DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Reactive Kafka With Spring Boot
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Be Punctual! Avoiding Kotlin’s lateinit In Spring Boot Testing
  • How To Get Closer to Consistency in Microservice Architecture

Trending

  • Recurrent Workflows With Cloud Native Dapr Jobs
  • A Guide to Container Runtimes
  • How the Go Runtime Preempts Goroutines for Efficient Concurrency
  • The Modern Data Stack Is Overrated — Here’s What Works
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Producer Consumer With Kafka and Kotlin

Producer Consumer With Kafka and Kotlin

By 
Unni Mana user avatar
Unni Mana
·
Jun. 08, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
17.0K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

In this article, we will develop a simple Spring Boot application using Kafka and Kotlin. 

Let's get started. Visit https://start.spring.io and add the following dependencies:

Groovy
xxxxxxxxxx
1
 
1
implementation("org.springframework.boot:spring-boot-starter-data-rest")
2
implementation("org.springframework.boot:spring-boot-starter-web")
3
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
4
implementation("org.apache.kafka:kafka-streams")
5
implementation("org.jetbrains.kotlin:kotlin-reflect")
6
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
7
implementation("org.springframework.kafka:spring-kafka")


This demo makes use of Gradle as the build option. You can choose Maven as well.

Generate and download the project. Then, import this project into IntelliJ IDEA.

Download Apache Kafka

Download the latest version of Apache Kafka from its site and unzip it to a folder. I am using Windows 10 OS. So, you may run into some problem when starting Kafka. It is related to "too many lines encountered". This is because Kafka is appending a big folder structure as the name for its path. If this problem persists, you will have to rename the folder structure to a shorter one and start the application from 'Power Shell'

 The following commands are used to start the Kafka: 

Shell
xxxxxxxxxx
1
 
1
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2
.\kafka-server-start.bat ..\..\config\server.properties


You can see these two commands in '/bin/windows' folder. 

In order to run Kafka, you need to start the Zookeeper service first. ZooKeeper is an Apache product that offers distributed configuration service. 

Spring Boot Starter

First step is to create a class called KafkaDemoApplication.kt using your IDE. When you created a project from the Spring Starter web site, this class will be created automatically.

Add the following lines of code:

Kotlin
xxxxxxxxxx
1
 
1
import org.springframework.boot.autoconfigure.SpringBootApplication
2
import org.springframework.boot.runApplication
3
4
@SpringBootApplication
5
class KafkaDemoApplication 
6
7
fun main(args: Array<String>) {
8
   runApplication<KafkaDemoApplication>(*args)
9
}


Producer

We can send the message to the topic in two ways, which are listed below.

Next, we need to develop a controller class, which is used to send and receive the message. Let call this class KafkaController.kt. Then, add the following method:

Kotlin
xxxxxxxxxx
1
 
1
var kafkaTemplate:KafkaTemplate<String, String>? = null;
2
val topic:String = "test_topic"
3
4
@GetMapping("/send")
5
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
6
    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
7
    var sendResult: SendResult<String, String> = lf.get()
8
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
9
}


We are using KafkaTemplate to send the message to a topic called test_topic. This will return a ListenableFuture object from which we can get the result of this action. This approach is the easiest one if  you just want to send a message to a topic.

Another Method

The next method of sending a message to Kafka topic is to use the KafkaProducer object.  We will develop that piece of code.

Kotlin
xxxxxxxxxx
1
13
 
1
@GetMapping("/produce")
2
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
3
    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
4
5
    val map = mutableMapOf<String, String>()
6
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
7
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
8
    map["bootstrap.servers"] = "localhost:9092"
9
10
    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
11
    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
12
    return ResponseEntity.ok(" message sent to " + future.get().topic());
13
}

 

It deserves some explanation.

We need to initialize the KafkaProduce object with a Map that contains a key and value for serialization. In this example, we are dealing with string  message so that we need to use only StringSerializer.  

Basically, a Serializer is an interface in Kafka which will convert a string to bytes. Apache Kafka has other serializers, such as ByteArraySerializer, ByteSerializer, FloatSerializer, etc. 

We specify the key and value of the map with the StringSerializer. 

Kotlin
xxxxxxxxxx
1
 
1
map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
2
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"


The next value is the bootstrap server details that is required to communicate with the Kafka cluster.

Kotlin
xxxxxxxxxx
1
 
1
map["bootstrap.servers"] = "localhost:9092"


All these three attributes are necessary if we use KafkaProducer

Then, we need create a ProducerRecord with the name of the topic and the message itself. This is what is achieved in this line.

Kotlin
xxxxxxxxxx
1
 
1
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)


Now we can send our message to the topic using the following code:

Kotlin
xxxxxxxxxx
1
 
1
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!


This operation will return a future with the name of the topic that is used to send the message.

Consumer

We have  seen how to send a message to a topic. But we need to listen for the incoming message. In order to achieve this, we need a develop a listener so that we can consume the message.

Let's create a class called MessageConsumer.kt and annotate with the @Service annotation.

Kotlin
xxxxxxxxxx
1
 
1
@KafkaListener(topics= ["test_topic"], groupId = "test_id")
2
fun consume(message:String) :Unit {
3
    println(" message received from topic : $message");
4
}

 
This method is used to listen for the message with the help of the @KafkaListener annotation and prints the message on the console once it is available in the topic. But make sure you use the same topic name that is used to send the message to.

You can check the whole source code in my github link repository.

kafka Kotlin (programming language) Spring Framework

Opinions expressed by DZone contributors are their own.

Related

  • Reactive Kafka With Spring Boot
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Be Punctual! Avoiding Kotlin’s lateinit In Spring Boot Testing
  • How To Get Closer to Consistency in Microservice Architecture

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!