DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Supercharging Pytest: Integration With External Tools
  • Exploring Playwright’s Feature “Copy Prompt”
  • Mastering Redirects With Cloudflare Bulk Redirects
  • Top Tools for Object Storage and Data Management

Trending

  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Simplifying Multi-LLM Integration With KubeMQ
  • Intro to RAG: Foundations of Retrieval Augmented Generation, Part 1
  • Data Lake vs. Warehouse vs. Lakehouse vs. Mart: Choosing the Right Architecture for Your Business
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Message Testing

Kafka Message Testing

Utilizing tools like RecordCaptor, as well as adhering to isolation principles and clear separation of test stages, ensures high accuracy and efficiency.

By 
Anton Belyaev user avatar
Anton Belyaev
·
Sep. 18, 24 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
4.3K Views

Join the DZone community and get the full member experience.

Join For Free

This article offers an approach to writing integration tests for Kafka-based applications that focuses on interaction specification, making tests more readable and easier to maintain. The proposed method not only enhances testing efficiency but also contributes to a better understanding of the integration processes within the application.

Duck with net and envelopes

The article builds on three ideas presented in relevant articles: writing tests with a clear separation of Arrange-Act-Assert stages, isolation in Kafka tests, and using tools to enhance test visibility. I recommend reviewing these before delving into the material of this article.

Demonstration Scenario

Let's take a Telegram bot that forwards requests to the OpenAI API and returns the result to the user as an example. If the request to OpenAI violates the system's security rules, the client will be notified. Additionally, a message will be sent to Kafka for the behavioral control system so that the manager can contact the user, explain that their request was too sensitive even for our bot, and ask them to review their preferences.

The interaction contracts with services are described in a simplified manner to emphasize the core logic. Below is a sequence diagram demonstrating the application's architecture. I understand that the design may raise questions from a system architecture perspective, but please approach it with understanding — the main goal here is to demonstrate the approach to writing tests.

Telegram bot that forwards requests to the OpenAI API and returns the result to the user as an example

Message Capture

The main testing tool will be the message capture object — RecordCaptor. Its operation is quite similar to the outgoing request capture object — RequestCaptor, which can be read about in the article Ordering Chaos: Arranging HTTP Request Testing in Spring (linked earlier).

Message capture will be performed through a standard Kafka consumer. The list of topics must be specified explicitly via a configuration parameter.

Java
 
@KafkaListener(id = "recordCaptor", topics = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord<Object, Object> record,
                                @Headers Map<String, Object> boundedHeaders) {
    RecordSnapshot recordSnapshot = mapper.recordToSnapshot(record, boundedHeaders);
    recordCaptor.capture(recordSnapshot);
}


The RecordCaptor object accumulates information from captured messages.

Using this approach requires adhering to isolation in Kafka tests. Waiting for offset commit confirmation before verifying test results should be done using the KafkaSupport#waitForPartitionOffsetCommit method.

Test Example

Below is the test code for the described scenario.

Java
 
def "User Message Processing with OpenAI"() {
    setup:
    KafkaSupport.waitForPartitionAssignment(applicationContext)                           // 1
    and:                                                                                  // 2
    def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
            .body("""{
                "error": {
                "code": "content_policy_violation",
                "message": "Your request was rejected as a result of our safety system."
                }
            }"""))
    def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
    when:
    mockMvc.perform(post("/telegram/webhook")                                             // 3
            .contentType(APPLICATION_JSON_VALUE)
            .content("""{
                "message": {
                "from": {
                    "id": 10000000
                },
                "chat": {
                    "id": 20000000
                },
                "text": "Hello!"
                }
            }""".toString())
            .accept(APPLICATION_JSON_VALUE))
            .andExpect(status().isOk())
    KafkaSupport.waitForPartitionOffsetCommit(applicationContext)                         // 4
    then:
    openaiRequestCaptor.times == 1                                                        // 5
    JSONAssert.assertEquals("""{
        "content": "Hello!"
    }""", openaiRequestCaptor.bodyString, false)
    and:
    telegramRequestCaptor.times == 1
    JSONAssert.assertEquals("""{
        "chatId": "20000000",
        "text": "Your request was rejected as a result of our safety system."
    }""", telegramRequestCaptor.bodyString, false)
    when:                                                                                 // 6
    def message = recordCaptor.getRecords("topicC", "20000000").last
    then:
    message != null
    JSONAssert.assertEquals("""{
        "webhookMessage": {
        "message": {
            "chat": {
            "id": "20000000"
            },
            "text": "Hello!"
        }
        },
        "error": {
            "code": "content_policy_violation",
            "message": "Your request was rejected as a result of our safety system."
        }
    }""", message.value as String, false)
}


Key steps:

  1. Wait for partition assignment before starting the test scenario.
  2. Mock requests to OpenAI and Telegram.
  3. Execute the test scenario.
  4. Wait for offset confirmation.
  5. Verify requests to OpenAI and Telegram.
  6. Check the message in Kafka.

Using JSONAssert.assertEquals ensures consistency in data representation across Kafka messages, logs, and tests. This simplifies testing by providing flexibility in comparison and accuracy in error diagnosis.

The article provides an example with JSON message format; other formats are not covered, but the described approach does not impose format restrictions.

How To Find Your Message in RecordCaptor

Messages in RecordCaptor are organized by topic name and key. In the provided test, the key used is the Kafka message key. When sending, we explicitly specify it:

Java
 
sendMessage("topicC", chatId, ...);
...
private void sendMessage(String topic, String key, Object payload) {
    Message message = MessageBuilder
            .withPayload(objectMapper.writeValueAsString(payload))
            .setHeader(KafkaHeaders.TOPIC, topic)
            .setHeader(KafkaHeaders.KEY, key)                          <-- set key
            .build();
    kafkaTemplate.send(message).get();
}


To search by message key within a topic:

Java
 
when:                                                                                
def message = recordCaptor.getRecords("topicC", "20000000").last       <-- use key


If this option is not suitable, you need to describe your own indexes based on message parameters for constructing the search. An example can be seen in the tests PolicyViolationTestsCustomIndex.groovy.

Connecting RecordCaptor

The code for connecting RecordCaptor looks as follows:

Java
 
@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
    @Bean
    RecordCaptor recordCaptor() {
        return new RecordCaptor();
    }

    @Bean
    RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
        return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
    }
}


OffsetSnapshotFrame

Experience has shown that working with Kafka-based applications requires tools to facilitate understanding the state of consumers and message consumption status. For this task, you can compare topic offsets and consumer groups in the offset confirmation waiting operation and log discrepancies, as illustrated in the image:

Compare topic offsets and consumer groups in the offset confirmation waiting operation and log discrepancies

The code for OffsetComparisonFrame is available for review.

Conclusion

Testing messages in Kafka using the proposed approach not only simplifies test writing but also makes it more structured and understandable. Utilizing tools like RecordCaptor, as well as adhering to isolation principles and clear separation of test stages, ensures high accuracy and efficiency.

  • Link to the project repository with test demonstrations: kafka-test-support.

Thank you for reading the article, and good luck in your efforts to write effective and clear tests!

Tool kafka Object (computer science) Requests Testing

Published at DZone with permission of Anton Belyaev. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Supercharging Pytest: Integration With External Tools
  • Exploring Playwright’s Feature “Copy Prompt”
  • Mastering Redirects With Cloudflare Bulk Redirects
  • Top Tools for Object Storage and Data Management

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!