Spring Boot With Kafka Communication
In this article, we will be looking into how we can communicate with Kafka from a Spring Boot application to send and receive messages or events.
Join the DZone community and get the full member experience.
Join For FreeIn this article, we will be looking into how we can publish and subscribe to a Kafka topic.
Introduction
Kafka over the years has gained a lot in popularity for its high throughput and real-time asynchronous messaging. It's considered a de facto standard for streaming of events and provides fault-tolerant storage that is stable, reliable, and scalable.
So today we will be looking into how we can communicate with Kafka from a Spring Boot application to send and receive messages or events.
Creating a Producer
Let’s go to https://start.spring.io and create an application adding the spring-kafka
dependency as bellow.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Now let’s create a producer that will send messages to a Kafka topic.
@Component public class KafkaProducer { @Value("${topic.name}") private String topicName; private KafkaTemplate kafkaTemplate; public KafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Scheduled(cron = "*/2 * * * * *") public void sendMessage() { UUID key = UUID.randomUUID(); Message payload = new Message("jack"); System.out.println("Sending Data " + payload); ProducerRecord<String, Message> record = new ProducerRecord<String, Message>(topicName, key.toString(), payload); kafkaTemplate.send(record); } }
Here I have created a producer which is scheduled to send a message every 2 secs. To send the message, we are making use of the KafkaTemplate.
To send the message to the right Kafka broker, we need to provide some configuration. For this, we are going to add some config settings in the properties file as follows.
spring: kafka: bootstrap-servers: - localhost:9092 consumer: client-id: my-client-consumer group-id: spring-application-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageDeSerializer producer: client-id: my-client-application key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageSerializer topic: name: "first-topic"
The Properties may look garbled here, But you can view it in a cleaner manner on my website
Here, we have set the broker properties, the value serializer, and deserializer properties. You can find all supported properties in this class org.springframework.boot.autoconfigure.kafka.KafkaProperties
.
Now, Since I am using a custom message class, I need to provide a custom serializer and deserializer for sending and receiving the message.
The Serializer and Deserializers are pretty simple. You need to implement the org.apache.kafka.common.serialization.Serializer
and org.apache.kafka.common.serialization.Deserializer
as follows.
public class MessageSerializer implements Serializer<Message> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public byte[] serialize(String topic, Message data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { throw new SerializationException(e); } } }
public class MessageDeSerializer implements Deserializer<Message> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public Message deserialize(String topic, byte[] data) { try { return objectMapper.readValue(data, Message.class); } catch (IOException e) { throw new SerializationException(e); } } }
Creating a Consumer
Along with the producer, we have set some consumer properties. So let's create a consumer for the topic.
@Component public class KafkaConsumer { @KafkaListener(id = "my-client-application", topics = "${topic.name}") public void consumer(ConsumerRecord<String, Message> consumerRecord) { System.out.println("Consumed Record Details: " + consumerRecord); Message message = consumerRecord.value(); System.out.println("Consumed Message" + message); } }
Here we have created a component, with a method annotated with KafkaListener
. This method will be invoked whenever there is a message on the Kafka topic.
So with this let's start the application.
java -jar \ target/spring-kafka-communication-service-0.0.1-SNAPSHOT.jar
When we run the application, it sends a message every 2 seconds and the consumer reads the message.

You can find more config options in the documentation here.
Conclusion
In this article, We saw how we can read and send messages on a Kafka topic using Spring-Kafka. In my next post, I would be using Spring Cloud Streams to communicate with Kafka.
I have uploaded the code on GitHub.
Enjoy!!
Published at DZone with permission of Amrut Prabhu. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments