{{announcement.body}}
{{announcement.title}}

Example of ETL Application Using Apache Spark and Hive

DZone 's Guide to

Example of ETL Application Using Apache Spark and Hive

In this article, we'll read a sample data set with Spark on HDFS (Hadoop File System), do a simple analytical operation, then write to a table that we'll make in Hive.

· Big Data Zone ·
Free Resource

Hello everyone!

In this article, I will read a sample data set with Spark on HDFS (Hadoop File System), do a simple analytical operation, then write to a table that I will create in Hive.

Today, the amount of data produced is increasing day-by-day. With the increasing amount of data, it is very important to analyze the collected data. The technologies that enable us to do analytical work on big data are in the focus area of each company. Therefore, almost all companies have big data environments and they are doing data analysis in these environments.

Hadoop is the most widely known and most widely used large data environment in the industry. In the Hadoop ecosystem, there are solutions that meet several different needs, such as data storage, data processing, visualization, and data querying. Each solution and library in the Hadoop ecosystem are technologies at a depth that require separate expertise. You can view the different technologies and usage areas of this ecosystem in the picture below.

Big data technologiesNow let's go to the construction of the sample application. In the example, we will first send the data from our Linux file system to the data storage unit of the Hadoop ecosystem (HDFS) (for example, Extraction). Then we will read the data we have written here with Spark and then we will apply a simple Transformation and write to Hive (Load). Hive is a substructure that allows us to query the data in the hadoop ecosystem, which is stored in this environment. With this infrastructure, we can easily query the data in our big data environment using SQL language.

First, let's take a look at the data set we will work. The data set we will work with is the Sample Sales Dataset in Kaggle. The following table shows the column names and types.

Column Name Data Type
ORDERNUMBER Number
QUANTITYORDERED Number
PRICEEACH Number
ORDERLINENUMBER Number
SALES Number
ORDERDATE Date
STATUS String
QTR_ID Number
MONTH_ID Number
YEAR_ID Number
PRODUCTLINE String
MSRP Number
PRODUCTCODE String
CUSTOMERNAME String
PHONE Number
ADDRESSLINE1 String
ADDRESSLINE2 String
CITY String
STATE String
POSTALCODE Number
COUNTRY String
TERRITORY String
CONTACTLASTNAME String
CONTACTFIRSTNAME String
DEALSIZE

String

This data includes sample sales transactions. We will do some transformations from this sample sales data and create a sample report and insert it into a Hive table.

First, copy the sample data that we downloaded from Kaggle to HDFS. For this, let's create a directory in HDFS.

hadoop fs -mkdir samplesales

Yes, we created the directory in HDFS, now we copy the sample data in our local directory to hdfs.

hadoop fs -copyFromLocal sales_sample_data.csv samplesales

hadoop fs -ls samplesales/

We've written the data to HDFS, so we can start our PySpark interface and start processing the data with Spark.

/user/spark-2.1.0-bin-hadoop2.7/bin/pyspark --master yarn-client --num-executors 4 --driver-memory 2g --executor-memory 4g

Apache SparkSpark started successfully. Let's import the libraries that we will use at this stage.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql import HiveContext
from pyspark.sql.functions import *

hive_context = HiveContext(sc)
sqlContext = SQLContext(sc)

Now, let's create the line format in which we will keep our data.

RecSales = RecSales = Row('ordernumber','quantityordered','priceeach','orderlinenumber','sales','orderdate','status','qtr_id','month_id','year_id','productline','msrp','productcode','customername','phone','addressline1','addressline2','city','state','postalcode','country','territory','contactlastname','contactfirstname','dealsize')

Now it's time to read the data we received in HDFS and write it in a dataframe. Since the columns are sperated by the delimeter="," , and we must specify this delimeter when loading the data. After data is in the data frame, we give a name and save it as a temp table. We will be able to use this name later when writing SQL in hive_context or sqlContext.

dataSales = sc.textFile("/user/samplesales/")
header = dataSales.first()
dataSales= dataSales.filter(lambda line: line != header)
recSales = dataSales.map(lambda l: l.split(","))
dataSales = recSales.map(lambda l: RecSales(l[0],l[1],l[2],l[3],l[4],l[5],l[6],l[7],l[8],l[9],l[10],l[11],l[12],l[13],l[14],l[15],l[16],l[17],l[18],l[19],l[20],l[21],l[22],l[23],l[24]))
dfRecSales = hive_context.createDataFrame(dataSales)
dfRecSales.registerTempTable("sales")

We have successfully read the data from HDFS and have written it into a data frame object. Now let's have a few simple queries with the data we uploaded with Spark SQL.

hive_context.sql("select count(*) from sales").collect()

hive_context.sql("select * from sales").head()

hive_context.sql("select ordernumber,priceeach  from sales").head(2)

Now let's group the sales by territory, and write the results to a Hive table.

dfterriroty = hive_context.sql("select territory,sum(priceeach) total from sales group by territory")
dfterriroty.registerTempTable("sumterr")
dfterriroty.show()

Hive Table

Let's create a Hive table and write the result.

hive_context.sql("CREATE TABLE IF NOT EXISTS territory_sum (territory String, total INT)")

hive_context.sql("insert into territory_sum select * from sumterr")

Finally, check the data written to Hive.

hive_context.sql("select * from territory_sum").show()

hive_context.sql("select count(*) from territory_sum").show()

hive_context.sql("select count(*) from sumterr").show()

Hive data

Topics:
big data ,etl applications ,hdfs ,spark tutorial ,apache hive tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}