Data Processing With Apache Spark
A step-by-step tutorial on how to use Apache Spark with SQL Server to process data efficiently from several different types of datafiles.
Join the DZone community and get the full member experience.
Join For FreeSpark has emerged as a favorite for analytics, especially those instances that can handle massive volumes of data as well as provide high performance compared to any other conventional database engines. Spark SQL allows users to formulate their complex business requirements to Spark by using the familiar language of SQL.
So, in this blog, we will see how you can process data with Apache Spark and what better way to establish the capabilities of Spark than to put it through its paces and use the Hadoop-DS benchmark to compare performance, throughput, and SQL compatibility against SQL Server.
Before we begin, ensure that the following test environment is available:
- SQL Server: 32 GB RAM with Windows server 2012 R2.
- Hadoop Cluster: Two machines with 8GB RAM, Ubuntu flavor.
Sample Data
For the purpose of this demo, we will use AdventureWorks2016DW data.
The following table is used in a query with the number of records:
Table Name |
No. Of Records |
FactInternetSales |
60458398 |
dimProduct |
606 |
DimProductSubcategory |
37 |
DimProductCategory |
4 |
Dimcustomer |
18484 |
We will compare the performance of three data processing engines: SQL Server, Spark with CSV files as datafiles, and Spark with Parquet files as datafiles.
Query
We will use the following query to process the data:
xxxxxxxxxx
select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)
from FactInternetSales f
inner join dimProduct p on f.productkey = p.productkey
inner join DimProductSubcategory ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey
inner join DimProductCategory pc on pc.ProductCategoryKey = ps.ProductCategoryKey
inner join dimcustomer c on c.customerkey = f.customerkey
group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName
Let’s measure the performance of each processing engine:
SQL Server
While running the query in our SQL Server with the 32GB RAM and Microsoft 2012 Server, it takes around 2.33 minutes to execute and return the data.
The following is the screenshot of this query:
Spark With CSV Datafiles
Now, let’s export the same dataset to CSV and move it to HDFS.
The following are screenshots of HDFS with the CSV file as an input source.




Now that we have the files for the specific input tables moved to HDFS as CSV files, we can start with Spark Shell and create DataFrames for each source file.
Run tehe following commands for creating SQL
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
val sqlContext = new SQLContext(sc)
Run the following command to create Fact Schema:
x
val factSchema = StructType(Array(
StructField("ProductKey", IntegerType, true),
StructField("OrderDateKey", IntegerType, true),
StructField("DueDateKey", IntegerType, true),
StructField("ShipDateKey", IntegerType, true),
StructField("CustomerKey", IntegerType, true),
StructField("PromotionKey", IntegerType, true),
StructField("CurrencyKey", IntegerType, true),
StructField("SalesTerritoryKey", IntegerType, true),
StructField("SalesOrderNumber", StringType, true),
StructField("SalesOrderLineNumber", IntegerType, true),
StructField("RevisionNumber", IntegerType, true),
StructField("OrderQuantity", IntegerType, true),
StructField("UnitPrice", DoubleType, true),
StructField("ExtendedAmount", DoubleType, true),
StructField("UnitPriceDiscountPct", DoubleType, true),
StructField("DiscountAmount", DoubleType, true),
StructField("ProductStandardCost", DoubleType, true),
StructField("TotalProductCost", DoubleType, true),
StructField("SalesAmount", DoubleType, true),
StructField("TaxAmt", DoubleType, true),
StructField("Freight", DoubleType, true),
StructField("CarrierTrackingNumber", StringType, true),
StructField("CustomerPONumber", StringType, true),
StructField("OrderDate", TimestampType, true),
StructField("DueDate", TimestampType, true),
StructField("ShipDate", TimestampType, true)
));
Run the following command to create a DataFrame for Sales with Fact Schema:
xxxxxxxxxx
val salesCSV = sqlContext.read.format("csv")
.option("header", "false")
.schema(factSchema)
.load("/data/FactSalesNew/part-m-00000")
Run the following command to create a Customer schema:
xxxxxxxxxx
val customerSchema = StructType(Array(
StructField("CustomerKey", IntegerType, true),
StructField("GeographyKey", IntegerType, true),
StructField("CustomerAlternateKey", StringType, true),
StructField("Title", StringType, true),
StructField("FirstName", StringType, true),
StructField("MiddleName", StringType, true),
StructField("LastName", StringType, true),
StructField("NameStyle", BooleanType, true),
StructField("BirthDate", TimestampType, true),
StructField("MaritalStatus", StringType, true),
StructField("Suffix", StringType, true),
StructField("Gender", StringType, true),
StructField("EmailAddress", StringType, true),
StructField("YearlyIncome", DoubleType, true),
StructField("TotalChildren", IntegerType, true),
StructField("NumberChildrenAtHome", IntegerType, true),
StructField("EnglishEducation", StringType, true),
StructField("SpanishEducation", StringType, true),
StructField("FrenchEducation", StringType, true),
StructField("EnglishOccupation", StringType, true),
StructField("SpanishOccupation", StringType, true),
StructField("FrenchOccupation", StringType, true),
StructField("HouseOwnerFlag", StringType, true),
StructField("NumberCarsOwned", IntegerType, true),
StructField("AddressLine1", StringType, true),
StructField("AddressLine2", StringType, true),
StructField("Phone", StringType, true),
StructField("DateFirstPurchase", TimestampType, true),
StructField("CommuteDistance", StringType, true)
));
Run the following command to create a Customer dataframe with the Customer Schema.
xxxxxxxxxx
val customer = sqlContext.read.format("csv")
.option("header", "false")
.schema(customerSchema)
.load("/data/dimCustomer/part-m-00000")
xxxxxxxxxx
val customer = sqlContext.read.format("csv")
.option("header", "false")
.schema(customerSchema)
.load("/data/dimCustomer/part-m-00000")

Now create the product schema with the following command:
xxxxxxxxxx
val productSchema = StructType(Array(
StructField("ProductKey", IntegerType, true),
StructField("ProductAlternateKey", StringType, true),
StructField("ProductSubcategoryKey", IntegerType, true),
StructField("WeightUnitMeasureCode", StringType, true),
StructField("SizeUnitMeasureCode", StringType, true),
StructField("EnglishProductName", StringType, true),
StructField("SpanishProductName", StringType, true),
StructField("FrenchProductName", StringType, true),
StructField("StandardCost", DoubleType, true),
StructField("FinishedGoodsFlag", BooleanType, true),
StructField("Color", StringType, true),
StructField("SafetyStockLevel", IntegerType, true),
StructField("ReorderPoint", IntegerType, true),
StructField("ListPrice", DoubleType, true),
StructField("Size", StringType, true),
StructField("SizeRange", StringType, true),
StructField("Weight", DoubleType, true),
StructField("DaysToManufacture", IntegerType, true),
StructField("ProductLine", StringType, true),
StructField("DealerPrice", DoubleType, true),
StructField("Class", StringType, true),
StructField("Style", StringType, true),
StructField("ModelName", StringType, true),
StructField("LargePhoto", StringType, true),
StructField("EnglishDescription", StringType, true),
StructField("FrenchDescription", StringType, true),
StructField("ChineseDescription", StringType, true),
StructField("ArabicDescription", StringType, true),
StructField("HebrewDescription", StringType, true),
StructField("ThaiDescription", StringType, true),
StructField("GermanDescription", StringType, true),
StructField("JapaneseDescription", StringType, true),
StructField("TurkishDescription", StringType, true),
StructField("StartDate", TimestampType, true),
StructField("EndDate", TimestampType, true),
StructField("Status", StringType, true)
))
Create the product dataframe with the Product schema.
xxxxxxxxxx
val product = sqlContext.read.format("csv")
.option("header", "false")
.schema(productSchema)
.load("/data/dimProduct/part-m-00000")
Now create the Product Category schema using the following command:
xxxxxxxxxx
val productCategotySchema = StructType(Array(
StructField("ProductCategoryKey", IntegerType, true),
StructField("ProductCategoryAlternateKey", IntegerType, true),
StructField("EnglishProductCategoryName", StringType, true),
StructField("SpanishProductCategoryName", StringType, true),
StructField("FrenchProductCategoryName", StringType, true)
))
Now create the Product Category dataframe with the Product Category Schema:
xxxxxxxxxx
val productCategory = sqlContext.read.format("csv")
.option("header", "false")
.schema(productCategotySchema)
.load("/data/dimProductCategory/part-m-00000")
Now create the Product Sub Category schema using the following command:
x
val productSubCategotySchema = StructType(Array(
StructField("ProductSubcategoryKey", IntegerType, true),
StructField("ProductSubcategoryAlternateKey", IntegerType, true),
StructField("EnglishProductSubcategoryName", StringType, true),
StructField("SpanishProductSubcategoryName", StringType, true),
StructField("FrenchProductSubcategoryName", StringType, true),
StructField("ProductCategoryKey", IntegerType, true)
))
And create the productSubCategory
dataframe using the below command:
xxxxxxxxxx
val productSubCategory = sqlContext.read.format("csv")
.option("header", "false")
.schema(productSubCategotySchema)
.load("/data/dimProductSubCategory/part-m-00000")
Now create temporary views of each dataframe that we have created so far:
xxxxxxxxxx
sales.createOrReplaceTempView("salesV")
customer.createOrReplaceTempView("customerV")
product.createOrReplaceTempView("productV")
productCategory.createOrReplaceTempView("productCategoryV")
productSubCategory.createOrReplaceTempView("productSubCategoryV")
And run the same query which we ran in SQL Server:
xxxxxxxxxx
Val df_1=spark.sql("""select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)
from salesV f
inner join productV p on f.productkey = p.productkey
inner join productSubCategoryV ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey
inner join productCategoryV pc on pc.ProductCategoryKey = ps.ProductCategoryKey
inner join customerV c on c.customerkey = f.customerkey
group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName """)
df_1.show()
It took around 3 minutes to execute the result set.
Spark With a Parquet File for a Fact Table
Now, let’s convert the FactInternetSaleNew file to a Parquet file and save it to HDFS using the following command:
salesCSV.write.format("parquet").save("sales_parquet")
Create a dataframe on top of the Parquet file using the below command:
val sales = sqlContext.read.parquet("/user/nituser/sales.parquet")
And create a temp view using the sales dataframe:
sales.createOrReplaceTempView("salesV")
Now, we will run the same query which we used in Step 2:
xxxxxxxxxx
val df_1=spark.sql("""select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)
from salesV f
inner join productV p on f.productkey = p.productkey
inner join productSubCategoryV ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey
inner join productCategoryV pc on pc.ProductCategoryKey = ps.ProductCategoryKey
inner join customerV c on c.customerkey = f.customerkey
group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName """)
It will return the same result set in less than 20 seconds.
We can conclude by stating that Spark with commodity hardware performs very similar to the high-end SQL Server.
However, Spark outshines other engines when it deals with efficient column-oriented and compressed storage formats.
So, we need to decide the specifications for the processing engine and storage based on business requirements, while also understanding how we can boost the power of such a highly efficient processing engine and get the required performance.
Published at DZone with permission of Vineet Pawar. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments