Ensuring Data Consistency in Distributed Systems With the Transactional Outbox Pattern
Ensure data consistency during dual-writes by saving outgoing events to the database alongside primary data, then using a worker to relay them to a message broker.
Join the DZone community and get the full member experience.
Join For FreeIn modern distributed systems, it is common to face the "dual-write" problem. This occurs when a service needs to perform two actions simultaneously: mutate state in a database and publish an event to a message broker (like RabbitMQ, Apache Kafka, or a Pub/Sub system) to notify other services.
Because these two operations target different systems over the network, they cannot share a single, traditional ACID transaction. If one operation succeeds and the other fails, the system is left in an inconsistent state.
To solve this, we can use the Transactional Outbox pattern. In this article, we will explore the dual-write problem, see why traditional error handling falls short, and implement a robust Transactional Outbox solution using Python, FastAPI, MongoDB (via the Beanie ODM), and Dapr for publish-subscribe messaging.
The Dual-Write Problem: A Naive Approach
Imagine we are building a user registration service. When a user signs up, we need to save their profile to our MongoDB database and publish a user-created event so that the email service can send a welcome email.
The most intuitive, albeit naive, approach is to call both functions sequentially.
from fastapi import APIRouter
from models import User
from dapr.clients import DaprClient
import json
router = APIRouter()
@router.post("/users/")
async def create_user(name: str, email: str):
# 1. Save to the database
new_user = User(name=name, email=email)
await new_user.insert()
# 2. Publish to the message broker
async with DaprClient() as dapr:
await dapr.publish_event(
pubsub_name="mypubsub",
topic_name="user-created",
data=json.dumps({"id": str(new_user.id), "email": email})
)
return {"message": "User created successfully"}
At first glance, this seems fine. However, what happens if the application crashes exactly after the database insert but before the Dapr client publishes the event? Or what if the message broker experiences a temporary network partition?
The user is saved in the database, but the event is never fired. The user will never receive their welcome email, and any other downstream services relying on that event will remain completely unaware of the new user. Our distributed system is now inconsistent.
The False Comfort of Exception Handling
A common reaction to the naive approach is to wrap the publish step in a try...except block and attempt to roll back the database transaction if the publish fails.
from fastapi import APIRouter, HTTPException
from models import User
from dapr.clients import DaprClient
import json
router = APIRouter()
@router.post("/users")
async def create_user(name: str, email: str):
new_user = User(name=name, email=email)
await new_user.insert()
try:
async with DaprClient() as dapr:
await dapr.publish_event(
pubsub_name="mypubsub",
topic_name="user-created",
data=json.dumps({"id": str(new_user.id), "email": email})
)
except Exception as e:
# Attempt to roll back the database write
await new_user.delete()
raise HTTPException(status_code=500, detail="Failed to create user and publish event.")
return {"message": "User created successfully"}
While this looks safer, it is still fundamentally flawed. Network calls are inherently unreliable. If the publish_event call fails, we try to execute new_user.delete(). But what if the database server becomes unreachable at that exact moment? The delete operation fails, the exception is swallowed or raised without a successful rollback, and we are back to square one: the user exists in the database, but no event was published.
We cannot achieve guaranteed consistency across two disparate network resources using sequential, synchronous calls.
Enter the Transactional Outbox Pattern
The Transactional Outbox pattern guarantees eventual consistency. Instead of trying to write to the database and the message broker synchronously, we save the database entity and the message intended for the broker within the exact same database transaction.
Because we are targeting a single database, we can rely on its internal atomic properties. In relational databases, this is typically done by inserting a record into a users table and a record into an outbox table within one SQL transaction.
With MongoDB, single-document operations are atomic by default. This makes NoSQL databases uniquely suited for this pattern: We can embed the outbox message directly inside the main document as an optional property.
Updating the Beanie Model
Let us update our Beanie Object Document Mapper (ODM) model to include an embedded outbox message. We will also introduce a MongoDB partial index. A partial index is crucial here because we only want to index documents that actually have pending messages. This keeps the index incredibly small, fast, and memory-efficient.
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from beanie import Document
from pymongo import IndexModel, ASCENDING
class OutboxMessage(BaseModel):
topic: str
payload: dict
created_at: datetime = Field(default_factory=datetime.utcnow)
class User(Document):
name: str
email: str
outbox_message: Optional[OutboxMessage] = None
class Settings:
name = "users"
# Create a partial index to quickly find documents with pending messages
indexes = [
IndexModel(
[("outbox_message.created_at", ASCENDING)],
name="pending_outbox_messages_idx",
partialFilterExpression={"outbox_message": {"$type": "object"}}
)
]
The Atomic Write Endpoint
Now, we can update our FastAPI endpoint. We no longer interact with Dapr during the user request. Instead, we generate the payload, embed it in the user document, and save it.
from fastapi import APIRouter
from models import User, OutboxMessage
router = APIRouter()
@router.post("/users")
async def create_user(name: str, email: str):
# Construct the message payload
payload = {"email": email, "action": "registration"}
outbox = OutboxMessage(topic="user-created", payload=payload)
# Attach the outbox message to the user document
new_user = User(
name=name,
email=email,
outbox_message=outbox
)
# This single-document insert is entirely atomic.
# It either fully succeeds (saving the user and the message) or fully fails.
await new_user.insert()
return {"message": "User created successfully"}
Because MongoDB guarantees atomicity at the document level, we completely eliminate the risk of a partial failure. The user and the message are now safely persisted together.
The Message Relay Worker
The second half of the Transactional Outbox pattern is the message relay (or worker). This is a separate background process or async task that continuously polls the database for pending outbox messages, publishes them to the message broker, and then removes them from the database.
Because we set up a partial index using partialFilterExpression, this query is highly optimized and will not scan the entire users collection.
import asyncio
import json
import logging
from dapr.clients.grpc.client import DaprClient
from models import User
logger = logging.getLogger(__name__)
async def process_outbox_messages():
"""
Background worker that relays messages from the database to Dapr.
"""
# Instantiate Dapr client outside the loop for connection reuse
async with DaprClient() as dapr:
while True:
try:
# Find the oldest document that contains an outbox_message
user_doc = await User.find(
{"outbox_message": {"$type": "object"}}
).sort("outbox_message.created_at").first_or_none()
if user_doc and user_doc.outbox_message:
msg = user_doc.outbox_message
# 1. Publish the event via Dapr
await dapr.publish_event(
pubsub_name="mypubsub",
topic_name=msg.topic,
data=json.dumps(msg.payload)
)
# 2. Remove the message from the document
# Using $unset ensures this update is atomic
await user_doc.update({"$unset": {"outbox_message": ""}})
else:
# No pending messages; sleep briefly
await asyncio.sleep(2)
except Exception as e:
# Log the error. The message remains in the DB and will be retried.
logger.error(f"Error processing outbox message: {e}")
await asyncio.sleep(5)
Idempotency
It is crucial to understand that the Transactional Outbox pattern guarantees at-least-once delivery.
Consider a scenario where the worker successfully publishes the message to Dapr, but the worker process crashes right before it executes the $unset database update. When the worker restarts, it will read the exact same outbox message from the database and publish it to Dapr a second time.
Because of this, any downstream services consuming these messages must be designed to be idempotent. They must be able to safely process the same user-created event multiple times without causing unintended side effects, such as sending duplicate welcome emails.
Conclusion
Handling distributed transactions is one of the most complex challenges in microservice architectures. Relying on sequential operations and basic exception handling will inevitably lead to data inconsistencies and phantom bugs that are difficult to trace.
By leveraging the Transactional Outbox pattern, we shift the responsibility of consistency to the database's atomic guarantees. Combining Python, Beanie, and a message broker abstraction like Dapr provides a fast, reliable, and consistent way to ensure that your database state and your distributed events always remain in harmony.
Opinions expressed by DZone contributors are their own.
Comments