Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark Performance Tuning – Degree of Parallelism

DZone's Guide to

Apache Spark Performance Tuning – Degree of Parallelism

Today we learn about improving performance and increasing speed through partition tuning in a Spark application running on YARN.

Free Resource

Transform incident management with machine learning and analytics to help you maintain optimal performance and availability while keeping pace with the growing demands of digital business with this eBook, brought to you in partnership with BMC.

This is the third article of a four-part series about Apache Spark on YARN. Apache Spark allows developers to run multiple tasks in parallel across machines in a cluster, or across multiple cores on a desktop. A partition, or split, is a logical chunk of a distributed data set. Apache Spark builds a Directed Acyclic Graph (DAG) with jobs, stages, and tasks for the submitted application. The number of tasks will be determined based on the number of partitions.

A few performance bottlenecks were identified in the SFO Fire Department call service dataset use case with YARN cluster manager. To understand the use case and performance bottlenecks identified, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks. The resource planning bottleneck is addressed and notable performance improvements achieved in the use case Spark application, as discussed in our previous blog on Apache Spark on YARN – Resource Planning.

In this blog post, let us discuss the partition problem and tuning the partitions of the use case Spark application.

Spark Partition Principles

The general principles to be followed when tuning partition for Spark application are as follows:

  • Too few partitions – Cannot utilize all cores available in the cluster.
  • Too many partitions – Excessive overhead in managing many small tasks.
  • Reasonable partitions – Helps us to utilize the cores available in the cluster and avoids excessive overhead in managing small tasks.

Understanding Use Case Performance 

The performance duration (without any performance tuning) based on different API implementations of the use case Spark application running on YARN is shown in the below diagram:SparkApplicationWithDefaultConfigurationPerformance

The performance duration after tuning the number of executors, cores, and memory for RDD and DataFrame implementation of the use case Spark application is shown in the below diagram:SparkApplicationAfterResourceTuningPerformance

For tuning of the number of executors, cores, and memory for RDD and DataFrame implementation of the use case Spark application, refer our previous blog on Apache Spark on YARN – Resource Planning.

Let us understand the Spark data partitions of the use case application and decide on increasing or decreasing the partition using Spark configuration properties.

Understanding Spark Data Partitions

The two configuration properties in Spark to tune the number of partitions at runtime are as follows:

select

Default parallelism and shuffle partition problems in both RDD and DataFrame API based application implementation are shown in the below diagram:FireServiceCallAnalysisDataFrameTest1StagesStats

The count () action stage using default parallelism (12 partitions) is shown in the below diagram:SparkDefaultParallelism12_200Stats

From the Summary Metrics for Input Size/Records section, the Max partition size is ~128 MB.

On considering the event timeline to understand those 200 shuffled partition tasks, there are tasks with more scheduler delay and less computation time. It indicates that 200 tasks are not necessary here and can be tuned to decrease the shuffle partition to reduce scheduler burden.HighNumberOfTasksProblem

The Stages view in Spark UI indicates that most of the tasks are simply launched and terminated without any computation, as shown in the below diagram:NumberOfTasksProblem

Spark Partition Tuning

Let us first decide the number of partitions based on the input dataset size. The rule of thumb to decide the partition size while working with HDFS is 128 MB. As our input dataset size is about 1.5 GB (1500 MB) and going with 128 MB per partition, the number of partitions will be:

Total input dataset size / partition size => 1500 / 128 = 11.71 = ~12 partitions.

This is equal to the Spark default parallelism (spark.default.parallelism) value. The metrics based on default parallelism are shown in the above section.

Now, let us perform a test by reducing the partition size and increasing the number of partitions.

Consider partition size as 64 MB.

Number of partitions = Total input dataset size / partition size => 1500 / 64 = 23.43 = ~23 partitions.

DataFrame API implementation is executed using the below partition configurations:select

The RDD API implementation is executed using the below partition configurations:select

Note: spark.sql.shuffle.partitions property is not applicable for RDD API-based implementation.

Running Spark on YARN With Partition Tuning

DataFrame API Spark-Submit

./bin/spark-submit --name FireServiceCallAnalysisDataFramePartitionTest --master yarn --deploy-mode cluster --executor-memory 2g --executor-cores 2 --num-executors 2 --conf spark.sql.shuffle.partitions=23 --conf spark.default.parallelism=23 --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDF /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv

Note: Update the values of spark.default.parallelism and spark.sql.shuffle.partitions property as testing has to be performed with the different number of partitions.

The Stages view based on spark.default.parallelism=23 and spark.sql.shuffle.partitions=23 is shown in the below diagram:Partition23_23_StagesStats

Consider the Tasks: Succeeded/Total column in the above diagram. Both default and shuffle partitions are applied and the number of tasks is 23.

The count () action stage using default parallelism (23 partitions) is shown in the below screenshot:SparkParallelism23_23_Stats

On considering Summary Metrics for Input Size/Records section, the max partition size is ~66 MB.

On looking into the shuffle stage tasks, the scheduler has launched 23 tasks and most of the times are occupied by shuffle (Read/Write). There are no tasks without computation.

Partition23_23_ShuflleEventStatsPartition23_23_TasksStatsThe output obtained after executing Spark application with the different number of partitions is shown in the below diagram:PartitionTuningOutput

In this blog, we discussed partition principles and understood the use case performance, deciding the number of partitions, and partition tuning using Spark configuration properties.

The Resource planning bottleneck is addressed and notable performance improvements achieved in the use case Spark application, as discussed in our previous blog on Apache Spark on YARN – Resource Planning. To understand about the use case and performance bottlenecks identified, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks. But, the performance of spark application remains unchanged.

In our upcoming blog, let us discuss the final bottleneck of the use case in “ApacheSpark Performance Tuning – Straggler Tasks.”

The final performance achieved after resource tuning, partition tuning, and straggler tasks problem fixing is shown in the below diagram:Straggler Fix Output

Evolve your approach to Application Performance Monitoring by adopting five best practices that are outlined and explored in this e-book, brought to you in partnership with BMC.

Topics:
yarn ,spark ,performance tuning ,parallelism ,performance

Published at DZone with permission of Rathnadevi Manivannan. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}