DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Selecting the Right Automated Tests
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • JWT Token Revocation: Centralized Control vs. Distributed Kafka Handling
  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast

Trending

  • API Design
  • Securing Your Applications With Spring Security
  • How To Optimize Feature Sets With Genetic Algorithms
  • Top 7 Best Practices DevSecOps Team Must Implement in the CI/CD Process
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Testing, Tools, and Frameworks
  4. Integration Testing of Non-Blocking Retries With Spring Kafka

Integration Testing of Non-Blocking Retries With Spring Kafka

How to write integration tests for your Spring Kafka implementation of consumers having retries and Dead Letter Publishing enabled.

Mukut Bhattacharjee user avatar by
Mukut Bhattacharjee
·
Aug. 14, 23 · Tutorial
Like (7)
Save
Tweet
Share
3.19K Views

Join the DZone community and get the full member experience.

Join For Free

Kafka Non-Blocking Retries

Non Blocking retries in Kafka are done via configuring retry topics for the main topic. An Additional Dead Letter Topic can also be configured if required. Events will be forwarded to DLT if all retries are exhausted. A lot of resources are available in the public domain to understand the technicalities. 

  • Kafka Consumer Non-Blocking Retry: Spring Retry Topics
  • Spring Retry Kafka Consumer

What To Test?

It can be a challenging job when it comes to writing integration tests for the retry mechanism in your code. 

  • How do you test that the event has been retried for the required number of times? 
  • How do you test that retries are only performed when certain exceptions occur and not for others?
  • How do you test if another retry is not done if the exception is resolved in the previous retry?
  • How do you test that the nth attempt in the retry succeeds after (n-1) retry attempts have failed?
  • How to test if the event has been sent to the Dead Letter Queue when all the retry attempts have been exhausted?

Let’s see with some code. You can find a lot of good articles which show how to set up Non-Blocking retries using Spring Kafka. One such implementation is given below. This is accomplished using the @RetryableTopic and @DltHandler  annotations from Spring-Kafka.

Setting up the Retryable Consumer

Java
 
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

    private final CustomEventHandler handler;

    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }

    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }

}


If you notice in the above code snippet, the include parameter contains CustomRetryableException.class. This tells the consumer to retry only in case CustomRetryableException is thrown by the CustomEventHandler#handleEvent method. You can add as many as you like. There is an exclude parameter as well, but any one of them can be used at a time.

The event processing should be retried for a maximum of ${retry.attempts} times before publishing to the DLT.

Setting up Test Infra

To write integration tests, you need to make sure that you have a functioning Kafka broker (embedded preferred) and a fully functioning publisher. Let's set up our infrastructure:

Java
 
@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
  
  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;


    // tests

}


** Configurations are imported from the application-test.yml file.

When using an embedded kafka broker, it is important to mention the topics to be created. They will not be created automatically. In this case, we are creating four topics, namely 

"test", "test-retry-0", "test-retry-1", "test-dlt"


We have set out the maximum retry attempts to three. Each topic corresponds to each of the retry attempts. So events should be forwarded to DLT if three retries are exhausted.

Test Cases

Retry should not be done if consumption is successful on the first attempt.

This can be tested by the fact that the CustomEventHandler#handleEvent method is called only once. Further tests on Log statements can also be added.

Java
 
    @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }


Retry should not be done if a non-retryable exception is raised.

In this case, the CustomEventHandler#handleEvent method should be invoked only once:

Java
 
    @Test
    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }


Retry for the maximum configured number of times if a RetryableException is thrown and subsequently should be published to Dead Letter Topic when retries are exhausted.

In this case, the CustomEventHandler#handleEvent method should be invoked three (maxRetries) times and CustomEventHandler#handleEventFromDlt method should be invoked once.

Java
 
    @Test
    void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));
    }


**A considerable timeout has been added in the verification stage so that exponential back-off delay can be taken into consideration before the test is completed. This is important and may result in an assertion failure if not set properly.

Should be retried until RetryableException is resolved And should not continue retrying if a non-retryable exception is raised or consumption eventually succeeds.

The test has been set up such as to throw a RetryableException first and then throw a NonRetryable exception, such that retry is done once.

Java
 
    @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }
Java
 
    @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }


Conclusion

So, you can see that the integration test is a mix and match of strategies, timeouts, delays, and verifications so as to foolproof the retry mechanism of your Kafka Event-Driven Architecture.

Kudos. Feel Free to suggest improvements and reach out to me on LinkedIn.

The full code can be found here. 

Integration testing kafka Non-blocking algorithm

Published at DZone with permission of Mukut Bhattacharjee. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Selecting the Right Automated Tests
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • JWT Token Revocation: Centralized Control vs. Distributed Kafka Handling
  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: