{{announcement.body}}
{{announcement.title}}

Tutorial: Data Ingestion From Kafka to Azure Data Explorer

DZone 's Guide to

Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Use the Azure Data Explorer sink connector to ingest data from Apache Kafka.

· Big Data Zone ·
Free Resource

This blog will cover data ingestion from Kafka to Azure Data Explorer (Kusto) using Kafka Connect.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from any diverse sources, such as websites, applications, IoT devices, and more. Kafka Connect platform allows you to stream data between Apache Kafka and external systems in a scalable and reliable manner. The Kafka Connect Sink connector for Azure Data Explorer allows you to move data in Kafka topics to Azure Data Explorer tables which you can later query and analyze.

Here is the GitHub repo for this blog - https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial

The goal is to get started quickly, so we will keep things simple and Docker-ize everything! This includes Kafka, Zookeeper, Kafka Connect worker and the event generator application - defined in docker-compose.yaml

Over the course of this tutorial, you will:

  • Get an overview of the individual components
  • Configure and setup Azure Data Explorer and install the connector
  • Run the end to end demo

Prerequisites

Overview

As previously mentioned, all the components are defined inside docker-compose.yaml file. Let's go over it bit by bit:

The Kafka and Zookeeper part is pretty straightforward - using the debezium images

Java
 




x
15


 
1
  zookeeper:
2
    image: debezium/zookeeper:1.2
3
    ports:
4
      - 2181:2181
5
  kafka:
6
    image: debezium/kafka:1.2
7
    ports:
8
      - 9092:9092
9
    links:
10
      - zookeeper
11
    depends_on:
12
      - zookeeper
13
    environment:
14
      - ZOOKEEPER_CONNECT=zookeeper:2181
15
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092



The events-producer service is a simple application that sends Storm Events data to a Kafka topic. Storm Events data is a canonical example used throughout the Azure Data Explorer documentation (for example, check this Quickstart and the complete CSV file). The producer app uses the original CSV, but only includes selected fields (such as start and end time, state, source etc.) rather than the entire row (which has more than 20 columns). Here is the sample data:

Java
 




xxxxxxxxxx
1


 
1
2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
2
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
3
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9487,NEW YORK,Winter Weather,Department of Highways
4
...



The service component in Docker Compose is defined as such:

Java
 




xxxxxxxxxx
1
11


 
1
  events-producer:
2
    build:
3
      context: ./storm-events-producer
4
    links:
5
      - kafka
6
    depends_on:
7
      - kafka
8
    environment:
9
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
10
      - KAFKA_TOPIC=storm-events
11
      - SOURCE_FILE=StormEvents.csv



The sink connector is where a lot of the magic happens! Let's explore it:

Kafka Sink Connector for Azure Data Explorer

Here is the kusto-connect service in docker compose file:

Java
 




xxxxxxxxxx
1
15


 
1
  kusto-connect:
2
    build:
3
      context: ./connector
4
    ports:
5
      - 8083:8083
6
    links:
7
      - kafka
8
    depends_on:
9
      - kafka
10
    environment:
11
      - BOOTSTRAP_SERVERS=kafka:9092
12
      - GROUP_ID=adx
13
      - CONFIG_STORAGE_TOPIC=my_connect_configs
14
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
15
      - STATUS_STORAGE_TOPIC=my_connect_statuses



The container is built from a Dockerfile - this makes it easier for you to run it locally as opposed to pulling it from an external Docker registry

Java
 




xxxxxxxxxx
1


 
1
FROM debezium/connect:1.2
2
WORKDIR $KAFKA_HOME/connect
3
ARG KUSTO_KAFKA_SINK_VERSION
4
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar



It's based on top of the Debezium Kafka Connect image. Simply download the Kusto Connector JAR (version 1.0.1 at the time of writing) and place it in the Kafka Connect plugins directory. That's it!

Here is what the sink connector configuration file looks like:

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



The process of loading/importing data into a table in Azure Data Explorer is known as Ingestion. This is how the the connector operates as well.

https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview

Behind the scenes, it uses the following modules in the Java SDK for Azure Data Explorer
  • data: to connect, issue (control) commands and query data
  • ingest: to ingest data

At the time of writing, the data formats supported by the connector are: csv, json, txt, avro, apacheAvro, tsv, scsv, sohsv and psv. Data in the Kafka topics is written to files on disk. These are then sent to Azure Data Explorer based on the following connector configurations - when file has reached flush.size.bytes or the flush.interval.ms interval has passed.

The only exception to the above mechanism is the avro and apacheAvro data types which are handled as byte arrays

By "sent to Azure Data Explorer", what I really mean that the file is queued for Ingestion (using IngestClient.ingestFromFile)

Alright, lots of theory so far...

Let's Try It Out!

Clone this repo:

Java
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
2
cd kafka-kusto-ingestion-tutorial



Start off creating an Azure Data Explorer cluster and database using Azure Portal, Azure CLI or any of the client SDKs such as Python.

Once that's done, create a table (Storms) and respective mapping (Storms_CSV_Mapping):

Java
 




xxxxxxxxxx
1


 
1
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
2
 
          
3
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'



Start Containers and Install the Connector

Before installing the connector, we need to create a Service Principal in order for the connector to authenticate and connect to Azure Data Explorer service.

Use az ad sp create-for-rbac command:

Java
 




xxxxxxxxxx
1


 
1
az ad sp create-for-rbac -n "kusto-sp"



You should get a JSON response as such - please note down the appId, password and tenant as you will be using them in subsequent steps

Java
 




xxxxxxxxxx
1


 
1
{
2
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
3
  "displayName": "kusto-sp",
4
  "name": "http://kusto-sp",
5
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
6
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
7
}



Start the containers:

Java
 




xxxxxxxxxx
1


 
1
docker-compose up



The producer application will start sending events to the storm-events topic. You should see logs similar to:

Java
 




xxxxxxxxxx
1
16


 
1
....
2
events-producer_1  | sent message to partition 0 offset 0
3
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
4
events-producer_1  | 
5
events-producer_1  | sent message to partition 0 offset 1
6
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
7
events-producer_1  | 
8
events-producer_1  | sent message to partition 0 offset 2
9
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
10
events-producer_1  | 
11
events-producer_1  | sent message to partition 0 offset 3
12
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9494,NEW YORK,Winter Weather,Department of Highways
13
events-producer_1  | 
14
events-producer_1  | sent message to partition 0 offset 4
15
events-producer_1  | 2020/08/20 16:51:35 event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
16
....



We can now install the sink connector to consume these events and ingest them into Azure Data Explorer

Replace the values for following attributes in adx-sink-config.json: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (the database name) and kusto.url

Java
 




xxxxxxxxxx
1
17


 
1
{
2
    "name": "storm",
3
    "config": {
4
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
5
        "flush.size.bytes": 10000,
6
        "flush.interval.ms": 50000,
7
        "tasks.max": 1,
8
        "topics": "storm-events",
9
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
10
        "aad.auth.authority": "<enter tenant ID>",
11
        "aad.auth.appid": "<enter application ID>",
12
        "aad.auth.appkey": "<enter client secret>",
13
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
14
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
15
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
16
    }
17
}



In a different terminnal, keep a track of the connector service logs:

Java
 




xxxxxxxxxx
1


 
1
docker-compose logs -f | grep kusto-connect



Install the connector:

Java
 




xxxxxxxxxx
1


 
1
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
2
 
          
3
//check status
4
curl http://localhost:8083/connectors/storm/status



The connector should spring into action. Meanwhile in the other terminal, you should see logs similar to:

Java
 




xxxxxxxxxx
1


 
1
kusto-connect_1    | INFO   ||  Refreshing Ingestion Resources   [com.microsoft.azure.kusto.ingest.ResourceManager]
2
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_0.csv.gz) of size (9192) at current offset (93)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
3
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 1: {storm-events-0=OffsetAndMetadata{offset=94, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
4
ct.runtime.WorkerSinkTask]
5
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_94.csv.gz) of size (1864) at current offset (111)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
6
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 2: {storm-events-0=OffsetAndMetadata{offset=112, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
7
....



Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Java
 




xxxxxxxxxx
1


 
1
Storms | count
2
 
          
3
. show ingestion failures



Once there is some data, try out a few queries. To see all the records:

Java
 




xxxxxxxxxx
1


 
1
Storms



Use where and project to filter specific data

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| where EventType == 'Drought' and State == 'TEXAS'
3
| project StartTime, EndTime, Source, EventId



Use the summarize operator

Java
 




xxxxxxxxxx
1


 
1
Storms
2
| summarize event_count=count() by State
3
| where event_count > 10
4
| project State, event_count
5
| render columnchart



These are just few examples. Dig into the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

If you want to re-start from scratch, simply stop the containers (docker-compose down -v), delete (drop table Storms) and re-create the Storms table (along with the mapping) and re-start containers (docker-compose up)

Clean Up

To delete the Azure Data Explorer cluster/database, use az cluster delete or az kusto database delete

Java
 




xxxxxxxxxx
1


 
1
az kusto cluster delete -n <cluster name> -g <resource group name>
2
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>



That's a Wrap!

I hope this helps you get started building data ingestion pipelines from Kafka to Azure Data Explorer using the Kafka Connect sink connector. This is not the only way to ingest data into Azure Data Explorer (of course!). You're welcome to explore the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub and much more!

Until next time, happy exploring!

Topics:
azure, big data, cloud (add topic), docker, kafka, kafka connect platform, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}