Reading Nested Parquet File in Scala and Exporting to CSV
In this brief, yet code-heavy tutorial, learn how to handle nested Parquet compressed content and remove certain columns of your data.
Join the DZone community and get the full member experience.
Join For FreeRecently, we were working on a problem where a Parquet compressed file had lots of nested tables. Some of the tables had columns with an Array type. Our objective was to read the file and save it to CSV.
We wrote a script in Scala that does the following:
- Handles nested Parquet compressed content
- Looks for columns called “Array” and then removes those columns
Here is the script:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"
f.dataType match {
case st: StructType => flattenSchema(st, colPath)
// Skip user defined types like array or vectors
case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
}
})
}
Here are the all the steps you would need to take while reading the Parquet compressed content and then export it to-disk as a CSV.
val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
scala> :paste
// Entering paste mode (ctrl-D to finish)
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"
f.dataType match {
case st: StructType => flattenSchema(st, colPath)
// Skip user defined types like array or vectors
case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
}
})
}
// Exiting paste mode, now interpreting.
flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]
scala >
val df = spark.read.parquet("/user/avkash/test.parquet")
df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")
If you want to see the full working scripts with output, you can visit any of the following links based on your Spark Version:
We got some help from this StackOverflow discussion. Michal Kurka and Michal Malohlava helped me to write above solution, thanks guys.
Thats it. Enjoy!
Published at DZone with permission of Avkash Chauhan, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments