DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Spark: Write to CSV File

In this post, we explore how to work with Scala and Apache Spark in order to import data from another source into a CSV file.

Mark Needham user avatar by
Mark Needham
·
Dec. 04, 14 · Tutorial
Like (5)
Save
Tweet
Share
141.85K Views

Join the DZone community and get the full member experience.

Join For Free

A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime dataset and having worked out how many of each crime had been committed I wanted to write that to a CSV file.

Spark provides a saveAsTextFile function which allows us to save RDDs so I refactored my code into the following format to allow me to use that:

import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}

// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"

val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)

val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))

val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})

val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }

counts.saveAsTextFile(file)

If we run that code from the Spark shell, we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:

$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

If we look at some of those part files we can see that it’s written the crime types and counts as expected:

$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530

$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

This is fine if we’re going to pass those CSV files into another Hadoop-based job, but I actually want a single CSV file so it’s not quite what I want.

One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:

val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }


counts.saveAsTextFile(file)

part-00000 now looks like this:

$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.

Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file.

First, we import Hadoop into our SBT file:

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

Now let’s bring our merge function into the Spark shell:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

And now let’s make use of it:

val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))

val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))

val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }

counts.saveAsTextFile(file)

merge(file, destinationFile)

And now we’ve got the best of both worlds:

$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

The full code is available as a gist if you want to play around with it.

CSV

Published at DZone with permission of Mark Needham, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Too Many Tools? Streamline Your Stack With AIOps
  • Cloud-Native Application Networking
  • Deploying Java Serverless Functions as AWS Lambda
  • GPT-3 Playground: The AI That Can Write for You

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: