Kafka and Spark Streams: Living Happily Ever After

DZone 's Guide to

Kafka and Spark Streams: Living Happily Ever After

Learn how to use Spark Streaming to stream, transform, and transport data between Kafka topics.

· Big Data Zone ·
Free Resource

Today, we are going to get to understand a bit about using Spark Streaming to transform and transport data between Kafka topics.

The demand for stream processing is increasing every day. The reason is that often, processing big volumes of data is not enough. We need real-time processing of data, especially when we need to handle continuously increasing volumes of data and also need to process it and maintain it.

Image result for let the data flow meme'

Recently, in one of our projects, we faced such a requirement. Myself, being a newbie to Apache Spark, had only a little idea about what to do. So, I considered the best option to be referring to the Apache Spark documentation. It did help me understand the basic concepts of Spark, streaming, and transporting data using streams.

To give you a heads up, Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

It provides a high-level abstraction called a discretized stream, or DStream, which represents a continuous stream of data.

DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs (Resilient Distributed Datasets).

Although in Spark 2.0, structured streaming was introduced, a lot of big companies still use the DStreams — the original Spark Streams.

Image result for spark streaming memeWe were a bit confused, but after a lot of discussions, we decided to go with the DStreams, only because they seemed to be the more logical choice.

So, our next step was integrating the Kafka with Spark Streaming.

This is where our challenge began. Using the help of the documentation, I started the coding journey.

  • The very first thing we require is a SparkConf object that loads defaults from system properties and the classpath.
    val conf = new SparkConf().setMaster("local[*]")
    Note: local[*] gives it access to all the available cores. It can be changed as per requirement.
  • The next important thing is the KafkaParams, which is nothing but a Map[String, Object].
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer], 
    // used as i am using a string serializer for the input kafka topic
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-demo",
      "kafka.consumer.id" -> "kafka-consumer-01"
    These are required by the input stream to know the whereabouts of the input data and how to fetch it.
  • Next, we require a StreamingContext that will take SparkConf as one of the arguments:
    val ssc = new StreamingContext(conf, Seconds(1))
  • Once we are through with StreamingContext, we now require an input stream that will actually stream the data (requires KafkaParams):
    val inputStream = KafkaUtils.createDirectStream(ssc,
      PreferConsistent, Subscribe[String, String](Array(inputTopic), kafkaParams))
    This stream now contains the consumer records, i.e the records from the input topic. Here, I am not playing with maintaining the state or checkpointing because that is a very different topic that has no reference here, and also, you can separately integrate it as well.
  • Once we have inputStream, we can perform our required operations on that stream (operations like flatMap, map, filter, etc.) to get yourself a processed stream, i.e. the stream now contains the modified/transformed data.
    val processedStream = inputStream.map(record => record.value) //Any operation can be performed here.
      //checking the batches for data
  • Once you are done with the operations, you will get a processedStream, which we now need to store in an output Kafka topic. Now, this is where I was not sure. I went through a lot of blogs but couldn’t find any decent solution. But finally, after a lot of digging, I found a simple solution.
    rdd.foreach {
      case data: String => {
        val message = new ProducerRecord[String, String](outputTopic, data)
    //producer here is the new producer  and the outputTopic is the Kafka topic where we need to store the processed data
  • Finally, your messages in the processed stream will be added to the output Kafka topic using the producer in the above case.

A piece of cake. Right?

Image result for easy meme

We have also added our code for reference, which you can find here.

To know more about the Spark Streaming and its integration with Kafka, you can refer here.

In this article, we tried to understand how we can use Spark Streaming to stream data between Kafka topics. I hope you enjoyed reading this article. Stay tuned for more.

big data ,kafka ,real-time data ,spark ,stream processing ,streaming data

Published at DZone with permission of Anmol Sarna , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}