Unit Testing in Kafka
Learn about unit testing in Apache Kafka with Embedded Kafka, a library that allows you to test Kafka without running Zookeeper and a Kafka broker.
Join the DZone community and get the full member experience.
Join For FreeApache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another. Generally, data is published to topics via the Producer API, and the Consumers API consumes data from subscribed topics.
In this blog, we will see how to do unit testing in Kafka. Unit testing your Kafka code is incredibly important. It’s transporting your most important data. Until now, we had to explicitly run Zookeeper and a Kafka server to test the Producer and Consumer.
Now there is an alternative to testing Kafka, without running Zookeeper and a Kafka broker.
Wondering how? EmbeddedKafka is there for you. EmbeddedKafka is a library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.10.2.1 and ZooKeeper 3.4.8. It will start Zookeeper and a Kafka broker before the test and stop it after the test. We also have the facility to start and stop Zookeeper and the Kafka server in a programmatic way.
How to Use EmbeddedKafka
Before testing, follow these instructions:
- Add the following dependency in your build.sbt:
“net.manub” %% “scalatest-embedded-kafka” % “0.14.0” % “test”
2) Have your TestSpec extend the EmbeddedKafka trait.
Using the withRunningKafka
closure will give a running instance of Kafka. It will automatically start Zookeeper and a Kafka broker respectively on port 6000 and 6001, and automatically shut down at the end of the test.
class KafkaSpec extends FlatSpec with EmbeddedKafka {
“runs with embedded kafka” should {
withRunningKafka {
// test cases goes here
}
}
}
An EmbeddedKafka companion object is provided for use without the EmbeddedKafka trait. Zookeeper and Kafka can be started and stopped in a programmatic way.
class KafkaSpec extends FlatSpec with EmbeddedKafka with BeforeAndAfterAll {
override def beforeAll():Unit = {
EmbeddedKafka.start()
}
// test cases goes here
override def afterAll():Unit = {
EmbeddedKafka.stop()
}
}
EmbeddedKafka also supports custom configurations. It’s possible to change the ports on which Zookeeper and Kafka will be started by providing an implicit EmbeddedKafkaConfig. We can also provide any implicit serializer according to our requirements.
implicit val config = EmbeddedKafkaConfig(KafkaPort = 9092, ZookeeperPort = 2182)
implicit val serilizer = new StringSerializer
The same implicit EmbeddedKafkaConfig can be used to define custom producer/consumer properties.
The EmbeddedKafka trait also provides some utility methods to interact with embedded Kafka, in order to test our Kafka producer and consumer.
def publishToKafka(topic: String, message: String): Unit
def consumeFirstMessageFrom(topic: String): String
It also provides many more methods which can be used according to need. For examples, click here.
The good thing is that we can also test our Kafka stream in a similar way. For that, we have to add the following dependency in build.sbt, and extend your spec with EmbeddedKafkaStreamsAllInOn
.
“net.manub” %% “scalatest-embedded-kafka-streams” % “0.14.0” % “test”
For more information on the testing of Kafka stream, you can use the links in the references below.
EmbeddedKafka has made the unit testing of Kafka easier. Besides that, EmbeddedKafka is also very easy to use.
Hope this blog will help you.
References:
Published at DZone with permission of Mahesh Chand Kandpal, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments