DZone
Big Data Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Big Data Zone > Spark Transformations for Pair RDD

Spark Transformations for Pair RDD

Resilient Distributed Datasets (RDD) are an interesting piece of the Apache Spark puzzle. Learn how to transform the data they hold using Scala.

Teena Vashist user avatar by
Teena Vashist
·
Dec. 03, 18 · Big Data Zone · Tutorial
Like (3)
Save
Tweet
25.08K Views

Join the DZone community and get the full member experience.

Join For Free

In this post, we will see some common transformation for Pair RDD.

Here is a use case you can relate with. Suppose you have a dataset that contains the number of employees at the regional level and you want to roll up to the division level, and you need to group those rows by division and sum the number of employees of all the regions in each division.

Spark gives you a specific RDD type called a key/value pair RDD for these use cases. 

Let's see some example of Key/Value Pair RDD Transformations:

1. Creating Key/Value Pair RDD: 
The pair RDD arranges the data of a row into two parts. The first part is the Key and the second part is the Value. In the below example, I used a parallelize method to create a RDD, and then I used the length method to create a Pair RDD. The key is the length of the each word and the value is the word itself.

scala> val rdd = sc.parallelize(List("hello","world","good","morning"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val pairRdd = rdd.map(a => (a.length,a))
pairRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26

scala> pairRdd.collect().foreach(println)
(5,hello)
(5,world)
(4,good)
(7,morning)

2. groupBykey(): 
This transformation groups all the rows with the same key into a single row. The number of rows in the resulting RDD will be the same as the number of rows of unique keys in the input RDD.  

scala> val groupPairKey = pairRdd.groupByKey()
groupPairKey: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[2] at groupByKey at <console>:28

scala> groupPairKey.collect().foreach(println)
(4,CompactBuffer(good))
(5,CompactBuffer(hello, world))
(7,CompactBuffer(morning))

Each row in the resulting RDD contains a unique key and a list of values of the same key.

3. reduceByKey(): 
This transformation reduce all the values of the same key to a single value. This process performs into two steps.

  • Group the values of the same key.

  • Apply the reduce function to the list of values of each key.

scala> val reducePairKey = pairRdd.reduceByKey((total,value)=> total + value)
reducePairKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at reduceByKey at <console>:28

scala> reducePairKey.collect()
res2: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))

4. sortByKey():
This transformation sorts the results according to the key. You can also specify that the results should be in ascending (default) or descending order.  

scala> reducePairKey.map(t => (t._1,t._2)).sortByKey().collect()
res4: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))

scala> reducePairKey.map(t => (t._1,t._2)).sortByKey(false).collect()
res5: Array[(Int, String)] = Array((7,morning), (5,helloworld), (4,good))

5. join(): 
This transformation is used to join the information of two datasets. By joining the dataset of type (K,V) and dataset (K,W), the result of the joined dataset is (K,(V,W)).

scala> val rdd1 = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),(105, 6.0),(165, 31.0), (110, 40.11)))
rdd1: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"),  (105, "a"),(165, "c")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24

scala> val join = rdd1.join(rdd2)
join: org.apache.spark.rdd.RDD[(Int, (Double, String))] = MapPartitionsRDD[32] at join at <console>:28

scala> join.collect().foreach(println)
(105,(6.0,a))
(165,(31.0,c))
(110,(50.35,a))
(110,(40.11,a))
(126,(211.0,b))
(127,(305.2,b))

Thank you for reading this article, I hope it was helpful to you.

References:

Beginning Apache Spark 2 With Resilient Distributed Datasets, Spark SQL, Structured Streaming and Spark Machine Learning Library

Apache Spark sql

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Migrating From Heroku To Render
  • Choosing Between REST and GraphQL
  • Deployment of Low-Latency Solutions in the Cloud
  • How to Upload/Download a File To and From the Server

Comments

Big Data Partner Resources

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo