Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Reading Nested Parquet File in Scala and Exporting to CSV

DZone's Guide to

Reading Nested Parquet File in Scala and Exporting to CSV

In this brief, yet code-heavy tutorial, learn how to handle nested Parquet compressed content and remove certain columns of your data.

· Database Zone ·
Free Resource

RavenDB vs MongoDB: Which is Better? This White Paper compares the two leading NoSQL Document Databases on 9 features to find out which is the best solution for your next project.  

Recently, we were working on a problem where a Parquet compressed file had lots of nested tables. Some of the tables had columns with an Array type. Our objective was to read the file and save it to CSV.

We wrote a script in Scala that does the following:

  • Handles nested Parquet compressed content
  • Looks for columns called “Array” and then removes those columns

Here is the script:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

Here are the all the steps you would need to take while reading the Parquet compressed content and then export it to-disk as a CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df = spark.read.parquet("/user/avkash/test.parquet")

df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output, you can visit any of the following links based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x

We got some help from this StackOverflow discussion. Michal Kurka and Michal Malohlava helped me to write above solution, thanks guys.

Thats it. Enjoy!

Get comfortable using NoSQL in a free, self-directed learning course provided by RavenDB. Learn to create fully-functional real-world programs on NoSQL Databases. Register today.

Topics:
parquet ,tutorial ,scala ,csv ,nested files ,database

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}