DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer
  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Getting Started With Azure Data Explorer Using the Go SDK

Trending

  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques
  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Distributed Consensus: Paxos vs. Raft and Modern Implementations
  1. DZone
  2. Coding
  3. Tools
  4. Data Ingestion Into Azure Data Explorer Using Kafka Connect

Data Ingestion Into Azure Data Explorer Using Kafka Connect

In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Sep. 28, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
2.9K Views

Join the DZone community and get the full member experience.

Join For Free

In this blog, we will go over how to ingest data into Azure Data Explorer using the open-source Kafka Connect Sink connector for Azure Data Explorer running on Kubernetes using Strimzi. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors and Strimzi provides a "Kubernetes-native" way of running Kafka clusters, as well as Kafka, Connect workers.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from many diverse sources, such as websites, applications, IoT devices, and more. It has a rich connector ecosystem that supports ingestion into Azure Data Explorer as detailed here. One of the supported sources is Apache Kafka and the sink connector allows you to move data from Kafka topics into Azure Data Explorer tables which you can later query and analyze. The best part is that you can do so in a scalable and fault-tolerant way using just configuration!

Here is an overview of the scenario depicted in this blog post:

The Azure Data Explorer Kafka Connector picks up data from the configured Kafka topic and queues up ingestion processes (in batches) which eventually write data to a table in Azure Data Explorer. Behind the scenes, the connector leverages the Java SDK for Azure Data Explorer.

Resources for this blog post are available on GitHub

Prerequisites

You will need an Azure account along with Azure CLI or Azure Cloud Shell.

Here are some quick pointers to setting up an Azure Data Explorer cluster and a managed Kubernetes service on Azure. I recommend installing the below services as a part of a single Azure Resource Group which makes it easy to manage these services

Azure Data Explorer

You can set up an Azure Data Explorer cluster and database using Azure Portal, Azure CLI, or any of the client SDKs such as Python. Once that's done, create a table (named Storms) and respective mapping (named Storms_CSV_Mapping) using below queries:

Java
 




x


 
1
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
2

          
3
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'



Azure Kubernetes Service (optional)

I have used Azure Kubernetes Service (AKS) but the instructions in this blog post should work for other options as well (e.g. with a local minikube cluster on your laptop). You can set up an AKS cluster using Azure CLI, Azure portal, or ARM template

Base Installation

Start by installing the Strimzi Operator and use it to spin up a single-node Kafka cluster on Kubernetes. Installing Strimzi using Helm is pretty easy:

helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-kafka strimzi/strimzi-kafka-operator

To confirm successful installation:

kubectl get pods -l=name=strimzi-cluster-operator

You should see the cluster operator Pod in Running status

Java
 




xxxxxxxxxx
1


 
1
NAME                                        READY   STATUS    RESTARTS   AGE
2
strimzi-cluster-operator-5c66f679d5-69rgk   1/1     Running   0          43s
3

          



To deploy a single-node Kafka cluster (along with Zookeeper):

kubectl apply -f https://github.com/abhirockzz/

kusto-kafka-connect-strimzi/raw/master/deploy/kafka.yaml

Wait for the cluster to start:

kubectl get pod my-kafka-cluster-kafka-0 -w

The Kafka Pod should transition to Running status and both the containers should be in READY state

Java
 




xxxxxxxxxx
1


 
1
NAME                       READY   STATUS    RESTARTS   AGE
2
my-kafka-cluster-kafka-0   2/2     Running   0          1m



Kafka Connect Cluster Setup

The Strimzi container images for Kafka Connect include two built-in file connectors - FileStreamSourceConnector and FileStreamSinkConnector. For this blog, a custom Docker image seeded with Azure Data Explorer connector (version 1.0.1) is available on Docker Hub and it is referenced in the KafkaConnect resource definition (image: abhirockzz/adx-connector-strimzi:1.0.1):

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
image: abhirockzz/adx-connector-strimzi:1.0.1
version: 2.4.0
....

If you want to build your own Docker image, use the Strimzi Kafka Docker image as a base and add the Azure Data Explorer connector JAR top to the plugin path. Start by downloading the connector JAR file:

Java
 




xxxxxxxxxx
1


 
1
export KUSTO_KAFKA_SINK_VERSION=1.0.1
2
mkdir connector && cd connector
3
curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar



Then, you can use this Dockerfile to build the Docker image:

Java
 




xxxxxxxxxx
1


 
1
FROM strimzi/kafka:0.19.0-kafka-2.4.0
2
USER root:root
3
COPY ./connector/ /opt/kafka/plugins/
4
RUN ls -lrt /opt/kafka/plugins/
5
USER 1001


This technique has been illustrated in the Strimzi documentation

Authentication

Before installing the connector, we need to create an Azure Service Principal for the connector to authenticate and connect to the Azure Data Explorer service. You can use the az ad sp create-for-RBAC command:

az ad sp create-for-rbac -n "kusto-sp"

You will get a JSON response as below - please note down the appId, password and tenant as you will be using them in subsequent steps

Java
 




xxxxxxxxxx
1


 
1
{
2
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
3
  "displayName": "kusto-sp",
4
  "name": "http://kusto-sp",
5
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
6
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
7
}



Add permissions to your database

Provide an appropriate role to the Service Principal you just created. To assign the admin role, follow this guide to use the Azure portal or use the following command in your Data Explorer cluster

.add database <database name> admins

('aadapp=<service principal AppID>;

<service principal TenantID>') 'AAD App'

We will seed the auth related config as a Kubernetes Secret - later on, you will see where this Secret is referenced.

Create a file called adx-auth.yaml with the below contents.

Java
 




xxxxxxxxxx
1
11


 
1
apiVersion: v1
2
kind: Secret
3
metadata:
4
  name: adx-auth
5
type: Opaque
6
stringData:
7
  adx-auth.properties: |-
8
    kustoURL: <replace ADX Ingest URL>
9
    tenantID: <enter service principal tenant ID>
10
    appID: <enter service principal app ID>
11
    password: <enter service principal tenant password>



Replace values for the following:

  • kustoURL Azure Data Explorer ingestion URL e.g. https://ingest-[cluster name].[region].kusto.windows.net
  • tenantID Service principal tenant ID
  • appID Service principal application ID
  • password Service principal password

Install Kafka Connect

Create the Secret and initiate the Kafka Cluster creation:

kubectl apply -f adx-auth.yaml

kubectl apply -f https://github.com/abhirockzz/

kusto-kafka-connect-strimzi/raw/master/deploy/kafka-connect.yaml

While you wait for the Kafka Connect cluster to start, take a look at this snippet of the KafkaConnect cluster resource definition. Notice the externalConfiguration attribute that points to the secret we had just created. It is loaded into the Kafka Connect Pod as a Volume and the Kafka FileConfigProvider is used to access them.

Java
 




xxxxxxxxxx
1
15


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaConnect
3
metadata:
4
  name: my-connect-cluster
5
spec:
6
  image: abhirockzz/adx-connector-strimzi:1.0.1
7
  config:
8
    ...
9
    config.providers: file
10
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
11
  externalConfiguration:
12
    volumes:
13
      - name: adx-auth-config
14
        secret:
15
          secretName: adx-auth



To check Kafka Connect cluster status:

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -w

Wait for the Kafka Connect Pod to transition into Running state.

Java
 




xxxxxxxxxx
1


 
1
NAME                                          READY   STATUS    RESTARTS   AGE
2
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          1m



Create the Topic and Install Connector

You can use the Strimzi Entity Operator to create the storm-events topic. Here is the Topic definition:

Java
 




xxxxxxxxxx
1


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaTopic
3
metadata:
4
  name: storm-events
5
  labels:
6
    strimzi.io/cluster: my-kafka-cluster
7
spec:
8
  partitions: 3
9
  replicas: 1



To create:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://github.com/abhirockzz/kusto-kafka-connect-strimzi/raw/master/deploy/topic.yaml



Use kubectl get kafkatopic to see the topic you just created as well as internal Kafka topics

Java
 




xxxxxxxxxx
1


 
1
NAME                                                          PARTITIONS   REPLICATION FACTOR
2
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           1
3
storm-events                                                  3            1
4
strimzi-connect-cluster-configs                               1            1
5
strimzi-connect-cluster-offsets                               25           1
6
strimzi-connect-cluster-status                                5            1



Here is snippet of the connector (KafkaConnector) definition - it's just a way to capture configuration and metadata for the connector you want to install.

Java
 




xxxxxxxxxx
1
20


 
1
apiVersion: kafka.strimzi.io/v1alpha1
2
kind: KafkaConnector
3
metadata:
4
  name: adx-sink-connector
5
  labels:
6
    strimzi.io/cluster: my-connect-cluster
7
spec:
8
  class: com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
9
  tasksMax: 3
10
  config:
11
    topics: storm-events
12
    flush.size.bytes: 10000
13
    flush.interval.ms: 50000
14
    kusto.tables.topics.mapping: "[{'topic': 'storm-events','db': '[REPLACE DATABASE NAME]', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]"
15
    kusto.url: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:kustoURL}
16
    aad.auth.authority: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:tenantID}
17
    aad.auth.appid: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:appID}
18
    aad.auth.appkey: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:password}
19
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
20
    value.converter: "org.apache.kafka.connect.storage.StringConverter"


The flush.size.bytes and flush.interval.ms attributes work in tandem with each other and serve as a performance knob for batching. Please refer to the connector GitHub repo for details on these and other configuration parameters

Notice how the individual properties (from the Secret) are referenced. For example, to reference the Service Principal application ID, we used this:

Java
 




xxxxxxxxxx
1


 
1
aad.auth.appid: ${file:/opt/kafka/external-configuration/adx-auth-config/adx-auth.properties:appID}


  • /opt/kafka/external-configuration is a fixed path inside the container
  • adx-auth-config is the name of the volume in the KafkaConnect definition
  • adx-auth.properties is the name of the file as defined in the Secret
  • appID is the name of the key

The direct attribute name has been used to define non-sensitive connector configs (e.g. topics: storm-events). Alternatively, can encapsulate these in a ConfigMap, load them as a Volume and reference them (just like the sensitive attributes using a Secret).

Copy the above definition for the KafkaConnector to local file adx-connect-config.yaml. Make sure you replace the correct database name in the kusto.tables.topics.mapping attribute. To create:

kubectl apply -f adx-connect-config.yaml

Check the Kafka connect logs kubectl logs -l=strimzi.io/cluster=my-connect-cluster. If everything is working fine, you should see logs similar to this:

Java
 




xxxxxxxxxx
1


 
1
....
2
INFO [Consumer clientId=connector-consumer-adx-sink-connector-1, groupId=connect-adx-sink-connector] Resetting offset for partition storm-events-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-adx-sink-connector-1]
3

          
4
INFO [Consumer clientId=connector-consumer-adx-sink-connector-2, groupId=connect-adx-sink-connector] Resetting offset for partition storm-events-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-adx-sink-connector-2]



Data Ingestion in Action

So, we have everything setup. All we need is events to be sent to the Kafka topic so that we can see the connector in action and ingest data into Azure Data Explorer.

You can use this handy event generator application (available in Docker Hub) and deploy it to your Kubernetes cluster - the Dockerfile is available in the GitHub repo in case you want to reference it.

Kubernetes Deployment snippet:

Java
 




xxxxxxxxxx
1
19


 
1
apiVersion: apps/v1
2
kind: Deployment
3
metadata:
4
  name: adx-event-producer
5
spec:
6
  replicas: 1
7
  ....
8
    spec:
9
      containers:
10
        - name: adx-event-producer
11
          image: abhirockzz/adx-event-producer
12
          imagePullPolicy: Always
13
          env:
14
            - name: KAFKA_BOOTSTRAP_SERVER
15
              value: my-kafka-cluster-kafka-bootstrap:9092
16
            - name: KAFKA_TOPIC
17
              value: storm-events
18
            - name: SOURCE_FILE
19
              value: StormEvents.csv



To deploy the producer application:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://github.com/abhirockzz/kusto-kafka-connect-strimzi/raw/master/deploy/producer.yaml



The application picks up records from the StormEvents.csv file and sends them to a Kafka topic. Each event is a CSV record that represents data for a Storm occurrence (start and end time, state, type, etc.), for example: 2007-01-01 00:00:00.0000000,2007-01-01

05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer.

The producer application waits for 3 seconds between subsequent produce operations to Kafka. This is intentional so that you can monitor the Kafka Connect logs and make sense of what's going on. The StormEvents.csv file contains more than 50,000 records, so it might take a while for all of them to be batched and ingested to Azure Data Explorer

You can track the application logs using: kubectl logs -f -l app=adx-event-producer. If all is well, you should see something similar to this:

Java
 




xxxxxxxxxx
1


 
1
...
2
sent message to partition 0 offset 0
3
event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
4

          
5
sent message to partition 0 offset 1
6
event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
7

          
8
sent message to partition 0 offset 2
9
event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer



The storm-events the topic will now start getting events and these will be picked up by the sink connector. If you were to track the connector logs:

kubectl logs -f -l strimzi.io/cluster=my-connect-cluster

... you should see logs similar to this:

Java
 




xxxxxxxxxx
1


 
1
....
2
INFO Kusto ingestion: file (/tmp/kusto-sink-connector-17d03941-f8ca-498e-bc52-68ced036dc69/kafka_storm-events_0_0.csv.gz) of size (1722) at current offset (16) (com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter) [Timer-6]
3

          
4
INFO WorkerSinkTask{id=adx-sink-connector-0} Committing offsets asynchronously using sequence number 17: {storm-events-0=OffsetAndMetadata{offset=17, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-adx-sink-connector-0]
5

          
6
INFO Kusto ingestion: file (/tmp/kusto-sink-connector-17d03941-f8ca-498e-bc52-68ced036dc69/kafka_storm-events_0_17.csv.gz) of size (1666) at current offset (33) (com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter) [Timer-7]
7
....



Query Azure Data Explorer

Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Storms | count
. show ingestion failures

Once there is some data, try out a few queries. To see all the records:

Storms

Use where and project to filter specific data

Storms
| where EventType == 'Drought' and State == 'TEXAS'
| project StartTime, EndTime, Source, EventId

Use the summarize operator.

Storms
| summarize event_count=count() by State
| where event_count > 10
| project State, event_count
| render columnchart

These are just few examples. Please take a look at the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

Clean Up Resources

To delete the connector and/or Kafka cluster:

kubectl delete kafkaconnect/my-connect-cluster
kubectl delete kafka/my-kafka-cluster

To delete the AKS and Azure Data Explorer clusters, simply delete the resource group:

az group delete --name <AZURE_RESOURCE_GROUP> --yes --no-wait

Conclusion

That's all for this blog post and I hope you found it useful! Please note that, this is not the only way to ingest data into Azure Data Explorer. You're welcome to refer to the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub etc.

Please consider exploring the following topics as additional learning resources:

Resources

  • Configuring Kafka Connect cluster using Strimzi
  • Strimzi KafkaConnect schema reference
  • Strimzi KafkaConnector schema reference
  • Just Enough Azure Data Explorer for Cloud Architects
  • What's new in Azure Data Explorer connector 1.x
  • Kusto Query Language (quick reference?WT.mc_id=dzone-blog-abhishgu)
kafka Azure Data Explorer Data (computing) azure Database Docker (software) Connector (mathematics) Kubernetes Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer
  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Getting Started With Azure Data Explorer Using the Go SDK

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!