Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark: Reading CSV Using Custom Timestamp Format

DZone's Guide to

Apache Spark: Reading CSV Using Custom Timestamp Format

Here's the solution to a timestamp format issue that occurs when reading CSV in Spark for both Spark versions 2.0.1 or newer and for Spark versions 2.0.0 or older.

· Database Zone ·
Free Resource

RavenDB vs MongoDB: Which is Better? This White Paper compares the two leading NoSQL Document Databases on 9 features to find out which is the best solution for your next project.  

In this blog, we are considering a situation where I wanted to read a CSV through Spark, but the CSV contains some timestamp columns in it. Is this going to be a problem while inferring schema at the time of reading the CSV using spark?

Well, the answer may be no if the CSV has the timestamp field in the specific yyyy-MM-dd hh:mm:ss format. In this particular case, the spark CSV reader can infer it to timestamp considering it as the default format.

id,name,age,joining_date,wedding_date
1,Joseph,25,1999-09-04 45:50:46,2014-11-22 00:00:00
val csvDataFrame = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode","DROPMALFORMED")
.load("<path of the csv file>")

csvDataframe.printSchema()

When you read the schema of the DataFrame after reading the CSV, you will see that every field has been inferred correctly by the CSV.

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- joining_date: timestamp (nullable = true)
 |-- wedding_date: timestamp (nullable = true)

But what if the timestamp fields in the CSV are in some other timestamp format? (For example, MM-dd-yyyy hh mm ss format.)

The content of the CSV file will be:

id,name,age,joining_date,wedding_date
1,Joseph,25,09-04-1999 45 50 46,11-22-2014 00 00 00

In this case, Spark does not get the timestamp field. It will not be able to infer the CSV field/column correctly considering that column to be of string type.

When you see a DataFrame schema this time, it will give the timestamp field as a string:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- joining_date: string (nullable = true)
 |-- wedding_date: string (nullable = true)

The above-mentioned issues have a solution depending on the version of Spark we are using.

Solution 1: Using Spark Version 2.0.1 and Above

Here, you have the straight-forward option timestampFormat to give any timestamp format while reading CSV. We have to just add an extra option defining the custom timestamp format, like option(“timestampFormat”, “MM-dd-yyyy hh mm ss”).

val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv") .option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.option("timestampFormat", "MM-dd-yyyy hh mm ss")
.load("<path of the csv file>")

csvDataframe.printSchema()

In this way, you will have the timestamp field correctly inferred when we even have some other timestamp format in the CSV file.

root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)

Remember: This solution will work only in Spark versions greater than 2.0.0 (2.0.1 and above). If you have Spark version 2.0.0 or older, check out Solution 2 with a workaround.

Solution 2: Using Spark Version 2.0.0 or Older

In older versions of Spark, the above option for timestampFormat does not exist, though we have the way to do so. Let it be inferred as a string, and cast the string field having the timestamp value explicitly to the timestamp.

For this, you must know the columns that need to be converted to the timestamp.

For example, I know all my timestamp fields end with _date. Then those fields can be explicitly cast to any timestamp format.

val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("<path of the csv file>")

val updatedDF = csvDataframe.columns.filter(colName =>colName.endsWith("_date"))
.foldLeft(csvDataframe) { (outputDF, columnName) =>
outputDF.withColumn(columnName, unix_timestamp(col(columnName), "MM-dd-yyyy hh mm ss").cast("timestamp"))
}
updatedDF. printSchema ()

This way, you will able to get the correct data type for timestamp fields with other formats, as well.

root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)

Conclusion

While you read CSV using Spark, you may have problems while reading timestamp field having timestamp format other than the default one, i.e yyyy-MM-dd hh:mm:ss. This blog has the solution to this timestamp format issue that occurs when reading CSV in Spark for both Spark versions 2.0.1 or newer and for Spark versions 2.0.0 or older. 

Get comfortable using NoSQL in a free, self-directed learning course provided by RavenDB. Learn to create fully-functional real-world programs on NoSQL Databases. Register today.

Topics:
database ,csv ,spark ,tutorial ,timestamp ,formatting issue

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}