{{announcement.body}}
{{announcement.title}}

PySpark Join Explained

DZone 's Guide to

PySpark Join Explained

Everything you need to know about PySpark's Join method.

· Big Data Zone ·
Free Resource

moon-behind-clouds


PySpark provides multiple ways to combine dataframes i.e. join, merge, union, SQL interface, etc. In this article, we will take a look at how the PySpark join function is similar to SQL join, where two or more tables or dataframes can be combined based on conditions. 

Let's take a look at some of the join operations supported by PySpark with examples. First, create two dataframes from Python Dictionary, we will be using these two dataframes in this article.  

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('joins_example').getOrCreate()
sc = spark.sparkContext

dataset1 = [
  {
  'key' : 'abc',
  'val11' : 1.1,
  'val12' : 1.2
  },
  {
  'key' : 'def',
  'val11' : 3.0,
  'val12' : 3.4
  }
]

dataset2 = [
  {
  'key' : 'abc',
  'val21' : 2.1,
  'val22' : 2.2
  },
  {
  'key' : 'xyz',
  'val21' : 3.1,
  'val22' : 3.2
  }
]

rdd1 = sc.parallelize(dataset1)
df1 = spark.createDataFrame(rdd1)
print('df1')
df1.show()

rdd2 = sc.parallelize(dataset2)
df2 = spark.createDataFrame(rdd2)
print('df2')
df2.show()

##################################################################################

df1
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
+---+-----+-----+

df2
+---+-----+-----+
|key|val21|val22|
+---+-----+-----+
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+


The following kinds of joins are explained in this article. 

  • Inner Join.

  • Outer Join.

  • Left Join.

  • Right Join.

  • Left Semi Join.

  • Left Anti Join.

  • Inner Join with advance conditions.

You may also like: PySpark Tutorial: Learn Apache Spark Using Python.

Let's take detailed look in each of them. 

df = df1.join(df2, on=['key'], how='inner')
df.show()


The inner join selects matching records from both of the dataframes. Match is performed on column(s) specified in the on parameter. In this example, both dataframes are joined when the column named key  has same value, i.e. 'abc.'

df = df1.join(df2, on=['key'], how='inner')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
+---+-----+-----+-----+-----+


Outer Join

Outer join combines data from both dataframes, irrespective of 'on' column matches or not. If there is a match combined, one row is created if there is no match missing columns for that row are filled with null.  

df = df1.join(df2, on=['key'], how='outer')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null|  3.1|  3.2|
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+


Left Join

Left join will choose all the data from the left dataframe (i.e. df1 in this example) and perform matches on column name key. If a match is found, values are filled from the matching row, and if not found, unavailable values are filled with null

df = df1.join(df2, on=['key'], how='left')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+


Right Join

This is the same as the left join operation performed on right side dataframe, i.e df2 in this example.  

df = df1.join(df2, on=['key'], how='right')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null|  3.1|  3.2|
|abc|  1.1|  1.2|  2.1|  2.2|
+---+-----+-----+-----+-----+


Left Semi Join

This is like inner join, with only the left dataframe columns and values are selected.

df  = df1.join(df2, on=['key'], how='left_semi')
df.show()

##################################################################################

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
+---+-----+-----+


Left Anti Join

This join is like df1-df2, as it selects all rows from df1 that are not present in df2.   

df  = df1.join(df2, on=['key'], how='left_anti')
df.show()


##################################################################################

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|def|  3.0|  3.4|
+---+-----+-----+


Inner Join With Advanced Conditions

In addition, PySpark provides conditions that can be specified instead of the 'on' parameter. For example, if you want to join based on range in Geo Location-based data, you may want to choose latitude longitude ranges.

print('Inner join with condition df1.key == df2.key')
df  = df1.join(df2, df1.key == df2.key, how='inner')
df.show()
print('Inner join with condition df1.key > df2.key')
df  = df1.join(df2, df1.key > df2.key, how='inner')
df.show()
print('Inner join with multiple conditions [df1.val11 < df2.val21, df1.val12 < df2.val22]')
df  = df1.join(df2, [df1.val11 < df2.val21, df1.val12 < df2.val22], how='inner')
df.show()
print('Inner join with multiple or conditions (df1.val11 > df2.val21) | (df1.val12 < df2.val22)')
df  = df1.join(df2, [(df1.val11 < df2.val21) | (df1.val12 > df2.val22)], how='inner')
df.show()

##################################################################################

Inner join with condition df1.key == df2.key
+---+-----+-----+---+-----+-----+
|key|val11|val12|key|val21|val22|
+---+-----+-----+---+-----+-----+
|abc|  1.1|  1.2|abc|  2.1|  2.2|
+---+-----+-----+---+-----+-----+

Inner join with condition df1.key > df2.key
+---+-----+-----+---+-----+-----+
|key|val11|val12|key|val21|val22|
+---+-----+-----+---+-----+-----+
|def|  3.0|  3.4|abc|  2.1|  2.2|
+---+-----+-----+---+-----+-----+

Inner join with multiple conditions [df1.val11 < df2.val21, df1.val12 < df2.val22]
+---+-----+-----+---+-----+-----+
|key|val11|val12|key|val21|val22|
+---+-----+-----+---+-----+-----+
|abc|  1.1|  1.2|abc|  2.1|  2.2|
|abc|  1.1|  1.2|xyz|  3.1|  3.2|
+---+-----+-----+---+-----+-----+

Inner join with multiple or conditions (df1.val11 > df2.val21) | (df1.val12 < df2.val22)
+---+-----+-----+---+-----+-----+
|key|val11|val12|key|val21|val22|
+---+-----+-----+---+-----+-----+
|abc|  1.1|  1.2|abc|  2.1|  2.2|
|abc|  1.1|  1.2|xyz|  3.1|  3.2|
|def|  3.0|  3.4|abc|  2.1|  2.2|
|def|  3.0|  3.4|xyz|  3.1|  3.2|
+---+-----+-----+---+-----+-----+


I hope this article helps you understand some functionalities that PySpark joins provide.  


Further Reading

Topics:
pyspark ,join ,big data ,tutorial ,data ,dataframe ,python ,r

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}