Processing Time Series Data With Redis and Apache Kafka
A practical example of how to use Kafka, Redis, and Spring on Azure to process time series data and to create a scalable data pipeline that acts as a buffer.
Join the DZone community and get the full member experience.Join For Free
RedisTimeSeries is a Redis Module that brings native time series data structure to Redis. Time series solutions, which were built earlier on top of Sorted Sets (or Redis Streams), can benefit from
RedisTimeSeries features such as high volume inserts, low latency reads, flexible query language, down-sampling, and much more!
Generally speaking, time series data is (relatively) simple. Having said that, we need to factor in other characteristics as well:
- Data velocity: For example, think hundreds of metrics from thousands of devices per second
- Volume (big data): Think data accumulation over months (even years)
Thus, time series databases such as
RedisTimeSeries are just a part of the overall solution. You also need to think about how to collect (ingest), process, and send all your data to
RedisTimeSeries. What you really need is a scalable data pipeline that can act as a buffer to decouple producers and consumers.
That's where Apache Kafka comes in! In addition to the core broker, it has a rich ecosystem of components, including Kafka Connect (which is a part of the solution architecture presented in this blog post), client libraries in multiple languages, Kafka Streams, MirrorMaker, etc.
This blog post provides a practical example of how to use
RedisTimeSeries with Apache Kafka for analyzing time series data.
GitHub repo - https://github.com/abhirockzz/redis-timeseries-kafka
Let's start off by exploring the use case first. Please note that it has been kept simple for the purposes of this blog post, but the subsequent sections explain it further.
Imagine there are many locations, each of which has multiple devices, and you're tasked with the responsibility to monitor the device metrics (for now, we will only consider
pressure). We will store these metrics in
RedisTimeSeries (of course!) and use the following naming convention for the keys:
<metric name>:<location>:<device>. For example, the temperature for device
1 in location
5 will be represented as
temp:5:1. Each time series data point will also have the following labels (metadata):
device. This is to allow for flexible querying as you will see later in the upcoming sections.
Here are a couple of examples to give you an idea of how you would add data points using the
# temperature for device 2 in location 3 along with labels TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2 # pressure for device 2 in location 3 TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2`
Here is what the solution looks like at a high level:
To summarise the end-to-end flow:
Simulated device data is sent to a local MQTT broker. This data is picked up by the MQTT Kafka connect source connector and sent to a confluent cloud Kafka cluster in Azure. It is processed by a spring application in Azure Spring Cloud which is finally sent to a Redis database in Azure.
It's time to dive in! Before that, make sure you have the following:
- An Azure account
- Install Azure CLI
- JDK 11 (for example, OpenJDK)
- A recent version of Maven and Git
Follow the documentation to provision Azure Cache for Redis (Enterprise tier), which comes with the
Provision Confluent Cloud cluster on Azure Marketplace and also create a Kafka topic (for example,
az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
Before moving on, make sure to clone the GitHub repo:
git clone https://github.com/abhirockzz/redis-timeseries-kafka cd redis-timeseries-kafka
The components include:
- Mosquitto MQTT broker
- Grafana for tracking time series data in the dashboard
- Kafka Connect with the MQTT source connector
I installed and started the
mosquitto broker locally on Mac.
brew install mosquitto brew services start mosquitto
You can follow the steps corresponding to your OS or feel free to use this Docker image.
I installed and started Grafana locally on Mac.
brew install grafana brew services start grafana
You can do the same for your OS or feel free to use this Docker image.
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
You should be able to find the
connect-distributed.properties file in the repo that you just cloned. Replace the values for properties such as
First, download and unzip Apache Kafka locally.
Start a Local Kafka Connect Cluster
export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0> $KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
To install MQTT source connector manually:
- Download the connector/plugin ZIP file from this link
- Extract it into one of the directories that are listed on the Connect worker's
Note: If you're using Confluent Platform locally, simply use the CLI:
confluent-hub install confluentinc/kafka-connect-mqtt:latest
Create MQTT Source Connector Instance
Make sure to check the
mqtt-source-config.json file. Make sure you enter the right topic name for
kafka.topic and leave the
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @mqtt-source-config.json # wait for a minute before checking the connector status curl http://localhost:8083/connectors/mqtt-source/status
Build the application JAR file:
cd consumer export JAVA_HOME=/Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home mvn clean package
Create an Azure Spring Cloud application and deploy the JAR file:
az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11 az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
Use the script to send data to local MQTT broker. You can use the script in the GitHub repo you just cloned:
All it does is use the
mosquitto_pub CLI command to send data.
Data is sent to the
device-stats MQTT topic (this is not the Kafka topic). You can double-check it by using the CLI subscriber:
mosquitto_sub -h localhost -t device-stats
To validate the end-to-end pipeline, check the Kafka topic in the Confluent Cloud portal. You should also check the logs for the device data processor app in Azure Spring Cloud:
az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>
Go to the Grafana UI at
The Redis Data Source plugin for Grafana works with any Redis database, including Azure Cache for Redis. Follow the instructions in this blog post to configure a data source.
Import the dashboards in the
grafana_dashboards folder in the GitHub repo you had cloned. Refer to the Grafana documentation if you need assistance on how to import dashboards.
For instance, here is a dashboard that shows the average pressure (over 30 seconds) for device 5 in location 1 (uses
Here is another dashboard, that shows the maximum temperature (over 15 seconds) for multiple devices in location 3 (again, thanks to
Crank up the
redis-cli and connect to the Azure Cache for Redis instance:
redis-cli -h <azure redis hostname e.g. redisdb.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls
Start with simple queries:
# pressure in device 5 for location 1 TS.GET pressure:1:5 # temperature in device 5 for location 4 TS.GET temp:4:5
Filter by location, and then get temperature and pressure for all devices:
TS.MGET WITHLABELS FILTER location=3
Extract temp and pressure for all devices in one or more locations within a specific time range:
TS.MRANGE - + WITHLABELS FILTER location=3 TS.MRANGE - + WITHLABELS FILTER location=(3,5)
- + refers to everything from the beginning up until the latest timestamp, but you can be more specific.
MRANGE is what we need! We can get back multiple time series and use a filter.
We can also filter by a specific device in a location and further drill down by either temperature or pressure:
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
All these can be combined with aggregations.
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp # all the temp data points are not useful. how about an average (or max) instead of every temp data points? TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
It's also possible to create a rule to do this aggregation and store it in a different time series.
Once you're done, don't forget to delete resources to avoid unwanted costs.
Follow the steps in the documentation to delete the Confluent Cloud cluster — all you need to do is delete the Confluent organization. Similarly, you should delete the Azure Cache for Redis instance, as well.
On your local machine:
- Stop the Kafka Connect cluster
- Stop the Mosquitto broker (e.g.
brew services stop mosquitto)
- Stop Grafana service (e.g.
brew services stop grafana)
We explored a data pipeline to ingest, process, and query time series data using Redis and Kafka. When you think about the next steps and move towards a production-grade solution, you should consider a few more things.
- Retention policy: Think about this since your time series data points do not get trimmed/deleted by default
- Down-sampling/aggregations rules: You don't want to store data forever, right? Make sure to configure appropriate rules to take care of this (e.g.
TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000)
- Duplicate data policy: How would you like to handle duplicate samples? Make sure that the default policy (
BLOCK) is indeed what you need. If not, consider other options
This is not an exhaustive list. For other configuration options, please refer to the
What About Long-Term Data Retention?
Data is precious, including time series! You may want to process it further, for example: run machine learning to extract insights, predictive maintenance, etc. For this to be possible, you will need to retain this data for a longer time frame, and for this to be cost-effective and efficient, you would want to use a scalable object storage service such as Azure Data Lake Storage Gen2 (ADLS Gen2).
There is a connector for that! You could enhance your existing data pipeline by using the fully-managed Azure Data Lake Storage Gen2 sink connector for Confluent Cloud to process and store the data in ADLS and then run machine learning using Azure Synapse Analytics or Azure Databricks.
Your time series data volumes can only move one way — up! It's critical for your solution to be scalable from a variety of angles:
- Core infrastructure: Managed services allow teams to focus on the solution rather than setting up and maintaining infrastructure, especially when it comes to complex distributed systems such as databases and streaming platforms (for example, Redis and Kafka).
- Kafka Connect: As far as the data pipeline is concerned, you're in good hands since Kafka Connect platform is inherently stateless and horizontally scalable. You have a lot of options in terms of how you want to architect and size your Kafka Connect worker clusters.
- Custom applications: As was the case in this solution, we built a custom application to process data in Kafka topics. Fortunately, the same scalability characteristics apply to them as well. In terms of horizontal scale, it is limited only by the number of Kafka topic partitions you have.
Integration: It's not just Grafana!
RedisTimeSeries also integrates with Prometheus and Telegraf. However, there is no Kafka connector at the time this blog post was written - this would be a great add-on!
Sure, you can use Redis for (almost) everything, including time series workloads! Be sure to think about the end-to-end architecture for data pipeline and integration from time series data sources, all the way to Redis and beyond.
Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.