DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • I Was Tired of Flying Blind With AI Agents, So I Built AgentDog
  • Prompt Injection Is Real, So I Built a Python Firewall for LLM Pipelines
  • Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
  • Ingesting Fixed-Width Mainframe Files Into Delta Lake: The Details Nobody Writes Down

Trending

  • Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 2
  • Orchestrating Zero-Downtime Deployments With Temporal
  • Retesting Best Practices for Agile Teams: A Quick Guide to Bug Fix Verification
  • You Learned AI. So Why Are You Still Not Getting Hired?
  1. DZone
  2. Data Engineering
  3. Databases
  4. Handling Concurrent Data Loads in Delta Tables

Handling Concurrent Data Loads in Delta Tables

Learn to handle Delta Lake concurrency issues with retries, exponential backoff, partitioning, Auto-Optimize, MERGE, and monitoring techniques.

By 
munikrishnaiah sundararamaiah user avatar
munikrishnaiah sundararamaiah
·
Mar. 12, 25 · Analysis
Likes (3)
Comment
Save
Tweet
Share
4.7K Views

Join the DZone community and get the full member experience.

Join For Free

Ensuring Reliable Concurrent Writes With Retrying Options

Delta Lake is a resilient storage layer that offers ACID transactions, schema enforcement, and data versioning. However, concurrent writes generate contention since different processes are attempting to write, update, or delete at the same time. This process offers a structured retry mechanism with exponential backoff to handle concurrency in Delta tables.

Delta Table Concurrent Writes Issues

Concurrency failures occur when multiple processes simultaneously attempt to write to the same Delta table. The common failure scenarios are as follows:

  • ConcurrentAppendException – When concurrent jobs append records simultaneously, with conflicting appends.
  • ConcurrentDeleteReadException – If a process is trying to read deleted data by another process.
  • ConcurrentDeleteDeleteException – When two processes attempt to delete the same data simultaneously.

These issues must have an ever-present retry facility that makes writes happen successfully with consistency.

Proposed Retry Mechanism for Delta Table Writes

A streaming write retry procedure is employed to mitigate concurrent write failures utilizing exponential backoff.

The following Python code describes the process:

Python
 
from datetime import datetime
from time import sleep
from delta.exceptions import (
    ConcurrentAppendException,
    ConcurrentDeleteReadException,
    ConcurrentDeleteDeleteException,
)
import math

def streaming_write_with_concurrent_retry(
    stream, max_attempts=3, indefinite=False, table=None, path=None
):
    """
    Handles concurrent write operations to a Delta table or path by retrying the operation
    in case of specific concurrent exceptions.

    :param stream: The data stream to be written.
    :param max_attempts: The maximum number of retry attempts. Default is 3.
    :param indefinite: If True, will keep retrying indefinitely. Default is False.
    :param table: The Delta table to write to.
    :param path: The path to write to.
    :return: The result of writer.awaitTermination().
    """

    attempt = 0  # Initialize attempt counter

    while True:
        try:
            # Choose the writer based on whether table or path is provided
            if table:
                writer = stream.table(table)
            elif path:
                writer = stream.start(path)
            else:
                writer = stream.start()

            # Attempt to write and wait for termination
            return writer.awaitTermination()

        # Handle concurrent exceptions
        except (
            ConcurrentAppendException,
            ConcurrentDeleteReadException,
            ConcurrentDeleteDeleteException,
        ) as e:

            # Increment attempt counter
            attempt += 1

            # If indefinite is False and attempts have reached max_attempts, raise the exception
            if not indefinite and attempt >= max_attempts:
                raise e from None

            # Calculate sleep time using exponential backoff strategy
            sleep_time = min(120, math.pow(2, attempt))

            # Log the retry attempt
            print(f"Retrying {attempt}/{max_attempts if not indefinite else '∞'} after {sleep_time} seconds due to {type(e).__name__}")

            # Sleep for the calculated time before retrying
            sleep(sleep_time)


Retry Strategy Explanation

The retry policy follows an exponential backoff policy:

1. Exception Identification

The function catches ConcurrentAppendException, ConcurrentDeleteReadException, and ConcurrentDeleteDeleteException.

2. Retry Attempts and Limitations

The function retries a maximum of max_attempts times before it fails. The indefinite=True parameter allows infinite retries until success.

3. Exponential Backoff Calculation

Backoff formula: sleep_time = min(120, 2^attempt). This ensures retry wait times grow exponentially but are capped at a maximum of 120 seconds.

Example of retry wait times:

  • Attempt 1 → Wait 2 seconds
  • Attempt 2 → Wait 4 seconds
  • Attempt 3 → Wait 8 seconds
  • (up to a maximum of 120 seconds)

Resuming the Stream

Once the retry is successful, writer.awaitTermination() enables the streaming job to continue running.

Alternative Strategies for Delta Table Concurrency Handling

Besides retry-based conflict resolution, Delta Lake offers additional techniques:

1. Optimistic Concurrency Control (OCC)

Delta Lake checks for conflicts before committing a transaction. If there is a conflict, it will retry the operation automatically.

2. Partitioning Data for Isolation

Write operations should be targeted at other partitions to avoid collision.

Python
 
df.write.format("delta").mode("overwrite").option("replaceWhere", "date = '2025-02-17'").save("/mnt/delta/table")


This limits updates to a single partition, reducing contention.

3. Streaming and Auto-Optimize 

Enable Auto-Optimize and Auto-Compact:

Python
 
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
)


This config change reduces small files and improves concurrent write performance.

4. Merge-Based Upserts

Instead of direct inserts, use MERGE to handle conflicts:

Python
 
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/mnt/delta/table_name")

delta_table.alias("target").merge(
    df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={"target.value": "source.value"}).whenNotMatchedInsertAll().execute()


This process ensures row-level conflict resolution.

Monitoring Debugging Concurrency Issues

1. Check data load transaction history:

Python
 
DESCRIBE HISTORY delta.`/mnt/delta/table_name`;


2. View/check any active locks:

Python
 
SHOW TBLPROPERTIES delta.`/mnt/delta/table_name`;


3. Enable the change data feed (CDF) for change tracking:

Python
 
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES ('delta.enableChangeDataFeed' = true);


Conclusion

Delta tables allow concurrent writes with ACID guarantees, but conflicts are possible.

  • A retry-based strategy with exponential backoff helps mitigate concurrency issues.
  • Partitioning, MERGE, and Auto-Optimize also improve concurrent write performance.
  • Tracking mechanisms such as DESCRIBE HISTORY and CDF follow conflicts.

By adhering to these best practices, the process can efficiently attain concurrent delta table writes, maintain data integrity, and achieve performance optimizations.

Data integrity Exponential backoff DELTA (taxonomy) Python (language)

Opinions expressed by DZone contributors are their own.

Related

  • I Was Tired of Flying Blind With AI Agents, So I Built AgentDog
  • Prompt Injection Is Real, So I Built a Python Firewall for LLM Pipelines
  • Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
  • Ingesting Fixed-Width Mainframe Files Into Delta Lake: The Details Nobody Writes Down

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook