Spark Streaming: Unit Testing DStreams
Unit testing is important because it is one of the earliest testing efforts performed on the code. The earlier defects are detected, the easier they are to fix.
Join the DZone community and get the full member experience.
Join For FreeFrankly, I don't think there's any need of telling us, "the developers," that we have to have proper testing or unit testing to be correct (QAs, don't be flattered :P). The unit test cases are the quickest way to know whether or not there's something wrong with our code.
Unit testing is important because it is one of the earliest testing efforts performed on the code. The earlier defects are detected, the easier they are to fix.
Spark Streaming
Spark Streaming is the API provided by Spark alongside the Spark-Core API. It is used for scalable, high-throughput, fault-tolerant stream processing of live data streams and supports many data sources like Kafka, HDFS, etc.
It takes a continuous stream of data for ingestion, and through Spark Streaming API, this stream is converted to batches of input data. These batches are then processed by the Spark engine. To convert this stream of data to small batches and then process them, the DStream API is used as an abstraction.
DStream
DStream, or Discretized Stream, is an abstraction provided by the Spark Streaming. DStream denotes a series of RDDs (i.e small immutable batches of data), which makes DStream resilient. We can easily apply any transformation on the input data stream using DStream (using map, flatMap, etc). We only have to provide our business logic to the DStream and we can easily achieve the required result.
For example, the following method removes duplicate messages from the stream of (key, value) pairs using mapWithState:
def distinct(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
dStream.mapWithState(StateSpec.function(dedup))
.flatMap {
case Some(value) => Seq(value)
case _ => Seq()
}
}
val dedup = (key: Int, value: Option[String], state: State[List[Int]]) => {
(value, state.getOption()) match {
case (Some(data), Some(keys)) if !keys.contains(key) =>
state.update(key :: keys)
Some(key, data)
case (Some(data), None) =>
state.update(List(key))
Some(key, data)
case _ =>
None
}
}
And another method 'update' which modifies the values of our DStreams:
def update(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
dStream.map{
case (key, value) => (key, s"""{"value":[$value]}""")
}
}
Agenda
So, until here, we all know the basic way of how Spark Streaming works. All is well. But how do we actually get to know if our business logic is working well with Spark Streaming and transforming the input stream in the way we want? We need to test the DStream with input data and check the output of our Streaming code.
Problem
So, what's the problem? How do we execute streaming logic in a test environment?
Typically, we can write integration test cases and provide the actual environment in the integration test. But for unit testing, we need a testing environment which should not depend on any external application.
StreamingSuiteBase
StreamingSuiteBase provides the testing environment for DStream. It sends the inputs as batches and performs the provided operation on these batches and provides as output. And these outputs can be matched to the expected result.
Import the following dependency into your build.sbt:
"com.holdenkarau" %% "spark-testing-base" % "2.1.0_0.8.0" % Test
And mixin the trait ' StreamingSuiteBase ' into your spec:
class StreamingOperationsSpec extends WordSpec with StreamingSuiteBase {
The trait StreamingSuiteBase provides the method ' testOperation' which takes the input values, our business logic, expected output values as parameters and provides the result. Testing our method distinct and update as follows:
"StreamingOperations" should{
"remove duplicates" in{
val inputPair = List(List((1, "value"), (1, "value")))
val outputPair = List(List((1, "value")))
testOperation(inputPair, distinct _ , outputPair, ordered = false)
}
"update stream" in {
val inputPair = List(List((1, """{"name": "Steve"}, {"name": "Tony"}""")))
val outputPair = List(List((1, """{"value":[{"name": "Steve"}, {"name": "Tony"}]}""")))
testOperation(inputPair, update _, outputPair, ordered = false)
}
}
The method ' testOperation ' takes the output of the operation performed on the 'inputPair' and check whether it is equal to the 'outputPair' and just like this, we can test our business logic.
This short snippet lets you test your business logic without forcing you to create even a Spark session. You can mock the whole streaming environment and test your business logic easily.
This was a simple example of unary operations on DStreams. Similarly, we can test binary operations and window operations on DStreams.
You can find the code here.
Published at DZone with permission of Anuj Saxena, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
What ChatGPT Needs Is Context
-
AI Technology Is Drastically Disrupting the Background Screening Industry
-
Competing Consumers With Spring Boot and Hazelcast
-
Implementing RBAC in Quarkus
Comments