Event-Driven Architecture With Apache Kafka for .NET Developers Part 1: Event Producer
Learn to set up the development environment and build an an end-to-end microservices application with .NET Core and Apache Kafka.
Join the DZone community and get the full member experience.Join For Free
In This Series:
- Development Environment and Event Producer (this article)
- Event Consumer
- Azure Event Hubs Integration
An event-driven architecture utilizes events to trigger and communicate between microservices. An event is a change in the service's state, such as an item being added to the shopping cart. When an event occurs, the service produces an event notification which is a packet of information about the event.
The architecture consists of an event producer, an event router, and an event consumer. The producer sends events to the router, and the consumer receives the events from the router. Depending on the capability, the router can push the events to the consumer or send the events to the consumer on request (poll). The producer and the consumer services are decoupled, which allows them to scale, deploy, and update independently.
Apache Kafka is one of the most popular open-source event streaming platforms. It is horizontally scalable, distributed, and fault-tolerant by design. Kafka's programming model is based on the publish-subscribe pattern. With Kafka, publishers send messages to topics, which are named logical channels. A subscriber to a topic receives all the messages published to the topic. In an event-driven architecture, Kafka is used as an event router, and the microservices publish and subscribe to the events.
In this article, we will learn how to prepare the local environment for development and publish messages to Kafka. My subsequent articles will focus on building the components of an end-to-end application that will help you build event-driven microservices.
The complete source code of the application and other artifacts is available in my GitHub repository.
I will briefly discuss the components of Kafka that are relevant to us for using Kafka as a message broker. Apart from the publish-subscribe model, Kafka also supports a Streams API that is useful for transforming data from one topic to another, and a Connect API that helps you implement connectors that pull data from external systems into Kafka or push data from Kafka to external systems. These APIs are outside the scope of this article. To understand Kafka's architecture in detail, please read the Introduction to Kafka article on the Confluent docs website.
We understand that Kafka acts as a middleman that enables exchanging information from producers to consumers. Kafka can be set up across multiple servers, which are called Kafka brokers. With multiple brokers, you get the benefit of data replication, fault tolerance, and high availability of your Kafka cluster.
Following is a high-level system design of a Kafka cluster:
The metadata of Kafka cluster processes is stored in an independent system called Apache Zookeeper. Zookeeper helps Kafka perform several critical functions, such as electing a leader in case of node failure. It also maintains the list of consumers in a consumer group and manages the access control list of Kafka topics.
The first level segregation of events/messages in Kafka occurs through a Kafka object called the topic. The event producer publishes events to a topic which Kafka can subsequently broadcast to interested consumers. Think of a topic as a collection of FIFO (First In First Out) queues. You can either randomly store a message in one of the queues or place related messages on a single queue to guarantee FIFO. Each of the queues within a topic is called a topic partition. Each message in a queue is placed at a unique position number called an offset.
You can combine multiple consumers in a consumer group to scale out the consumption of messages from a topic. A consumer group is identified through a unique group id. Kafka balances the allocation of partitions between individual consumers of a consumer group to avoid the duplicate processing of messages.
After a consumer consumes a message stored at an offset, it commits the message to inform Kafka that it is done processing it. On the subsequent request, the consumer will receive the message at the next offset and so on.
Local Environment Setup
Setting up a development environment to work with Kafka is reasonably easy with Docker Compose. You can share Docker Compose specifications with other developers in your team to ensure environment consistency. We will use Docker Compose to set up a Kafka cluster that consists of the following components:
- Apache Zookeeper: The Zookeeper dependency will be removed from Kafka in the future by some vendors such as Confluent. Read the latest documentation from the vendor that you intend to use for Kafka.
- Kafdrop: Kafdrop is a popular web-based user interface for viewing Kafka topics and browsing consumer groups. It makes your Kafka cluster observable, which helps you diagnose issues and helps you with development.
- Schema Registry: Schema Registry is a service that lives outside of your cluster and allows the developers to manage the message schemas. Kafka supports messages in Avro, JSON, and Protobuf formats, and the Schema Registry supports the storage and retrieval of versioned schemas in all those formats. You can read more about Schema Registry on the Confluent docs website.
Several vendors publish Zookeeper and Kafka Docker images with slight differences in behavior and configuration. I typically use the distributions from Bitnami. However, you can also use the distributions from Confluence, Spotify, and Wurstmeister. Bitnami and Confluence build and test the images nightly, and they are also compatible with each other, so I recommend using them.
Create a file named docker-compose.yml and populate the file with the contents of the following listing:
We created the services that we previously discussed within the same network for discoverability. The environment variable setting values of the services are set with their recommended values, so modify them with caution.
From your terminal, change to the directory where you stored this file and run the following command to start the services:
After some time, execute the following command to verify whether the services are healthy.
The following screenshot presents the output of the command:
Our monitoring utility, Kafdrop, is now available at http://localhost:9000.
You can only view the two special topics at this time: the
__consumer_offsets topic that records the offsets processed by the consumers and the
_schemas topic that the Schema Registry uses to store the versioned schemas of messages. You will see more topics here when we create them programmatically.
Demo Application: TimeOff
To explore how we can use Kafka to build event-driven applications, we will build a very simple employee leave management system. Our application consists of the following services:
- Employee service: An employee can use this service to submit a leave application. This service submits the leave application received event to the leave-applications Kafka topic.
- Manager service: This service consumes the events from the leave-applications topic and records the manager's input. The application's result is sent as an event named leave application processed to the leave-applications-results Kafka topic.
- Result reader service: This service displays the approved or unapproved leave applications by consuming the messages from the leave-applications-results Kafka topic.
Following is the sequence diagram of the interaction between the services:
Producer Example: Employee Service
Use Visual Studio or VS Code to create a new .NET Core console application and name it TimeOff.Employee. For reference, you can locate this project in the GitHub repository with the same name. Please note that the code documented in this article might differ slightly from the code in the repository because I have enabled the reuse of code through common functions and shared models. However, the code in the repository is still not refactored to a high degree so that it is easy to comprehend and does not require much navigation.
Before we proceed, I want to discuss the message formats available in Kafka briefly. Kafka supports Avro, Protobuf, and JSON formatted messages. These message formats are supported by the Schema Registry as well. Avro is a preferred format over others if all the services in the ecosystem can support it. You can read about why Avro is the better data serialization format for stream data on the Confluent website.
To enable our application to work with Kafka, you need the Kafka .NET Client. Also, since we will use the Confluent Schema Registry to enforce contracts between the producer and consumer, we need the serializer (for the producer) and deserializer (for the consumer) for our applications. We will use Avro formatted messages in our application, and so we will install the Avro serializer in our project. Use the following commands to install the required NuGet packages in your project:
Open the Program class file in your editor and begin populating the
Main method as per the directions. Let's start with initializing an Admin client (
IAdminClient) to create a topic, a Producer client (
IProducer) to publish messages to Kafka, and the Schema Registry client (
CachedSchemaRegistryClient) to enforce schema constraints on the producer.
Each client requires certain initialization parameters, such as the Bootstrap servers, which is the list of brokers that the client will connect to initially. After the initial connection, the client discovers the rest of the brokers automatically. The schema registry requires the address of the Schema Registry server. Use the following code to create the configurations that will be used to initialize the clients.
Let's first create the topic that will receive our messages. Add the following code to your program to create a new topic named leave-applications with three partitions.
You must be wondering why we created three partitions? We want to explore how the producer can write to different partitions of a topic. An employee belongs to a department, so we will create a partition for each department in the leave-applications topic. Employee applications will be queued sequentially within each department. Let's create an enumeration named
Department now, which we will later use in the producer's logic.
We are now ready to write to our topic. But before we do that, we need to specify the schema of the message that we will write to our topic.
Avro Schema and Avrogen
Create an Avro schema file named LeaveApplicationReceived.avsc in a class library project named TimeOff.Models so that we can share it between the producer and the consumer. Add the following schema specification to the file:
We will convert the Avro schema to a C# class file so that it is understood by our .NET Core Avro serializers and deserializers. We will use the avrogen tool from Confluence to automatically generate the C# class file from our Avro specification.
To install the avrogen tool, execute the following command:
Next, in your terminal, change to the directory containing the LeaveApplicationReceived.avsc file and execute the following command to generate the C# file.
Copy the generated file LeaveApplicationReceived.cs from the output folder and paste it at the root of the project.
Let's go back to the
Program class and continue editing the
Main method to write the message producer logic as follows:
Let's navigate through the code together. We created an instance of the
CachedSchemaRegistryClient class, which allows us to access the schema registry. Kafka exposes the message producer capabilities through the
IProducer interface. We embedded the Avro key and value serializers into the
IProducer instance. The Avro serializers use the schema registry client to register a new schema, and they record the schema id with the message sent to Kafka topic.
CachedSchemaRegistryClient maintains a local cache of schemas for validation to minimize the number of calls to the Schema Registry. The
ProduceAsync method accepts the partition index and the message to send the message to the relevant partition of the topic.
Let's execute the application now to record a few leave applications as follows:
Let's use Kafdrop to view the newly registered schema.
Let's also view the newly added messages with Kafdrop.
In this article, we learned the basics of Kafka as a message mediator. We set up a local Kafka environment and learned how to use Schema Registry and the Kafka Producer API to send messages to a Kafka topic. We used Kafdrop to inspect the schema and the messages in Kafka.
In the following article, we will learn to write a message consumer using the Kafka Consumer API.
Please share your comments and feedback in the comments section or on my Twitter handle @rahulrai_in.
Published at DZone with permission of Rahul Rai, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.