Over a million developers have joined DZone.

Convert RDD to DataFrame with Spark

Learn how to convert an RDD to DataFrame in Databricks Spark CSV library.

· Big Data Zone

Read this eGuide to discover the fundamental differences between iPaaS and dPaaS and how the innovative approach of dPaaS gets to the heart of today’s most pressing integration problems, brought to you in partnership with Liaison.

As I mentioned in a previous blog post I’ve been playing around with the Databricks Spark CSV library and wanted to take a CSV file, clean it up and then write out a new CSV file containing some of the columns.

I started by processing the CSV file and writing it into a temporary table:

import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

I wanted to get to the point where I could call the following function which writes a DataFrame to disk:

private def createFile(df: DataFrame, file: String, header: String): Unit = {
  FileUtil.fullyDelete(new File(file))
  val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
  df.distinct.save(tmpFile, "com.databricks.spark.csv")

The first file only needs to contain the primary type of crime, which we can extract with the following query:

val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")

res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])

Some of the primary types have trailing spaces which I want to get rid of. As far as I can tell Spark’s variant of SQL doesn’t have the LTRIM or RTRIM functions but we can map over ‘rows’ and use the String ‘trim’ function instead:

rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776

Now we’ve got an RDD of Rows which we need to convert back to a DataFrame again. ‘sqlContext’ has a function which we might be able to use:

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })

<console>:27: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
              sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })

These are the signatures we can choose from:

2015 08 06 21 58 12

If we want to pass in an RDD of type Row we’re going to have to define a StructType or we can convert each row into something more strongly typed:

case class CrimeType(primaryType: String)

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]

Great, we’ve got our DataFrame which we can now plug into the ‘createFile’ function like so:

  sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),

We can actually do better though!

Since we’ve got an RDD of a specific class we can make use of the ‘rddToDataFrameHolder’ implicit function and then the ‘toDF’ function on ‘DataFrameHolder’. This is what the code looks like:

import sqlContext.implicits._
  rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),

And we’re done!

Discover the unprecedented possibilities and challenges, created by today’s fast paced data climate and why your current integration solution is not enough, brought to you in partnership with Liaison

spark,databricks,rdd,dataframe,big data

Published at DZone with permission of Mark Needham, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}