DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • An Overview of SQL Server Joins
  • How to Pivot and Join Time Series Data in Flux
  • Top 10 Web Developer Communities Developers Should Join in 2021
  • MongoDB to Couchbase: An Introduction to Developers and Experts

Trending

  • Measuring the Impact of AI on Software Engineering Productivity
  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  • AI's Dilemma: When to Retrain and When to Unlearn?
  • Unlocking Data with Language: Real-World Applications of Text-to-SQL Interfaces
  1. DZone
  2. Data Engineering
  3. Databases
  4. Spark Structured Streaming Joins

Spark Structured Streaming Joins

In this artipostle, we'll look at the different joins available in Spark Structured Streaming and how to implement them using Scala.

By 
Sylvester Daniel user avatar
Sylvester Daniel
·
Apr. 02, 19 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
23.7K Views

Join the DZone community and get the full member experience.

Join For Free

Objective

In this article, we'll look at the different joins available in Spark Structured Streaming. In a streaming job, you may have multiple static and streaming data sources. You may have to join them to implement various functionalities. We will see how Spark Structured Streaming handles various types of joins with both static and streaming datasets.

Overview

The table below gives you an overview of different types of streams on the left/right and what is supported/unsupported in Structured Streaming.

Inner Join

From the table above, it's clear that inner join is supported irrespective of whether it is a static or streaming dataset. An inner join on streaming and static datasets doesn’t have to be stateful as the set of records in the stream will be matched with a static set of records. An inner join with a static and streaming join is not stateful, whereas when you inner join two streaming datasets, optionally you can watermark and add a time constraint to it. We will discuss the inner joining of two streaming datasets in a later section.

Full Outer Join

A full outer join is supported only when two static datasets are joined. From the table below, it’s clear that a full outer join is not supported if a streaming dataset is involved. One of the reasons for not supporting a full outer join in any of the streaming datasets is due to the fact that both left and right don’t have to wait for each other's matching data to arrive. In streaming, due to late arriving data, a given record may not match with its corresponding record in the same window and this may result in very inconsistent output when you use a full outer join.

Static Data and Static Joins

When a static dataset is used in a streaming job the data is loaded only once at the beginning of the job. When you join two static datasets all types of joins are supported. We will not go into detail about the two types of static join datasets in this article.

Streaming Data and Static Joins

When a streaming dataset and a static dataset are used in the above configuration, then only an inner join and a left outer join are supported. Right outer joins and full outer joins are not supported.

In my view, the right outer join is not required, as only static data will be returned as output in that micro-batch, which may not be of much use. For the same reason, I believe the full outer join is also not supported. A left outer join, on its own, is good enough to return a streaming record when there is no matching static record.

Inner joins and left outer joins on streaming and static datasets don’t have to be stateful, as the streaming set of records in a given window can be matched with a static set of records.

Sample Code

Refer to the code below to join a streaming dataset with a static dataset. Checkout working examples here.

 val leftOuterJoinDS  =  streamingEmployeeDS.join(staticDepartmentDS, $"departmentId" === $"id", "left_outer")

Static Data and Stream Joins

When we have a  static dataset on the left and a streaming dataset on the right, then only an inner join and a right outer join are supported. Left outer joins and full outer joins are not supported.

In my view, a left outer join is not required, as only static data will be returned as output in that micro-batch, which may not be of any use. For the same reason, I believe the full outer join is also not supported. Right outer joins, alone, are good enough to return a streaming record when there is no matching static record.

Inner joins and right outer joins on static and streaming datasets don’t have to be stateful, as the streaming set of records in a given window will be matched with a static set of records.

Sample Code

Refer to the code below to join a static dataset with streaming dataset. Checkout working examples here.

val rightOuterJoinDS =  staticDepartmentDS  .join(streamingEmployeeDS, $"id" === $"departmentId", "right_outer")

Streams and Stream Joins

Joining two streaming datasets is supported only from Spark version 2.3 on.

Stream — Stream (Inner Join)

Add description

When you inner join two streaming datasets watermarking and time constraint is optional. If watermark and time constraints are not specified then data is stored in the state indefinitely. Setting watermark on both sides and time constraint will enable state cleanup accordingly.

Stream to Stream (Left and Right Outer Join)

Both left and right outer joins are conditionally supported in structured streaming. When you left/right outer joins, two streaming datasets watermarking and time constraints are mandatory. When you use a left outer join, you have to make sure the dataset on the righthand side is watermarked with a time constraint and vice-versa. I strongly recommend watermarking both sides for state cleanup.

Watermark vs. Time Constraint

It’s very important to understand the difference between watermark and time constraint. Watermarking a stream decides how delayed a record can arrive and gives a timeline after which the records can be dropped. For example, if you set a watermark for 30 minutes, then records older than 30 minutes will be dropped/ignored. Time constraints decide how long records will be retained in Spark's state in correlation to the other stream.

In the example above, when the following values are set, then Spark will automatically decide Spark's state duration as three hours for stream 1 and one hour for stream 2.

Stream 1 watermark = 1 hour

Stream 2 watermark = 2 hours

Time constraint: Stream 1 time >= stream 2 time AND stream 1 time <= stream 2 time + interval 1 hour

Sample Code

Refer code below to join two streaming dataset. Checkout working examples here.

Watermark

val employeeStreamDS = rateSourceData.withWatermark("empTimestamp", "10 seconds")

Time constraint

val joinedDS  =  departmentStreamDS.join(employeeStreamDS, expr("""    id = departmentId AND    empTimestamp >= depTimestamp - interval 1 minutes AND    empTimestamp <= depTimestamp + interval 1 minutes    """  ), "left_outer")

Summary

For two static streams, all types of joins are supported. When a static and stream is joined, inner join and the side of stream dataset is supported, i.e. if a streaming dataset is on the left then a left outer join is supported, otherwise, a right outer join is supported. When two streams are joined, inner, left outer, and right outer joins are conditionally supported. It is recommended to apply watermarking on both sides and time constraints for all joins.

Joins (concurrency library) Database SPARK (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • An Overview of SQL Server Joins
  • How to Pivot and Join Time Series Data in Flux
  • Top 10 Web Developer Communities Developers Should Join in 2021
  • MongoDB to Couchbase: An Introduction to Developers and Experts

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!