DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Getting Started With Azure Data Explorer Using the Go SDK

Trending

  • Scaling DevOps With NGINX Caching: Reducing Latency and Backend Load
  • Building an AI/ML Data Lake With Apache Iceberg
  • Building Resilient Identity Systems: Lessons from Securing Billions of Authentication Requests
  • Modern Test Automation With AI (LLM) and Playwright MCP
  1. DZone
  2. Coding
  3. Tools
  4. Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Use the Azure Data Explorer sink connector to ingest data from Apache Kafka.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Aug. 26, 20 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
6.7K Views

Join the DZone community and get the full member experience.

Join For Free

This blog will cover data ingestion from Kafka to Azure Data Explorer (Kusto) using Kafka Connect.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from any diverse sources, such as websites, applications, IoT devices, and more. Kafka Connect platform allows you to stream data between Apache Kafka and external systems in a scalable and reliable manner. The Kafka Connect Sink connector for Azure Data Explorer allows you to move data in Kafka topics to Azure Data Explorer tables which you can later query and analyze.

Here is the GitHub repo for this blog - https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial

The goal is to get started quickly, so we will keep things simple and Docker-ize everything! This includes Kafka, Zookeeper, Kafka Connect worker and the event generator application - defined in docker-compose.yaml

Over the course of this tutorial, you will:

  • Get an overview of the individual components
  • Configure and setup Azure Data Explorer and install the connector
  • Run the end to end demo

Prerequisites

  • You will need a Microsoft Azure account. Maybe try a free one?
  • Install Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser.
  • Docker and Docker Compose installed

Overview

As previously mentioned, all the components are defined inside docker-compose.yaml file. Let's go over it bit by bit:

The Kafka and Zookeeper part is pretty straightforward - using the debezium images

Java
 




x
15


 
1
  zookeeper:
2
    image: debezium/zookeeper:1.2
3
    ports:
4
      - 2181:2181
5
  kafka:
6
    image: debezium/kafka:1.2
7
    ports:
8
      - 9092:9092
9
    links:
10
      - zookeeper
11
    depends_on:
12
      - zookeeper
13
    environment:
14
      - ZOOKEEPER_CONNECT=zookeeper:2181
15
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092



The events-producer service is a simple application that sends Storm Events data to a Kafka topic. Storm Events data is a canonical example used throughout the Azure Data Explorer documentation (for example, check this Quickstart and the complete CSV file). The producer app uses the original CSV, but only includes selected fields (such as start and end time, state, source etc.) rather than the entire row (which has more than 20 columns). Here is the sample data:

Java
 




xxxxxxxxxx
1


 
1
2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
2
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
3
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9487,NEW YORK,Winter Weather,Department of Highways
4
...



The service component in Docker Compose is defined as such:

Java
 




xxxxxxxxxx
1
11


 
1
  events-producer:
2
    build:
3
      context: ./storm-events-producer
4
    links:
5
      - kafka
6
    depends_on:
7
      - kafka
8
    environment:
9
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
10
      - KAFKA_TOPIC=storm-events
11
      - SOURCE_FILE=StormEvents.csv



The sink connector is where a lot of the magic happens! Let's explore it:

Kafka Sink Connector for Azure Data Explorer

Here is the kusto-connect service in docker compose file:

Java
 




xxxxxxxxxx
1
15


 
1
  kusto-connect:
2
    build:
3
      context: ./connector
4
    ports:
5
      - 8083:8083
6
    links:
7
      - kafka
8
    depends_on:
9
      - kafka
10
    environment:
11
      - BOOTSTRAP_SERVERS=kafka:9092
12
      - GROUP_ID=adx
13
      - CONFIG_STORAGE_TOPIC=my_connect_configs
14
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
15
      - STATUS_STORAGE_TOPIC=my_connect_statuses



The container is built from a Dockerfile - this makes it easier for you to run it locally as opposed to pulling it from an external Docker registry

Java
 




xxxxxxxxxx
1


 
1
FROM debezium/connect:1.2
2
WORKDIR $KAFKA_HOME/connect
3
ARG KUSTO_KAFKA_SINK_VERSION
4
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar



It's based on top of the Debezium Kafka Connect image. Simply download the Kusto Connector JAR (version 1.0.1 at the time of writing) and place it in the Kafka Connect plugins directory. That's it!

Here is what the sink connector configuration file looks like:

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



The process of loading/importing data into a table in Azure Data Explorer is known as Ingestion. This is how the the connector operates as well.

https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview

Behind the scenes, it uses the following modules in the Java SDK for Azure Data Explorer
  • data: to connect, issue (control) commands and query data
  • ingest: to ingest data

At the time of writing, the data formats supported by the connector are: csv, json, txt, avro, apacheAvro, tsv, scsv, sohsv and psv. Data in the Kafka topics is written to files on disk. These are then sent to Azure Data Explorer based on the following connector configurations - when file has reached flush.size.bytes or the flush.interval.ms interval has passed.

The only exception to the above mechanism is the avro and apacheAvro data types which are handled as byte arrays

By "sent to Azure Data Explorer", what I really mean that the file is queued for Ingestion (using IngestClient.ingestFromFile)

Alright, lots of theory so far...

Let's Try It Out!

Clone this repo:

Java
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
2
cd kafka-kusto-ingestion-tutorial



Start off creating an Azure Data Explorer cluster and database using Azure Portal, Azure CLI or any of the client SDKs such as Python.

Once that's done, create a table (Storms) and respective mapping (Storms_CSV_Mapping):

Java
 




xxxxxxxxxx
1


 
1
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
2

          
3
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'



Start Containers and Install the Connector

Before installing the connector, we need to create a Service Principal in order for the connector to authenticate and connect to Azure Data Explorer service.

Use az ad sp create-for-rbac command:

Java
 




xxxxxxxxxx
1


 
1
az ad sp create-for-rbac -n "kusto-sp"



You should get a JSON response as such - please note down the appId, password and tenant as you will be using them in subsequent steps

Java
 




xxxxxxxxxx
1


 
1
{
2
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
3
  "displayName": "kusto-sp",
4
  "name": "http://kusto-sp",
5
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
6
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
7
}



Start the containers:

Java
 




xxxxxxxxxx
1


 
1
docker-compose up



The producer application will start sending events to the storm-events topic. You should see logs similar to:

Java
 




xxxxxxxxxx
1
16


 
1
....
2
events-producer_1  | sent message to partition 0 offset 0
3
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
4
events-producer_1  | 
5
events-producer_1  | sent message to partition 0 offset 1
6
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
7
events-producer_1  | 
8
events-producer_1  | sent message to partition 0 offset 2
9
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
10
events-producer_1  | 
11
events-producer_1  | sent message to partition 0 offset 3
12
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9494,NEW YORK,Winter Weather,Department of Highways
13
events-producer_1  | 
14
events-producer_1  | sent message to partition 0 offset 4
15
events-producer_1  | 2020/08/20 16:51:35 event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
16
....



We can now install the sink connector to consume these events and ingest them into Azure Data Explorer

Replace the values for following attributes in adx-sink-config.json: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (the database name) and kusto.url

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



In a different terminnal, keep a track of the connector service logs:

Java
 




xxxxxxxxxx
1


 
1
docker-compose logs -f | grep kusto-connect



Install the connector:

Java
 




xxxxxxxxxx
1


 
1
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
2

          
3
//check status
4
curl http://localhost:8083/connectors/storm/status



The connector should spring into action. Meanwhile in the other terminal, you should see logs similar to:

Java
 




xxxxxxxxxx
1


 
1
kusto-connect_1    | INFO   ||  Refreshing Ingestion Resources   [com.microsoft.azure.kusto.ingest.ResourceManager]
2
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_0.csv.gz) of size (9192) at current offset (93)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
3
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 1: {storm-events-0=OffsetAndMetadata{offset=94, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
4
ct.runtime.WorkerSinkTask]
5
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_94.csv.gz) of size (1864) at current offset (111)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
6
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 2: {storm-events-0=OffsetAndMetadata{offset=112, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
7
....



Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Java
 




xxxxxxxxxx
1


 
1
Storms | count
2

          
3
. show ingestion failures



Once there is some data, try out a few queries. To see all the records:

Java
 




xxxxxxxxxx
1


 
1
Storms



Use where and project to filter specific data

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| where EventType == 'Drought' and State == 'TEXAS'
3
| project StartTime, EndTime, Source, EventId



Use the summarize operator

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| summarize event_count=count() by State
3
| where event_count > 10
4
| project State, event_count
5
| render columnchart



These are just few examples. Dig into the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

If you want to re-start from scratch, simply stop the containers (docker-compose down -v), delete (drop table Storms) and re-create the Storms table (along with the mapping) and re-start containers (docker-compose up)

Clean Up

To delete the Azure Data Explorer cluster/database, use az cluster delete or az kusto database delete

Java
 




xxxxxxxxxx
1


 
1
az kusto cluster delete -n <cluster name> -g <resource group name>
2
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>



That's a Wrap!

I hope this helps you get started building data ingestion pipelines from Kafka to Azure Data Explorer using the Kafka Connect sink connector. This is not the only way to ingest data into Azure Data Explorer (of course!). You're welcome to explore the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub and much more!

Until next time, happy exploring!

kafka azure Azure Data Explorer Data (computing) Database Java (programming language) Docker (software) Connector (mathematics)

Opinions expressed by DZone contributors are their own.

Related

  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Getting Started With Azure Data Explorer Using the Go SDK

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!