4 Common Reasons for FetchFailed Exception in Apache Spark
This article lists out the most common four reasons for a FetchFailed exception in Apache Spark.
Join the DZone community and get the full member experience.Join For Free
Shuffle operations are the backbone of almost all Spark Jobs that are aimed at data aggregation, joins, or data restructuring. During a shuffle operation (Without the support of External Shuffle service), the data is shuffled across various nodes of the cluster via a two-step process:
a) Shuffle Write: Shuffle map tasks write the shuffle data to be shuffled in a disk file, the data is arranged in the file according to shuffle reduce tasks. A bunch of shuffle data corresponding to a shuffle reduce task written by a shuffle map task is called a shuffle block. Further, each of the shuffle map tasks informs the driver about the written shuffle data.
b) Shuffle Read: Shuffle reduce tasks queries the driver about the locations of their shuffle blocks. Then these tasks establish connections with the executors hosting their shuffle blocks and start fetching the required shuffle blocks. Once a block is fetched, it is available for further computation in the reduce task.
The two-step process of a shuffle although sounds simple, but is operationally intensive as it involves data sorting, disk writes/reads, and network transfers. Therefore, there is always a question mark on the reliability of a shuffle operation, and the evidence of this unreliability is the commonly encountered ‘FetchFailed Exception’ during the shuffle operation. Most Spark developers spend considerable time in troubleshooting this widely encountered exception. First, they try to find out the root cause of the exception, and then accordingly put the right fix for the same.
A FetchFailed Exception, reported in a shuffle reduce task, indicates the failure in the reading of one or more shuffle blocks from the hosting executors. Debugging a FetchFailed Exception is quite challenging since it can occur due to multiple reasons. Finding and knowing the right reason is very important because this would help you in putting the right fix to overcome the Exception.
Troubleshooting hundreds of Spark Jobs in recent times, I have realized that FetchFailed Exception mainly comes due to the following reasons:
- Out of Heap memory on Executors
- Low Memory Overhead on Executors
- Shuffle block greater than 2 GB
- Network TimeOut.
Let's understand each of these reasons in detail:
1. ‘Out of Heap memory on an Executor’: This reason indicates that the Fetch Failed Exception has come because an Executor hosting the corresponding shuffle blocks has crashed due to Java ‘Out of memory’ error. ‘Out of memory error’ could come when there is a shortage of heap space on the executor, or the garbage collector of the hosting Executor is wasting more time on garbage collection as compared to real useful work. One of the common causes for the shortage of heap space is due to inappropriate partitioning on either or both sides (map and reduce) of the shuffle operation. ( To know more about the Spark partitioning tuning, you could refer to my recently published book, “Guide to Spark Partitioning: Spark Partitioning Explained in Depth” )
To concretize on the 'Out of Heap Memory' reasoning behind the FetchFailed Exception, you need to check the hosting executor details (hostname/IP Address/Port) mentioned in a Fetch Failed Exception. Once you get the executor details, you could notice the following task failures against the hosting executors:
- ‘ExecutorLostFailure’ due to Exit code 143
- ‘ExecutorLostFailure’ due to Executor Heartbeat timed out.
These task failures against the hosting executors indicate that the executor hosting the shuffle blocks got killed due to Java ‘Out of memory’ error. Also, one could explicitly confirm these failures in the executor logs. Since the hosting executor got killed, the hosted shuffle blocks could not be fetched and therefore could result in Fetch Failed Exceptions in one or more shuffle reduce tasks.
2. ‘Low memory overhead on an Executor’: This reason indicates that a Fetch Failed Exception has come because an Executor hosting the corresponding shuffle blocks has crashed due to ‘Low memory overhead’. ‘Low memory overhead’ error comes when an executor physical RAM footprint crosses the designated physical memory limits. This scenario could happen when executor heap memory is heavily utilized plus there is a good demand for off-heap memory too.
To correlate this reason, you need to check the hosting executor details (hostname/IP Address/Port) mentioned in the Fetch Failed Exception. Once you get the executor details, you could notice the following task failure against the hosting executors:
- ‘ExecutorLostFailure, # GB of # GB physical memory used. Consider boosting the spark.yarn.executor.Overhead’
The above task failure against a hosting executor indicates that the executor hosting the shuffle blocks got killed due to the over usage of designated physical memory limits. Again, since the hosting executor got killed, the hosted shuffle blocks could not be fetched which eventually results in possible Fetch Failed Exceptions in one or more shuffle reduce tasks.
3. ‘Shuffle block greater than 2 GB’: FetchFailed Exception mentioning ‘Too Large Frame’, ‘Frame size exceeding’ or ‘size exceeding Integer.MaxValue’ as the error cause indicates that the corresponding Shuffle reduce task was trying to fetch a shuffle block greater than 2 GB. This mainly comes due to the limit of Integer.MaxValue(2GB) on the data structure abstraction (ByteBuffer) being used to store a shuffle block in the memory.
You can handle the error by tuning the partitioning strategy on either or both sides of the shuffle operation. However, starting Spark release 2.4, this particular cause is largely addressed.
4. ‘Network Timeout’: Fetching of Shuffle blocks is generally retried for a configurable number of times (
spark.shuffle.io.maxRetries) at configurable intervals (
spark.shuffle.io.retryWait). When all the retires are exhausted while fetching a shuffle block from its hosting executor, a FetchFailed Exception is raised in the shuffle reduce task. Such FetchFailed Exceptions are usually categorized in the ‘Network Timeout’ category and are difficult to correlate. Further, these exceptions may arise due to network issues, or when the executors hosting the corresponding shuffle blocks are getting overwhelmed.
I am planning to cover the possible fixes against each cause in an upcoming article. In case, you are looking for a fix urgently against a Fetch Failed Exception, you can drop me a message.
Published at DZone with permission of Ajay Gupta. See the original article here.
Opinions expressed by DZone contributors are their own.