5 Surprising Truths About Scaling Apache Spark
Strategies for optimizing Apache Spark performance by addressing core bottlenecks like data shuffling, join inefficiencies, and excessive data scanning.
Join the DZone community and get the full member experience.
Join For FreeEleven o’clock in the evening, Friday. The cursor blinks beside a frozen progress indicator — no change since thirty-nine minutes ago - your key workflow still stuck mid-execution. Suddenly, crimson text floods the display: Out of Memory (OOM) or No space left on device. A reflex suggests adding compute units immediately; however, within distributed architectures, scaling up frequently drags performance down while inflating cost. Quiet realization follows - more hardware does not always fix broken flow.
A seasoned cloud data architect for ten years, explanations about Spark’s delayed evaluation have been routine. Though the Catalyst Optimizer excels at shaping efficient workflows, hidden costs can linger unseen. Only when massive datasets arrive do these silent burdens emerge clearly. Excellence in data work goes beyond syntax - it includes grasping subtle system actions. Consequences touch both team effectiveness and financial outcomes equally.
Takeaway 1: The Invisible Tax of the "Shuffle"
When a Spark job runs slowly, look first at the exchange operator — often called a shuffle. Data must shift between partitions during this phase, enabling operations like join() or groupByKey(). Despite its necessity, shuffling frequently causes major slowdowns. Instead of using parallel processing efficiently, tasks wait for data transfers. What begins as high-speed computation becomes limited by network speed. Bottlenecks emerge where least expected - in movement, not logic. This step transforms scalable workloads into traffic jams across machines.
A single shuffle triggers expenses across three distinct areas. One aspect involves computational load. Another ties to memory bandwidth usage. Then there is latency introduced during data movement. Each vector carries its own resource demand:
-
Disk I/O: Shuffles generate a massive number of intermediate files on the executor’s local disk. Exhausting this space is a leading cause of job failure.
-
Network I/O: This is where the math kills your performance. The number of network connections required is a function of Mapper × Reducer. On a large cluster, this creates an explosion of connections that can saturate your bandwidth.
-
CPU/memory load: Heavy computation arises when sorting and merging data across executors, demanding significant CPU and memory resources prior to advancing. Though necessary, this process strains node capacity early in the execution flow.
"Shuffle is one of the most substantial factors in degraded performance of your Spark application. While storing the intermediate data, it can exhaust space on the executor's local disk, causing the job to fail."
Takeaway 2: Small Tables Stay Fixed The Hidden Rule
When joining big tables, Spark typically uses sort-merge join. This method redistributes data across nodes by key; afterward, sorting occurs within each partition. Once ordered, matching records are combined sequentially. Though stable at high volume, such processing demands significant resources. Its robustness comes with a higher computational cost.
Speed gains begin when Spark uses Broadcast Hash Join. Rather than redistributing data, it sends a small dataset to each processing node. Local joins then occur without moving records across machines. By default, Spark’s spark.sql.autoBroadcastJoinThreshold is set to 10MB. This limit is what controls automatic triggering. On current cloud hardware, that value may restrict performance; raising it helps. A 100MB lookup table, if replicated fully, avoids costly network transfers. Execution finishes faster because computation replaces communication. What matters most is alignment between memory capacity and threshold setting.
collect() the data before broadcasting it, you risk crashing the entire application if the table exceeds the driver’s memory or the spark.driver.maxResultSize limit.
from pyspark.sql.functions import broadcast
# Forcing the Catalyst Optimizer to bypass the shuffle
df_joined = df_large.join(broadcast(df_small), "join_key")
Takeaway 3: Spark Has Gained Self-Correcting Behavior Through AQE
Spark 3.0+ brought what is now called Adaptive Query Execution. Once, execution paths stayed fixed - decided early, unchanged despite shifting data patterns. Midway through processing, new information can trigger reassessment of those original choices. Errors in manual configuration may get adjusted on the fly, quietly corrected. This shift allows decisions to evolve when evidence contradicts initial assumptions. Modern Spark adapts because conditions are rarely identical to predictions.
Within AWS setups, AQE holds significant influence. Though active by default in Glue 4.0, activation in Glue 3.0 or earlier EMR releases typically requires setting spark.sql.adaptive.enabled to true.
Three core improvements emerge from AQE:
- When tiny shuffled segments appear, merging happens live. This reduces strain from handling numerous idle jobs by combining them instantly. Overhead drops because blank tasks no longer pile up during processing.
- A table initially expected to be large might shrink significantly when processed. Yet, if its filtered result turns out compact, the system opts for broadcasting instead of sorting and merging. Though designed for larger datasets, this adjustment happens without manual intervention. When reduction occurs at runtime, broadcast becomes more efficient than sort-merge by default behavior.
- A skewed partition may delay processing across the system. Where an imbalance occurs, division into finer segments takes place. This split allows work to proceed without waiting on one slow component. Progress continues even when some parts operate at different speeds.
Takeaway 4: You’re Likely Reading Too Much Data
The best guidance often lands quietly, avoiding data handling speeds things more than any tool could. That quiet truth carries a name: scanning optimization.
Using columnar formats like Parquet or ORC enables predicate pushdown. Because of this, Spark consults the summary data before reading fully. When directories follow a logical split - say, by date components, e.g., by year/month/day — the system skips irrelevant sections naturally. Though a collection may span hundreds of parts, smart structuring lets retrieval focus on a single segment. As a result, unnecessary disk access fades into the background.
"Reduce the amount of data the Spark cluster needs to process... We can filter the source data by skipping some partitions if they do not satisfy our condition. The right condition can significantly speed up reading and retrieving the needed data."
Takeaway 5: Managed Runtimes Grow Faster Than Open Source by About Two Times
Performance differences appear when comparing basic open-source Spark against tuned cloud versions such as Amazon EMR 7.12. Measured results indicate that large-scale merge operations complete more quickly on hosted platforms — up to 2.08x faster than standard Spark 3.5.6. Such gains emerge under specific conditions involving substantial data volumes, roughly around 3 terabytes in size.
Enhanced execution stems from backend adjustments not found in default configurations. Speed advantages depend heavily on infrastructure tuning beyond what community releases offer. Managed environments apply low-level changes, allowing higher throughput during intensive jobs. Raw benchmarks reflect timing gaps where enterprise systems outpace general-purpose builds. Efficiency shifts become visible only at scale, not in smaller test runs. Underlying architecture modifications explain most of the improvement seen in trials. Default Spark lacks certain optimizations automatically included in commercial variants.
This speedup isn't magic; it’s a result of systematic write-pipeline optimizations that aren't available in the base project:
-
Metadata-only deletes: For partition-level deletes, EMR modifies metadata rather than rewriting physical files. Deletion at this level avoids file rewrites through structural updates. Changes occur strictly above the storage layer, leaving underlying content untouched. This approach skips rewriting by leveraging partition boundaries.
-
Bloom filter joins: These probabilistic structures reduce the data read during merge operations by filtering out non-matching keys early. Such structures, rooted in probability, cut down the volume processed at merge time. During the combining steps, less information moves through the pipeline because irrelevant entries vanish ahead of processing.
-
Parallel file write-out: Optimized parallelism when writing to S3 reduces the time executors spend in "Wait" states. Writing files in parallel to S3 cuts down wait times for executors. Efficiency improves as tasks proceed without lingering.
Business outcomes show that doubling speed brings about a 70 percent gain in efficiency on EC2 usage. Improved runtime does more than enhance systems; it lowers recurring expenses associated with cloud infrastructure.
Conclusion: Beyond the "Lazy" Evaluation
Opinions expressed by DZone contributors are their own.
Comments