DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Kafka JDBC Source Connector for Large Data
  • How to Integrate a Distributed Database With Event Streaming
  • Common Performance Management Mistakes
  • How To Get Closer to Consistency in Microservice Architecture

Trending

  • Optimizing High-Volume REST APIs Using Redis Caching and Spring Boot (With Load Testing Code)
  • AI Agents in Java: Architecting Intelligent Health Data Systems
  • Securing the AI Host: Spring AI MCP Server Communication With API Keys
  • DevOps and Platform Engineering Readiness Checklist: Everything Needed for a Scalable, Secure, High-Velocity Delivery Platform
  1. DZone
  2. Coding
  3. Tools
  4. Data Ingestion Into Azure Data Explorer Using Kafka Connect

Data Ingestion Into Azure Data Explorer Using Kafka Connect

In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Sep. 28, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
3.2K Views

Join the DZone community and get the full member experience.

Join For Free

In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer running on Kubernetes using Strimzi. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors and Strimzi provides a "Kubernetes-native" way of running Kafka clusters, as well as Kafka, Connect workers.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from many diverse sources, such as websites, applications, IoT devices, and more. It has a rich connector ecosystem that supports ingestion into Azure Data Explorer as detailed here. One of the supported sources is Apache Kafka and the sink connector allows you to move data from Kafka topics into Azure Data Explorer tables which you can later query and analyze. The best part is that you can do so in a scalable and fault-tolerant way using just configuration!

Here is an overview of the scenario depicted in this blog post:

The Azure Data Explorer Kafka Connector picks up data from the configured Kafka topic and queues up ingestion processes (in batches) which eventually write data to a table in Azure Data Explorer. Behind the scenes, the connector leverages the Java SDK for Azure Data Explorer.

Resources for this blog post are available on GitHub

Prerequisites

You will need an Azure account along with Azure CLI or Azure Cloud Shell.

Here are some quick pointers to setting up an Azure Data Explorer cluster and a managed Kubernetes service on Azure. I recommend installing the below services as a part of a single Azure Resource Group which makes it easy to manage these services

Azure Data Explorer

You can set up an Azure Data Explorer cluster and database using Azure Portal, Azure CLI, or any of the client SDKs such as Python. Once that's done, create a table (named Storms) and respective mapping (named Storms_CSV_Mapping) using below queries:

Java
 




x


 
1
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
2

          
3
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'



Azure Kubernetes Service (optional)

I have used Azure Kubernetes Service (AKS) but the instructions in this blog post should work for other options as well (e.g. with a local minikube cluster on your laptop). You can set up an AKS cluster using Azure CLI, Azure portal, or ARM template

Base Installation

Start by installing the Strimzi Operator and use it to spin up a single-node Kafka cluster on Kubernetes. Installing Strimzi using Helm is pretty easy:

helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-kafka strimzi/strimzi-kafka-operator

To confirm successful installation:

kubectl get pods -l=name=strimzi-cluster-operator

You should see the cluster operator Pod in Running status

Java
 




xxxxxxxxxx
1


 
1
NAME                                        READY   STATUS    RESTARTS   AGE
2
strimzi-cluster-operator-5c66f679d5-69rgk   1/1     Running   0          43s
3

          



To deploy a single-node Kafka cluster (along with Zookeeper):

kubectl apply -f https://github.com/abhirockzz/

kusto-kafka-connect-strimzi/raw/master/deploy/kafka.yaml

Wait for the cluster to start:

kubectl get pod my-kafka-cluster-kafka-0 -w

The Kafka Pod should transition to Running status and both the containers should be in READY state

Java
 




xxxxxxxxxx
1


 
1
NAME                       READY   STATUS    RESTARTS   AGE
2
my-kafka-cluster-kafka-0   2/2     Running   0          1m



Kafka Connect Cluster Setup

The Strimzi container images for Kafka Connect include two built-in file connectors - FileStreamSourceConnector and FileStreamSinkConnector. For this blog, a custom Docker image seeded with Azure Data Explorer connector (version 1.0.1) is available on Docker Hub and it is referenced in the KafkaConnect resource definition (image: abhirockzz/adx-connector-strimzi:1.0.1):

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
image: abhirockzz/adx-connector-strimzi:1.0.1
version: 2.4.0
....

If you want to build your own Docker image, use the Strimzi Kafka Docker image as a base and add the Azure Data Explorer connector JAR top to the plugin path. Start by downloading the connector JAR file:

Java
 




xxxxxxxxxx
1


 
1
export KUSTO_KAFKA_SINK_VERSION=1.0.1
2
mkdir connector && cd connector
3
curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar



Then, you can use this Dockerfile to build the Docker image:

Java
 




xxxxxxxxxx
1


 
1
FROM strimzi/kafka:0.19.0-kafka-2.4.0
2
USER root:root
3
COPY ./connector/ /opt/kafka/plugins/
4
RUN ls -lrt /opt/kafka/plugins/
5
USER 1001


This technique has been illustrated in the Strimzi documentation

Authentication

Before installing the connector, we need to create an Azure Service Principal for the connector to authenticate and connect to the Azure Data Explorer service. You can use the az ad sp create-for-RBAC command:

az ad sp create-for-rbac -n "kusto-sp"

You will get a JSON response as below - please note down the appId, password and tenant as you will be using them in subsequent steps

Java
 




xxxxxxxxxx
1


 
1
{
2
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
3
  "displayName": "kusto-sp",
4
  "name": "http://kusto-sp",
5
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
6
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
7
}



Add permissions to your database

Provide an appropriate role to the Service Principal you just created. To assign the admin role, follow this guide to use the Azure portal or use the following command in your Data Explorer cluster

.add database <database name> admins

('aadapp=<service principal AppID>;

<service principal TenantID>') 'AAD App'

We will seed the auth related config as a Kubernetes Secret - later on, you will see where this Secret is referenced.

Create a file called adx-auth.yaml with the below contents.

Java
 




xxxxxxxxxx
1
11


 
1
apiVersion: v1
2
kind: Secret
3
metadata:
4
  name: adx-auth
5
type: Opaque
6
stringData:
7
  adx-auth.properties: |-
8
    kustoURL: <replace ADX Ingest URL>
9
    tenantID: <enter service principal tenant ID>
10
    appID: <enter service principal app ID>
11
    password: <enter service principal tenant password>



Replace values for the following:

  • kustoURL Azure Data Explorer ingestion URL e.g. https://ingest-[cluster name].[region].kusto.windows.net
  • tenantID Service principal tenant ID
  • appID Service principal application ID
  • password Service principal password

Install Kafka Connect

Create the Secret and initiate the Kafka Cluster creation:

kubectl apply -f adx-auth.yaml

kubectl apply -f https://github.com/abhirockzz/

kusto-kafka-connect-strimzi/raw/master/deploy/kafka-connect.yaml

While you wait for the Kafka Connect cluster to start, take a look at this snippet of the KafkaConnect cluster resource definition. Notice the externalConfiguration attribute that points to the secret we had just created. It is loaded into the Kafka Connect Pod as a Volume and the Kafka FileConfigProvider is used to access them.

Java
 




xxxxxxxxxx
1
15


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaConnect
3
metadata:
4
  name: my-connect-cluster
5
spec:
6
  image: abhirockzz/adx-connector-strimzi:1.0.1
7
  config:
8
    ...
9
    config.providers: file
10
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
11
  externalConfiguration:
12
    volumes:
13
      - name: adx-auth-config
14
        secret:
15
          secretName: adx-auth



To check Kafka Connect cluster status:

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -w

Wait for the Kafka Connect Pod to transition into Running state.

Java
 




xxxxxxxxxx
1


 
1
NAME                                          READY   STATUS    RESTARTS   AGE
2
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          1m



Create the Topic and Install Connector

You can use the Strimzi Entity Operator to create the storm-events topic. Here is the Topic definition:

Java
 




xxxxxxxxxx
1


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaTopic
3
metadata:
4
  name: storm-events
5
  labels:
6
    strimzi.io/cluster: my-kafka-cluster
7
spec:
8
  partitions: 3
9
  replicas: 1



To create:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://github.com/abhirockzz/kusto-kafka-connect-strimzi/raw/master/deploy/topic.yaml



Use kubectl get kafkatopic to see the topic you just created as well as internal Kafka topics

Java
 




xxxxxxxxxx
1


 
1
NAME                                                          PARTITIONS   REPLICATION FACTOR
2
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           1
3
storm-events                                                  3            1
4
strimzi-connect-cluster-configs                               1            1
5
strimzi-connect-cluster-offsets                               25           1
6
strimzi-connect-cluster-status                                5            1



Here is snippet of the connector (KafkaConnector) definition - it's just a way to capture configuration and metadata for the connector you want to install.

Java
 




xxxxxxxxxx
1
20


 
1
apiVersion: kafka.strimzi.io/v1alpha1
2
kind: KafkaConnector
3
metadata:
4
  name: adx-sink-connector
5
  labels:
6
    strimzi.io/cluster: my-connect-cluster
7
spec:
8
  class: com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
9
  tasksMax: 3
10
  config:
11
    topics: storm-events
12
    flush.size.bytes: 10000
13
    flush.interval.ms: 50000
14
    kusto.tables.topics.mapping: "[{'topic': 'storm-events','db': '[REPLACE DATABASE NAME]', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]"
15
    kusto.url: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:kustoURL}
16
    aad.auth.authority: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:tenantID}
17
    aad.auth.appid: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:appID}
18
    aad.auth.appkey: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:password}
19
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
20
    value.converter: "org.apache.kafka.connect.storage.StringConverter"


The flush.size.bytes and flush.interval.ms attributes work in tandem with each other and serve as a performance knob for batching. Please refer to the connector GitHub repo for details on these and other configuration parameters

Notice how the individual properties (from the Secret) are referenced. For example, to reference the Service Principal application ID, we used this:

Java
 




xxxxxxxxxx
1


 
1
aad.auth.appid: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:appID}


  • /opt/kafka/external-configuration is a fixed path inside the container
  • adx-auth-config is the name of the volume in the KafkaConnect definition
  • adx-auth.properties is the name of the file as defined in the Secret
  • appID is the name of the key

The direct attribute name has been used to define non-sensitive connector configs (e.g. topics: storm-events). Alternatively, can encapsulate these in a ConfigMap, load them as a Volume and reference them (just like the sensitive attributes using a Secret).

Copy the above definition for the KafkaConnector to local file adx-connect-config.yaml. Make sure you replace the correct database name in the kusto.tables.topics.mapping attribute. To create:

kubectl apply -f adx-connect-config.yaml

Check the Kafka connect logs kubectl logs -l=strimzi.io/cluster=my-connect-cluster. If everything is working fine, you should see logs similar to this:

Java
 




xxxxxxxxxx
1


 
1
....
2
INFO [Consumer clientId=connector-consumer-adx-sink-connector-1, groupId=connect-adx-sink-connector] Resetting offset for partition storm-events-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-adx-sink-connector-1]
3

          
4
INFO [Consumer clientId=connector-consumer-adx-sink-connector-2, groupId=connect-adx-sink-connector] Resetting offset for partition storm-events-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-adx-sink-connector-2]



Data Ingestion in Action

So, we have everything setup. All we need is events to be sent to the Kafka topic so that we can see the connector in action and ingest data into Azure Data Explorer.

You can use this handy event generator application (available in Docker Hub) and deploy it to your Kubernetes cluster - the Dockerfile is available in the GitHub repo in case you want to reference it.

Kubernetes Deployment snippet:

Java
 




xxxxxxxxxx
1
19


 
1
apiVersion: apps/v1
2
kind: Deployment
3
metadata:
4
  name: adx-event-producer
5
spec:
6
  replicas: 1
7
  ....
8
    spec:
9
      containers:
10
        - name: adx-event-producer
11
          image: abhirockzz/adx-event-producer
12
          imagePullPolicy: Always
13
          env:
14
            - name: KAFKA_BOOTSTRAP_SERVER
15
              value: my-kafka-cluster-kafka-bootstrap:9092
16
            - name: KAFKA_TOPIC
17
              value: storm-events
18
            - name: SOURCE_FILE
19
              value: StormEvents.csv



To deploy the producer application:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://github.com/abhirockzz/kusto-kafka-connect-strimzi/raw/master/deploy/producer.yaml



The application picks up records from the StormEvents.csv file and sends them to a Kafka topic. Each event is a CSV record that represents data for a Storm occurrence (start and end time, state, type, etc.), for example: 2007-01-01 00:00:00.0000000,2007-01-01

05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer.

The producer application waits for 3 seconds between subsequent produce operations to Kafka. This is intentional so that you can monitor the Kafka Connect logs and make sense of what's going on. The StormEvents.csv file contains more than 50,000 records, so it might take a while for all of them to be batched and ingested to Azure Data Explorer

You can track the application logs using: kubectl logs -f -l app=adx-event-producer. If all is well, you should see something similar to this:

Java
 




xxxxxxxxxx
1


 
1
...
2
sent message to partition 0 offset 0
3
event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
4

          
5
sent message to partition 0 offset 1
6
event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
7

          
8
sent message to partition 0 offset 2
9
event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer



The storm-events the topic will now start getting events and these will be picked up by the sink connector. If you were to track the connector logs:

kubectl logs -f -l strimzi.io/cluster=my-connect-cluster

... you should see logs similar to this:

Java
 




xxxxxxxxxx
1


 
1
....
2
INFO Kusto ingestion: file (/tmp/kusto-sink-connector-17d03941-f8ca-498e-bc52-68ced036dc69/kafka_storm-events_0_0.csv.gz) of size (1722) at current offset (16) (com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter) [Timer-6]
3

          
4
INFO WorkerSinkTask{id=adx-sink-connector-0} Committing offsets asynchronously using sequence number 17: {storm-events-0=OffsetAndMetadata{offset=17, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-adx-sink-connector-0]
5

          
6
INFO Kusto ingestion: file (/tmp/kusto-sink-connector-17d03941-f8ca-498e-bc52-68ced036dc69/kafka_storm-events_0_17.csv.gz) of size (1666) at current offset (33) (com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter) [Timer-7]
7
....



Query Azure Data Explorer

Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Storms | count
. show ingestion failures

Once there is some data, try out a few queries. To see all the records:

Storms

Use where and project to filter specific data

Storms
| where EventType == 'Drought' and State == 'TEXAS'
| project StartTime, EndTime, Source, EventId

Use the summarize operator.

Storms
| summarize event_count=count() by State
| where event_count > 10
| project State, event_count
| render columnchart

These are just few examples. Please take a look at the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

Clean Up Resources

To delete the connector and/or Kafka cluster:

kubectl delete kafkaconnect/my-connect-cluster
kubectl delete kafka/my-kafka-cluster

To delete the AKS and Azure Data Explorer clusters, simply delete the resource group:

az group delete --name <AZURE_RESOURCE_GROUP> --yes --no-wait

Conclusion

That's all for this blog post and I hope you found it useful! Please note that, this is not the only way to ingest data into Azure Data Explorer. You're welcome to refer to the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub etc.

Please consider exploring the following topics as additional learning resources:

Resources

  • Configuring Kafka Connect cluster using Strimzi
  • Strimzi KafkaConnect schema reference
  • Strimzi KafkaConnector schema reference
  • Just Enough Azure Data Explorer for Cloud Architects
  • What's new in Azure Data Explorer connector 1.x
  • Kusto Query Language (quick reference?WT.mc_id=dzone-blog-abhishgu)
kafka Azure Data Explorer Data (computing) azure Database Docker (software) Connector (mathematics) Kubernetes Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • Kafka JDBC Source Connector for Large Data
  • How to Integrate a Distributed Database With Event Streaming
  • Common Performance Management Mistakes
  • How To Get Closer to Consistency in Microservice Architecture

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook