DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory
  • A Fresh Look at Optimizing Apache Spark Programs
  • Configuring Spark-Submit
  • Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches

Trending

  • Key Takeaways From Integrating a RAG Application With LangSmith
  • Bringing Intelligence Closer to the Source: Why Real-Time Processing is the Heart of Edge AI
  • Evaluating SOC Effectiveness Using Detection Coverage and Response Metrics
  • Retesting Best Practices for Agile Teams: A Quick Guide to Bug Fix Verification
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Debugging a Spark Driver Out of Memory (OOM) Issue With Large JSON Data Processing

Debugging a Spark Driver Out of Memory (OOM) Issue With Large JSON Data Processing

This article draws on real-world debugging experience and aims to provide insights into Spark's memory management challenges.

By 
Raju Ansari user avatar
Raju Ansari
·
Oct. 29, 25 · Analysis
Likes (2)
Comment
Save
Tweet
Share
2.2K Views

Join the DZone community and get the full member experience.

Join For Free

As a data engineer, I recently encountered a challenging scenario that highlighted the complexities of Apache Spark memory management and Spark internal processing. Despite working with what seemed like a moderate dataset (25 GB), I experienced a driver Out of Memory (OOM) error that halted my data replication job.

In this article, I will discuss Spark's internal processing complexity and memory management that can help us build a resilient data replication solution.

Scenario

Let’s jump into the code snippet that is causing the OOM issue.

Scala
 
//Read all data files
val df = spark.read.json(allFiles: _*)

//Create DDBRecord dataset
val recordsDS: Dataset[DDBRecord] = df.map(
      row => Record.fromRow(row, dynamoDBPartitionKey, Option(dynamoDBSortKey)))

//Aggregate using custom aggregrator
val aggregatedDS = recordsDS.groupByKey{
    r => (r.ddbPartitionKey.orNull, r.ddbSortKey.orNull)
    }.agg(aggregator.toColumn.name("result"))

//Convert back to dataFrame from the transformed dataset
val resultDf = aggregatedDS.map(r => r._2.toRow)(RowEncoder.apply(df.schema))

if (resultDf.isEmpty) {
    return
} 
//apply the net changes in the Apache Iceberg target


Our data file is DynamoDB exported data, which typically has rows containing Metadata, Keys, OldImage and NewImage. Our goal is to apply the net changes to the Apache Iceberg target. The objective of the code above is to determine net changes by keeping track of the oldest and newest images in the data files. For example, if there are multiple updates, we need to keep only the oldest (OldImage) and the latest (NewImage) data. We are using a custom model class (DDBRecord) and a custom aggregator to achieve this task.

Here are additional details about the data and infrastructure.

  • Dataset: 25 GB of DynamoDB exported JSON data
  • Cluster: 9-machine Glue Spark cluster (G.8X)
  • Cluster Configuration:
    • 32 vCPU per machine
    • 128 GB memory per machine

The Unexpected Challenge

Surprisingly, while performing a simple isEmpty() operation on a transformed DataFrame, the Spark driver immediately threw an Out of Memory error. This was unexpected, given the seemingly modest data size and powerful cluster configuration.

As we know that Spark defers the execution of transformations until an action is called. In this scenario, Spark starts the execution when the action df.isEmpty is triggered. The driver is throwing an OOM error as part of executing this action.

Dive Deep

Below is a typical stack trace for an OOM issue where Spark experiences low memory and ultimately OOM when it tries to execute the action. As is evident from the stack trace, Spark calls explainString method to generate a plain text of the whole plan. It uses plain text for Spark events and logging purposes. You can control the plain text generation by setting config spark.sql.explain.mode to Simple, Formatted, Extended, etc.

Java
 
Caused by: java.lang.OutOfMemoryError
    at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161)
    at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
    at java.lang.StringBuilder.append(StringBuilder.java:141)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:203)
    at scala.collection.immutable.Stream.addString(Stream.scala:691)
    at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:377)
    at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:376)
    at scala.collection.immutable.Stream.mkString(Stream.scala:760)
    at org.apache.spark.sql.catalyst.util.package$.truncatedString(package.scala:179)
    at org.apache.spark.sql.catalyst.expressions.Expression.toString(Expression.scala:307)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:136)
    at org.apache.spark.sql.catalyst.expressions.If.toString(conditionalExpressions.scala:105)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:136)
    at org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:213)
    at org.apache.spark.sql.catalyst.trees.TreeNode.formatArg(TreeNode.scala:918)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$formatArg$1(TreeNode.scala:911)
    at scala.collection.immutable.List.map(List.scala:297)
    at org.apache.spark.sql.catalyst.trees.TreeNode.formatArg(TreeNode.scala:911)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$argString$1(TreeNode.scala:931)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.AbstractIterator.addString(Iterator.scala:1431)
    at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:377)
    at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:376)
    at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:379)
    at scala.collection.AbstractIterator.mkString(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.trees.TreeNode.argString(TreeNode.scala:949)
    at org.apache.spark.sql.catalyst.trees.TreeNode.simpleString(TreeNode.scala:956)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:402)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:404)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:1070)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:1098)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:1098)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:1098)
    at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:991)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657)
    at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:284)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267)
    **at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246)**
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
    at org.apache.spark.sql.Dataset.isEmpty(Dataset.scala:3201)
    ... 16 more


But why does Spark explain the plan during execution?

Spark needs to generate a plan string representation for logging, Spark UI, optimization, and debugging purposes. Spark has an internal event bus where this information is sent for other components to act on. For example, Spark UI listens to the event bus and uploads the event for Spark UI rendering. Whenever an action is invoked, Spark materializes the logical plan and determines the corresponding physical plan for execution.

The code block below shows that Spark calls explainString to send it to the event bus.

explainString

Why would a plan string representation cause OOM?

On a further deep dive, we find that the generated plan spans multiple pages. There are two factors contributing to the massive plan. First, the schema size, and second, the dataset created from the DataFrame.

In our implementation, we are creating a dataset from a dataframe to produce net changes. The Spark dataset is a distributed collection of data that combines the benefits of RDDs and DataFrames. Since DataSet has schema + type safety, Spark adds null safety for every field. If we have nested attributes this type safety check is applied recursively making the whole plan a giant string. Below is a typical part of the plan when an attribute Key from dataframe is used to represent the same in a dataset. Type safety checks make the plan string representation bigger.

createexternalrow(if (isnull(Keys#786)) null else createexternalrow(if (Keys#786.isNullAt) null else if (isnull(Keys#786.uuid)) null else createexternalrow(if (Keys#786.uuid.isNullAt) null else Keys#786.uuid.S.toString, StructField(S,StringType,true)), StructField(uuid,StructType(StructField(S,StringType,true)),true))

It is fine to generate a plan. However, storing the plain-text representation of the plan is expensive and causes memory issues when the source schema itself is big. The Spark Jira issue mentions the same OOM issue that we are facing.

In Spark, there are two properties to control the plan generation maxToStringFields or maxPlanStringLength. These properties only affect the formatting or truncate output; they don’t control the traversal itself.

Deep Dive Conclusion: Spark plan text representation is massive due to dataset internal representation and Spark traversal of generating a plan. This is contributing to a significant memory drop and, in some cases, OOM.

What Is the Fix?

Our investigation indicates that the current implementation creates a DataSet from a DataFrame, and its internal representation is massive when Spark represents the plan as a plain string. We have a couple of options to reduce the memory footprint to address the issue.

We need a plan for debugging purposes, but not at the cost of compromising the functionality. We would prefer to turn the explain plan off to address the low memory issue. However, this feature is not available in Spark. This pull request talks about having Off mode.

We work on optimizing our implementation, where we will not create a DataSet because the DataSet's internal representation is contributing to the massive string representation of the plan. Below is the equivalent code to compute the net changes using dataframe operations only. Essentially, we group records by key (sorted by timestamp) and pick the oldImage and newImage for net changes.

Scala
 
//Instead of using dataset, perform dataframe aggregation only.
val df = spark.read.json(allFiles: _*)

df.groupBy(keyColumns: _*)
.agg(
  min_by(col("OldImage"), col("_write_ts")).as("_earliest_old"),
  max_by(col("NewImage"), col("_write_ts")).as("_latest_new"),
  max_by(col("Keys"), col("_write_ts")).as("_final_keys"),
  max_by(col("Metadata"), col("_write_ts")).as("_final_metadata")
      )
if (df.isEmpty) {
    return
} 
//proceed with the rest


My local testing clearly shows significant improvement in memory consumption after the fix.

The image below shows the object size during execution. 

Object size during execution

After the fix, the object size has reduced drastically.

Object size has reduced drastically


Quick Rules of Thumb for OOM

  • Driver → User Memory risk:
    • Anything that pulls results back (collect, show, take large n).
    • Anything that expands metadata (schema explosion, file listing).
  • Executor → Unified Memory risk:
    • Anything that redistributes data (join, groupBy, shuffle).
    • Caching or broadcast of large datasets.

The table below helps you narrow down the investigation.

Aspect Driver Executor
Schema, Logical/Physical Plans Lives in User Memory Not relevant
File Metadata (S3 file listing, partition info) User Memory Not relevant
Task Scheduling Metadata User Memory Not relevant
Row Collection (collect(), count(), isEmpty()) Results land in User Memory Executors compute them
Shuffle Buffers, Joins, Aggregations Minimal at driver (control plane only) Unified Memory
Caching (persist, broadcast) Broadcasts take Storage Memory Cached data in executors takes Storage Memory
UDF Execution Rare (only if driver executes an action locally) Executors run UDFs in Execution/User Memory
OOM Cause Often schema explosion, collect(), file listing, huge plan Often shuffle/join spill, skew, insufficient execution memory


Conclusion

Memory management in Spark is nuanced. Even seemingly simple operations can lead to unexpected memory challenges. I would advise keeping track of memory usage and using appropriate configuration and improvements to make a reliable data processing solution.

Apache Spark JSON Memory (storage engine)

Opinions expressed by DZone contributors are their own.

Related

  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory
  • A Fresh Look at Optimizing Apache Spark Programs
  • Configuring Spark-Submit
  • Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook