Data Ingestion Into Azure Data Explorer Using Kafka Connect
In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer.
Join the DZone community and get the full member experience.Join For Free
In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer running on Kubernetes using Strimzi. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors and Strimzi provides a "Kubernetes-native" way of running Kafka clusters, as well as Kafka, Connect workers.
Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from many diverse sources, such as websites, applications, IoT devices, and more. It has a rich connector ecosystem that supports ingestion into Azure Data Explorer as detailed here. One of the supported sources is Apache Kafka and the sink connector allows you to move data from Kafka topics into Azure Data Explorer tables which you can later query and analyze. The best part is that you can do so in a scalable and fault-tolerant way using just configuration!
Here is an overview of the scenario depicted in this blog post:
The Azure Data Explorer Kafka Connector picks up data from the configured Kafka topic and queues up ingestion processes (in batches) which eventually write data to a table in Azure Data Explorer. Behind the scenes, the connector leverages the Java SDK for Azure Data Explorer.
Resources for this blog post are available on GitHub
Here are some quick pointers to setting up an Azure Data Explorer cluster and a managed Kubernetes service on Azure. I recommend installing the below services as a part of a single Azure Resource Group which makes it easy to manage these services
You can set up an Azure Data Explorer cluster and database using Azure Portal, Azure CLI, or any of the client SDKs such as Python. Once that's done, create a table (named
Storms) and respective mapping (named
Storms_CSV_Mapping) using below queries:
Azure Kubernetes Service (optional)
I have used Azure Kubernetes Service (AKS) but the instructions in this blog post should work for other options as well (e.g. with a local
minikube cluster on your laptop). You can set up an AKS cluster using Azure CLI, Azure portal, or ARM template
Start by installing the Strimzi Operator and use it to spin up a single-node Kafka cluster on Kubernetes. Installing Strimzi using
Helm is pretty easy:
helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-kafka strimzi/strimzi-kafka-operator
To confirm successful installation:
kubectl get pods -l=name=strimzi-cluster-operator
You should see the cluster operator
To deploy a single-node Kafka cluster (along with Zookeeper):
kubectl apply -f https://github.com/abhirockzz/
Wait for the cluster to start:
kubectl get pod my-kafka-cluster-kafka-0 -w
The Kafka Pod should transition to
Running status and both the containers should be in
Kafka Connect Cluster Setup
The Strimzi container images for Kafka Connect include two built-in file connectors -
FileStreamSinkConnector. For this blog, a custom Docker image seeded with Azure Data Explorer connector (version
1.0.1) is available on Docker Hub and it is referenced in the
KafkaConnect resource definition (
If you want to build your own Docker image, use the Strimzi Kafka Docker image as a base and add the Azure Data Explorer connector JAR top to the plugin path. Start by downloading the connector JAR file:
Then, you can use this
Dockerfile to build the Docker image:
This technique has been illustrated in the Strimzi documentation
Before installing the connector, we need to create an Azure Service Principal for the connector to authenticate and connect to the Azure Data Explorer service. You can use the az ad sp create-for-RBAC command:
az ad sp create-for-rbac -n "kusto-sp"
You will get a JSON response as below - please note down the
tenant as you will be using them in subsequent steps
Add permissions to your database
Provide an appropriate role to the Service Principal you just created. To assign the
admin role, follow this guide to use the Azure portal or use the following command in your Data Explorer cluster
.add database <database name> admins
('aadapp=<service principal AppID>;
<service principal TenantID>') 'AAD App'
We will seed the auth related config as a Kubernetes Secret - later on, you will see where this
Secret is referenced.
Create a file called
adx-auth.yaml with the below contents.
Replace values for the following:
kustoURLAzure Data Explorer ingestion URL e.g.
tenantIDService principal tenant ID
appIDService principal application ID
passwordService principal password
Install Kafka Connect
Secret and initiate the Kafka Cluster creation:
kubectl apply -f adx-auth.yaml
kubectl apply -f https://github.com/abhirockzz/
While you wait for the Kafka Connect cluster to start, take a look at this snippet of the
KafkaConnect cluster resource definition. Notice the
externalConfiguration attribute that points to the secret we had just created. It is loaded into the Kafka Connect
Pod as a Volume and the Kafka FileConfigProvider is used to access them.
To check Kafka Connect cluster status:
kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -w
Wait for the Kafka Connect Pod to transition into
Create the Topic and Install Connector
You can use the Strimzi Entity Operator to create the
storm-events topic. Here is the
kubectl get kafkatopic to see the topic you just created as well as internal Kafka topics
Here is snippet of the connector (
KafkaConnector) definition - it's just a way to capture configuration and metadata for the connector you want to install.
flush.interval.msattributes work in tandem with each other and serve as a performance knob for batching. Please refer to the connector GitHub repo for details on these and other configuration parameters
Notice how the individual properties (from the
Secret) are referenced. For example, to reference the Service Principal application ID, we used this:
/opt/kafka/external-configurationis a fixed path inside the container
adx-auth-configis the name of the volume in the
adx-auth.propertiesis the name of the file as defined in the
appIDis the name of the key
The direct attribute name has been used to define non-sensitive connector configs (e.g.
topics: storm-events). Alternatively, can encapsulate these in a
ConfigMap, load them as a
Volumeand reference them (just like the sensitive attributes using a
Copy the above definition for the
KafkaConnector to local file
adx-connect-config.yaml. Make sure you replace the correct database name in the
kusto.tables.topics.mapping attribute. To create:
kubectl apply -f adx-connect-config.yaml
Check the Kafka connect logs
kubectl logs -l=strimzi.io/cluster=my-connect-cluster. If everything is working fine, you should see logs similar to this:
Data Ingestion in Action
So, we have everything setup. All we need is events to be sent to the Kafka topic so that we can see the connector in action and ingest data into Azure Data Explorer.
You can use this handy event generator application (available in Docker Hub) and deploy it to your Kubernetes cluster - the
Dockerfile is available in the GitHub repo in case you want to reference it.
To deploy the producer application:
The application picks up records from the StormEvents.csv file and sends them to a Kafka topic. Each event is a CSV record that represents data for a Storm occurrence (start and end time, state, type, etc.), for example:
05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer.
The producer application waits for 3 seconds between subsequent produce operations to Kafka. This is intentional so that you can monitor the Kafka Connect logs and make sense of what's going on. The
StormEvents.csvfile contains more than 50,000 records, so it might take a while for all of them to be batched and ingested to Azure Data Explorer
You can track the application logs using:
kubectl logs -f -l app=adx-event-producer. If all is well, you should see something similar to this:
storm-events the topic will now start getting events and these will be picked up by the sink connector. If you were to track the connector logs:
kubectl logs -f -l strimzi.io/cluster=my-connect-cluster
... you should see logs similar to this:
Query Azure Data Explorer
Wait for sometime before data ends up in the
Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:
Storms | count
. show ingestion failures
Once there is some data, try out a few queries. To see all the records:
project to filter specific data
| where EventType == 'Drought' and State == 'TEXAS'
| project StartTime, EndTime, Source, EventId
| summarize event_count=count() by State
| where event_count > 10
| project State, event_count
| render columnchart
These are just few examples. Please take a look at the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.
Clean Up Resources
To delete the connector and/or Kafka cluster:
kubectl delete kafkaconnect/my-connect-cluster
kubectl delete kafka/my-kafka-cluster
To delete the AKS and Azure Data Explorer clusters, simply delete the resource group:
az group delete --name <AZURE_RESOURCE_GROUP> --yes --no-wait
That's all for this blog post and I hope you found it useful! Please note that, this is not the only way to ingest data into Azure Data Explorer. You're welcome to refer to the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub etc.
Please consider exploring the following topics as additional learning resources:
Opinions expressed by DZone contributors are their own.