Distributed Task Queue With Python asyncio + Redis (A Celery Replacement)
This article discusses how to build a lightweight, distributed task queue using Python asyncio and Redis as a simpler alternative to Celery for I/O-bound workloads.
Join the DZone community and get the full member experience.
Join For FreeCelery has been the de facto standard for background task processing in Python for over a decade. It’s powerful, battle-tested, and feature-rich, but it also comes with significant complexity: brokers, result backends, worker pools, configuration overhead, serialization quirks, and sometimes opaque debugging. With the rise of asyncio, high-performance Redis clients, and modern Python runtimes, many teams are asking a simple question:
Do we really need Celery for every background job use case?
In this article, we’ll build a lightweight distributed task queue using Python's asyncio and Redis that can serve as a practical Celery replacement for many real-world workloads.
You’ll learn:
- When Celery is overkill
- How a distributed async task queue actually works
- How to build one using asyncio and Redis
- A real-time example: processing user events asynchronously
- Tradeoffs, limitations, and when to use (or not use) this approach
When You Might Not Need Celery
Celery excels at:
- CPU-bound tasks
- Complex workflows and chords
- Long-running batch jobs
- Heavy retry policies and task routing
However, many modern systems have workloads like:
- Sending emails or notifications
- Processing webhooks
- Enriching events
- Calling third-party APIs
- Lightweight data transformations
These tasks are often I/O-bound, short-lived, high-throughput, or latency-sensitive. For these cases, an async-first task queue can be:
- Faster
- Easier to reason about
- Simpler to deploy
- More observable
Architecture Overview
Our async task queue consists of four core components:
- Producer – Pushes tasks to Redis
- Redis – Acts as a distributed message broker
- Async workers – Consume tasks using asyncio
- Task registry – Maps task names to Python functions
We’ll use:
- asyncio for concurrency
- redis-py (async) for Redis access
- JSON for task serialization
Real-Time Example: User Event Processing
Imagine a SaaS application where users perform actions like signing up, logging in, or upgrading plans. Each event triggers background tasks:
- Send a welcome email
- Update analytics
- Notify downstream systems
And we don’t want to block API requests, and we don’t need heavy Celery workflows. This is a perfect use case for an async task queue.
Step 1: Defining the Task Format
Each task will include:
- id: Unique task ID
- name: Task name
- payload: Task data
- timestamp: Creation time
import uuid
import time
import json
def create_task(name, payload):
return {
"id": str(uuid.uuid4()),
"name": name,
"payload": payload,
"timestamp": time.time(),
}
Step 2: Redis Queue Implementation
We’ll use a Redis list as our queue.
- RPUSH -> enqueue
- BLPOP -> blocking dequeue
Redis client setup:
import redis.asyncio as redis
redis_client = redis.Redis(
host="localhost",
port=6379,
decode_responses=True,
)
Enqueue a task (Producer):
async def enqueue_task(queue_name, task):
await redis_client.rpush(queue_name, json.dumps(task))
Step 3: Task Registry
Instead of Celery decorators, we’ll use a simple registry.
TASK_REGISTRY = {}
def task(name):
def decorator(fn):
TASK_REGISTRY[name] = fn
return fn
return decorator
This gives us:
- Explicit task registration
- Clear ownership
- No magic imports
Step 4: Defining Async Tasks
Now, let’s define real background tasks.
Example 1: Sending a welcome email
import asyncio
@task("send_welcome_email")
async def send_welcome_email(payload):
email = payload["email"]
await asyncio.sleep(1) # simulate I/O
print(f"Welcome email sent to {email}")
Example 2: Tracking an analytics event
@task("track_analytics")
async def track_analytics(payload):
event = payload["event"]
user_id = payload["user_id"]
await asyncio.sleep(0.5)
print(f"Tracked event '{event}' for user {user_id}")
Step 5: Building the Async Worker
The worker:
- Blocks on Redis
- Deserializes tasks
- Dispatches to async functions
- Handles errors gracefully
async def worker(queue_name):
print("Worker started...")
while True:
_, raw_task = await redis_client.blpop(queue_name)
task = json.loads(raw_task)
task_name = task["name"]
payload = task["payload"]
handler = TASK_REGISTRY.get(task_name)
if not handler:
print(f"Unknown task: {task_name}")
continue
try:
await handler(payload)
except Exception as e:
print(f"Task {task['id']} failed: {e}")
Step 6: Running Multiple Workers (Concurrency)
We can scale horizontally with multiple processes and vertically with async concurrency.
async def start_workers(queue_name, concurrency=10):
tasks = [
asyncio.create_task(worker(queue_name))
for _ in range(concurrency)
]
await asyncio.gather(*tasks)
Run it:
if __name__ == "__main__":
asyncio.run(start_workers("task_queue", concurrency=5))
Step 7: Producing Tasks From an API
Now let’s simulate a real API request.
async def user_signup(email, user_id):
await enqueue_task(
"task_queue",
create_task(
"send_welcome_email",
{"email": email},
),
)
await enqueue_task(
"task_queue",
create_task(
"track_analytics",
{
"event": "signup",
"user_id": user_id,
},
),
)
From your FastAPI or Flask async route:
@app.post("/signup")
async def signup(user: User):
await user_signup(user.email, user.id)
return {"status": "ok"}
Your API returns instantly while background work happens asynchronously.
Reliability Considerations
Celery provides:
- Retries
- Acknowledgments
- Result backends
We can add these incrementally.
Simple Retry Strategy:
async def execute_with_retry(handler, payload, retries=3):
for attempt in range(retries):
try:
await handler(payload)
return
except Exception as e:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt)
Observability and Debugging
Unlike Celery, everything is explicit. You can easily add:
- Structured logging
- OpenTelemetry spans
- Prometheus metrics
Example:
import time
start = time.time()
await handler(payload)
duration = time.time() - start
print(f"Task {task_name} took {duration:.2f}s")
Tradeoffs vs. Celery
What You Gain
- Simpler architecture
- Async-native
- Easier debugging
- Lower operational overhead
What You Lose
- Built-in task routing
- Advanced workflows
- Native result backend
- Ecosystem plugins
When to Use This Pattern
Use asyncio + Redis task queues if:
- Tasks are I/O-bound
- You already run Redis
- You want minimal infrastructure
- You value clarity over abstraction
Avoid it if:
- You need complex workflows
- You run CPU-heavy jobs
- You rely heavily on Celery primitives
Conclusion
Unlike CPU-bound workloads, most background jobs in SaaS platforms are I/O-heavy: sending HTTP requests, reading from object storage, publishing events, or writing logs. Async workers allow thousands of these tasks to be processed concurrently using a single event loop, dramatically reducing resource usage. This is where an asyncio + Redis approach shines.
Celery is not obsolete, but it’s no longer the default answer. Modern Python gives us the tools to build lean, async-first distributed systems with far less complexity. By combining asyncio and Redis, you can create a production-ready task queue that’s fast, debuggable, and easy to operate.
If you’ve ever felt Celery was too heavy for your workload, this approach might be exactly what you need.
Opinions expressed by DZone contributors are their own.
Comments