Apache Spark Performance Tuning – Straggler Tasks
The final part of this performance tuning series discusses straggler tasks and shuffle principles in our example Spark app.
Join the DZone community and get the full member experience.
Join For FreeThis is the last article of a four-part series about Apache Spark on YARN. Apache Spark carefully distinguishes "transformation" operations into two types: "narrow" and "wide." This distinction is important due to strong implications on evaluating transformations and improving their performance. Spark depends heavily on the key/value pair paradigm on defining and parallelizing operations, especially wide transformations requiring data to be redistributed between machines.
A few performance bottlenecks were identified in the SFO Fire Department call service dataset use case with YARN cluster manager. To understand the use case and performance bottlenecks identified, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks. The resource planning bottleneck is addressed and notable performance improvements achieved in the use case Spark application, as discussed in our previous blog on Apache Spark on YARN – Resource Planning. To learn about partition tuning in the use case Spark application, refer to our previous blog on Apache Spark Performance Tuning – Degree of Parallelism.
In this blog, let us discuss the shuffle and straggler tasks problem so as to improve the performance of the use case application.
Spark Shuffle Principles
Two primary techniques, “shuffle less” and “shuffle better,” to avoid performance problems associated with shuffles are as follows:
- Shuffle Less Often – To minimize the number of shuffles in a computation requiring several transformations, preserve partitioning across narrow transformations to avoid reshuffling data.
- Shuffle Better - Computation cannot be completed without a shuffle sometimes. All wide transformations and all shuffles are not equally expensive or prone to failure.
Operations on the key/value pairs can cause
- Out-of-memory errors in the driver.
- Out-of-memory errors on the executor nodes.
- Shuffle failures.
- Straggler tasks or partitions, especially slow to compute.
The memory errors in the driver are mainly caused by actions. The last three performance issues (out of memory on the executors, shuffles, and straggler tasks) are caused by shuffles associated with the wide transformations.
Understanding Use Case Application Shuffle
The number of partitions tuned based on the input dataset size is explained in our previous blog on Apache Spark Performance Tuning – Degree of Parallelism. The DataFrame API implementation of the application submitted with the following configuration is shown in the below screenshot:
./bin/spark-submit --name FireServiceCallAnalysisDataFramePartitionTest --master yarn --deploy-mode cluster --executor-memory 2g --executor-cores 2 --num-executors 2 --conf spark.sql.shuffle.partitions=23 --conf spark.default.parallelism=23 --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDF /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
On considering Shuffle Read and Write columns, the shuffled data is in Bytes and Kilo Bytes (KB) across all the stages, as per the shuffle principle “shuffle less” in our use case application.
The input of ~849 MB is carried over in all the shuffle stages.
The “Executors” tab in the Spark UI provides the summary of input, shuffles read, and write as shown in the below diagram:
The overall input size is 5.9 GB including original input of 1.5 GB and entire shuffle input of ~849 MB.
Detecting Stragglers Tasks in the Use Case
"Stragglers" are tasks within a stage that take much longer to execute than other tasks. The total time taken for DataFrame API implementation is 1.3 minutes. On considering the Stages wise durations, Stage 0 and 2 consumed 10 s and 46 s, respectively. Totally, 56 seconds (~ 1 minute).
Internally, Spark does the following:
- Spark optimizers such as Catalyst and Tungsten optimize the code at runtime.
- Spark high-level DataFrame and DataSet API encoder reduces the input size by encoding the data.
By reducing input size and by filtering the data from input datasets in both low-level and high-level API implementation, the performance can be improved.
Low-Level and High-Level API Implementation
Our input dataset has 34 columns. Three columns were used for computation to answer the use case scenario questions.
The below updated RDD and DataFrame API implementation code provides performance improvement by selecting only needed data for this use case scenario:
val filteredFireServiceCallRDD = filteredFireServiceCallWithoutHeaderRDD.map(x => Array(x(3), x(4), x(31)))
The above line is added at the beginning of the RDD API implementation to select three columns and remove 31 columns from the RDD to reduce the input size in all the shuffle stages.
The below code does the same thing in the DataFrame API implementation:
// FILTERING NEEDED COLUMN FOR USE CASE SCENARIO’S
val fireServiceCallDF = fireServiceCallYearAddedDF.select("CallType", "NeighborhooodsDistrict", "CallDateTS", "CallYear")
The code block of the RDD API implementation is given below:
// FILTER THE HEADER ROW AND SPLIT THE COLUMNS IN THE DATA FILE (EXCLUDE COMMA WITH IN DOUBLE QUOTES)
val filteredFireServiceCallWithoutHeaderRDD = fireServiceCallRawRDD.filter(row => row != header).map(x => x.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)"))
val filteredFireServiceCallRDD = filteredFireServiceCallWithoutHeaderRDD.map(x => Array(x(3), x(4), x(31)))
// CACHE/PERSIST THE RDD
filteredFireServiceCallRDD.setName("FireServiceCallsRDD").persist().take(10)
// NUMBER OF RECORDS IN THE FILE
val totalRecords = filteredFireServiceCallRDD.count()
println(s"Number of records in the data file: $totalRecords")
// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsRDD = filteredFireServiceCallRDD.map(x => x(0))
distinctTypesOfCallsRDD.distinct().collect().foreach(println)
// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedRDD = distinctTypesOfCallsRDD.map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
distinctTypesOfCallsSortedRDD.collect().foreach(println)
// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
val fireServiceCallYearsRDD = filteredFireServiceCallRDD.map(convertToYear).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
fireServiceCallYearsRDD.take(20).foreach(println)
// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallRDD = filteredFireServiceCallRDD.map(convertToDate).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).sortByKey(false)
last7DaysServiceCallRDD.take(7).foreach(println)
// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsRDD = filteredFireServiceCallRDD.filter(row => (convertToYear(row) == "2016")).map(x => x(2)).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
neighborhoodDistrictCallsRDD.collect().foreach(println)
The code block of the DataFrame API implementations is given below:
// FILTERING NEEDED COLUMN FOR USE CASE SCENARIO’S
val fireServiceCallDF = fireServiceCallYearAddedDF.select("CallType", "NeighborhooodsDistrict", "CallDateTS", "CallYear")
// RE ARRANGE NUMBER OF PARTITION
fireServiceCallDF.cache().take(10)
// PRINT SCHEMA
fireServiceCallDF.printSchema()
// LOOK INTO TOP 20 ROWS IN THE DATA FILE
fireServiceCallDF.show()
// NUMBER OF RECORDS IN THE FILE
val totalRecords = fireServiceCallDF.count()
println(s"Number of records in the data file: $totalRecords")
// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsDF = fireServiceCallDF.select("CallType").distinct()
distinctTypesOfCallsDF.collect().foreach(println)
// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedDF = fireServiceCallDF.select("CallType").groupBy("CallType").count().orderBy(desc("count"))
distinctTypesOfCallsSortedDF.collect().foreach(println)
// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
val fireServiceCallYearsDF = fireServiceCallDF.select("CallYear").groupBy("CallYear").count().orderBy(desc("count"))
fireServiceCallYearsDF.show()
// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallDF = fireServiceCallDF.select("CallDateTS").groupBy("CallDateTS").count().orderBy(desc("CallDateTS"))
last7DaysServiceCallDF.show(7)
// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsDF = fireServiceCallDF.filter("CallYear == 2016").select("NeighborhooodsDistrict").groupBy("NeighborhooodsDistrict").count().orderBy(desc("count"))
neighborhoodDistrictCallsDF.collect().foreach(println)
Submitting a Spark Application in YARN
The Spark submit command with partition tuning used to execute the RDD and DataFrame API implementation in YARN is as follows:
Submit RDD Application
./bin/spark-submit --name FireServiceCallAnalysisRDDStragglerFixTest --master yarn --deploy-mode cluster --executor-memory 2g --executor-cores 2 --num-executors 2 --conf spark.default.parallelism=23 --class com.treselle.fscalls.analysis.SFOFireServiceCallAnalysis /data/SFFireServiceCall/SFFireServiceCallAnalysisPF.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
Submit DataFrame Application
./bin/spark-submit --name FireServiceCallAnalysisDataFrameStragglerFixTest --master yarn --deploy-mode cluster --executor-memory 2g --executor-cores 2 --num-executors 2 --conf spark.sql.shuffle.partitions=23 --conf spark.default.parallelism=23 --class com.treselle.fscalls.analysis.SFOFireServiceCallAnalysisDF /data/SFFireServiceCall/SFFireServiceCallAnalysisPF.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
The DataFrame API implementation of the application input, shuffles read, and writes is monitored in the stages view. The below diagram shows that the input size of shuffle stages is ~17 MB currently and ~849 MB previously.The shuffle read and write do not have multiple changes. The “Executors” tab in the Spark UI provides the summary of input, shuffles read, and write. as shown in the below diagram:
The summary shows that the input size is 1.5 GB currently and 5.9 GB previously.
The time duration after reducing the input size in RDD and DataFrame API implementation is shown in the below diagram:
Understanding Use Case Performance
The performance duration (without any performance tuning) based on different API implementation of the use case Spark application running on YARN is shown in the below diagram:
For more details, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks.
We tuned the number of executors, cores, and memory for RDD and DataFrame implementation of the use case Spark application. The below diagram is based on the performance improvements after tuning the resources:
For more details, refer our previous blog on Apache Spark on YARN – Resource Planning.
We tuned the default parallelism and shuffle partitions of both RDD and DataFrame implementation in our previous blog on Apache Spark Performance Tuning – Degree of Parallelism. We did not achieve performance improvement, but reduced the scheduler overhead.
Finally, after identifying the straggler tasks and reducing the input size, we got 2 x performance improvements in DataFrame implementation and 4 x improvements in RDD implementation.
Conclusion
In this blog, we discussed shuffle principles and understood the use case application shuffle, straggler task deducting in the application, and input size reduction to improve the performance of different API implementations of the Spark application.
We achieved 2 x performance improvements in DataFrame implementation and 4 x improvements in RDD implementation from the result of resource and partition running.
The code examples are available on GitHub.
Published at DZone with permission of Rathnadevi Manivannan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments