Why Your Spark Apps Are Slow Or Failing, Part II: Data Skew and Garbage Collection
We look at the concepts of data skew and garbage collection and how they can impact the performance and speed of your Spark application.
Join the DZone community and get the full member experience.Join For Free
The second part of the series “Why Your Spark Apps Are Slow or Failing” follows Part I on memory management and deals with issues that arise with data skew and garbage collection in Spark. Like many performance challenges with Spark, the symptoms increase as the scale of the data handled by the application increases.
What Is Data Skew?
In an ideal Spark application run, when Spark wants to perform a join, for example, join keys would be evenly distributed and each partition that needed processing would be nicely organized. However, real business data is rarely so neat and cooperative. We often end up with less than ideal data organization across the Spark cluster that results in degraded performance due to data skew.
Data skew is not an issue with Spark per se, rather it is a data problem. The cause of the data skew problem is the uneven distribution of the underlying data. Uneven partitioning is sometimes unavoidable in the overall data layout or the nature of the query.
For joins and aggregations Spark needs to co-locate records of a single key in a single partition. Records of a key will always be in a single partition. Similarly, other key records will be distributed in other partitions. If a single partition becomes very large it will cause data skew, which will be problematic for any query engine if no special handling is done.
Dealing With Data Skew
Data skew problems are more apparent in situations where data needs to be shuffled in an operation such as a join or an aggregation. Shuffle is an operation done by Spark to keep related data (data pertaining to a single key) in a single partition. For this, Spark needs to move data around the cluster. Hence, shuffle is considered the most costly operation.
Common symptoms of data skew are:
- Frozen stages and tasks.
- Low utilization of CPU.
- Out of memory errors.
There are several tricks we can employ to deal with data skew problem in Spark.
Identifying and Resolving Data Skew
Spark users often observe that most tasks finish within a reasonable amount of time, only to have one task take forever. In all likelihood, this is an indication that your dataset is skewed. This behavior also results in the overall underutilization of the cluster. This is especially a problem when running Spark in the cloud, where over-provisioning of cluster resources is wasteful and costly.
If skew is at the data source level (e.g. a Hive table is partitioned on the
_month key and the table has a lot more records for a particular
_month), this will cause skewed processing in the stage that is reading from the table. In such a case restructuring, the table with a different partition key(s) helps. However, sometimes it is not feasible as the table might be used by other data pipelines in an enterprise.
In such cases, there are several things that we can do to avoid skewed data processing.
If we are doing a join operation on a skewed dataset one of the tricks is to increase the
spark.sql.autoBroadcastJoinThreshold value so that smaller tables get broadcasted. This should be done to ensure sufficient driver and executor memory.
If there are too many null values in a join or group-by key they would skew the operation. Try to preprocess the null values with some random ids and handle them in the application.
In a SQL join operation, the join key is changed to redistribute data in an even manner so that processing for a partition does not take more time. This technique is called salting. Let’s look at an example to better understand the outcome of salting. In a join or group-by operation, Spark maps a key to a particular partition id by computing a hash code on the key and dividing it by the number of shuffle partitions.
Let’s assume there are two tables with the following schema.
Let’s consider a case where a particular key is skewed heavily, e.g. key 1, and we want to join both the tables and do a grouping to get a count. For example:
After the shuffle stage induced by the join operation, all the rows with the same key need to be in the same partition. Look at the above diagram. Here, all the rows with key 1 are in Partition 1. Similarly, all the rows with key 2 are in Partition 2. It is quite natural that processing Partition 1 will take more time, as the partition contains more data. Let’s check Spark’s UI for the shuffle stage run time for the above query.
As we can see, one task took a lot more time than other tasks. With more data it would be even more significant. Also, this might cause application instability in terms of memory usage as one partition would be heavily loaded.
Can we add something to the so that our dataset will be more evenly distributed? Most of users with skew problems use the salting technique. Salting is a technique where we will add random values to the join key of one of the tables. In the other table, we need to replicate the rows to match the random keys. The idea is if the join condition is satisfied by key1 == key1, it should also get satisfied by key1_<salt> = key1_<salt>. The value of salt will help the dataset to be more evenly distributed.
Here is an example of how to do that in our use case. Check the number 20, used while doing a random function and while exploding the dataset. This is the distinct number of divisions we want for our skewed key. This is a very basic example and can be improved to include only keys which are skewed.
Now let’s check the Spark UI again. As we can see processing time is more even now.
Note that for smaller data the performance difference won’t be very different. Sometimes the shuffle compress also plays a role in the overall runtime. For skewed data, shuffled data can be compressed heavily due to the repetitive nature of data. Hence the overall disk IO/network transfer is also reduced. We need to run our app without salt and with salt to finalize the approach that best fits our needs.
Spark runs on the Java Virtual Machine (JVM). Because Spark can store large amounts of data in memory, it has a major reliance on Java’s memory management and garbage collection (GC). Therefore, garbage collection (GC) can be a major issue that can affect many Spark applications.
Common symptoms of excessive GC in Spark are:
- Application speed.
- Executor heartbeat timeout.
- GC overhead limit exceeded error.
Spark’s memory-centric approach and data-intensive applications make it a more common issue than other Java applications. Thankfully, it’s easy to diagnose if your Spark application is suffering from a GC problem. The Spark UI marks executors in red if they have spent too much time doing GC.
Spark executors are spending a significant amount of CPU cycles performing garbage collection. This can be determined by looking at the “Executors” tab in the Spark application UI. Spark will mark an executor in red if the executor has spent more than 10% of the time in garbage collection than the task time, as you can see in the diagram below
The Spark UI indicates excessive GC in red
Addressing Garbage Collection Issues
Here are some of the basic things we can do to try to address GC issues.
If using RDD-based applications, use data structures with fewer objects. For example, use an array instead of a list.
Specialized Data Structures
If you are dealing with primitive data types, consider using specialized data structures like Koloboke or fastutil. These structures optimize memory usage for primitive types.
Storing Data Off-Heap
The Spark execution engine and Spark storage can both store data off-heap. You can switch on off-heap storage using the following commands:
- –conf spark.memory.offHeap.enabled = true
- –conf spark.memory.offHeap.size = Xgb.
Be careful when using off-heap storage as it does not impact on-heap memory size, i.e. it won’t shrink heap memory. So, to define an overall memory limit, assign a smaller heap size.
Built-in vs. User Defined Functions (UDFs)
If you are using Spark SQL, try to use the built-in functions as much as possible, rather than writing new UDFs. Most of the Spark UDFs can work on UnsafeRow and don’t need to convert to wrapper data types. This avoids creating garbage, also it plays well with code generation.
Be Stingy About Object Creation
Remember we may be working with billions of rows. If we create even a small temporary object with 100-byte size for each row, it will create 1 billion * 100 bytes of garbage.
End of Part II
So far we have focused on memory management, data skew, and garbage collection as the causes of slowdowns and failures in your Spark applications. For Part III of the series, we will turn our attention to resource management and cluster configuration were issues such as data locality, IO-bound workloads, partitioning, and parallelism can cause some real headaches unless you have good visibility and intelligence about your data runtime.
Opinions expressed by DZone contributors are their own.