Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Kafka + Spark Streaming Integration

DZone 's Guide to

Apache Kafka + Spark Streaming Integration

Let's take a look at Apache Kafka + Spark Streaming integration and also explore the receiver-based approach.

· Integration Zone ·
Free Resource

Spark Streaming Integration

Kafka-Spark Streaming Integration

In Apache Kafka-Spark Streaming Integration, there are two approaches to configure Spark Streaming to receive data from Kafka i.e. Kafka Spark Streaming Integration. First is by using Receivers and Kafka’s high-level API, and a second, as well as new approach, is without using Receivers. There are different programming models for both the approaches, such as performance characteristics and semantics guarantees.

Kafka- Spark Streaming Integration

Let’s study both approaches in detail.

Receiver-Based Approach

Here, we use a Receiver to receive the data. So, by using the Kafka high-level consumer API, we implement the Receiver. Further, the received data is stored in Spark executors. Then jobs launched by Kafka-Spark Streaming processes the data.

Although, it is a possibility that this approach can lose data under failures under default configuration. Hence, we have to additionally enable write-ahead logs in Kafka Spark Streaming, to ensure zero-data-loss. That saves all the received Kafka data into write-ahead logs on a distributed file system synchronously. In this way, it is possible to recover all the data on failure.

Further, we will discuss how to use this Receiver-Based Approach in our Kafka-Spark Streaming application.

Linking

Now, link your Kafka streaming application with the following artifact, for Scala/Java applications using SBT/Maven project definitions.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

However, we will have to add this above library and its dependencies when deploying our application, for Python applications.

Programming

Afterward, create an input DStream by importing KafkaUtils in the streaming application code:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
    [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Also, using variations of createStream, we can specify the key and value classes and their corresponding decoder classes.

Deploying

As with any Spark applications, spark-submit is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.

Moreover, using –packages spark-streaming-Kafka-0-8_2.11 and its dependencies can be directly added to spark-submit, for Python applications, which lack SBT/Maven project management.

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

Also, we can also download the JAR of the Maven artifact spark-streaming-Kafka-0-8-assembly from the Maven repository. Then add it to spark-submit with –jars.

Direct Approach (No Receivers)

After Receiver-Based Approach, new receiver-less “direct” approach has been introduced. It ensures stronger end-to-end guarantees. This approach periodically queries Kafka for the latest offsets in each topic+partition, rather than using receivers to receive data. Also, defines the offset ranges to process in each batch, accordingly. Moreover, to read the defined ranges of offsets from Kafka, it’s simple consumer API is used, especially when the jobs to process the data are launched. However, it is similar to read files from a file system.

Note: This feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

Now, let’s discuss how to use this approach in our streaming application.

To learn more about Consumer API follow the below link:

Linking

However, this approach is supported only in Scala/Java application. With the following artifact, link the SBT/Maven project.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

Programming

Further, import KafkaUtils and create an input DStream, in the streaming application code:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

We must specify either metadata.broker.list or bootstrap.servers, in the Kafka parameters. Hence, it will start consuming from the latest offset of each Kafka partition, by default. Although, it will start consuming from the smallest offset if you set configuration auto.offset.reset in Kafka parameters to smallest.

Moreover, using other variations of KafkaUtils.createDirectStream we can start consuming from an arbitrary offset. Afterward, do the following to access the Kafka offsets consumed in each batch.

// Hold a reference to the current offset ranges, so downstream can use it
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.map {
          ...
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
  ...
}

If we want Zookeeper-based Kafka monitoring tools to show the progress of the streaming application, we can use this to update Zookeeper ourself.

Deploying

Here, the deploying process is similar to deploying process of Receiver-Based Approach.

Advantages of Direct Approach

There are following advantages of 2nd approach over 1st approach in Spark Streaming Integration with Kafka:

Kafka- Spark Streaming Integration

Simplified Parallelism

There is no requirement to create multiple input Kafka streams and union them. However, Kafka-Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, with the direct stream. That will read data from Kafka in parallel. Hence, we can say, it is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

Efficiency

Achieving zero-data-loss in the first approach required the data to be stored in a write-ahead log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice – once by Kafka, and a second time by the write-ahead log. The second approach eliminates the problem as there is no receiver, and hence no need for write-ahead logs. As long as we have sufficient Kafka retention, it is possible to recover messages from Kafka.

Exactly-Once Semantics

Basically, we used Kafka’s high-level API to store consumed offsets in Zookeeper in the first approach. However, to consume data from Kafka this is a traditional way. Even if it can ensure zero data loss, there is a small chance some records may get consumed twice under some failures. It happens due to inconsistencies between data reliably received by Kafka-Spark Streaming and offsets tracked by Zookeeper. Therefore, we use a simple Kafka API that does not use Zookeeper, in this second approach. Here, Kafka-Spark Streaming tracks the offsets, through its checkpoints. That removes inconsistencies between Spark Streaming and Zookeeper/Kafka.

Thus each record is received by Spark Streaming effectively exactly once despite failures. Hence, make sure our output operation that saves the data to an external data store must be either idempotent or an atomic transaction that saves results and offsets. That helps to achieve exactly-once semantics for the output of our results.

Although, there is one disadvantage also, that it does not update offsets in Zookeeper, thus Zookeeper-based Kafka monitoring tools will not show progress. But still, we can access the offsets processed by this approach in each batch and update Zookeeper yourself.

So, this was all about Apache Kafka-Spark Streaming Integration. Hope you like our explanation.

Conclusion

Hence, in this Kafka-Spark Streaming Integration, we have learned the whole concept of Spark Streaming Integration with Apache Kafka in detail. Also, we discussed two different approaches for Kafka-Spark Streaming configuration and that are Receiving Approach and Direct Approach. Moreover, we discussed the advantages of the Direct Approach. Furthermore, if any doubt occurs, feel free to ask in the comments section.

Topics:
kafka ,apache kafka ,spark streaming ,scala ,java ,zookeeper ,integration ,integration tutorial ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}