DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Control Your Services With OTEL, Jaeger, and Prometheus
  • Utilizing AI and Database Technologies to Stimulate Innovation
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Entity Creation With Asynchronous Pipelines in Distributed Systems

Trending

  • MCP Servers: The Technical Debt That Is Coming
  • Event Driven Architecture (EDA) - Optimizer or Complicator
  • Rust, WASM, and Edge: Next-Level Performance
  • Scaling Microservices With Docker and Kubernetes on Production
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Processing Time Series Data With QuestDB and Apache Kafka

Processing Time Series Data With QuestDB and Apache Kafka

In this article, create a reference implementation of how to poll real-time market data and use Kafka to stream that to QuestDB via Kafka Connect.

By 
Yitaek Hwang user avatar
Yitaek Hwang
DZone Core CORE ·
Apr. 04, 23 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
6.2K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Kafka is a battle-tested distributed stream-processing platform popular in the financial industry to handle mission-critical transactional workloads. Kafka’s ability to handle large volumes of real-time market data makes it a core infrastructure component for trading, risk management, and fraud detection. Financial institutions use Kafka to stream data from market data feeds, transaction data, and other external sources to drive decisions.

A common data pipeline to ingest and store financial data involves publishing real-time data to Kafka and utilizing Kafka Connect to stream that to databases. For example, the market data team may continuously update real-time quotes for a security to Kafka, and the trading team may consume that data to make buy/sell orders. Processed market data and orders may then be saved to a time series database for further analysis.

In this article, we’ll create a sample data pipeline to illustrate how this could work in practice. We will poll an external data source (FinnHub) for real-time quotes of stocks and ETFs, and publish that information to Kafka. Kafka Connect will then grab those records and publish them to a time series database (QuestDB) for analysis.

Finnhub > Kafka > QuestDB

Prerequisites

  • Git
  • Docker Engine: 20.10+
  • Golang 1.19+
  • FinnHub API Token

Setup

To run the example locally, first clone the repo.

The codebase is organized into three parts:

  • Golang code is located at the root of the repo.
  • Dockerfile for the Kafka Connect QuestDB image and the Docker Compose YAML file is under docker.
  • JSON files for Kafka Connect sinks are under kafka-connect-sinks. 

Building the Kafka Connect QuestDB Image

We first need to build the Kafka Connect docker image with QuestDB Sink connector. Navigate to the docker directory and run docker-compose build.

The Dockerfile is simply installing the Kafka QuestDB Connector via Confluent Hub on top of the Confluent Kafka Connect base image:

FROM confluentinc/cp-kafka-connect-base:7.3.2
RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6


Start Kafka, Kafka Connect, and QuestDB

Next, we will set up the infrastructure via Docker Compose. From the same docker directory, run Docker Compose in the background:

docker-compose up -d


This will start Kafka + Zookeeper, our custom Kafka Connect image with the QuestDB Connector installed, as well as QuestDB. The full content of the Docker Compose file is as follows:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.3.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-connect:
    image: cp-kafka-connect-questdb
    build:
      context: .
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - zookeeper
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

  questdb:
    image: questdb/questdb
    hostname: questdb
    container_name: questdb
    ports:
      - "9000:9000"
      - "9009:9009"


Start the QuestDB Kafka Connect Sink

Wait for the Docker containers to be healthy (the kafka-connect image will log “Finished starting connectors and tasks” message), and we can create our Kafka Connect sinks. We will create two sinks: one for Tesla and one for SPY (SPDR S&P 500 ETF) to compare price trends of a volatile stock and the overall market.

Issue the following curl command to create the Tesla sink within the kafka-connect-sinks directory:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-TSLA.json http://localhost:8083/connectors


The JSON file it posts contains the following configurations.

{
  "name": "questdb-sink-SPY",
  "config": {
    "connector.class":"io.questdb.kafka.QuestDBSinkConnector",
    "tasks.max":"1",
    "topics": "topic_SPY",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "host": "questdb",
    "timestamp.field.name": "timestamp"
  }
}


Create the sink for SPY as well:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-SPY.json http://localhost:8083/connectors


Streaming Real Time Stock Quotes

Now that we have our data pipeline set up, we are ready to stream real-time stock quotes to Kafka and store them in QuestDB.

First, we need to get a free API token from Finnhub Stock API. Create a free account online and copy the API key.

Export that key to our shell under FINNHUB_TOKEN :

export FINNHUB_TOKEN=<my-token-here>


The real-time quote endpoint returns various attributes such as the current price, high/low/open quotes, as well as previous close price. Since we are just interested in the current price, we only grab the price and add the ticket symbol and timestamp to the Kafka JSON message.

The code below will grab the quote every 30 seconds and publish to the Kafka topic: topic_TSLA .

package main

import (
 "encoding/json"
 "fmt"
 "io/ioutil"
 "net/http"
 "os"
 "time"

 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type StockData struct {
 Price float64 `json:"c"`
}

type StockDataWithTime struct {
 Symbol    string  `json:"symbol"`
 Price     float64 `json:"price"`
 Timestamp int64   `json:"timestamp"`
}

func main() {
 // Create a new Kafka producer instance
 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
 if err != nil {
  panic(fmt.Sprintf("Failed to create producer: %s\n", err))
 }
 defer p.Close()

 for {
  token, found := os.LookupEnv("FINNHUB_TOKEN")
  if !found {
   panic("FINNHUB_TOKEN is undefined")
  }
  symbol := "TSLA"

  url := fmt.Sprintf("https://finnhub.io/api/v1/quote?symbol=%s&token=%s", symbol, token)

  // Retrieve the stock data
  resp, err := http.Get(url)
  if err != nil {
   fmt.Println(err)
   return
  }
  defer resp.Body.Close()

  // Read the response body
  body, err := ioutil.ReadAll(resp.Body)
  if err != nil {
   fmt.Println(err)
   return
  }

  // Unmarshal the JSON data into a struct
  var data StockData
  err = json.Unmarshal(body, &data)
  if err != nil {
   fmt.Println(err)
   return
  }

  // Format data with timestamp
  tsData := StockDataWithTime{
   Symbol:    symbol,
   Price:     data.Price,
   Timestamp: time.Now().UnixNano() / 1000000,
  }

  jsonData, err := json.Marshal(tsData)
  if err != nil {
   fmt.Println(err)
   return
  }

  topic := fmt.Sprintf("topic_%s", symbol)
  err = p.Produce(&kafka.Message{
   TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
   Value:          jsonData,
  }, nil)

  if err != nil {
   fmt.Printf("Failed to produce message: %s\n", err)
  }

  fmt.Printf("Message published to Kafka: %s", string(jsonData))

  time.Sleep(30 * time.Second)
 }
}


To start streaming the data, run the code:

$ go run main.go


To also get data for SPY, open up another terminal window, modify the code for the symbol to SPY and run the code as well with the token value set.

Result

After running the producer code, it will print out messages that it sends to Kafka like: Message published to Kafka: {“symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215} . This data is sent to the Kafka topic topic_TSLA and sent to QuestDB via the Kafka Connect sink.

We can then navigate to localhost:9000 to access the QuestDB console. Searching for all records in the topic_TSLA table, we can see our real-time market quotes:

SELECT * FROM ‘topic_TSLA’

We can also look at SPY data from topic_SPY :

SELECT * FROM ‘topic_SPY’

With the data now in QuestDB, we can query for aggregate information by getting the average price over a 2m window:

SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m;

Query for aggregate information by getting the average price over a 2m window

Conclusion

Kafka is a trusted component of data pipelines handling large amounts of time series data such as financial data. Kafka can be used to stream mission-critical source data to multiple destinations, including time series databases suited for real-time analytics.

In this article, we created a reference implementation of how to poll real-time market data and use Kafka to stream that to QuestDB via Kafka Connect. For more information on the QuestDB Kafka connector, check out the overview page on the QuestDB website. It lists more information on the configuration details and FAQs on setting it up. The GitHub repo for the connector also has sample projects including a Node.js and Java example for those looking to extend this reference architecture.

Time series kafka Database Pipeline (software)

Published at DZone with permission of Yitaek Hwang. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Control Your Services With OTEL, Jaeger, and Prometheus
  • Utilizing AI and Database Technologies to Stimulate Innovation
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Entity Creation With Asynchronous Pipelines in Distributed Systems

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!