Spark Transformations for Pair RDD
Resilient Distributed Datasets (RDD) are an interesting piece of the Apache Spark puzzle. Learn how to transform the data they hold using Scala.
Join the DZone community and get the full member experience.
Join For FreeIn this post, we will see some common transformation for Pair RDD.
Here is a use case you can relate with. Suppose you have a dataset that contains the number of employees at the regional level and you want to roll up to the division level, and you need to group those rows by division and sum the number of employees of all the regions in each division.
Spark gives you a specific RDD type called a key/value pair RDD for these use cases.
Let's see some example of Key/Value Pair RDD Transformations:
1. Creating Key/Value Pair RDD:
The pair RDD arranges the data of a row into two parts. The first part is the Key and the second part is the Value. In the below example, I used a parallelize
method to create a RDD, and then I used the length
method to create a Pair RDD. The key is the length of the each word and the value is the word itself.
scala> val rdd = sc.parallelize(List("hello","world","good","morning"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val pairRdd = rdd.map(a => (a.length,a))
pairRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26
scala> pairRdd.collect().foreach(println)
(5,hello)
(5,world)
(4,good)
(7,morning)
2. groupBykey():
This transformation groups all the rows with the same key into a single row. The number of rows in the resulting RDD will be the same as the number of rows of unique keys in the input RDD.
scala> val groupPairKey = pairRdd.groupByKey()
groupPairKey: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[2] at groupByKey at <console>:28
scala> groupPairKey.collect().foreach(println)
(4,CompactBuffer(good))
(5,CompactBuffer(hello, world))
(7,CompactBuffer(morning))
Each row in the resulting RDD contains a unique key and a list of values of the same key.
3. reduceByKey():
This transformation reduce all the values of the same key to a single value. This process performs into two steps.
Group the values of the same key.
Apply the
reduce
function to the list of values of each key.
scala> val reducePairKey = pairRdd.reduceByKey((total,value)=> total + value)
reducePairKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at reduceByKey at <console>:28
scala> reducePairKey.collect()
res2: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))
4. sortByKey():
This transformation sorts the results according to the key. You can also specify that the results should be in ascending (default) or descending order.
scala> reducePairKey.map(t => (t._1,t._2)).sortByKey().collect()
res4: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))
scala> reducePairKey.map(t => (t._1,t._2)).sortByKey(false).collect()
res5: Array[(Int, String)] = Array((7,morning), (5,helloworld), (4,good))
5. join():
This transformation is used to join the information of two datasets. By joining the dataset of type (K,V) and dataset (K,W), the result of the joined dataset is (K,(V,W)).
scala> val rdd1 = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),(105, 6.0),(165, 31.0), (110, 40.11)))
rdd1: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"), (105, "a"),(165, "c")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> val join = rdd1.join(rdd2)
join: org.apache.spark.rdd.RDD[(Int, (Double, String))] = MapPartitionsRDD[32] at join at <console>:28
scala> join.collect().foreach(println)
(105,(6.0,a))
(165,(31.0,c))
(110,(50.35,a))
(110,(40.11,a))
(126,(211.0,b))
(127,(305.2,b))
Thank you for reading this article, I hope it was helpful to you.
References:
Opinions expressed by DZone contributors are their own.
Comments