An Intro to Apache Spark Partitioning: What You Need to Know
An Intro to Apache Spark Partitioning: What You Need to Know
Get some detailed insight into how partitioning plays a role in Apache Spark and learn about skewed data and shuffle blocks.
Join the DZone community and get the full member experience.Join For Free
The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.
Apache Spark's resilient distributed datasets (RDD) are a collection of various data that are so big in size, they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes partitions across different nodes. They are evaluated lazily (i.e. the execution will not start until an action is triggered that increases manageability, saves computation, and thus increases optimization and speed) and the transformations are stored as directed acyclic graphs (DAG). So, every action on the RDD will make Apache Spark recompute the DAG.
It's important to understand the characteristics of partitions in Apache Spark to guide you in achieving better performance, debugging, and error handling.
Here are some of the basics of partitioning:
- Every node in a Spark cluster contains one or more partitions.
- The number of partitions used in Spark is configurable. Having too few (causing less concurrency, data skewing, and improper resource utilization) or too many (causing task scheduling to take more time than actual execution time) partitions is not good. By default, it is set to the total number of cores on all the executor nodes.
- Partitions in Spark do not span multiple machines.
- Tuples in the same partition are guaranteed to be on the same machine.
- Spark assigns one task per partition and each worker can process one task at a time.
Hash Partitioning vs. Range Partitioning in Apache Spark
Apache Spark supports two types of partitioning: hash partitioning and range partitioning. Knowing what keys in your data are distributed or sequenced, as well as the action you want to perform on your data, can help you select the appropriate techniques. There are many factors which affect partitioning choices:
- Available resources: Number of cores on which the task can run.
- External data sources: The size of local collections, Cassandra table, or HDFS file determines the number of partitions.
- Transformations used to derive RDD: There are a number of rules to determine the number of partitions when an RDD is derived from another RDD.
As you can see, there are multiple aspects you'll need to keep in mind when working with Apache Spark. In this blog, I want to highlight the importance of being completely aware of your business data, its keys and physical resources on Spark processing, and, most importantly, network, CPU, and memory.
Let's look at some common pitfalls when working with Apache Spark partitioning.
Skewed Data and Shuffle Blocks
Processing with Apache Spark's default partitioning might cause data to be skewed, which, in turn, can cause problems related to shuffles during aggregation operations or single executors not having sufficient memory.
Here, we see "key-a" has a larger amount of data in the partition, so tasks on Exec-5 will take much longer to complete than the other five tasks. Another important thing to remember is that Spark shuffle blocks can be no greater than 2 GB (internally because the ByteBuffer abstraction has a MAX_SIZE set to 2GB). For example, if you are running an operation such as aggregations, joins, or cache operations, a Spark shuffle will occur — and having a small number of partitions or data skews can cause a high shuffle block issue. Hence, if you started seeing an error related to a breach of MAX_SIZE limits due to shuffles, you know why it's happening, as it may be tied to skewed data.
How do you avoid skewed data and shuffle blocks? Partitioning wisely. It's critical to partition wisely in order to manage memory pressure as well as to ensure complete resource utilization on executors' nodes. You must always know your data size, types, and how it's distributed. A couple of best practices to remember are:
- Understanding and selecting the right operators for actions like reduceByKey or aggregateByKey so that your driver is not put under pressure and the tasks are properly executed on executors.
- If your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions after loading the data will allow the operations that come after it to leverage more of the cluster's CPU.
- Also, if data is skewed, then repartitioning using an appropriate key that can spread the load evenly is also recommended.
How Do You Get the Right Number of Partitions?
Apache Spark can only run a single concurrent task for every partition of an RDD, up to the number of cores in your cluster (and probably 2-3x times that). Hence, as far as choosing a "good" number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism. The maximum size of a partition is ultimately limited by the available memory of an executor.
There are also cases in which it's not possible to understand which proper repartitioning key should be used, even for data distribution. Hence, methods like salting can be used, which involves adding a new "fake" key and using it alongside the current key for better distribution of data. Here's an example:
- Add a random element to small RDD using a Cartesian product(1-N) to increase the number of entries and create new join key.
- Join RDDs on a new join key, which will now be distributed better due to random seeding.
- Remove the random fake key from the join key to get the final result of the join.
In the example above, the fake key in the lookup dataset will be a Cartesian product (1-N), and for the main dataset, it will a random key (1-N) for the source dataset on each row, N being the level of distribution.
Published at DZone with permission of Lokesh Poojari Gangadharaiah , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.