DZone
Microservices Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Microservices Zone > Spring Boot With Kafka Communication

Spring Boot With Kafka Communication

In this article, we will be looking into how we can communicate with Kafka from a Spring Boot application to send and receive messages or events.

Amrut Prabhu user avatar by
Amrut Prabhu
·
Dec. 09, 21 · Microservices Zone · Tutorial
Like (5)
Save
Tweet
4.68K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will be looking into how we can publish and subscribe to a Kafka topic. 

Introduction

Kafka over the years has gained a lot in popularity for its high throughput and real-time asynchronous messaging. It's considered a de facto standard for streaming of events and provides fault-tolerant storage that is stable, reliable, and scalable.

So today we will be looking into how we can communicate with Kafka from a Spring Boot application to send and receive messages or events.

Creating a Producer

Let’s go to https://start.spring.io and create an application adding the spring-kafka dependency as bellow.

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>


Now let’s create a producer that will send messages to a Kafka topic.

@Component
public class KafkaProducer {

    @Value("${topic.name}")
    private String topicName;

    private KafkaTemplate kafkaTemplate;

    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(cron = "*/2 * * * * *")
    public void sendMessage() {
        UUID key = UUID.randomUUID();
        Message payload = new Message("jack");
        System.out.println("Sending Data " + payload);

        ProducerRecord<String, Message> record = new ProducerRecord<String, Message>(topicName,
                key.toString(),
                payload);
        
        kafkaTemplate.send(record);
    }
}


Here I have created a producer which is scheduled to send a message every 2 secs. To send the message, we are making use of the KafkaTemplate.

To send the message to the right Kafka broker, we need to provide some configuration. For this, we are going to add some config settings in the properties file as follows.

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    consumer:
      client-id: my-client-consumer
      group-id: spring-application-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageDeSerializer
    producer:
      client-id: my-client-application
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageSerializer
        
topic:
  name: "first-topic"

The Properties may look garbled here, But you can view it in a cleaner manner on my website  

Here, we have set the broker properties, the value serializer, and deserializer properties. You can find all supported properties in this class org.springframework.boot.autoconfigure.kafka.KafkaProperties.

Now, Since I am using a custom message class, I need to provide a custom serializer and deserializer for sending and receiving the message. 

The Serializer and Deserializers are pretty simple. You need to implement the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer as follows.

public class MessageSerializer implements Serializer<Message> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, Message data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }

    }
}
public class MessageDeSerializer implements Deserializer<Message> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public Message deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, Message.class);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }
}


Creating a Consumer

Along with the producer, we have set some consumer properties. So let's create a consumer for the topic.

@Component
public class KafkaConsumer {

    @KafkaListener(id = "my-client-application", topics = "${topic.name}")
    public void consumer(ConsumerRecord<String, Message> consumerRecord) {
        System.out.println("Consumed Record Details: " + consumerRecord);
        Message message = consumerRecord.value();
        System.out.println("Consumed Message" + message);
    }
}


Here we have created a component, with a method annotated with KafkaListener. This method will be invoked whenever there is a message on the Kafka topic.

So with this let's start the application. 

java -jar \ 
target/spring-kafka-communication-service-0.0.1-SNAPSHOT.jar


When we run the application, it sends a message every 2 seconds and the consumer reads the message.

Screenshot of application run


You can find more config options in the documentation here.

Conclusion

In this article, We saw how we can read and send messages on a Kafka topic using Spring-Kafka. In my next post, I would be using Spring Cloud Streams to communicate with Kafka. 

I have uploaded the code on GitHub.

Enjoy!!

kafka Spring Framework Spring Boot

Published at DZone with permission of Amrut Prabhu. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Your Old Laptop Is Your New Database Server
  • Data Visualization of Healthcare Expenses by Country Using Web Scraping in Python
  • Taking Your App Offline with the Salesforce Mobile SDK
  • Privacy and the 7 Laws of Identity

Comments

Microservices Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo