Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spark Stream-Stream Join

DZone's Guide to

Spark Stream-Stream Join

In this post, we are going to look how Spark now gives support for developers and data scientists to join two streaming data frames.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this post, we are going to see how beautifully Spark now gives support for joining the two streaming data frames.

I this example, I am going to use:

Apache Spark 2.3.0
Apache Kafka 0.11.0.1
Scala 2.11.8

The build.sbt, our code looks like the following:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "0.11.0.1")

To create the two streaming data frames, I am going to send the data to Kafka with some regular time interval in two separate topics, here I've named them ‘dataTopic1’ and ‘dataTopic2.’

For sending the data, first I simply make the list of integers and send these integers from the list to the Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
}

In the same way, I send the data to the second Kafka topic called “dataTopic2.”

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
})

After sending the data to Kafka, I start reading the data frame from the topic like I readdataFrame1 from the Kafka topic “dataTopic1” as follow:

val dataFrame1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I readdataFrame2 from the Kafka topic “dataTopic2.” After that, apply the join on the data frame column called “data” as follows:

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming data frames into one, whose schema is as follow:

root
 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the data frames, it will give us the two timestamps, one for each of the data. Because of this, the final output is as follows:

image

If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
apache spark ,big data ,kafka ,spark streaming ,data frames

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}