Kafka Streams and Unit Testing

DZone 's Guide to

Kafka Streams and Unit Testing

Kafka allows us to store data for consumers' use. Therefore, it's very important to perform unit testing on your Kafka Streams API.

· Big Data Zone ·
Free Resource

Kafka Streams is a powerful API. In Kafka, we can only store our data for consumers to consume. But we always needed a processor with which we can process the data without going to an external tool like Spark, Storm, etc. To know more about this and for a quick start you can check out the first blog of this series.

The Need

Here, we are using Kafka streams in our applications. We are done with the implementation but again, the most important thing left is testing. This blog is about how to test the application we have created. For this, I’ll be taking the sample app I created in my previous blog for both high-level DSL and low-level processor API.

Traditionally, we test our Kafka application with an integration test for which we need to create a ZooKeeper and a real Kafka broker. After that, we need a mock producer and mock consumer for our app to produce the inputs and receive the outputs. That makes it such a big hassle just to test our app. Testing it for real scenarios and for the actual integration test, this is needed without a doubt.

Other than this, we have embedded Kafka. Embedded Kafka provides us with a virtual environment of Kafka broker and provides us the built-in methods to publish and receive data from topics for testing purposes.

Both the approaches are good for integration test cases for testing Kafka. But we needed something that could provide us with unit tests. So for the unit testing of Kafka streams, there comes something called mocked streams.

Mocked Streams

A mocked stream is a library for Scala for unit testing Kafka Streams. It is available for Scala 2.11.8 onward. It saves us from writing all the code that we used to do for our unit tests and creating a separate Kafka broker just for testing. And we can use any testing tool (i.e. FunSuite or FlatSpec) with it. Following is one test case code.

it should "change characters to upper case" in {
  val res = MockedStreams().topology{builder =>
    streamOperations.toUpperCase(builder, inTopic).to(outTopic)
    .input[String, String](inTopic, stringSerde, stringSerde, keyValueSeq)
    .output[String, String](outTopic, stringSerde, stringSerde, keyValueSeq.size)

  assert(res.toList == updateKeyValueSeq)

In this test case, we don’t need to do anything except import our MockedStream library — no need for producer-consumer, Kafka broker, etc.


To start writing test cases for our Kafka stream app, we first need to add the dependency of MockedStreams:

"com.madewithtea" %% "mockedstreams" % "1.3.0" % "test"

Now, all we need to do is just to import the library in our test class and we are good to write our test cases.

It provides us with a topology() method, which provides us with a KStreamBuilder instance on which we can apply our method to test it.

MockedStreams().topology{builder =>

This method returns an instance of Builder class provided in MockedStream library. We can provide inputs using the input() method and then we can apply the output() method to receive the output of our method. We can apply our streaming config on this Builder instance to provide it with our serdes, time stamp extractor class, etc. This config is used to create the builder inside the topology method.

val result = MockedStreams().topology{builder =>
  .input(firstInTopic, stringSerde, stringSerde, keyValueSeq)
  .output(firstOutTopic, stringSerde, stringSerde, keyValueSeq.size)

We can use multiple inputs on this Builder instance:

val result = MockedStreams().topology{builder =>
 .input(firstInTopic, stringSerde, stringSerde, keyValueSeq)
 .input(secondInTopic, stringSerde, stringSerde, keyValueSeq)
 .output(firstOutTopic, stringSerde, stringSerde, keyValueSeq.size)

Similarly multiple output methods:

val resultBuilder = MockedStreams().topology{builder =>
 .input(firstInTopic, stringSerde, stringSerde, keyValueSeq)
 .input(secondInTopic, stringSerde, stringSerde, keyValueSeq)

val firstOutput = resultBuilder.output(firstOutTopic, stringSerde, stringSerde, keyValueSeq.size)
val secondOutput = resultBuilder.output(secondOutTopic, stringSerde, stringSerde, keyValueSeq.size)

For more details, you can visit the official GitHub documentation.

I have continued with the code on my previous repo. Here is the link to the sample application for which test cases are now added.


  1. Scala library for unit testing
  2. Streams documentation
big data ,kafka streams ,tutorial ,unit testing

Published at DZone with permission of Anuj Saxena , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}