DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • From Monolith to Microservices: Practical Lessons From Real System Modernization
  • Automating Monolith Migration for Resource-Constrained Edge Systems
  • A Comprehensive Analysis of Async Communication in Microservice Architecture
  • Understanding Time Series Databases

Trending

  • Working With Cowork: Don’t Be Confused
  • Optimizing High-Volume REST APIs Using Redis Caching and Spring Boot (With Load Testing Code)
  • Code Quality Had 5 Pillars. AI Broke 3 and Created 2 We Can’t Measure
  • Has AI-Generated SQL Impacted Data Quality? We Reviewed 1,000 Incidents
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Ensuring Data Consistency in Distributed Systems With the Transactional Outbox Pattern

Ensuring Data Consistency in Distributed Systems With the Transactional Outbox Pattern

Ensure data consistency during dual-writes by saving outgoing events to the database alongside primary data, then using a worker to relay them to a message broker.

By 
Henry Yu user avatar
Henry Yu
·
Apr. 07, 26 · Analysis
Likes (2)
Comment
Save
Tweet
Share
3.2K Views

Join the DZone community and get the full member experience.

Join For Free

In modern distributed systems, it is common to face the "dual-write" problem. This occurs when a service needs to perform two actions simultaneously: mutate state in a database and publish an event to a message broker (like RabbitMQ, Apache Kafka, or a Pub/Sub system) to notify other services.

Because these two operations target different systems over the network, they cannot share a single, traditional ACID transaction. If one operation succeeds and the other fails, the system is left in an inconsistent state.

To solve this, we can use the Transactional Outbox pattern. In this article, we will explore the dual-write problem, see why traditional error handling falls short, and implement a robust Transactional Outbox solution using Python, FastAPI, MongoDB (via the Beanie ODM), and Dapr for publish-subscribe messaging.

The Dual-Write Problem: A Naive Approach

Imagine we are building a user registration service. When a user signs up, we need to save their profile to our MongoDB database and publish a user-created event so that the email service can send a welcome email.

The most intuitive, albeit naive, approach is to call both functions sequentially.

Python
 
from fastapi import APIRouter
from models import User
from dapr.clients import DaprClient
import json

router = APIRouter()

@router.post("/users/")
async def create_user(name: str, email: str):
    # 1. Save to the database
    new_user = User(name=name, email=email)
    await new_user.insert()

    # 2. Publish to the message broker
    async with DaprClient() as dapr:
        await dapr.publish_event(
            pubsub_name="mypubsub",
            topic_name="user-created",
            data=json.dumps({"id": str(new_user.id), "email": email})
        )
        
    return {"message": "User created successfully"}


At first glance, this seems fine. However, what happens if the application crashes exactly after the database insert but before the Dapr client publishes the event? Or what if the message broker experiences a temporary network partition?

The user is saved in the database, but the event is never fired. The user will never receive their welcome email, and any other downstream services relying on that event will remain completely unaware of the new user. Our distributed system is now inconsistent.

The False Comfort of Exception Handling

A common reaction to the naive approach is to wrap the publish step in a try...except block and attempt to roll back the database transaction if the publish fails.

Python
 
from fastapi import APIRouter, HTTPException
from models import User
from dapr.clients import DaprClient
import json

router = APIRouter()

@router.post("/users")
async def create_user(name: str, email: str):
    new_user = User(name=name, email=email)
    await new_user.insert()

    try:
        async with DaprClient() as dapr:
            await dapr.publish_event(
                pubsub_name="mypubsub",
                topic_name="user-created",
                data=json.dumps({"id": str(new_user.id), "email": email})
            )
    except Exception as e:
        # Attempt to roll back the database write
        await new_user.delete()
        raise HTTPException(status_code=500, detail="Failed to create user and publish event.")
        
    return {"message": "User created successfully"}


While this looks safer, it is still fundamentally flawed. Network calls are inherently unreliable. If the publish_event call fails, we try to execute new_user.delete(). But what if the database server becomes unreachable at that exact moment? The delete operation fails, the exception is swallowed or raised without a successful rollback, and we are back to square one: the user exists in the database, but no event was published.

We cannot achieve guaranteed consistency across two disparate network resources using sequential, synchronous calls.

Enter the Transactional Outbox Pattern

The Transactional Outbox pattern guarantees eventual consistency. Instead of trying to write to the database and the message broker synchronously, we save the database entity and the message intended for the broker within the exact same database transaction.

Because we are targeting a single database, we can rely on its internal atomic properties. In relational databases, this is typically done by inserting a record into a users table and a record into an outbox table within one SQL transaction.

With MongoDB, single-document operations are atomic by default. This makes NoSQL databases uniquely suited for this pattern: We can embed the outbox message directly inside the main document as an optional property.

Updating the Beanie Model

Let us update our Beanie Object Document Mapper (ODM) model to include an embedded outbox message. We will also introduce a MongoDB partial index. A partial index is crucial here because we only want to index documents that actually have pending messages. This keeps the index incredibly small, fast, and memory-efficient.

Python
 
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from beanie import Document
from pymongo import IndexModel, ASCENDING

class OutboxMessage(BaseModel):
    topic: str
    payload: dict
    created_at: datetime = Field(default_factory=datetime.utcnow)

class User(Document):
    name: str
    email: str
    outbox_message: Optional[OutboxMessage] = None

    class Settings:
        name = "users"
        # Create a partial index to quickly find documents with pending messages
        indexes = [
            IndexModel(
                [("outbox_message.created_at", ASCENDING)],
                name="pending_outbox_messages_idx",
                partialFilterExpression={"outbox_message": {"$type": "object"}}
            )
        ]


The Atomic Write Endpoint

Now, we can update our FastAPI endpoint. We no longer interact with Dapr during the user request. Instead, we generate the payload, embed it in the user document, and save it.

Python
 
from fastapi import APIRouter
from models import User, OutboxMessage

router = APIRouter()

@router.post("/users")
async def create_user(name: str, email: str):
    # Construct the message payload
    payload = {"email": email, "action": "registration"}
    outbox = OutboxMessage(topic="user-created", payload=payload)
    
    # Attach the outbox message to the user document
    new_user = User(
        name=name, 
        email=email, 
        outbox_message=outbox
    )
    
    # This single-document insert is entirely atomic.
    # It either fully succeeds (saving the user and the message) or fully fails.
    await new_user.insert()
    
    return {"message": "User created successfully"}


Because MongoDB guarantees atomicity at the document level, we completely eliminate the risk of a partial failure. The user and the message are now safely persisted together.

The Message Relay Worker

The second half of the Transactional Outbox pattern is the message relay (or worker). This is a separate background process or async task that continuously polls the database for pending outbox messages, publishes them to the message broker, and then removes them from the database.

Because we set up a partial index using partialFilterExpression, this query is highly optimized and will not scan the entire users collection.

Python
 
import asyncio
import json
import logging
from dapr.clients.grpc.client import DaprClient
from models import User

logger = logging.getLogger(__name__)

async def process_outbox_messages():
    """
    Background worker that relays messages from the database to Dapr.
    """
    # Instantiate Dapr client outside the loop for connection reuse
    async with DaprClient() as dapr:
        while True:
            try:
                # Find the oldest document that contains an outbox_message
                user_doc = await User.find(
                    {"outbox_message": {"$type": "object"}}
                ).sort("outbox_message.created_at").first_or_none()

                if user_doc and user_doc.outbox_message:
                    msg = user_doc.outbox_message
                    
                    # 1. Publish the event via Dapr
                    await dapr.publish_event(
                        pubsub_name="mypubsub",
                        topic_name=msg.topic,
                        data=json.dumps(msg.payload)
                    )
                    
                    # 2. Remove the message from the document
                    # Using $unset ensures this update is atomic
                    await user_doc.update({"$unset": {"outbox_message": ""}})
                    
                else:
                    # No pending messages; sleep briefly
                    await asyncio.sleep(2)
                    
            except Exception as e:
                # Log the error. The message remains in the DB and will be retried.
                logger.error(f"Error processing outbox message: {e}")
                await asyncio.sleep(5)


Idempotency

It is crucial to understand that the Transactional Outbox pattern guarantees at-least-once delivery.

Consider a scenario where the worker successfully publishes the message to Dapr, but the worker process crashes right before it executes the $unset database update. When the worker restarts, it will read the exact same outbox message from the database and publish it to Dapr a second time.

Because of this, any downstream services consuming these messages must be designed to be idempotent. They must be able to safely process the same user-created event multiple times without causing unintended side effects, such as sending duplicate welcome emails.

Conclusion

Handling distributed transactions is one of the most complex challenges in microservice architectures. Relying on sequential operations and basic exception handling will inevitably lead to data inconsistencies and phantom bugs that are difficult to trace.

By leveraging the Transactional Outbox pattern, we shift the responsibility of consistency to the database's atomic guarantees. Combining Python, Beanie, and a message broker abstraction like Dapr provides a fast, reliable, and consistent way to ensure that your database state and your distributed events always remain in  harmony.

Relational database systems microservices

Opinions expressed by DZone contributors are their own.

Related

  • From Monolith to Microservices: Practical Lessons From Real System Modernization
  • Automating Monolith Migration for Resource-Constrained Edge Systems
  • A Comprehensive Analysis of Async Communication in Microservice Architecture
  • Understanding Time Series Databases

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook