Spark: Write to CSV File
In this post, we explore how to work with Scala and Apache Spark in order to import data from another source into a CSV file.
Join the DZone community and get the full member experience.
Join For FreeA couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime dataset and having worked out how many of each crime had been committed I wanted to write that to a CSV file.
Spark provides a saveAsTextFile
function which allows us to save RDDs so I refactored my code into the following format to allow me to use that:
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
If we run that code from the Spark shell, we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:
$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 .
drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 ..
-rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000
-rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001
-rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002
-rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003
...
If we look at some of those part files we can see that it’s written the crime types and counts as expected:
$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310
This is fine if we’re going to pass those CSV files into another Hadoop-based job, but I actually want a single CSV file so it’s not quite what I want.
One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:
val counts = partitions.repartition(1).
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
part-00000 now looks like this:
$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2
This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.
Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file.
First, we import Hadoop into our SBT file:
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"
Now let’s bring our merge
function into the Spark shell:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
And now let’s make use of it:
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile)
And now we’ve got the best of both worlds:
$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2
The full code is available as a gist if you want to play around with it.
Published at DZone with permission of Mark Needham, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments