Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Apache Spark DataFrames for Processing of Tabular Data

DZone's Guide to

Using Apache Spark DataFrames for Processing of Tabular Data

Learn from a great example how to process tones of tabular data very fast using Scala, Apache Spark, and the MapR Sandbox.

· Big Data Zone
Free Resource

Effortlessly power IoT, predictive analytics, and machine learning applications with an elastic, resilient data infrastructure. Learn how with Mesosphere DC/OS.

This post will help you get started using Apache Spark DataFrames with Scala on the MapR Sandbox. The new Spark DataFrames API is designed to make big data processing on tabular data easier. A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases. In this post, you’ll learn how to:

  • Load data into Spark DataFrames
  • Explore data with Spark SQL

This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.

Software

This tutorial will run on the MapR v5.0 Sandbox, which includes Spark 1.3

The Sample Data Sets

We will use two example datasets - one from eBay online auctions and one from the SFPD Crime Incident Reporting system.

The eBay online auction dataset has the following data fields:

auctionid - unique identifier of an auction
bid - the proxy bid placed by a bidder
bidtime - the time (in days) that the bid was placed, from the start of the auction
bidder - eBay username of the bidder
bidderrate - eBay feedback rating of the bidder
openbid - the opening bid set by the seller
price - the closing price that the item sold for (equivalent to the second highest bid + an increment)

The table below shows the data fields with some sample data: 




Using Spark DataFrames we will explore the data with questions like:

  • How many auctions were held?
  • How many bids were made per item?
  • What's the minimum, maximum, and average number of bids per item?
  • Show the bids with price > 100

The table below shows the SFPD data fields with some sample data: 






Using Spark DataFrames, we will explore the SFPD data with questions like:

  • What are the top 10 Resolutions?
  • How many Categories are there?
  • What are the top 10 incident Categories?

Loading Data Into Spark DataFrames

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with: 
$ spark-shell

First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects. (In the code snippets below , output is shown in comments)

//  SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._

Below we load the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions; the first() action returns the first element in the RDD, which is the String “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”

// load the data into a  new RDD
val ebayText = sc.textFile("ebay.csv")

// Return the first element in this RDD
ebayText.first()
// res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3

Below we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element of ebayText to create the ebay RDD of Auction objects.

//define the schema using a case class
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)

// create an RDD of Auction objects 
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))

The ebay RDD first() action returns the first element in the RDD, Auction = Auction( 8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox,3).

// Return the first element in this RDD
ebay.first()
//res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)
// Return the number of elements in the RDD
ebay.count()
//res8: Long = 10654

A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():

// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()

The previous RDD transformations can also be written on one line like this:

val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p => 
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()

Explore and Query the eBay Auction Data With Spark DataFrames

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The DataFrame show() action displays the top 20 rows in a tabular form.

// Display the top 20 rows of DataFrame 
auction.show()
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3 …

DataFrame printSchema() Prints the schema to the console in a tree format

// Return the schema of this DataFrame
auction.printSchema()
//root
// |-- auctionid: string (nullable = true)
// |-- bid: float (nullable = false)
// |-- bidtime: float (nullable = false)
// |-- bidder: string (nullable = true)
// |-- bidderrate: integer (nullable = true)
// |-- openbid: float (nullable = false)
// |-- price: float (nullable = false)
// |-- item: string (nullable = true)
// |-- daystolive: integer (nullable = true)

After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:

// How many auctions were held?
auction.select("auctionid").distinct.count
// Long = 627

// How many bids per item?
auction.groupBy("auctionid", "item").count.show
//auctionid  item    count
//3016429446 palm    10
//8211851222 xbox    28
//3014480955 palm    12

// What's the min number of bids per item? what's the average? what's the max? 
auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show

// MIN(count) AVG(count)        MAX(count)
// 1  16.992025518341308 75

// Get the auctions with closing price > 100
val highprice= auction.filter("price > 100")
// highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]

// display dataframe in a tabular format
highprice.show()
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3        
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3

You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:

// register the DataFrame as a temp table 
auction.registerTempTable("auction")
// SQL statements can be run 
// How many  bids per auction?
val results =sqlContext.sql("SELECT auctionid, item,  count(bid) FROM auction GROUP BY auctionid, item")
// display dataframe in a tabular format
results.show()
// auctionid  item    count
// 3016429446 palm    10
// 8211851222 xbox    28. . . 

val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction  GROUP BY item,auctionid")
results.show()
// auctionid  c1
// 3019326300 207.5
// 8213060420 120.0 . . . 

Loading the SFPD Data Into Spark Dataframes Using a csv Parsing Library

Now we will load the SFPD dataset into a Spark dataframe using the spark-csv parsing library from Databricks. You can use this library at the Spark shell by specifying --packages com.databricks:spark-csv_2.10:1.0.3 when starting the shell as shown below:

$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3

The load operation will parse the sfpd.csv file and return a dataframe using the first header line of the file for column names.

import sqlContext.implicits._
import org.apache.spark.sql._

//  Return the dataset specified by data source as a DataFrame, use the header for column names
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))

The take operation returns the specified number of rows in the DataFame.

// Return the first n rows in the DataFrame
df.take(1)

// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])

// Print the schema to the console in a tree format
df.printSchema()
//root
// |-- IncidntNum: string (nullable = true)
// |-- Category: string (nullable = true)
// |-- Descript: string (nullable = true)
// |-- DayOfWeek: string (nullable = true)
// |-- Date: string (nullable = true)
// |-- Time: string (nullable = true)
// |-- PdDistrict: string (nullable = true)
// |-- Resolution: string (nullable = true)
// |-- Address: string (nullable = true)
// |-- X: string (nullable = true)
// |-- Y: string (nullable = true)
// |-- Location: string (nullable = true)
// |-- PdId: string (nullable = true)

// display dataframe in a tabular format
df.show()
//IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId
//150467944  LARCENY/THEFT GRAND THEFT FROM ... Thursday  05/28/2015 23:59 TENDERLOIN NONE           TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244

Here are some example queries using sqlContext:

// how many categories are there?
df.select("Category").distinct.count
// res5: Long = 39
// register as a temp table inorder to use sql 
df.registerTempTable("sfpd")

// How many categories are there
sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)

// [ASSAULT]
// [MISSING PERSON]
// [TREA] . . .
// What are the top 10 Resolutions ?
sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println)
// [NONE,1063775]
// [ARREST, BOOKED,414219]
// [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?
val t =  sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")

t.show()
// Category       catcount
// LARCENY/THEFT  353793
// OTHER OFFENSES 253627
// NON-CRIMINAL   186272. . .

// The results of SQL queries are DataFrames and support RDD operations.
// The columns of a row in the result can be accessed by ordinal
t.map(t => "column 0: " + t(0)).collect().foreach(println)
// column 0: LARCENY/THEFT
// column 0: OTHER OFFENSES
// column 0: NON-CRIMINAL
// column 0: ASSAULT …

The Physical Plan For DataFrames

The Catalyst query optimizer creates the physical Execution Plan for DataFrames as shown in the diagram below: 






Printing the Physical Plan to the Console

DataFrames are designed to take the SQL queries constructed against them and optimize the execution as sequences of Spark Jobs as required. You can print the physical plan for a DataFrame using the explain operation as shown below:

//  Prints the physical plan to the console for debugging purpose
auction.select("auctionid").distinct.explain()

// == Physical Plan ==
// Distinct false
// Exchange (HashPartitioning [auctionid#0], 200)
//  Distinct true
//   Project [auctionid#0]
 //   PhysicalRDD   //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37

Summary

In this blog post, you’ve learned how to load data into Spark DataFrames, and explore data with Spark SQL. If you have any further questions, or want to share how you are using Spark DataFrames, please add your comments in the section below.

Learn to design and build better data-rich applications with this free eBook from O’Reilly. Brought to you by Mesosphere DC/OS.

Topics:
apache spark ,big data

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}