Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark: Handle Null Timestamp While Reading CSV in Spark 2.0.0

DZone's Guide to

Apache Spark: Handle Null Timestamp While Reading CSV in Spark 2.0.0

When Spark 2.0.0 tries to read a CSV, it throws an error whenever it gets null values for the timestamp field. Learn how to solve that issue.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

In this blog, I will discuss a problem that I faced recently. One thing to keep in mind is that this problem is specifically related to Spark version 2.0.0. Other than this version, this problem does not occur.

Problem: Spark code was reading a CSV file. This particular CSV file had one timestamp column that could have null values, as well. So when Spark tried to read the CSV, it was throwing an error whenever it got null values for the timestamp field. I needed the solution that could handle null timestamp fields.

You can find the code snippet below:

import org.apache.spark.sql.SparkSession
 
object CsvReader extends App {
 
val sparkSession = SparkSession.builder()
 .master("local")
 .appName("POC")
 .getOrCreate()
 
val df = sparkSession.read
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("test.csv")
 
df.printSchema()
 df.show()
}

You can see easily that the above code is inferring the schema while reading the CSV file.

Solution: To solve the above problem, we need to follow the below approach:

  1. Provide a custom schema where the timestamp field must be read as a string type.
  2. Cast the timestamp field explicitly.

By using the above approach, we can solve the null timestamp field issue. But there is one thing to notice that we must know first: the field for the timestamp in CSV and the schema for the whole CSV file. Only then will we be able to cast that field from the string to the timestamp explicitly and while maintaining the original schema for the file.

In my case, I am taking below CSV file test.csv:

a

The schema for the CSV file is as:

  • ID: String.

  • PHONE: Integer.

  • BIRTH_DT: Timestamp.

The solution code must be as follows:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, unix_timestamp}
import org.apache.spark.sql.types._

object CsvReader extends App {

  val sparkSession = SparkSession.builder()
    .master("local")
    .appName("POC")
    .getOrCreate()

  val schema = StructType(List(
    StructField("ID", StringType),
    StructField("PHONE", IntegerType),
    StructField("BIRTH_DT", StringType)
  ))

  val df = sparkSession.read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .load("test.csv")

  val columnName = "BIRTH_DT"
  val updatedDF = df.withColumn(columnName, unix_timestamp(col(columnName), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))

  updatedDF.printSchema()
  updatedDF.show()
}

That’s it. I hope this blog is helpful to you!

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
big data ,tutorial ,apache spark ,csv ,null

Published at DZone with permission of Rishi Khandelwal, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}