Spark Streaming: Unit Testing DStreams

DZone 's Guide to

Spark Streaming: Unit Testing DStreams

Unit testing is important because it is one of the earliest testing efforts performed on the code. The earlier defects are detected, the easier they are to fix.

· Big Data Zone ·
Free Resource

Frankly, I don't think there's any need of telling us, "the developers," that we have to have proper testing or unit testing to be correct (QAs, don't be flattered :P). The unit test cases are the quickest way to know whether or not there's something wrong with our code.

Unit testing is important because it is one of the earliest testing efforts performed on the code. The earlier defects are detected, the easier they are to fix.

Spark Streaming

Spark Streaming is the API provided by Spark alongside the Spark-Core API. It is used for scalable, high-throughput, fault-tolerant stream processing of live data streams and supports many data sources like Kafka, HDFS, etc.

It takes a continuous stream of data for ingestion, and through Spark Streaming API, this stream is converted to batches of input data. These batches are then processed by the Spark engine. To convert this stream of data to small batches and then process them, the DStream API is used as an abstraction.


DStream, or Discretized Stream, is an abstraction provided by the Spark Streaming. DStream denotes a series of RDDs (i.e small immutable batches of data), which makes DStream resilient. We can easily apply any transformation on the input data stream using DStream (using map, flatMap, etc). We only have to provide our business logic to the DStream and we can easily achieve the required result.

For example, the following method removes duplicate messages from the stream of (key, value) pairs using mapWithState:

def distinct(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
    .flatMap {
      case Some(value) => Seq(value)
      case _ => Seq()

val dedup = (key: Int, value: Option[String], state: State[List[Int]]) => {
  (value, state.getOption()) match {
    case (Some(data), Some(keys)) if !keys.contains(key) =>
      state.update(key :: keys)
      Some(key, data)
    case (Some(data), None) =>
      Some(key, data)
    case _ =>

And another method 'update' which modifies the values of our DStreams:

def update(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
case (key, value) => (key, s"""{"value":[$value]}""")


So, until here, we all know the basic way of how Spark Streaming works. All is well. But how do we actually get to know if our business logic is working well with Spark Streaming and transforming the input stream in the way we want? We need to test the DStream with input data and check the output of our Streaming code.


So, what's the problem? How do we execute streaming logic in a test environment?

Typically, we can write integration test cases and provide the actual environment in the integration test. But for unit testing, we need a testing environment which should not depend on any external application.


StreamingSuiteBase provides the testing environment for DStream. It sends the inputs as batches and performs the provided operation on these batches and provides as output. And these outputs can be matched to the expected result.

Import the following dependency into your build.sbt:

"com.holdenkarau" %% "spark-testing-base" % "2.1.0_0.8.0" % Test

And mixin the trait ' StreamingSuiteBase ' into your spec:

class StreamingOperationsSpec extends WordSpec with StreamingSuiteBase {

The trait StreamingSuiteBase provides the method ' testOperation' which takes the input values, our business logic, expected output values as parameters and provides the result. Testing our method distinct and update as follows:

"StreamingOperations" should{

"remove duplicates" in{
val inputPair = List(List((1, "value"), (1, "value")))
val outputPair = List(List((1, "value")))

testOperation(inputPair, distinct _ , outputPair, ordered = false)

"update stream" in {
val inputPair = List(List((1, """{"name": "Steve"}, {"name": "Tony"}""")))
val outputPair = List(List((1, """{"value":[{"name": "Steve"}, {"name": "Tony"}]}""")))

testOperation(inputPair, update _, outputPair, ordered = false)

The method ' testOperation ' takes the output of the operation performed on the 'inputPair' and check whether it is equal to the 'outputPair' and just like this, we can test our business logic.

This short snippet lets you test your business logic without forcing you to create even a Spark session. You can mock the whole streaming environment and test your business logic easily.

This was a simple example of unary operations on DStreams. Similarly, we can test binary operations and window operations on DStreams.

You can find the code here.

spark ,big data ,unit testing ,spark streaming

Published at DZone with permission of Anuj Saxena , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}