DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Using KRaft Kafka for Development and Kubernetes Deployment
  • Setting Up Local Kafka Container for Spring Boot Application
  • Reactive Kafka With Spring Boot
  • Custom Health Checks in Spring Boot

Trending

  • AI, ML, and Data Science: Shaping the Future of Automation
  • How to Configure and Customize the Go SDK for Azure Cosmos DB
  • Transforming AI-Driven Data Analytics with DeepSeek: A New Era of Intelligent Insights
  • Building Enterprise-Ready Landing Zones: Beyond the Initial Setup
  1. DZone
  2. Coding
  3. Tools
  4. Preventing Data Loss With Kafka Listeners in Spring Boot

Preventing Data Loss With Kafka Listeners in Spring Boot

In this article, we'll look at how to build Kafka listeners with Spring Boot and how to use Kafka's acknowledgment mechanisms.

By 
Viacheslav Shago user avatar
Viacheslav Shago
·
May. 08, 23 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
26.0K Views

Join the DZone community and get the full member experience.

Join For Free

Data loss is one of the biggest problems developers face when building distributed systems. Whether due to network issues or code bugs, data loss can have serious consequences for enterprises. In this article, we'll look at how to build Kafka listeners with Spring Boot and how to use Kafka's acknowledgment mechanisms to prevent data loss and ensure the reliability of our systems.

Apache Kafka

Apache Kafka is a distributed message platform used to store and deliver messages. Once a message is written to Kafka, it will be kept there according to a retention policy. The consumer groups mechanism is used to read out messages. The offset for each consumer group is used to understand the stage of message processing and to keep track of the progress of each consumer group in reading messages from a partition. It allows each consumer group to independently read messages from a topic and resume reading from where it left off in case of failures or restarts. In a simplified way, this can be represented as follows:
Apache Kafka process

After successfully processing a message, a consumer sends an acknowledgment to Kafka, and the offset pointer for that consumer group is shifted. As mentioned earlier, other consumer groups store their offset values in the message broker, allowing messages to be read independently.

When we talk about high-reliability systems that must guarantee no data loss, we must consider all possible scenarios. Apache Kafka, by design, already has the features to ensure reliability. We, as consumers of messages, must also provide proper reliability. But what can go wrong?

  • The consumer receives the message and crashes before he can process it
  • The consumer receives the message, processes it, and then crashes
  • Any network problems

This can happen for reasons beyond our control — temporary network unavailability, an incident on the instance, pod eviction in a K8s cluster, etc.

Kafka allows guaranteeing message delivery using the acknowledgment mechanism — at least once delivery. It means that the message will be delivered at least once, but under certain circumstances, it can be delivered several times. All we need to do is to configure Apache Kafka correctly and be able to react to duplicate messages if needed. Let's try to implement this in practice.

Run Apache Kafka

To start the message broker, we also need the zookeeper. The easiest way to do this is with docker-compose. Create the file docker-compose.yml:

YAML
 
---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.3
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.3
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1


Create a new topic:

Shell
 
docker exec broker \
    kafka-topics --bootstrap-server broker:9092 \
    --create \
    --topic demo


To produce messages, you can run the command:

Shell
 
docker exec -ti broker \
    kafka-console-producer --bootstrap-server broker:9092 \
    --topic demo


Each line is a new message. When finished, press Ctrl+C:

Shell
 
>first
>second
>third
>^C%


Messages have been written and will be stored in Apache Kafka.

Spring Boot Application

Create a gradle project and add the necessary dependencies to build.gradle:

Groovy
 
plugins {
	id 'java'
	id 'org.springframework.boot' version '2.7.10'
	id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

repositories {
	mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok:1.18.26'
    annotationProcessor 'org.projectlombok:lombok:1.18.26'

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testCompileOnly 'org.projectlombok:lombok:1.18.26'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.26'
}


application.yml:

YAML
 
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


Let's write an event handler:

Java
 
@Component
@Slf4j
public class DemoListener {

    @KafkaListener(topics = "demo", groupId = "demo-group")
    void processKafkaEvents(ConsumerRecord<String, String> record) {
        log.info("Try to process message");
        
        // Some code
        
        log.info("Processed value: " + record.value());
    }
}


Execution Result:

Shell
 
Try to process message
Processed value: first
Try to process message
Processed value: second
Try to process message
Processed value: third


But what if an error happens during message processing? In that case, we need to handle it correctly. If this error is related to an invalid message, we can write to the log or place this message in a separate topic — DLT (dead letter topic) for further parsing of this message. And what if processing implies calling another microservice, but that microservice doesn't answer? In this case, we may need the retry mechanism.

To implement it, we can configure DefaultErrorHandler:

Java
 
@Configuration
@Slf4j
public class KafkaConfiguration {
    @Bean
    public DefaultErrorHandler errorHandler() {
        BackOff fixedBackOff = new FixedBackOff(5000, 3);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
            log.error("Couldn't process message: {}; {}", consumerRecord.value().toString(), exception.toString());
        }, fixedBackOff);

        errorHandler.addNotRetryableExceptions(NullPointerException.class);
        return errorHandler;
    }
}


Here we have specified that in case of an error, we will do retries (maximum three times) at intervals of five seconds. But if we have an NPE, we won't do iterations in that case but just write a message to the log and skip the message.

But if we want more flexibility in error handling, we can do it manually:

YAML
 
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        enable.auto.commit: false
    listener:
      ack-mode: MANUAL


Here we set spring.kafka.consumer.properties.enable.auto.commit=false (if true, the consumer's offset will be periodically committed in the background. In that case property auto.commit.interval.ms (default 5000ms will be used) and spring.kafka.listener.ack-mode=MANUAL, which means we want to control this mechanism ourselves.

Now we can control the sending of the acknowledgment ourselves:

Java
 
@KafkaListener(topics = "demo", groupId = "demo-group")
void processKafkaEvents(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    log.info("Try to process message");
    try {
        //Some code

        log.info("Processed value: " + record.value());
        acknowledgment.acknowledge();
    } catch (SocketTimeoutException e) {
        log.error("Error while processing message. Try again later");
        acknowledgment.nack(Duration.ofSeconds(5));
    } catch (Exception e) {
        log.error("Error while processing message: {}" + record.value());
        acknowledgment.acknowledge();
    }
}


The Acknowledgment object allows you to explicitly acknowledge or reject (nack) the message. By calling acknowledge(), you are telling Kafka that the message has been successfully processed and can be committed. By calling nack(), you are telling Kafka that the message should be re-queued for processing after a specified delay (i.e., in a case when another microservice isn't responding).

Conclusion

Data loss prevention is critical for consumer Kafka applications. In this article, we looked at some best practices for exception handling and data loss prevention with Spring Boot. By following these practices, you can ensure that your application is more resilient to failures and can gracefully recover from errors without data loss. By applying these strategies, you can build a robust and reliable Kafka consumer application.

Data loss kafka Spring Boot

Opinions expressed by DZone contributors are their own.

Related

  • Using KRaft Kafka for Development and Kubernetes Deployment
  • Setting Up Local Kafka Container for Spring Boot Application
  • Reactive Kafka With Spring Boot
  • Custom Health Checks in Spring Boot

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!