Using Apache Spark DataFrames for Processing of Tabular Data
Learn from a great example how to process tones of tabular data very fast using Scala, Apache Spark, and the MapR Sandbox.
Join the DZone community and get the full member experience.
Join For FreeThis post will help you get started using Apache Spark DataFrames with Scala on the MapR Sandbox. The new Spark DataFrames API is designed to make big data processing on tabular data easier. A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases. In this post, you’ll learn how to:
- Load data into Spark DataFrames
- Explore data with Spark SQL
This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.
Software
This tutorial will run on the MapR v5.0 Sandbox, which includes Spark 1.3
- You can download the code and data to run these examples from here:
- The examples in this post can be run in the spark-shell, after launching with the spark-shell command.
- You can also run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.
The Sample Data Sets
We will use two example datasets - one from eBay online auctions and one from the SFPD Crime Incident Reporting system.
The eBay online auction dataset has the following data fields:
auctionid - unique identifier of an auction
bid - the proxy bid placed by a bidder
bidtime - the time (in days) that the bid was placed, from the start of the auction
bidder - eBay username of the bidder
bidderrate - eBay feedback rating of the bidder
openbid - the opening bid set by the seller
price - the closing price that the item sold for (equivalent to the second highest bid + an increment)
The table below shows the data fields with some sample data:
Using Spark DataFrames we will explore the data with questions like:
- How many auctions were held?
- How many bids were made per item?
- What's the minimum, maximum, and average number of bids per item?
- Show the bids with price > 100
The table below shows the SFPD data fields with some sample data:
Using Spark DataFrames, we will explore the SFPD data with questions like:
- What are the top 10 Resolutions?
- How many Categories are there?
- What are the top 10 incident Categories?
Loading Data Into Spark DataFrames
Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with: $ spark-shell
First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects. (In the code snippets below , output is shown in comments)
// SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
Below we load the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions; the first() action returns the first element in the RDD, which is the String “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”
// load the data into a new RDD
val ebayText = sc.textFile("ebay.csv")
// Return the first element in this RDD
ebayText.first()
// res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
Below we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element of ebayText to create the ebay RDD of Auction objects.
//define the schema using a case class
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)
// create an RDD of Auction objects
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))
The ebay RDD first() action returns the first element in the RDD, Auction = Auction( 8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox,3).
// Return the first element in this RDD
ebay.first()
//res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)
// Return the number of elements in the RDD
ebay.count()
//res8: Long = 10654
A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():
// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()
The previous RDD transformations can also be written on one line like this:
val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p =>
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()
Explore and Query the eBay Auction Data With Spark DataFrames
DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The DataFrame show() action displays the top 20 rows in a tabular form.
// Display the top 20 rows of DataFrame
auction.show()
// auctionid bid bidtime bidder bidderrate openbid price item daystolive
// 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3 …
DataFrame printSchema() Prints the schema to the console in a tree format
// Return the schema of this DataFrame
auction.printSchema()
//root
// |-- auctionid: string (nullable = true)
// |-- bid: float (nullable = false)
// |-- bidtime: float (nullable = false)
// |-- bidder: string (nullable = true)
// |-- bidderrate: integer (nullable = true)
// |-- openbid: float (nullable = false)
// |-- price: float (nullable = false)
// |-- item: string (nullable = true)
// |-- daystolive: integer (nullable = true)
After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:
// How many auctions were held?
auction.select("auctionid").distinct.count
// Long = 627
// How many bids per item?
auction.groupBy("auctionid", "item").count.show
//auctionid item count
//3016429446 palm 10
//8211851222 xbox 28
//3014480955 palm 12
// What's the min number of bids per item? what's the average? what's the max?
auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show
// MIN(count) AVG(count) MAX(count)
// 1 16.992025518341308 75
// Get the auctions with closing price > 100
val highprice= auction.filter("price > 100")
// highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]
// display dataframe in a tabular format
highprice.show()
// auctionid bid bidtime bidder bidderrate openbid price item daystolive
// 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3
You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:
// register the DataFrame as a temp table
auction.registerTempTable("auction")
// SQL statements can be run
// How many bids per auction?
val results =sqlContext.sql("SELECT auctionid, item, count(bid) FROM auction GROUP BY auctionid, item")
// display dataframe in a tabular format
results.show()
// auctionid item count
// 3016429446 palm 10
// 8211851222 xbox 28. . .
val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction GROUP BY item,auctionid")
results.show()
// auctionid c1
// 3019326300 207.5
// 8213060420 120.0 . . .
Loading the SFPD Data Into Spark Dataframes Using a csv Parsing Library
Now we will load the SFPD dataset into a Spark dataframe using the spark-csv parsing library from Databricks. You can use this library at the Spark shell by specifying --packages com.databricks:spark-csv_2.10:1.0.3 when starting the shell as shown below:
$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
The load operation will parse the sfpd.csv file and return a dataframe using the first header line of the file for column names.
import sqlContext.implicits._
import org.apache.spark.sql._
// Return the dataset specified by data source as a DataFrame, use the header for column names
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))
The take operation returns the specified number of rows in the DataFame.
// Return the first n rows in the DataFrame
df.take(1)
// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])
// Print the schema to the console in a tree format
df.printSchema()
//root
// |-- IncidntNum: string (nullable = true)
// |-- Category: string (nullable = true)
// |-- Descript: string (nullable = true)
// |-- DayOfWeek: string (nullable = true)
// |-- Date: string (nullable = true)
// |-- Time: string (nullable = true)
// |-- PdDistrict: string (nullable = true)
// |-- Resolution: string (nullable = true)
// |-- Address: string (nullable = true)
// |-- X: string (nullable = true)
// |-- Y: string (nullable = true)
// |-- Location: string (nullable = true)
// |-- PdId: string (nullable = true)
// display dataframe in a tabular format
df.show()
//IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId
//150467944 LARCENY/THEFT GRAND THEFT FROM ... Thursday 05/28/2015 23:59 TENDERLOIN NONE TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244
Here are some example queries using sqlContext:
// how many categories are there?
df.select("Category").distinct.count
// res5: Long = 39
// register as a temp table inorder to use sql
df.registerTempTable("sfpd")
// How many categories are there
sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)
// [ASSAULT]
// [MISSING PERSON]
// [TREA] . . .
// What are the top 10 Resolutions ?
sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println)
// [NONE,1063775]
// [ARREST, BOOKED,414219]
// [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?
val t = sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")
t.show()
// Category catcount
// LARCENY/THEFT 353793
// OTHER OFFENSES 253627
// NON-CRIMINAL 186272. . .
// The results of SQL queries are DataFrames and support RDD operations.
// The columns of a row in the result can be accessed by ordinal
t.map(t => "column 0: " + t(0)).collect().foreach(println)
// column 0: LARCENY/THEFT
// column 0: OTHER OFFENSES
// column 0: NON-CRIMINAL
// column 0: ASSAULT …
The Physical Plan For DataFrames
The Catalyst query optimizer creates the physical Execution Plan for DataFrames as shown in the diagram below:
Printing the Physical Plan to the Console
DataFrames are designed to take the SQL queries constructed against them and optimize the execution as sequences of Spark Jobs as required. You can print the physical plan for a DataFrame using the explain operation as shown below:
// Prints the physical plan to the console for debugging purpose
auction.select("auctionid").distinct.explain()
// == Physical Plan ==
// Distinct false
// Exchange (HashPartitioning [auctionid#0], 200)
// Distinct true
// Project [auctionid#0]
// PhysicalRDD //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37
Summary
In this blog post, you’ve learned how to load data into Spark DataFrames, and explore data with Spark SQL. If you have any further questions, or want to share how you are using Spark DataFrames, please add your comments in the section below.
Opinions expressed by DZone contributors are their own.
Comments