DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Databases
  4. Apache Kafka + Spark Streaming Integration

Apache Kafka + Spark Streaming Integration

Let's take a look at Apache Kafka + Spark Streaming integration and also explore the receiver-based approach.

Rinu Gour user avatar by
Rinu Gour
·
Oct. 17, 18 · Tutorial
Like (5)
Save
Tweet
Share
19.00K Views

Join the DZone community and get the full member experience.

Join For Free

Spark Streaming Integration

Kafka-Spark Streaming Integration

In Apache Kafka-Spark Streaming Integration, there are two approaches to configure Spark Streaming to receive data from Kafka i.e. Kafka Spark Streaming Integration. First is by using Receivers and Kafka’s high-level API, and a second, as well as new approach, is without using Receivers. There are different programming models for both the approaches, such as performance characteristics and semantics guarantees.

Kafka- Spark Streaming Integration

Let’s study both approaches in detail.

Receiver-Based Approach

Here, we use a Receiver to receive the data. So, by using the Kafka high-level consumer API, we implement the Receiver. Further, the received data is stored in Spark executors. Then jobs launched by Kafka-Spark Streaming processes the data.

Although, it is a possibility that this approach can lose data under failures under default configuration. Hence, we have to additionally enable write-ahead logs in Kafka Spark Streaming, to ensure zero-data-loss. That saves all the received Kafka data into write-ahead logs on a distributed file system synchronously. In this way, it is possible to recover all the data on failure.

Further, we will discuss how to use this Receiver-Based Approach in our Kafka-Spark Streaming application.

Linking

Now, link your Kafka streaming application with the following artifact, for Scala/Java applications using SBT/Maven project definitions.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

However, we will have to add this above library and its dependencies when deploying our application, for Python applications.

Programming

Afterward, create an input DStream by importing KafkaUtils in the streaming application code:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
    [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Also, using variations of createStream, we can specify the key and value classes and their corresponding decoder classes.

Deploying

As with any Spark applications, spark-submit is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.

Moreover, using –packages spark-streaming-Kafka-0-8_2.11 and its dependencies can be directly added to spark-submit, for Python applications, which lack SBT/Maven project management.

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

Also, we can also download the JAR of the Maven artifact spark-streaming-Kafka-0-8-assembly from the Maven repository. Then add it to spark-submit with –jars.

Direct Approach (No Receivers)

After Receiver-Based Approach, new receiver-less “direct” approach has been introduced. It ensures stronger end-to-end guarantees. This approach periodically queries Kafka for the latest offsets in each topic+partition, rather than using receivers to receive data. Also, defines the offset ranges to process in each batch, accordingly. Moreover, to read the defined ranges of offsets from Kafka, it’s simple consumer API is used, especially when the jobs to process the data are launched. However, it is similar to read files from a file system.

Note: This feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

Now, let’s discuss how to use this approach in our streaming application.

To learn more about Consumer API follow the below link:

Linking

However, this approach is supported only in Scala/Java application. With the following artifact, link the SBT/Maven project.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

Programming

Further, import KafkaUtils and create an input DStream, in the streaming application code:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

We must specify either metadata.broker.list or bootstrap.servers, in the Kafka parameters. Hence, it will start consuming from the latest offset of each Kafka partition, by default. Although, it will start consuming from the smallest offset if you set configuration auto.offset.reset in Kafka parameters to smallest.

Moreover, using other variations of KafkaUtils.createDirectStream we can start consuming from an arbitrary offset. Afterward, do the following to access the Kafka offsets consumed in each batch.

// Hold a reference to the current offset ranges, so downstream can use it
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.map {
          ...
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
  ...
}

If we want Zookeeper-based Kafka monitoring tools to show the progress of the streaming application, we can use this to update Zookeeper ourself.

Deploying

Here, the deploying process is similar to deploying process of Receiver-Based Approach.

Advantages of Direct Approach

There are following advantages of 2nd approach over 1st approach in Spark Streaming Integration with Kafka:

Kafka- Spark Streaming Integration

Simplified Parallelism

There is no requirement to create multiple input Kafka streams and union them. However, Kafka-Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, with the direct stream. That will read data from Kafka in parallel. Hence, we can say, it is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

Efficiency

Achieving zero-data-loss in the first approach required the data to be stored in a write-ahead log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice – once by Kafka, and a second time by the write-ahead log. The second approach eliminates the problem as there is no receiver, and hence no need for write-ahead logs. As long as we have sufficient Kafka retention, it is possible to recover messages from Kafka.

Exactly-Once Semantics

Basically, we used Kafka’s high-level API to store consumed offsets in Zookeeper in the first approach. However, to consume data from Kafka this is a traditional way. Even if it can ensure zero data loss, there is a small chance some records may get consumed twice under some failures. It happens due to inconsistencies between data reliably received by Kafka-Spark Streaming and offsets tracked by Zookeeper. Therefore, we use a simple Kafka API that does not use Zookeeper, in this second approach. Here, Kafka-Spark Streaming tracks the offsets, through its checkpoints. That removes inconsistencies between Spark Streaming and Zookeeper/Kafka.

Thus each record is received by Spark Streaming effectively exactly once despite failures. Hence, make sure our output operation that saves the data to an external data store must be either idempotent or an atomic transaction that saves results and offsets. That helps to achieve exactly-once semantics for the output of our results.

Although, there is one disadvantage also, that it does not update offsets in Zookeeper, thus Zookeeper-based Kafka monitoring tools will not show progress. But still, we can access the offsets processed by this approach in each batch and update Zookeeper yourself.

So, this was all about Apache Kafka-Spark Streaming Integration. Hope you like our explanation.

Conclusion

Hence, in this Kafka-Spark Streaming Integration, we have learned the whole concept of Spark Streaming Integration with Apache Kafka in detail. Also, we discussed two different approaches for Kafka-Spark Streaming configuration and that are Receiving Approach and Direct Approach. Moreover, we discussed the advantages of the Direct Approach. Furthermore, if any doubt occurs, feel free to ask in the comments section.

kafka Integration application Data (computing) Database API

Published at DZone with permission of Rinu Gour. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • The Future of Cloud Engineering Evolves
  • Using the PostgreSQL Pager With MariaDB Xpand
  • Spring Cloud: How To Deal With Microservice Configuration (Part 1)
  • Kotlin Is More Fun Than Java And This Is a Big Deal

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: