Spark Stream-Stream Join

DZone 's Guide to

Spark Stream-Stream Join

In this post, we are going to look how Spark now gives support for developers and data scientists to join two streaming data frames.

· Big Data Zone ·
Free Resource

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this post, we are going to see how beautifully Spark now gives support for joining the two streaming data frames.

I this example, I am going to use:

Apache Spark 2.3.0
Apache Kafka
Scala 2.11.8

The build.sbt, our code looks like the following:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "")

To create the two streaming data frames, I am going to send the data to Kafka with some regular time interval in two separate topics, here I've named them ‘dataTopic1’ and ‘dataTopic2.’

For sending the data, first I simply make the list of integers and send these integers from the list to the Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)

In the same way, I send the data to the second Kafka topic called “dataTopic2.”

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)

After sending the data to Kafka, I start reading the data frame from the topic like I readdataFrame1 from the Kafka topic “dataTopic1” as follow:

val dataFrame1 = spark.readStream
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I readdataFrame2 from the Kafka topic “dataTopic2.” After that, apply the join on the data frame column called “data” as follows:

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming data frames into one, whose schema is as follow:

 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the data frames, it will give us the two timestamps, one for each of the data. Because of this, the final output is as follows:


If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

apache spark, big data, data frames, kafka, spark streaming

Published at DZone with permission of Ayush Tiwari , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}