{{announcement.body}}
{{announcement.title}}

A Quick and Practical Example of Kafka Testing

DZone 's Guide to

A Quick and Practical Example of Kafka Testing

In this tutorial, we learn some of the fundamental aspects of Kafka testing in a declarative way and how to test microservices involving both Kafka and REST.

· Microservices Zone ·
Free Resource

1. Introduction

In this tutorial, we will quickly explore some basic to high-level approaches for testing microservice applications built using Kafka. Also, we will learn about the advantages of the declarative way of testing Kafka applications over the traditional/existing way of testing.

For everything explained here, we can find running code examples in the "Conclusion" section of this post.

To keep the tutorial concise, we will demonstrate only the below aspects.

  1. Producer Testing
  2. Consumer Testing
  3. Hooking Both Producer and Consumer Tests
  4. Producing RAW records and JSON records
  5. Consuming RAW records and JSON records
  6. Traditional Testing Challenges.
  7. Advantages of Declarative Style Testing (IEEE Paper)
  8. Combining REST API Testing with Kafka Testing
  9. Spinning Up Kafka in Docker - Single Node and Multi-Node

I strongly recommend reading through the Minimum Things We Need To Know For Kafka Testing post before proceeding with this tutorial.

For more details about Kafka streams and how to develop a streaming application, please visit Developing Streaming Applications Tutorial by Confluent.

2. Kafka Testing Challenges

The difficult part is some part of the application logic or a DB procedure keeps producing records to a topic and another part of the application keeps consuming the records and continuously processes them based on business rules.

The records, partitions, offsets, exception scenarios, etc. keep on changing, making it difficult to think in terms of what to test, when to test, and how to test.

data-pipe-line-random            

3. Testing Solution Approach

We can go for an end-to-end testing approach which will validate both producing, consuming, and DLQ records as well as the application processing logic. This will give us good confidence in releasing our application to higher environments.

We can do this by bringing up Kafka in dockerized containers or by pointing our tests to any integrated test environment somewhere in our Kubernetes-Kafka cluster or any other microservices infrastructure.  

Here we pick a functionality, produce the desired record and validate, consume the intended record and validate, alongside the HTTP REST or SOAP API validation which helps in keeping our tests much cleaner and less noisy.

Image title           

4. Producer Testing

When we produce a record to a topic we can verify the acknowledgment from a Kafka broker. This verification is in the format of recordMetadata.

For example, visualizing the "recordMetaData" as JSON would look like:

Response from the broker after a successful "produce".

{
    "recordMetadata": {
        "offset": 0,
        "timestamp": 1547760760264,
        "serializedKeySize": 13,
        "serializedValueSize": 34,
        "topicPartition": {
            "hash": 749715182,
            "partition": 0,   //<--- To which partition the record landed
            "topic": "demo-topic"
        }
    }
}

5. Consumer Testing

When we read or consume from a topic we can verify the record(s) fetched from the topics. Here we can validate/assert some of the metadata too, but most of the time you might need to deal with the records only (not the metadata).

There may be times, for instance, that we validate only the number of records, i.e. the size only, not the actual records

For example, visualizing the fetched "records" as JSON would look like:

Records fetched after a successful "consume".

{
    "records": [
        {
            "topic": "demo-topic",
            "key": "1547792460796",
            "value": "Hello World 1"
        },
        {
            // ...
        }
    ]
}

The full record(s) with the meta-data information looks like what we've got below, which we can also validate/assert if we have a test requirement to do so.

The fetched records with the metadata from the broker.

{
    "records": [
        {
            "topic": "demo-topic",
            "partition": 0,
            "offset": 3,
            "key": "1547792460796", //<---- Record key
            "value": "Hello World", //<---- Record value 
        }
    ]
}

6. Producer and Consumer Testing

In the same end-to-end test, we can perform two steps like below for the same record(s):

  • Step 1:
    • Produce to the topic "demo-topic" and validate the receivedrecordMetadata from the broker.
      • For example, produce a record with "key":"1234", "value":"Hello World"
  • Step 2:
    • Consume from the same topic "demo-topic" and validate records.
      • Assert that the same record was present in the response, i.e. "key": "1234", "value": "Hello World".
      • We might have consumed more than one record if they were produced to the same topic before we started consuming.

7. Challenges With the Traditional Style of Testing

Point 1

In the first place, there is nothing wrong in with the traditional style. But it has a steep learning curve to deal with when it comes to Kafka brokers.

For instance, when we deal with the brokers, we need to thoroughly get acquainted with the Kafka Client APIs, e.g. Key-SerDe, Value-SerDe, Time-Outs while record Poolings, commitSyncs, recordTypes, etc., and many more things at the API level.

For functional testing, we don't really need to know these concepts at the API level.

Point 2

Our test code gets tightly coupled with the client API code. This means we introduce many challenges in maintaining the test suites along with test framework's code.

8. Advantages of the Declarative Style of Testing

To draw a simile, the interesting way 'docker-compose' works is called the "Declarative Way." We tell the Docker Compose framework (in a YAML file) to spin up certain things at certain ports, link certain services to other services, etc., and things are done for us by the framework. We can drive our tests also in similar declarative fashion, which we are going to see in next sections.

How neat is that? Just think how much of a hassle it would be if we had to write code/shell scripts for the same repetitive tasks.

Point 1

In the declarative style, we can completely skip the API level that deals with brokers and only focus on test scenarios. But still, we have the flexibility to use the Kafka Client APIs and to add our own flavors to it.

Point 2

This contributes to finding more defects because we don't spend time in writing code, but spend more time in writing tests and covering more business scenarios/user journeys.

How?

  • Here, we tell the test to use the Kafka-Topic which is our "end point" or "url"

     i.e. "url": "kafka-topic: demo-topic" 

  • Next, we tell the test to use operation "produce"

     i.e. "operation":"produce" 

  • Next, we need to send the records to the request payload:

"request": {
    "records": [
        {
            "key": "KEY-1234",
            "value": "Hello World"
        }
    ]
}
  • Then, we tell the test that we are expecting the response "status" to be returned as "Ok" and some record metadata from the broker, i.e. a not-null value. This is the "assertions" part of our test.

"assertions": {
    "status" : "Ok",
    "recordMetadata" : "$NOT.NULL"
}
  • Note: We can even assert all the 'recordMetadata at once, which we will see in the later sections. For now, let's keep it simple and proceed.

  • Once we are done, our full test will look like the code below:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "RAW",
        "records": [
            {
                "key": 101,
                "value": "Hello World"
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

And that's it. We are done with the test case and ready to run.

Now, looking at the test above, anyone can easily figure out the what scenario being tested is.

Note that:

  1. We eliminated the coding hassles using the client API to deal with Kafka brokers.

  2. We eliminated the coding hassles of asserting each field key/value by traversing through their object path, parsing request-payloads, parsing response-payloads, etc.

At the same time, we used the JSON comparison feature of the framework to assert the outcome at once, therefore, making the tests a lot easier and cleaner.

We escaped two major hassles while testing.

And, the order of the fields doesn't matter here. The below code is also correct (field order swapped).

"assertions": {
        "recordMetadata": "$NOT.NULL"
        "status": "Ok",
}

9. Running a Single Test Using JUnit

It's super easy. We just need to point our JUnit @Test method to the JSON file. That's it really.

@TargetEnv("kafka_servers/kafka_test_server.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaProduceTest {

    @Test
    @JsonTestCase("kafka/produce/test_kafka_produce.json")
    public void testProduce() throws Exception {
         // No code is needed here. What? 
         // Where are the 'assertions' gone ?
    }

}

In the above code:

  • 'test_kafka_produce.json' is the test case which contains the JSON step(s) we talked about earlier.

  • 'kafka_test_server.properties' contains the "Broker" details and producer/consumer configs.

  • '@RunWith(ZeroCodeUnitRunner.class)' is a JUnit custom runner to run the test.

Also, we can use the Suite runner or Package runner to run the entire test suite.

Please visit these RAW and JSON examples and explanations.

10. Writing Our First Producer Test

We learned in the above section how to produce a record and assert the broker response/acknowledgment.

But we don't have to stop there. We can go further and ask our test to assert the "recordMetadata" field-by-field to verify it was written to the correct "partition" of the correct "topic" and much more, as shown below.

"assertions": {
    "status": "Ok",
    "recordMetadata": {
        "offset": 0,   //<--- This is the record 'offset' in the partition
        "topicPartition": {
            "partition": 0,   //<--- This is the partition number
            "topic": "demo-topic"  //<--- This is the topic name
        }
    }
}

That's it. In the above "assertions" block, we finished comparing the expected vs. actual values.

Note: The comparisons and assertions are instantly done. The "assertion" block is instantly compared against the actual "status" and "recordMetadata" received from the Kafka Broker. The order of the fields doesn't really matter here. The test only fails if the field values or structures don't match.

11. Writing Our First Consumer Test

Similarly, to write a "consumer" test, we need to know:

  • The topic name 'demo-topic' is our "end point," a.k.a. "url": "url": "kafka-topic: demo-topic".

  • The operation, i.e. 'consume':  "operation": "consume".

  • While consuming message(s) from the topic, we need to send as below: "request": { }

The above 'request' means to do nothing but consume without doing a 'commit'.

Or we can mention in our test to do certain things while consuming or after consuming the records.

"request": {
    "consumerLocalConfigs": {
        "commitSync": true,
        "maxNoOfRetryPollsOrTimeouts": 3
    }
}
  • "commitSync": true: Here, we are telling the test to do a `commitSync` after consuming the message, that means, it won't read the message again when you `poll` next time. It will only read the new messages if any arrive on the topic.

  • "maxNoOfRetryPollsOrTimeouts": 3: Here, we are telling the test to show the poll a maximum of three times, then stop polling. If we have more records, we can set this to a larger value. The default value is 1.

  • "pollingTime": 500: Here, we are telling the test to poll for 500 milliseconds each time it polls. The default value is 100 milliseconds if you skip this flag.

Visit this page for All configurable keys - ConsumerLocalConfigs from the source code.

Visit the HelloWorld Kafka examples repo to try it at home.

Note: These config values can be set in the properties file globally to all the tests, which means it will apply to all the tests in our test pack. Also, we can override any of the configs for a particular test or tests inside the suite. Hence it gives us flexibility for covering all kind of test scenarios.

Well, setting up these properties is not big deal and we have to do this to externalize them anyway. Hence, the simpler they are maintained, the better for us! But we must get an idea of what goes inside them.

We will discuss this in the coming sections.

12. Combining REST API Testing With Kafka Testing

Most of the time in a microservices architecture, we build applications using RESTful services, SOAP services (probably legacy), and Kafka.

Therefore, we need to cover all API contract validations in our end-to-end test scenarios, including Kafka.

But it's not a big deal as, after all, nothing changes here, except we just point our "url" to the HTTP endpoint for our REST or SOAP service, then manipulate payload/assertions block accordingly. That's it really.

Please visit Combining Kafka testing with REST API testing for a full step-by-step approach.

If we have a usecase:

Step 1: Kafka call - We send an "Address" record with id "id-lon-123" to the "address-topic," which eventually gets processed and written to  the"Address" database (e.g. Postgres or Hadoop). We then assert the broker acknowledgment.

Step 2: REST call - Query (GET) the "Address" REST API by using "/api/v1/addresses/id-lon-123" and assert the response.

The corresponding test case looks like below.

{
    "scenarioName": "Kafka and REST api validation example",
    "steps": [
        {
            "name": "produce_to_kafka",
            "url": "kafka-topic:people-address",
            "operation": "produce",
            "request": {
                "recordType" : "JSON",
                "records": [
                    {
                        "key": "id-lon-123",
                        "value": {
                            "id": "id-lon-123",
                            "postCode": "UK-BA9"
                        }
                    }
                ]
            },
            "assertions": {
                "status": "Ok",
                "recordMetadata" : "$NOT.NULL"
            }
        },
        {
            "name": "verify_updated_address",
            "url": "/api/v1/addresses/${$.produce_to_kafka.request.records[0].value.id}",
            "operation": "GET",
            "request": {
                "headers": {
                    "X-GOVT-API-KEY": "top-key-only-known-to-secu-cleared"
                }
            },
            "assertions": {
                "status": 200,
                "value": {
                    "id": "${$.produce_to_kafka.request.records[0].value.id}",
                    "postCode": "${$.produce_to_kafka.request.records[0].value.postcode}"
                }
            }
        }
    ]
}

Easy to read! Easy to write!

Field reused values via JSON path instead of hardcoding. It's a great time saver!

13. Producing RAW Records vs. JSON Records

1. In the case of RAW, we just say it quietly:

"recordType" : "RAW",

Then, our test case looks like below:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "RAW",
        "records": [
            {
                "key": 101,
                "value": "Hello World"
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

2. And for the JSON record, we mention it in the same way:

"recordType" : "JSON"

And, our test case looks like below:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "JSON",
        "records": [
            {
                "key": 101,
                "value": { 
                    "name" : "Jey"
                }
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

Note: The "value" section has a JSON record this time.

14. Kafka in a Docker Container

Ideally, this section should have been at the beginning. But, what's the point of just running a docker-compose file without even knowing the outcome of it? We can find it here to make everyone's life easy!

We can find the docker-compose files and the step-by-step instructions below.

  1. Single Node Kafka in Docker
  2. Multi-Node Kafka Cluster in Docker

15. Conclusion

In this tutorial, we learned some of the fundamental aspects of Kafka testing in a declarative way. Also, we learned how easily we can test microservices involving both Kafka and REST. 

Using this approach, we have tested and validated clustered Kafka Data Pipelines to Hadoop as well as Http REST/SOAP APIs deployed in Kubernetes orchestrated pods. We found this approach very, very straight forward and reduced complexity to maintain and promote the artifacts to the higher environments.

With this approach, we were able to cover a lot of test scenarios with full clarity and find more defects in the early stages of the development cycle, even without writing any test code. This helped us to build up and maintain our regression pack in an easy and clean manner.

The complete source code of these examples of the repo GitHub (Try at Home) are given below.

To run any test(s), we can directly navigate to their corresponding JUnit @Test, under 'src/test/java'. We need to bring up Docker with kafka prior to clicking any Junit tests.

Use "kafka-schema-registry.yml (See Wiki)" to be able to run all the tests.

If you found this page helpful for testing Kafka and HTTP APIs, please leave a "star" on GitHub!

Happy testing!

Topics:
kafka topics ,kafka tutorial ,microservice best practices ,api testing ,microservices

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}