DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Why Queues Don’t Fix Scaling Problems
  • Developer Tools That Actually Matter in 2026
  • MCP Elicitation: Human-in-the-Loop for MCP Servers
  • Fundamentals of Logic Hallucinations in AI-Generated Code

Trending

  • Has AI-Generated SQL Impacted Data Quality? We Reviewed 1,000 Incidents
  • Can Claude Skills Replace Playwright Agents? A Practical View for QA Engineers
  • From Data Movement to Local Intelligence: The Shift from Centralized to Federated AI
  • Why Pass/Fail CI Pipelines Are Insufficient for Enterprise Release Decisions
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Message Testing

Kafka Message Testing

Utilizing tools like RecordCaptor, as well as adhering to isolation principles and clear separation of test stages, ensures high accuracy and efficiency.

By 
Anton Belyaev user avatar
Anton Belyaev
·
Sep. 18, 24 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
4.6K Views

Join the DZone community and get the full member experience.

Join For Free

This article offers an approach to writing integration tests for Kafka-based applications that focuses on interaction specification, making tests more readable and easier to maintain. The proposed method not only enhances testing efficiency but also contributes to a better understanding of the integration processes within the application.

Duck with net and envelopes

The article builds on three ideas presented in relevant articles: writing tests with a clear separation of Arrange-Act-Assert stages, isolation in Kafka tests, and using tools to enhance test visibility. I recommend reviewing these before delving into the material of this article.

Demonstration Scenario

Let's take a Telegram bot that forwards requests to the OpenAI API and returns the result to the user as an example. If the request to OpenAI violates the system's security rules, the client will be notified. Additionally, a message will be sent to Kafka for the behavioral control system so that the manager can contact the user, explain that their request was too sensitive even for our bot, and ask them to review their preferences.

The interaction contracts with services are described in a simplified manner to emphasize the core logic. Below is a sequence diagram demonstrating the application's architecture. I understand that the design may raise questions from a system architecture perspective, but please approach it with understanding — the main goal here is to demonstrate the approach to writing tests.

Telegram bot that forwards requests to the OpenAI API and returns the result to the user as an example

Message Capture

The main testing tool will be the message capture object — RecordCaptor. Its operation is quite similar to the outgoing request capture object — RequestCaptor, which can be read about in the article Ordering Chaos: Arranging HTTP Request Testing in Spring (linked earlier).

Message capture will be performed through a standard Kafka consumer. The list of topics must be specified explicitly via a configuration parameter.

Java
 
@KafkaListener(id = "recordCaptor", topics = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord<Object, Object> record,
                                @Headers Map<String, Object> boundedHeaders) {
    RecordSnapshot recordSnapshot = mapper.recordToSnapshot(record, boundedHeaders);
    recordCaptor.capture(recordSnapshot);
}


The RecordCaptor object accumulates information from captured messages.

Using this approach requires adhering to isolation in Kafka tests. Waiting for offset commit confirmation before verifying test results should be done using the KafkaSupport#waitForPartitionOffsetCommit method.

Test Example

Below is the test code for the described scenario.

Java
 
def "User Message Processing with OpenAI"() {
    setup:
    KafkaSupport.waitForPartitionAssignment(applicationContext)                           // 1
    and:                                                                                  // 2
    def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
            .body("""{
                "error": {
                "code": "content_policy_violation",
                "message": "Your request was rejected as a result of our safety system."
                }
            }"""))
    def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
    when:
    mockMvc.perform(post("/telegram/webhook")                                             // 3
            .contentType(APPLICATION_JSON_VALUE)
            .content("""{
                "message": {
                "from": {
                    "id": 10000000
                },
                "chat": {
                    "id": 20000000
                },
                "text": "Hello!"
                }
            }""".toString())
            .accept(APPLICATION_JSON_VALUE))
            .andExpect(status().isOk())
    KafkaSupport.waitForPartitionOffsetCommit(applicationContext)                         // 4
    then:
    openaiRequestCaptor.times == 1                                                        // 5
    JSONAssert.assertEquals("""{
        "content": "Hello!"
    }""", openaiRequestCaptor.bodyString, false)
    and:
    telegramRequestCaptor.times == 1
    JSONAssert.assertEquals("""{
        "chatId": "20000000",
        "text": "Your request was rejected as a result of our safety system."
    }""", telegramRequestCaptor.bodyString, false)
    when:                                                                                 // 6
    def message = recordCaptor.getRecords("topicC", "20000000").last
    then:
    message != null
    JSONAssert.assertEquals("""{
        "webhookMessage": {
        "message": {
            "chat": {
            "id": "20000000"
            },
            "text": "Hello!"
        }
        },
        "error": {
            "code": "content_policy_violation",
            "message": "Your request was rejected as a result of our safety system."
        }
    }""", message.value as String, false)
}


Key steps:

  1. Wait for partition assignment before starting the test scenario.
  2. Mock requests to OpenAI and Telegram.
  3. Execute the test scenario.
  4. Wait for offset confirmation.
  5. Verify requests to OpenAI and Telegram.
  6. Check the message in Kafka.

Using JSONAssert.assertEquals ensures consistency in data representation across Kafka messages, logs, and tests. This simplifies testing by providing flexibility in comparison and accuracy in error diagnosis.

The article provides an example with JSON message format; other formats are not covered, but the described approach does not impose format restrictions.

How To Find Your Message in RecordCaptor

Messages in RecordCaptor are organized by topic name and key. In the provided test, the key used is the Kafka message key. When sending, we explicitly specify it:

Java
 
sendMessage("topicC", chatId, ...);
...
private void sendMessage(String topic, String key, Object payload) {
    Message message = MessageBuilder
            .withPayload(objectMapper.writeValueAsString(payload))
            .setHeader(KafkaHeaders.TOPIC, topic)
            .setHeader(KafkaHeaders.KEY, key)                          <-- set key
            .build();
    kafkaTemplate.send(message).get();
}


To search by message key within a topic:

Java
 
when:                                                                                
def message = recordCaptor.getRecords("topicC", "20000000").last       <-- use key


If this option is not suitable, you need to describe your own indexes based on message parameters for constructing the search. An example can be seen in the tests PolicyViolationTestsCustomIndex.groovy.

Connecting RecordCaptor

The code for connecting RecordCaptor looks as follows:

Java
 
@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
    @Bean
    RecordCaptor recordCaptor() {
        return new RecordCaptor();
    }

    @Bean
    RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
        return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
    }
}


OffsetSnapshotFrame

Experience has shown that working with Kafka-based applications requires tools to facilitate understanding the state of consumers and message consumption status. For this task, you can compare topic offsets and consumer groups in the offset confirmation waiting operation and log discrepancies, as illustrated in the image:

Compare topic offsets and consumer groups in the offset confirmation waiting operation and log discrepancies

The code for OffsetComparisonFrame is available for review.

Conclusion

Testing messages in Kafka using the proposed approach not only simplifies test writing but also makes it more structured and understandable. Utilizing tools like RecordCaptor, as well as adhering to isolation principles and clear separation of test stages, ensures high accuracy and efficiency.

  • Link to the project repository with test demonstrations: kafka-test-support.

Thank you for reading the article, and good luck in your efforts to write effective and clear tests!

Tool kafka Object (computer science) Requests Testing

Published at DZone with permission of Anton Belyaev. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Why Queues Don’t Fix Scaling Problems
  • Developer Tools That Actually Matter in 2026
  • MCP Elicitation: Human-in-the-Loop for MCP Servers
  • Fundamentals of Logic Hallucinations in AI-Generated Code

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook