DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Data
  4. How to Use Dynamic Data Transpose in Spark

How to Use Dynamic Data Transpose in Spark

We take a look at how to use in-memory operators and the Scala language to work with dynamically transposed data in Apache Spark.

Subhasish Guha user avatar by
Subhasish Guha
CORE ·
Nov. 26, 18 · Tutorial
Like (5)
Save
Tweet
Share
16.87K Views

Join the DZone community and get the full member experience.

Join For Free

Dynamic Transpose is a critical transformation in Spark, as it requires a lot of iterations. This article will give you a clear idea of how to handle this complex scenario with in-memory operators.

First, let us see the source data that we have: 

idoc_number,orderid,idoc_qualifier_org,idoc_org
7738,2364,6,0
7738,2364,7,0
7738,2364,8,mystr1
7738,2364,12,mystr2
7739,2365,12,mystr3
7739,2365,7,mystr4

We also have a lookup table for the idoc_qualifier_org column in the Source data records. As the lookup table's size will be less, we can expect it to be in the cache and in the driver memory, as well.

qualifier,desc
6,Division
7,Distribution Channel
8,Sales Org
12,Order type

The expected output of the Dynamic Transpose operation is:

idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type
7738,2364,0,0,mystr1,mystr2
7739,2365,null,mystr3,null,mystr4

The below code will actually transpose the data based on the present column in the data. This code is a different way to work with Transpose Data in Spark. This code rigorously uses the complex data types of Spark and also takes care of the performance of the iterations. 

object DynamicTranspose {
 def dataValidator(map_val: Seq[Map[String, String]], rule: String): String = {
  try {
   val rule_array = rule.split("#!").toList
   val src_map = map_val.toList.flatten.toMap
   var output_str = ""
   rule_array.foreach(f =>
    output_str = output_str + "!" + src_map.getOrElse(f, "#")
   )

   return output_str.drop(1)
  } catch {
   case t:
    Throwable => t.printStackTrace().toString()
    return "0".toString()
  }

 }

 def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder().master("local[*]").config("spark.sql.warehouse.dir", "<src dir>").getOrCreate()
  val data_df = spark.read.option("header", "true").csv("<data path src>")
  val lkp_df = spark.read.option("header", "true").csv("lookup path source>")
  import spark.implicits._
  import org.apache.spark.sql.functions.broadcast

  val lkp_df_brdcast = broadcast(lkp_df)
  val result_df = data_df.join(broadcast(lkp_df_brdcast), $ "idoc_qualifier_org" === $ "qualifier", "inner")

  val df1 = result_df.groupBy(col("idoc_number"), col("orderid")).agg(collect_list(map($ "desc", $ "idoc_org")) as "map")
  import org.apache.spark.sql.functions.udf
  import org.apache.spark.sql.functions. {
   lit,
   max,
   row_number
  }
  import spark.implicits._
  import org.apache.spark.sql.Row
  val map_val = lkp_df.rdd.map(row => row.getString(1)).collect().mkString("#!")
  spark.sparkContext.broadcast(map_val)
  val recdValidator = udf(dataValidator _)
  var latest_df = df1.withColumn("explode_out", split(recdValidator(df1("map"), lit(map_val)), "!")).drop("map")
  val columns = map_val.split("#!").toList
  latest_df = columns.zipWithIndex.foldLeft(latest_df) {
   (memodDF, column) => {
    memodDF.withColumn(column._1, col("explode_out")(column._2))
   }
  }
  .drop("explode_out")
  latest_df.show()
 }

}

Hope this helps!

Data (computing)

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • How Do the Docker Client and Docker Servers Work?
  • Tech Layoffs [Comic]
  • Using JSON Web Encryption (JWE)
  • Remote Debugging Dangers and Pitfalls

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: