Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark on YARN – Performance and Bottlenecks

DZone's Guide to

Apache Spark on YARN – Performance and Bottlenecks

In this series, we learn about performance tuning and fixing bottlenecks in high-level APIs with an Apache Spark application on YARN.

Free Resource

Overview

This is the first article of a four-part series about Apache Spark on YARN. Apache Spark 2.x version ships with the second-generation Tungsten engine. This engine is built upon ideas from modern compilers to emit optimized code at runtime that collapses the entire query into a single function by using “whole-stage code generation” technique. Thereby, eliminating virtual function calls and leveraging CPU registers for intermediate data. This optimization is applied only to Spark high-level APIs such as DataFrame and Dataset and not to low-level RDD API.

In this blog, let us discuss high-level and low-level Spark API performances. The SFO Fire Department call service dataset and YARN cluster manager are chosen to test as well as tune application performance.

Our other articles of the four-part series are:

  • Part 2 – Apache Spark on YARN – Resource Planning
  • Part 3 – Apache Spark Performance Tuning – Degree of Parallelism
  • Part 4 – Apache Spark Performance Tuning – Straggler Tasks

About Dataset

SFO Fire Calls-For-Service dataset includes responses of all fire units to calls. This dataset has 34 columns and 4.36 million of rows. This dataset will be updated on daily basis. For more details about this dataset, refer SFO website (the link is provided in the reference section).

SFO Fire Department Dataset in HDFS

About Apache Hadoop Cluster

A 2-node Apache Hadoop cluster is set up using HDP 2.6 distribution, which comes with Spark 2.1. This distribution is used for Spark application execution.

Instance details: m4.xlarge (4 cores, 16 GB RAM).

Cluster details: The summary of cluster setup is shown in the below diagram:

HDP Cluster Summary

Use Case

To understand Spark performance and application tuning, a Spark application is created using RDD, DataFrame, Spark SQL, and Dataset APIs to answer the below questions from the SFO Fire Department call service dataset.

  • How many types of calls were made to the fire department?
  • How many incidents of each call type were there?
  • How many years of fire service calls are in the data file?
  • How many service calls were logged in for the past 7 days?
  • Which neighborhood in SF generated the most calls last year?

To answer all except the first question, data grouping should be performed (it is data shuffle in terms of Spark).

Note: One Spark task can handle one partition (partition = data + computation logic).

Low-Level and High-Level API Implementation

In this section, let us discuss low-level and high-level Spark API implementation to answer the above questions. For more details about API, please refer Spark website.

Resilient Distributed Dataset (RDD) Implementation (Low-Level API)

  • The RDD API, in Spark since the 1.0 release, can easily and efficiently process both structured and unstructured data.
  • RDD does not take advantage of Spark’s optimizers such as Catalyst and Tungsten. Developers need to optimize each RDD based on its characteristics attributes.
// NUMBER OF RECORDS IN THE FILE
val totalRecords = filteredFireServiceCallRDD.count()
println(s"Number of records in the data file: $totalRecords")

// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsRDD = filteredFireServiceCallRDD.map(x => x(3))
distinctTypesOfCallsRDD.distinct().collect().foreach(println)

// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedRDD = distinctTypesOfCallsRDD.map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
distinctTypesOfCallsSortedRDD.collect().foreach(println)

// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
 val fireServiceCallYearsRDD = filteredFireServiceCallRDD.map(convertToYear).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
fireServiceCallYearsRDD.take(20).foreach(println)

// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallRDD = filteredFireServiceCallRDD.map(convertToDate).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).sortByKey(false)
last7DaysServiceCallRDD.take(7).foreach(println)

// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR? 
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsRDD = filteredFireServiceCallRDD.filter(row => (convertToYear(row) == "2016")).map(x => x(31)).map(x => (x, 1)).reduceByKey((x, y) => (x + y)).map(x => (x._2, x._1)).sortByKey(false)
neighborhoodDistrictCallsRDD.collect().foreach(println)

DataFrame Implementation (High-Level API)

  • Introduced as part of the Project Tungsten initiative in Spark 1.3 to improve performance and scalability of Spark.
  • Introduces the concept of a schema to describe the data and is radically different from the RDD API as it is an API for building a relational query plan that Spark’s Catalyst optimizer can execute.
  • Gains the advantage of Spark’s optimizers such as Catalyst and Tungsten.
// NUMBER OF RECORDS IN THE FILE
val totalRecords = fireServiceCallDF.count()
println(s"Number of records in the data file: $totalRecords")

// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsDF = fireServiceCallDF.select("CallType").distinct()
distinctTypesOfCallsDF.collect().foreach(println)

// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedDF = fireServiceCallDF.select("CallType").groupBy("CallType").count().orderBy(desc("count"))
distinctTypesOfCallsSortedDF.collect().foreach(println)

// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
val fireServiceCallYearsDF = fireServiceCallDF.select("CallYear").groupBy("CallYear").count().orderBy(desc("count"))
fireServiceCallYearsDF.show()

// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallDF = fireServiceCallDF.select("CallDateTS").groupBy("CallDateTS").count().orderBy(desc("CallDateTS"))
last7DaysServiceCallDF.show(7)

// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsDF = fireServiceCallDF.filter("CallYear == 2016").select("NeighborhooodsDistrict").groupBy("NeighborhooodsDistrict").count().orderBy(desc("count"))
neighborhoodDistrictCallsDF.collect().foreach(println)

Spark SQL Implementation (High-Level API)

  • Spark SQL lets you query the data using SQL, both inside a Spark program and from external tools that are connected to Spark SQL through standard database connectors (JDBC/ ODBC) such as Business Intelligence tools like Tableau.
  • It provides a DataFrame abstraction in Python, Java, and Scala to simplify working with structured datasets. DataFrames are similar to tables in a relational database.
  • Spark SQL gains the advantage of Spark’s optimizers such as Catalyst and Tungsten as its abstraction is DataFrame.
// NUMBER OF RECORDS IN THE FILE
val totalRecords = spark.sql("SELECT COUNT(*) from fireServiceCallsView")
println(s"Number of records in the data file")
totalRecords.show()

// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY DIFFERENT TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsDF = spark.sql("SELECT DISTINCT CallType from fireServiceCallsView")
distinctTypesOfCallsDF.collect().foreach(println)

// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedDF = spark.sql("SELECT CallType, COUNT(CallType) as count from fireServiceCallsView GROUP BY CallType ORDER BY count desc")
distinctTypesOfCallsSortedDF.collect().foreach(println)

// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
val fireServiceCallYearsDF = spark.sql("SELECT CallYear, COUNT(CallYear) as count from fireServiceCallsView GROUP BY CallYear ORDER BY count desc")
fireServiceCallYearsDF.show()

// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallDF = spark.sql("SELECT CallDateTS, COUNT(CallDateTS) as count from fireServiceCallsView GROUP BY CallDateTS ORDER BY CallDateTS desc")
last7DaysServiceCallDF.show(7)

// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsDF = spark.sql("SELECT NeighborhooodsDistrict, COUNT(NeighborhooodsDistrict) as count from 
fireServiceCallsView WHERE CallYear == 2016 GROUP BY NeighborhooodsDistrict ORDER BY count desc")
neighborhoodDistrictCallsDF.collect().foreach(println)

Dataset Implementation (High-Level API)

  • The Dataset API, released as an API preview in Spark 1.6, provides the best of both RDD and DataFrame.
  • Datasets acquire two discrete APIs characteristics such as strongly typed and untyped.
  • Datasets and DataFrame use very advanced Spark built-in encoders. The encoders provide on-demand access to individual attributes without de-serializing an entire object and generate bytecode to interact with off-heap data.
  • Dataset API gains the advantage of Spark’s optimizers such as Catalyst and Tungsten.
// NUMBER OF RECORDS IN THE FILE
val totalRecords = fireServiceCallDS.count()
println(s"Number of records in the data file: $totalRecords")

// Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?
println(s"Q1: HOW MANY TYPES OF CALLS WERE MADE TO THE FIRE SERVICE DEPARTMENT?")
val distinctTypesOfCallsDS = fireServiceCallDS.select(col("CallType"))
distinctTypesOfCallsDS.distinct().collect().foreach(println)

// Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?
println(s"Q2: HOW MANY INCIDEDNTS OF EACH CALL TYPE WHERE THERE?")
val distinctTypesOfCallsSortedDS = fireServiceCallDS.select(col("CallType")).groupBy(col("CallType")).agg(count(col("CallType")).alias("count")).orderBy(desc("count"))
distinctTypesOfCallsSortedDS.collect().foreach(println)

// Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?
println(s"Q3: HOW MANY YEARS OF FIRE SERVICE CALLS IS IN THE DATA FILES AND INCIDENTS PER YEAR?")
val fireServiceCallYearsDS = fireServiceCallDS.select(col("CallYear")).groupBy(col("CallYear")).agg(count(col("CallYear")).alias("count")).orderBy(desc("count"))
fireServiceCallYearsDS.show()

// Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?
println(s"Q4: HOW MANY SERVICE CALLS WERE LOGGED IN FOR THE PAST 7 DAYS?")
val last7DaysServiceCallDS = fireServiceCallDS.select(col("CallDateTS")).groupBy(col("CallDateTS")).agg(count(col("CallDateTS")).alias("count")).orderBy(desc("CallDateTS"))
last7DaysServiceCallDS.show(7)

// Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?
println(s"Q5: WHICH NEIGHBORHOOD IN SF GENERATED THE MOST CALLS LAST YEAR?")
val neighborhoodDistrictCallsDS = fireServiceCallDS.filter(fireServiceCall => fireServiceCall.CallYear == 2016).select(col("NeighborhooodsDistrict")).groupBy(col("NeighborhooodsDistrict")).agg(count(col("NeighborhooodsDistrict")).alias("count")).orderBy(desc("count"))
neighborhoodDistrictCallsDS.collect().foreach(println)

Running Spark on YARN

There are two deployment modes, such as cluster and client modes, for launching Spark applications on YARN.

  • In cluster mode, the Spark driver runs inside an application master process managed by YARN on the cluster. The client goes away after initiating the application.
  • In client mode, the application master only requests resources from YARN and the Spark driver runs in the client process.

Resource (executors, cores, and memory) planning is an essential part when running Spark application as Standalone, and on YARN and Apache Mesos. Especially in YARN, “memory overhead” is a vital configuration while planning for Spark application resource.

Default Spark Configuration for YARN

Plenty of properties can be configured while submitting Spark application on YARN. As part of resource planning, the following are important:select

Note: In Cluster mode: The Spark driver runs inside a YARN Application Master (AM), which will be launched as per the resource allocated for the driver with memory overhead. In Client mode: Spark driver runs in the client process and YARN Application Master (AM) resource should be allocated.In both modes, executor resource should be planned and allocated.

Submitting Spark Applications in YARN

Pre-requisites: Let us assemble all the Spark applications as a Jar using Scala Build Tool (SBT). We have launched the Spark application in YARN in cluster mode with default Spark configuration.

./bin/spark-submit --master yarn --deploy-mode cluster --class com.treselle.fscalls.analysis.FireServiceCallAnalysis /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
./bin/spark-submit --master yarn --deploy-mode cluster --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDF /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
./bin/spark-submit --master yarn --deploy-mode cluster --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDFSQL /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
./bin/spark-submit --master yarn --deploy-mode cluster --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDS /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv

Monitoring Driver and Executor Resources

Upon successfully submitting the Spark application, the below message will be displayed in the console. The message states the amount of memory allocated for Application Master (AM). It is 1408 MB including 384 MB memory overhead, i.e. driver default configuration.

spark.driver.memory + spark.yarn.driver.memoryOverhead = 1024 MB (1 GB) + 384 MB = 1408 MB.

Driver AM ResourceExecutor memory and core can be monitored in both Resource Manager UI and Spark UI. The Executor tab in the Spark UI displays the number of executors and resources allocated to the executor. The Driver core in the below diagram is ‘0,’ though the default driver core is 1. This 1 core is used by the YARN application master.

Storage memory under the executor is shown based on memory used/total available memory for storage of data like RDD partitions cached in memory.Fire Service Analysis DF Executor Stats

Understanding Spark Internals

Spark constructs a Direct Acyclic Graph (DAG) using DAGScheduler based on transformation and action used in the application. Jobs, Stages, and tasks are the internal part of Spark execution. To understand Spark DAG and its internals, refer to Text Normalization with Spark - Part 2 .

RDD implementation for Spark application Jobs is shown in the below diagram. The Jobs view in Spark UI provides the high-level overview of Spark application statistics such as number of jobs, overall and individual job duration, number of stages, and total number of tasks.

Fire Service Analysis RDD Jobs StatsRDD, DataFrame, Spark SQL, and Dataset implementation of the Spark Application Jobs statistics are as follows:

select

Note: The above statistics are based on the default Spark configuration for different Spark API implementation in our use case scenario and no tuning has been applied. The performance bottlenecks are identified using Stages view in Spark UI.

RDD Implementation of Stages View

Fire Service Analysis RDD Stages StatsDataFrame Implementation of Stages ViewFire Service Analysis DF Stages Stats

Spark SQL Implementation of Stages ViewFire Service Analysis DF SQL Stages Stats

Dataset Implementation of Stages ViewFire Service Analysis DS Stages Stats

Low-Level and High-Level API Outputs

The results for five questions in this use case with different Spark API implementations are same. But, the duration taken by different implementations is varied.

  • High-level API implementation of the application was completed and the results were provided in 1.8 and 1.9 minutes.
  • Low-level RDD API implementation of the application was completed in 22 minutes. Even with Kyro serialized way, the implementation of the application was completed in 21 minutes.

The reason for the time difference is caused due to Spark optimizers such as Catalyst and Tungsten when the Spark application was written using High-level API and not Low-level API.Fire Service Call Output

Note: The results of these implementations and source codes has been uploaded into GitHub. Please look into the Reference section for the GitHub location and dataset link.

Identifying Performance Bottlenecks

To do performance tuning, identify the bottlenecks in the application. The following bottlenecks were identified during Spark application implementation of RDD, DataFrame, Spark SQL, and Dataset API:

Resource Planning (Executors, Core, and Memory)

A balanced number of executors, core, and memory will significantly improve the performance without any code changes in the Spark application while running on YARN.

Degree of Parallelism – Partition Tuning (Avoid Small Partition Problems)

On considering the Stages view of both high-level and low-level APIs, a bunch of tasks (200) were found at few stages. Dig deep into stages and look into those 200 tasks in the Event Timeline, tasks computation time will be very low when compared to scheduler delay. The thumb rule for partition size while running in YARN is ~ 128 MB.Parallelism Bottelneck

Straggler Tasks (Long Running Tasks) 

The straggler tasks can be identified in the Stages view and take a long time to complete. In this use case, the following are the straggler tasks that took longer time.

RDD Implementation Straggler TaskRDD Straggler Task

DataFrame Implementation Straggler TaskDF Straggler Task

Conclusion

In this blog, we have discussed running a Spark application on YARN with the default configuration by implementing high-level and low-level APIs. All the implementations were completed within the default resources allocated to the application for this use case, but this may not be the case for all the use cases. Resource planning helps us to decide the balanced executors, cores, and memory planning. The application written in high-level APIs are completed in less time when compared to low-level APIs. So, programming using high-level API is recommended using Spark. Bottlenecks were identified during both high-level and low-level API implementation.The above bottlenecks and performance tuning to eliminate those bottlenecks will be covered in our upcoming blog posts, listed below:

After performance tuning and fixing bottlenecks, the final times taken to complete the application in both high-level and low-level APIs are as shown in the below diagram:Straggler Fix Output

High-level API implementation of the application was completed and the results were provided in 1.8 and 1.9 minutes. After performance tuning, the time was reduced to ~41 seconds. Low-level RDD API implementation of the application was completed in 22 minutes and even with Kyro serialized way the application was completed in 21 minutes. After performance tuning, the time was reduced to ~3 minutes.

Topics:
yarn ,spark ,api management ,performance ,performance monioring

Published at DZone with permission of Rathnadevi Manivannan. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}