Handling Concurrent Data Loads in Delta Tables
Learn to handle Delta Lake concurrency issues with retries, exponential backoff, partitioning, Auto-Optimize, MERGE, and monitoring techniques.
Join the DZone community and get the full member experience.
Join For FreeEnsuring Reliable Concurrent Writes With Retrying Options
Delta Lake is a resilient storage layer that offers ACID transactions, schema enforcement, and data versioning. However, concurrent writes generate contention since different processes are attempting to write, update, or delete at the same time. This process offers a structured retry mechanism with exponential backoff to handle concurrency in Delta tables.
Delta Table Concurrent Writes Issues
Concurrency failures occur when multiple processes simultaneously attempt to write to the same Delta table. The common failure scenarios are as follows:
- ConcurrentAppendException – When concurrent jobs append records simultaneously, with conflicting appends.
- ConcurrentDeleteReadException – If a process is trying to read deleted data by another process.
- ConcurrentDeleteDeleteException – When two processes attempt to delete the same data simultaneously.
These issues must have an ever-present retry facility that makes writes happen successfully with consistency.
Proposed Retry Mechanism for Delta Table Writes
A streaming write retry procedure is employed to mitigate concurrent write failures utilizing exponential backoff.
The following Python code describes the process:
from datetime import datetime
from time import sleep
from delta.exceptions import (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
)
import math
def streaming_write_with_concurrent_retry(
stream, max_attempts=3, indefinite=False, table=None, path=None
):
"""
Handles concurrent write operations to a Delta table or path by retrying the operation
in case of specific concurrent exceptions.
:param stream: The data stream to be written.
:param max_attempts: The maximum number of retry attempts. Default is 3.
:param indefinite: If True, will keep retrying indefinitely. Default is False.
:param table: The Delta table to write to.
:param path: The path to write to.
:return: The result of writer.awaitTermination().
"""
attempt = 0 # Initialize attempt counter
while True:
try:
# Choose the writer based on whether table or path is provided
if table:
writer = stream.table(table)
elif path:
writer = stream.start(path)
else:
writer = stream.start()
# Attempt to write and wait for termination
return writer.awaitTermination()
# Handle concurrent exceptions
except (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
) as e:
# Increment attempt counter
attempt += 1
# If indefinite is False and attempts have reached max_attempts, raise the exception
if not indefinite and attempt >= max_attempts:
raise e from None
# Calculate sleep time using exponential backoff strategy
sleep_time = min(120, math.pow(2, attempt))
# Log the retry attempt
print(f"Retrying {attempt}/{max_attempts if not indefinite else '∞'} after {sleep_time} seconds due to {type(e).__name__}")
# Sleep for the calculated time before retrying
sleep(sleep_time)
Retry Strategy Explanation
The retry policy follows an exponential backoff policy:
1. Exception Identification
The function catches ConcurrentAppendException
, ConcurrentDeleteReadException
, and ConcurrentDeleteDeleteException
.
2. Retry Attempts and Limitations
The function retries a maximum of max_attempts
times before it fails. The indefinite=True
parameter allows infinite retries until success.
3. Exponential Backoff Calculation
Backoff formula: sleep_time = min(120, 2^attempt)
. This ensures retry wait times grow exponentially but are capped at a maximum of 120 seconds.
Example of retry wait times:
- Attempt 1 → Wait 2 seconds
- Attempt 2 → Wait 4 seconds
- Attempt 3 → Wait 8 seconds
- (up to a maximum of 120 seconds)
Resuming the Stream
Once the retry is successful, writer.awaitTermination()
enables the streaming job to continue running.
Alternative Strategies for Delta Table Concurrency Handling
Besides retry-based conflict resolution, Delta Lake offers additional techniques:
1. Optimistic Concurrency Control (OCC)
Delta Lake checks for conflicts before committing a transaction. If there is a conflict, it will retry the operation automatically.
2. Partitioning Data for Isolation
Write operations should be targeted at other partitions to avoid collision.
df.write.format("delta").mode("overwrite").option("replaceWhere", "date = '2025-02-17'").save("/mnt/delta/table")
This limits updates to a single partition, reducing contention.
3. Streaming and Auto-Optimize
Enable Auto-Optimize
and Auto-Compact
:
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
This config change reduces small files and improves concurrent write performance.
4. Merge-Based Upserts
Instead of direct inserts, use MERGE
to handle conflicts:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/mnt/delta/table_name")
delta_table.alias("target").merge(
df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={"target.value": "source.value"}).whenNotMatchedInsertAll().execute()
This process ensures row-level conflict resolution.
Monitoring Debugging Concurrency Issues
1. Check data load transaction history:
DESCRIBE HISTORY delta.`/mnt/delta/table_name`;
2. View/check any active locks:
SHOW TBLPROPERTIES delta.`/mnt/delta/table_name`;
3. Enable the change data feed (CDF) for change tracking:
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES ('delta.enableChangeDataFeed' = true);
Conclusion
Delta tables allow concurrent writes with ACID guarantees, but conflicts are possible.
- A retry-based strategy with exponential backoff helps mitigate concurrency issues.
- Partitioning,
MERGE
, andAuto-Optimize
also improve concurrent write performance. - Tracking mechanisms such as
DESCRIBE HISTORY
andCDF
follow conflicts.
By adhering to these best practices, the process can efficiently attain concurrent delta table writes, maintain data integrity, and achieve performance optimizations.
Opinions expressed by DZone contributors are their own.
Comments