Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spark Stream-Stream Join

DZone's Guide to

Spark Stream-Stream Join

In this post, we are going to look how Spark now gives support for developers and data scientists to join two streaming data frames.

· Big Data Zone ·
Free Resource

The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this post, we are going to see how beautifully Spark now gives support for joining the two streaming data frames.

I this example, I am going to use:

Apache Spark 2.3.0
Apache Kafka 0.11.0.1
Scala 2.11.8

The build.sbt, our code looks like the following:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
  "org.apache.kafka" % "kafka-clients" % "0.11.0.1")

To create the two streaming data frames, I am going to send the data to Kafka with some regular time interval in two separate topics, here I've named them ‘dataTopic1’ and ‘dataTopic2.’

For sending the data, first I simply make the list of integers and send these integers from the list to the Kafka topic with some regular time intervals, as follows.

val records1 = (1 to 10).toList
records1.foreach { record =>
  val producerRecord = new ProducerRecord[String, String]("dataTopic1", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
}

In the same way, I send the data to the second Kafka topic called “dataTopic2.”

val records2 = (5 to 15).toList
records2.foreach(record => {
  val producerRecord = new ProducerRecord[String, String]("dataTopic2", record.toString)
  producer.send(producerRecord)
  Thread.sleep(200)
})

After sending the data to Kafka, I start reading the data frame from the topic like I readdataFrame1 from the Kafka topic “dataTopic1” as follow:

val dataFrame1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootStrapServers)
  .option("subscribe", "dataTopic1")
  .option("includeTimestamp", value = true)
val streamingDf1 = dataFrame1.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(col("value").cast("Integer").as("data"), col("timestamp").as("timestamp1"))
  .select("data", "timestamp1")

In the same way, I readdataFrame2 from the Kafka topic “dataTopic2.” After that, apply the join on the data frame column called “data” as follows:

val streamingDfAfterJoin: DataFrame = streamingDf1.join(streamingDf2, "data")

This will join these two streaming data frames into one, whose schema is as follow:

root
 |-- data: integer (nullable = true)
 |-- timestamp1: timestamp (nullable = true)
 |-- timestamp2: timestamp (nullable = true)

Since it found the same value of “data” in both the data frames, it will give us the two timestamps, one for each of the data. Because of this, the final output is as follows:

image

If you want to run this code on your own, you can find this example here – GitHub repo. Before running this example, make sure your Kafka server is already running.

Managing data at scale doesn’t have to be hard. Find out how the completely free, open source HPCC Systems platform makes it easier to update, easier to program, easier to integrate data, and easier to manage clusters. Download and get started today.

Topics:
apache spark ,big data ,kafka ,spark streaming ,data frames

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}