DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Solid Testing Strategies for Salesforce Releases
  • Apex Testing: Tips for Writing Robust Salesforce Test Methods
  • A General Overview of TCPCopy Architecture
  • Modes and Modality in Performance Testing

Trending

  • Developers Beware: Slopsquatting and Vibe Coding Can Increase Risk of AI-Powered Attacks
  • AWS to Azure Migration: A Cloudy Journey of Challenges and Triumphs
  • *You* Can Shape Trend Reports: Join DZone's Software Supply Chain Security Research
  • Beyond Linguistics: Real-Time Domain Event Mapping with WebSocket and Spring Boot
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Testing, Tools, and Frameworks
  4. Isolation in Testing With Kafka

Isolation in Testing With Kafka

This article explores simplifying Kafka integration tests by ensuring test isolation and offering tools for better control.

By 
Anton Belyaev user avatar
Anton Belyaev
·
Jul. 11, 24 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
4.9K Views

Join the DZone community and get the full member experience.

Join For Free

The experience of running Kafka in test scenarios has reached a high level of convenience thanks to the use of Test containers and enhanced support in Spring Boot 3.1 with the @ServiceConnection annotation. However, writing and maintaining integration tests with Kafka remains a challenge. This article describes an approach that significantly simplifies the testing process by ensuring test isolation and providing a set of tools to achieve this goal. With the successful implementation of isolation, Kafka tests can be organized in such a way that at the stage of result verification, there is full access to all messages that have arisen during the test, thereby avoiding the need for forced waiting methods such as Thread.sleep().

This method is suitable for use with Test containers, Embedded Kafka, or other methods of running the Kafka service (e.g., a local instance).

donald duck

Test Isolation

As detailed in the article Eradicating Non-Determinism in Tests, clear control over the testing environment is crucial for the reliable execution of tests. This ensures that each test begins from a known state. An example could be a situation where one test creates data in a database and fails to clean it up afterward, negatively impacting the execution of subsequent tests that expect a different database state.

Various methods can be applied to achieve test isolation, including:

  • Restoring the system's original state before each test run.
  • Mandatory data cleanup after the execution of each test to prevent influence on subsequent tests.

Isolation in Kafka Testing

Restoring Kafka's initial environment state from scratch for each test scenario can be achieved by restarting Kafka. This option is straightforward in terms of implementation but costly in terms of startup time. There are methods to speed up this process (for more details, you can read about running Kafka on GraalVM), but this article suggests considering an option where we work with a common Kafka instance across all test scenarios.

This approach poses certain challenges: if one test sends messages to a topic and neglects the fact of their receipt and processing, it could harm the execution of another test that may rely on a different topic state.

To ensure isolation, it is necessary that in the case of sending a message to a topic being listened to by the same application, all processes initiated by these messages are completed before the end of the test scenario.

Implementation

Consider the example of a hypothetical Telegram bot that redirects requests to the OpenAI API and sends responses to users.

For simplicity, the interaction contracts with the services are described in a simplified form to highlight the main logic of the operation. Below is a sequence diagram showing the architecture of the application. I understand that the design may raise questions from a system architecture perspective, but please bear with me—the main goal here is to demonstrate the approach to isolation in testing.

sequence diagram showing the architecture of the application

Below is a diagram illustrating the testing approach:

diagram illustrating the testing approach

The key feature of the proposed solution is the strict division of test code into phases corresponding to the Arrange-Act-Assert pattern. You can read more about this approach in the article Ordering Chaos: Arranging HTTP Request Testing in Spring.

To achieve isolation, it is critically important to maintain the following relationships between the key elements of the scheme (numbers correspond to those indicated in the diagram):

  1. Scenario setup (Arrange) occurs before Scenario execution (Act).
  2. The message is sent and (3) confirmed by the partition leader before the ServiceA's request processing is considered complete.
  3. Manual offset management with commit occurs only after all processing by ServiceB or ServiceA is fully completed.
  4. Scenario execution (Act) occurs before Result verification (Assert).

This approach ensures that by the time of result verification, all processes within the test scenario are completed, and all messages are sent and received, ensuring the test environment is in a known and final state.

Scenario Setup (Arrange) Occurs Before Scenario Execution (Act)

The goal of this stage is to prepare everything necessary for the test scenario. In the context of our example, the main components of the test environment include the application context, HTTP request mocks, Kafka, and Record Captor.

Regarding integration with Kafka, it is critical to ensure that all consumers are ready to receive messages. This verification is implemented in the KafkaSupport#waitForPartitionAssignment method. The solution is based on the original ContainerTestUtils from the org.springframework.kafka:spring-kafka-test library, modified according to the described usage scenario. This method guarantees that at least one partition will be assigned to each Kafka consumer. This implies a limitation: there must be only one partition per topic in the test environment, although this limitation is a result of the current implementation of the method and can be changed.

Using a common Kafka instance requires setting the auto.offset.reset = latest parameter for consumers. For Spring applications, this is done as follows:

Properties files
 
spring.kafka.consumer.auto-offset-reset=latest


Record Captor is a key element of this solution. Its task is to "catch" messages from the specified topics in the configuration and provide access to them for the result verification step of the test scenario. Technically, it is a simple consumer for a Kafka topic with a message storage mechanism and an access interface. The Record Captor code is available in the project repository.

The current implementation of Record Captor offers the use of the message key for identifying messages related to a specific test case. This is useful in systems where unique identifiers are present, such as a client ID or a process identifier in the domain model. Using such identifiers as the key allows for effectively grouping and tracking all messages related to the same test scenario, even if they are distributed across different topics or processed by different system components.

Synchronous Message Sending With Acknowledgment

The goal is to implement synchronous message sending to Kafka with acknowledgment from the partition leader. To achieve this, the acks = 1 parameter must be set for the producer. In the context of a Spring application, this setting is specified as follows:

Properties files
 
spring.kafka.producer.acks=1


When using KafkaTemplate for sending messages, it is important to ensure the synchronicity of sending, as this component provides only an asynchronous interface org.springframework.kafka.core.KafkaTemplate#send(org.springframework.messaging.Message<?>) return CompletableFuture<SendResult<K, V>>. The following approach can be used for synchronous sending:

Java
 
kafkaTemplate.send(message).get()


This ensures that the message sending will be completed synchronously, waiting for Kafka's acknowledgment before proceeding with the program execution.

Manual Offset Management

Manual offset management with commit means that the message consumer will commit the message processing only after their complete processing. In this context, the offset for topicA will only be committed after the message has been successfully sent to topicB and the corresponding acknowledgment has been received.

To implement this logic, it is necessary to disable the automatic offset commit for consumers by setting the enable.auto.commit = false parameter. In the context of a Spring application, this is configured as follows:

Properties files
 
spring.kafka.consumer.enable-auto-commit=false


Furthermore, the commit mechanism should be configured so that the offset is committed after processing each individual message, which is achieved by setting the parameter:

Properties files
 
spring.kafka.listener.ack-mode=record


Scenario Execution (Act) Occurs Before Result Verification (Assert)

Before starting the result verification stage, it is necessary to ensure that all processes related to the scenario are completed, and all messages are sent and received, ensuring the test environment transitions to a "known" final state. Thanks to the preceding stages, we have ensured adherence to the happens-before principle between the actions of producers and consumers, as well as between all processing within the application. At this stage, it is required to perform a check for offset commits for each partition for each group of consumers.

To carry out this verification, the solution presented in the method pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit can be used.

Result Verification (Assert)

The final stage involves analyzing the results of the test scenario. This process includes checking the state of mocks and analyzing the messages caught in RecordCaptor.

Key Solution Elements Summarized

Here is a brief summary of the key components of the proposed solution for effective testing with Kafka:

  1. One partition per topic.
  2. The policy for message reading start for consumers: spring.kafka.consumer.auto-offset-reset=latest.
  3. Acknowledgment policy for producers: spring.kafka.producer.acks=1.
  4. Synchronous message sending: kafkaTemplate.send(message).get().
  5. Manual offset control: spring.kafka.consumer.enable-auto-commit=false, spring.kafka.listener.ack-mode=record.
  6. Waiting for partition assignment before starting the test scenario: pw.avvero.emk.KafkaSupport#waitForPartitionAssignment.
  7. Waiting for offset commitment before verifying test results: pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit.

Result

The application code is available in the examples module. The test code is as follows.

Java
 
def "User Message Processing with OpenAI"() {
    setup:
(1) KafkaSupport.waitForPartitionAssignment(applicationContext)                           
    and:
    def openaiRequestCaptor = new RequestCaptor()
(2) restMock.expect(manyTimes(), requestTo("https://api.openai.com/v1/chat/completions")) 
            .andExpect(method(HttpMethod.POST))
            .andExpect(openaiRequestCaptor)
            .andRespond(withSuccess('{"content": "Hi, how can i help you?"}', MediaType.APPLICATION_JSON))
    and:
    def telegramRequestCaptor = new RequestCaptor()
(3) restMock.expect(manyTimes(), requestTo("https://api.telegram.org/sendMessage"))      
            .andExpect(method(HttpMethod.POST))
            .andExpect(telegramRequestCaptor)
            .andRespond(withSuccess('{}', MediaType.APPLICATION_JSON))
    when:
(4) mockMvc.perform(post("/telegram/webhook")                                           
            .contentType(APPLICATION_JSON_VALUE)
            .content("""{
              "message": {
                "from": {
                  "id": 10000000
                },
                "chat": {
                  "id": 20000000
                },
                "text": "Hello!"
              }
            }""".toString())
            .accept(APPLICATION_JSON_VALUE))
            .andExpect(status().isOk())
(5) KafkaSupport.waitForPartitionOffsetCommit(applicationContext)                     
    then:
(6) openaiRequestCaptor.times == 1                                                      
    JSONAssert.assertEquals("""{
        "content": "Hello!"
    }""", openaiRequestCaptor.bodyString, false)
    and:
    telegramRequestCaptor.times == 1
    JSONAssert.assertEquals("""{
        "chatId": "20000000",
        "text": "Hi, how can i help you?"
    }""", telegramRequestCaptor.bodyString, false)
}


Here are the key steps described in the test scenario:

  1. Waiting for partition assignment before starting the test scenario.
  2. Mocking requests to OpenAI.
  3. Mocking requests to Telegram.
  4. Executing the test scenario.
  5. Waiting for offset confirmation.
  6. Result verification.

Additional tests, including scenarios with intensive message sending and the use of the @RetryableTopic mechanism for retries, are also available in the project repository, providing opportunities for study and adaptation to your own development needs.

Conclusion

Successful testing of interactions with Kafka requires a careful approach to test isolation and environment control. The use of Test containers and the capabilities of Spring Boot 3.1 greatly simplifies this process, and the application of the proposed techniques and tools allows developers to focus on the application logic, making development more efficient and less prone to errors.

Thank you for paying attention to the article, and good luck in your pursuit of writing convenient and reliable tests!

kafka Testing Test data integration test

Published at DZone with permission of Anton Belyaev. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Solid Testing Strategies for Salesforce Releases
  • Apex Testing: Tips for Writing Robust Salesforce Test Methods
  • A General Overview of TCPCopy Architecture
  • Modes and Modality in Performance Testing

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!