Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Achieving a 300% Speedup in ETL With Apache Spark

DZone's Guide to

Achieving a 300% Speedup in ETL With Apache Spark

Large or frequent file dumps can slow the ingest pipeline down. See how Cloudera combated this to achieve a 300% speedup instead.

· Big Data Zone
Free Resource

See how the beta release of Kubernetes on DC/OS 1.10 delivers the most robust platform for building & operating data-intensive, containerized apps. Register now for tech preview.

A common design pattern often emerges when teams begin to stitch together existing systems and an EDH cluster: file dumps, typically in a format like CSV, are regularly uploaded to EDH, where they are then unpacked, transformed into optimal query format, and tucked away in HDFS where various EDH components can use them. When these file dumps are large or happen very often, these simple steps can significantly slow down an ingest pipeline. Part of this delay is inevitable; moving large files across the network is time-consuming because of physical limitations and can’t be readily sped up. However, the rest of the basic ingest workflow described above can often be improved.

Let’s establish a simple use case for file processing in EDH: a directory of CSV files exists in hdfs:///user/example/zip_dir/, but has been compressed into raw *.zip files. To make them usable, they need to be extracted and compacted into a single text file which will be placed into hdfs:///user/example/quoteTable_csv/. Since these are CSV files, we’ll assume that each has a simple header located on its first line. One common way to do this is by executing a script similar to the one detailed below on an EDH “edge node” — a node in a cluster which has all of the required configuration files and libraries for applications to interact with the rest of the cluster. Details on the edge node and cluster that we are using for these examples can be found below in the section titled Cluster Details.

#!/bin/bash

mkdir ./zips                                                #create a directory for staging *.zip files

mkdir ./files                                               #create a directory for staging *.csv files

hadoop distcp /user/example/zip_dir/* ./zips                #copy zip files from HDFS to this node

for file in ./zips/*.zip; do

   unzip -a -j $file -d ./files/                            #for each *.zip file copied, unzip it to ./files

done

sed -i '1d' ./files/*                                       #remove the first line from each file in ./files

cat ./files/* > ./files/result.csv                          #concatenate all of the files in ./files

hdfs dfs -put ./result.csv /user/example/quoteTable_csv/    #upload the concatenated file to HDFS

rm -rf ./zips                                               #remove staging directory for *.zip files

rm -rf ./files                                              #remove staging directory for *.csv files

The following diagram shows the basic flow of this solution, where arrows represent either data being copied into a file on new location. In other words, each arrow between blocks shows a time that the data is copied from the left-hand block to the right-hand block. Purple arrows represent times that computations are performed on the data, and red arrows represent times that the data is simply copied.

Screen Shot 2016-12-06 at 8.15.22 AM

Although this solution is familiar and easy to implement,it’s obvious that there is a bottleneck present. On our example cluster, this script took 125 seconds to complete over zip files containing 10,000,000 records.

A Better Way

By leveraging Spark for distribution, we can achieve the same results much more quickly and with the same amount of code. By keeping data in HDFS throughout the process, we were able to ingest the same data as before in about 36 seconds. Let’s take a look at Spark code which produced equivalent results as the bash script shown above — note that a more parameterized version of this code and of all code referenced in this article can be found down below in the Resources section.

Screen Shot 2016-12-06 at 8.17.11 AM

As shown below, by moving this ingest workload from an edge node script to a Spark application, we saw a significant speed boost — the average time taken to unzip our files on the example cluster decreased by 35.7 seconds, which is equivalent to a speedup of more than 300%. The following graph shows the results of running these two workflows over several different inputs:

Screen Shot 2016-12-06 at 8.17.57 AM

Over the larger dataset, the Spark workflow consistently finished more than 900% faster than the simple bash workflow. Now, we will examine a more complicated workflow that involves processing the files after they’re decompressed. In this workflow, that rows from the compressed *.csv files from hdfs:///user/example/zip_dir/ are to be unzipped and placed into the Impala table quoteTable which is backed by parquet files at hdfs:///user/example/quoteTable/. Additionally, let’s filter out certain rows based on value. Our previous bash script could still be used, alongside a call to Impala to convert the *.csv files to parquet:

#!/bin/bash

impala-shell-q"

 CREATE TABLE quoteTable_csv

 (quote STRING)

 LOCATION '/user/example/quoteTable_csv'

 row format delimited fields terminated by ',';

 INSERT INTO quoteTable SELECT * FROM quoteTable_csv WHERE quote != 'EMPTY';

 DROP TABLE quoteTable_csv;

Although Impala performs the conversion and filter rather quickly, this common pattern still involves copying the data across HDFS. This problem is illustrated in the following diagram which depicts this new workflow:

Screen Shot 2016-12-06 at 8.19.03 AM

After running the bash script defined above of 138.5 seconds over our dataset. By contrast, we can modify our Spark job — rewritten below with new functionality — from above to achieve the same thing:

def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf()                                         //create a new spark config

   val sc = new SparkContext(sparkConf)                                    //build a spark context

   val sqlContext = new SQLContext(sc)                                     //build a SQLContext

    

   sc.binaryFiles("/user/example/zip_dir")                                 //make an RDD from *.zip files in HDFS

       .flatMap((file: (String, PortableDataStream)) => {                 //flatmap to unzip each file

           val zipStream = new ZipInputStream(file._2.open)               //open a java.util.zip.ZipInputStream

           val entry = zipStream.getNextEntry                             //get the first entry in the stream

           val iter = Source.fromInputStream(zipStream).getLines          //place entry lines into an iterator

           iter.next;                                                     //pop off the iterator's first line

           iter                                                           //return the iterator

       })

       .toDF("quote")                                                     //convert RDD to DataFrame

       .filter($"quote" !== "EMPTY")                                      //filter out unwanted rows

       .saveAsParquetFile("/user/example/quoteTable/result.parquet")      //write to parquet

 }

Screen Shot 2016-12-06 at 8.19.57 AM

Diagrammed, this program doesn’t look any different than before — the “process” represented is more intensive as it now includes filtering and conversion as well as unzipping, but the data is not written to disk any additional times. As an additional bonus, data that is filtered out is never copied to disk, which is not the case in our previous solution.

This Spark job completed in 64 seconds, over 200% faster than the bash script based solution. On the larger 100M record datasets, our Spark job was over 300% faster. Our cluster’s datanodes only had 2 disks each, and enough cores to support 2 single-core executors each. With significantly more powerful datanodes, Spark’s support for multithreaded writing to parquet will cause it to show even larger gains over Impala for workloads like ours. Even on small cluster, Spark’s performance advantages are clear:

Screen Shot 2016-12-06 at 8.20.50 AM

Once Spark has loaded information into a DataFrame, it’s easy to perform any additional transformations in-memory. In our final example, let’s imagine a more complex pipeline: multiple columns now exist in our dataset, with two quote-enclosed string columns that may contain our delimiter (‘,’), an integer column which needs to be clamped be between -100 and 100, a double column which needs to be squared, and that several simple filters need to be applied. We’ll use the Apache Commons CSV library to easily handle parsing this more complex input. A Spark implementation of this process is shown below:

def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf()                                         //create a new spark config

   val sc = new SparkContext(sparkConf)                                    //build a spark context

   val sqlContext = new SQLContext(sc)                                     //build a SQLContext

    

   sc.binaryFiles("/user/example/zip_dir")                                  //make an RDD from *.zip files in HDFS

       .flatMap((file: (String, PortableDataStream)) => {                  //flatmap to unzip each file

           val zipStream = new ZipInputStream(file._2.open)                //open a java.util.zip.ZipInputStream

           val entry = zipStream.getNextEntry                              //get the first entry in the stream

           val iter = Source.fromInputStream(zipStream).getLines           //place entry lines into an iterator

           iter.next;                                                      //pop off the iterator's first line

           iter                                                            //return the iterator

       })

       .map(parse)                                                         //parse RDD into columns

       .toDF(Seq("name", "score", "confidence", "comment") : _*)           //convert RDD to DataFrame

       .filter($"name" !== "EMPTY")                                        //filter out unwanted rows

       .filter($"score" > 50 || $"score" < -50)                            //...

       .filter($"confidence" > 3.0)                                        //...

       .saveAsParquetFile("/user/example/quoteTable/result.parquet")       //write to parquet

 }

 

 //a helper function to convert a line into a tuple of the correct datatypes using org.apache.commons.csv

 val parse = (line : String) => {

   val parser =                                                            //build a CSVParser

       CSVParser.parse(line, CSVFormat.DEFAULT.withRecordSeparator("\n")) //with a custom CSVFormat

   val fields = parser.getRecords.get(0)                                   //parse the line into columns

   parser.close                                                            //close the parser

   var tempInt = fields.get(1).toInt                                       //cast int field to int

   if(tempInt < -100)        tempInt = -100                                //clamp int to -100

   else if(tempInt > 100)    tempInt = 100                                 //clamp int to -100

   val tempDouble = fields.get(2).toDouble                                 //cast double field to double

   (fields.get(0), tempInt, tempDouble * tempDouble, fields.get(3))        //return a tuple of our columns

 }

This final test, because it involves writing in more compact datatypes, finished significantly faster than the previous test. Our Spark workflow finished in 52 seconds, and involved significantly less code than the traditional solution, which finished in 148 seconds. The graph below shows runtimes for this example over the same datasets used in previous examples:

Screen Shot 2016-12-06 at 8.21.37 AM

As you can see above, our example ingest workflow was markedly faster than a more intuitive solution which uses bash and Impala — and the difference in speed only gets larger as the size of the input grows. By leveraging Spark to its full potential to concisely perform distributed computations and to execute custom or third-party code in a distributed fashion, our ingest process in the final example was sped up by over 600% when compared to a more obvious implementation. Now that you’ve seen the basics, consider how Spark could speed up your own ETL!

Cluster Details

Hardware: 6 EC2 c3.xlarge nodes

CDH version: 5.8.2

Resources

Scripts

Spark

Results

Check out the original article here: http://blog.cloudera.com/blog/2016/12/achieving-a-300-speedup-in-etl-with-spark/

New Mesosphere DC/OS 1.10: Production-proven reliability, security & scalability for fast-data, modern apps. Register now for a live demo.

Topics:
ingest ,spark ,performance ,big data

Published at DZone with permission of Eric Maynard, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}