DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Building Scalable Real-Time Apps with AstraDB and Vaadin
Register Now

Trending

  • You’ve Got Mail… and It’s a SPAM!
  • Mainframe Development for the "No Mainframe" Generation
  • Application Architecture Design Principles
  • Explainable AI: Making the Black Box Transparent

Trending

  • You’ve Got Mail… and It’s a SPAM!
  • Mainframe Development for the "No Mainframe" Generation
  • Application Architecture Design Principles
  • Explainable AI: Making the Black Box Transparent
  1. DZone
  2. Coding
  3. Tools
  4. Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Use the Azure Data Explorer sink connector to ingest data from Apache Kafka.

Abhishek Gupta user avatar by
Abhishek Gupta
CORE ·
Aug. 26, 20 · Tutorial
Like (6)
Save
Tweet
Share
5.99K Views

Join the DZone community and get the full member experience.

Join For Free

This blog will cover data ingestion from Kafka to Azure Data Explorer (Kusto) using Kafka Connect.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from any diverse sources, such as websites, applications, IoT devices, and more. Kafka Connect platform allows you to stream data between Apache Kafka and external systems in a scalable and reliable manner. The Kafka Connect Sink connector for Azure Data Explorer allows you to move data in Kafka topics to Azure Data Explorer tables which you can later query and analyze.

Here is the GitHub repo for this blog - https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial

The goal is to get started quickly, so we will keep things simple and Docker-ize everything! This includes Kafka, Zookeeper, Kafka Connect worker and the event generator application - defined in docker-compose.yaml

Over the course of this tutorial, you will:

  • Get an overview of the individual components
  • Configure and setup Azure Data Explorer and install the connector
  • Run the end to end demo

Prerequisites

  • You will need a Microsoft Azure account. Maybe try a free one?
  • Install Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser.
  • Docker and Docker Compose installed

Overview

As previously mentioned, all the components are defined inside docker-compose.yaml file. Let's go over it bit by bit:

The Kafka and Zookeeper part is pretty straightforward - using the debezium images

Java
 




x
15


 
1
  zookeeper:
2
    image: debezium/zookeeper:1.2
3
    ports:
4
      - 2181:2181
5
  kafka:
6
    image: debezium/kafka:1.2
7
    ports:
8
      - 9092:9092
9
    links:
10
      - zookeeper
11
    depends_on:
12
      - zookeeper
13
    environment:
14
      - ZOOKEEPER_CONNECT=zookeeper:2181
15
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092



The events-producer service is a simple application that sends Storm Events data to a Kafka topic. Storm Events data is a canonical example used throughout the Azure Data Explorer documentation (for example, check this Quickstart and the complete CSV file). The producer app uses the original CSV, but only includes selected fields (such as start and end time, state, source etc.) rather than the entire row (which has more than 20 columns). Here is the sample data:

Java
 




xxxxxxxxxx
1


 
1
2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
2
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
3
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9487,NEW YORK,Winter Weather,Department of Highways
4
...



The service component in Docker Compose is defined as such:

Java
 




xxxxxxxxxx
1
11


 
1
  events-producer:
2
    build:
3
      context: ./storm-events-producer
4
    links:
5
      - kafka
6
    depends_on:
7
      - kafka
8
    environment:
9
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
10
      - KAFKA_TOPIC=storm-events
11
      - SOURCE_FILE=StormEvents.csv



The sink connector is where a lot of the magic happens! Let's explore it:

Kafka Sink Connector for Azure Data Explorer

Here is the kusto-connect service in docker compose file:

Java
 




xxxxxxxxxx
1
15


 
1
  kusto-connect:
2
    build:
3
      context: ./connector
4
    ports:
5
      - 8083:8083
6
    links:
7
      - kafka
8
    depends_on:
9
      - kafka
10
    environment:
11
      - BOOTSTRAP_SERVERS=kafka:9092
12
      - GROUP_ID=adx
13
      - CONFIG_STORAGE_TOPIC=my_connect_configs
14
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
15
      - STATUS_STORAGE_TOPIC=my_connect_statuses



The container is built from a Dockerfile - this makes it easier for you to run it locally as opposed to pulling it from an external Docker registry

Java
 




xxxxxxxxxx
1


 
1
FROM debezium/connect:1.2
2
WORKDIR $KAFKA_HOME/connect
3
ARG KUSTO_KAFKA_SINK_VERSION
4
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar



It's based on top of the Debezium Kafka Connect image. Simply download the Kusto Connector JAR (version 1.0.1 at the time of writing) and place it in the Kafka Connect plugins directory. That's it!

Here is what the sink connector configuration file looks like:

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



The process of loading/importing data into a table in Azure Data Explorer is known as Ingestion. This is how the the connector operates as well.

https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview

Behind the scenes, it uses the following modules in the Java SDK for Azure Data Explorer
  • data: to connect, issue (control) commands and query data
  • ingest: to ingest data

At the time of writing, the data formats supported by the connector are: csv, json, txt, avro, apacheAvro, tsv, scsv, sohsv and psv. Data in the Kafka topics is written to files on disk. These are then sent to Azure Data Explorer based on the following connector configurations - when file has reached flush.size.bytes or the flush.interval.ms interval has passed.

The only exception to the above mechanism is the avro and apacheAvro data types which are handled as byte arrays

By "sent to Azure Data Explorer", what I really mean that the file is queued for Ingestion (using IngestClient.ingestFromFile)

Alright, lots of theory so far...

Let's Try It Out!

Clone this repo:

Java
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
2
cd kafka-kusto-ingestion-tutorial



Start off creating an Azure Data Explorer cluster and database using Azure Portal, Azure CLI or any of the client SDKs such as Python.

Once that's done, create a table (Storms) and respective mapping (Storms_CSV_Mapping):

Java
 




xxxxxxxxxx
1


 
1
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
2

          
3
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'



Start Containers and Install the Connector

Before installing the connector, we need to create a Service Principal in order for the connector to authenticate and connect to Azure Data Explorer service.

Use az ad sp create-for-rbac command:

Java
 




xxxxxxxxxx
1


 
1
az ad sp create-for-rbac -n "kusto-sp"



You should get a JSON response as such - please note down the appId, password and tenant as you will be using them in subsequent steps

Java
 




xxxxxxxxxx
1


 
1
{
2
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
3
  "displayName": "kusto-sp",
4
  "name": "http://kusto-sp",
5
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
6
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
7
}



Start the containers:

Java
 




xxxxxxxxxx
1


 
1
docker-compose up



The producer application will start sending events to the storm-events topic. You should see logs similar to:

Java
 




xxxxxxxxxx
1
16


 
1
....
2
events-producer_1  | sent message to partition 0 offset 0
3
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
4
events-producer_1  | 
5
events-producer_1  | sent message to partition 0 offset 1
6
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
7
events-producer_1  | 
8
events-producer_1  | sent message to partition 0 offset 2
9
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
10
events-producer_1  | 
11
events-producer_1  | sent message to partition 0 offset 3
12
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9494,NEW YORK,Winter Weather,Department of Highways
13
events-producer_1  | 
14
events-producer_1  | sent message to partition 0 offset 4
15
events-producer_1  | 2020/08/20 16:51:35 event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
16
....



We can now install the sink connector to consume these events and ingest them into Azure Data Explorer

Replace the values for following attributes in adx-sink-config.json: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (the database name) and kusto.url

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



In a different terminnal, keep a track of the connector service logs:

Java
 




xxxxxxxxxx
1


 
1
docker-compose logs -f | grep kusto-connect



Install the connector:

Java
 




xxxxxxxxxx
1


 
1
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
2

          
3
//check status
4
curl http://localhost:8083/connectors/storm/status



The connector should spring into action. Meanwhile in the other terminal, you should see logs similar to:

Java
 




xxxxxxxxxx
1


 
1
kusto-connect_1    | INFO   ||  Refreshing Ingestion Resources   [com.microsoft.azure.kusto.ingest.ResourceManager]
2
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_0.csv.gz) of size (9192) at current offset (93)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
3
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 1: {storm-events-0=OffsetAndMetadata{offset=94, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
4
ct.runtime.WorkerSinkTask]
5
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_94.csv.gz) of size (1864) at current offset (111)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
6
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 2: {storm-events-0=OffsetAndMetadata{offset=112, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
7
....



Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Java
 




xxxxxxxxxx
1


 
1
Storms | count
2

          
3
. show ingestion failures



Once there is some data, try out a few queries. To see all the records:

Java
 




xxxxxxxxxx
1


 
1
Storms



Use where and project to filter specific data

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| where EventType == 'Drought' and State == 'TEXAS'
3
| project StartTime, EndTime, Source, EventId



Use the summarize operator

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| summarize event_count=count() by State
3
| where event_count > 10
4
| project State, event_count
5
| render columnchart



These are just few examples. Dig into the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

If you want to re-start from scratch, simply stop the containers (docker-compose down -v), delete (drop table Storms) and re-create the Storms table (along with the mapping) and re-start containers (docker-compose up)

Clean Up

To delete the Azure Data Explorer cluster/database, use az cluster delete or az kusto database delete

Java
 




xxxxxxxxxx
1


 
1
az kusto cluster delete -n <cluster name> -g <resource group name>
2
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>



That's a Wrap!

I hope this helps you get started building data ingestion pipelines from Kafka to Azure Data Explorer using the Kafka Connect sink connector. This is not the only way to ingest data into Azure Data Explorer (of course!). You're welcome to explore the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub and much more!

Until next time, happy exploring!

kafka azure Azure Data Explorer Data (computing) Database Java (programming language) Docker (software) Connector (mathematics)

Opinions expressed by DZone contributors are their own.

Trending

  • You’ve Got Mail… and It’s a SPAM!
  • Mainframe Development for the "No Mainframe" Generation
  • Application Architecture Design Principles
  • Explainable AI: Making the Black Box Transparent

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: