Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Reading Nested Parquet File in Scala and Exporting to CSV

DZone's Guide to

Reading Nested Parquet File in Scala and Exporting to CSV

In this brief, yet code-heavy tutorial, learn how to handle nested Parquet compressed content and remove certain columns of your data.

· Database Zone ·
Free Resource

Databases are better when they can run themselves. CockroachDB is a SQL database that automates scaling and recovery. Check it out here.

Recently, we were working on a problem where a Parquet compressed file had lots of nested tables. Some of the tables had columns with an Array type. Our objective was to read the file and save it to CSV.

We wrote a script in Scala that does the following:

  • Handles nested Parquet compressed content
  • Looks for columns called “Array” and then removes those columns

Here is the script:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

Here are the all the steps you would need to take while reading the Parquet compressed content and then export it to-disk as a CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df = spark.read.parquet("/user/avkash/test.parquet")

df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output, you can visit any of the following links based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x

We got some help from this StackOverflow discussion. Michal Kurka and Michal Malohlava helped me to write above solution, thanks guys.

Thats it. Enjoy!

Databases should be easy to deploy, easy to use, and easy to scale. If you agree, you should check out CockroachDB, a scalable SQL database built for businesses of every size. Check it out here. 

Topics:
parquet ,tutorial ,scala ,csv ,nested files ,database

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}