Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spark Transformations for Pair RDD

DZone's Guide to

Spark Transformations for Pair RDD

Resilient Distributed Datasets (RDD) are an interesting piece of the Apache Spark puzzle. Learn how to transform the data they hold using Scala.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In this post, we will see some common transformation for Pair RDD.

Here is a use case you can relate with. Suppose you have a dataset that contains the number of employees at the regional level and you want to roll up to the division level, and you need to group those rows by division and sum the number of employees of all the regions in each division.

Spark gives you a specific RDD type called a key/value pair RDD for these use cases. 

Let's see some example of Key/Value Pair RDD Transformations:

1. Creating Key/Value Pair RDD: 
The pair RDD arranges the data of a row into two parts. The first part is the Key and the second part is the Value. In the below example, I used a parallelize method to create a RDD, and then I used the length method to create a Pair RDD. The key is the length of the each word and the value is the word itself.

scala> val rdd = sc.parallelize(List("hello","world","good","morning"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val pairRdd = rdd.map(a => (a.length,a))
pairRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26

scala> pairRdd.collect().foreach(println)
(5,hello)
(5,world)
(4,good)
(7,morning)

2. groupBykey(): 
This transformation groups all the rows with the same key into a single row. The number of rows in the resulting RDD will be the same as the number of rows of unique keys in the input RDD.  

scala> val groupPairKey = pairRdd.groupByKey()
groupPairKey: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[2] at groupByKey at <console>:28

scala> groupPairKey.collect().foreach(println)
(4,CompactBuffer(good))
(5,CompactBuffer(hello, world))
(7,CompactBuffer(morning))

Each row in the resulting RDD contains a unique key and a list of values of the same key.

3. reduceByKey(): 
This transformation reduce all the values of the same key to a single value. This process performs into two steps.

  • Group the values of the same key.

  • Apply the reduce function to the list of values of each key.

scala> val reducePairKey = pairRdd.reduceByKey((total,value)=> total + value)
reducePairKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at reduceByKey at <console>:28

scala> reducePairKey.collect()
res2: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))

4. sortByKey():
This transformation sorts the results according to the key. You can also specify that the results should be in ascending (default) or descending order.  

scala> reducePairKey.map(t => (t._1,t._2)).sortByKey().collect()
res4: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))

scala> reducePairKey.map(t => (t._1,t._2)).sortByKey(false).collect()
res5: Array[(Int, String)] = Array((7,morning), (5,helloworld), (4,good))

5. join(): 
This transformation is used to join the information of two datasets. By joining the dataset of type (K,V) and dataset (K,W), the result of the joined dataset is (K,(V,W)).

scala> val rdd1 = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),(105, 6.0),(165, 31.0), (110, 40.11)))
rdd1: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"),  (105, "a"),(165, "c")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24

scala> val join = rdd1.join(rdd2)
join: org.apache.spark.rdd.RDD[(Int, (Double, String))] = MapPartitionsRDD[32] at join at <console>:28

scala> join.collect().foreach(println)
(105,(6.0,a))
(165,(31.0,c))
(110,(50.35,a))
(110,(40.11,a))
(126,(211.0,b))
(127,(305.2,b))

Thank you for reading this article, I hope it was helpful to you.

References:

Beginning Apache Spark 2 With Resilient Distributed Datasets, Spark SQL, Structured Streaming and Spark Machine Learning Library

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
spark ,scala ,big data ,pair rdd tutorial ,apache spark tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}