DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Supercharging Pytest: Integration With External Tools
  • FastHTML and Heroku
  • Automating Python Multi-Version Testing With Tox, Nox and CI/CD
  • Order in Chaos: Python Configuration Management for Enterprise Applications

Trending

  • Event-Driven Pipelines With Apache Pulsar and Go
  • Lambda-Driven API Design: Building Composable Node.js Endpoints With Functional Primitives
  • From APIs to Actions: Rethinking Back-End Design for Agents
  • OpenAPI From Code With Spring and Java: A Recipe for Your CI
  1. DZone
  2. Coding
  3. Tools
  4. Distributed Task Queue With Python asyncio + Redis (A Celery Replacement)

Distributed Task Queue With Python asyncio + Redis (A Celery Replacement)

This article discusses how to build a lightweight, distributed task queue using Python asyncio and Redis as a simpler alternative to Celery for I/O-bound workloads.

By 
Surya Bhaskar Reddy Karri user avatar
Surya Bhaskar Reddy Karri
·
Feb. 03, 26 · Analysis
Likes (2)
Comment
Save
Tweet
Share
1.4K Views

Join the DZone community and get the full member experience.

Join For Free

Celery has been the de facto standard for background task processing in Python for over a decade. It’s powerful, battle-tested, and feature-rich, but it also comes with significant complexity: brokers, result backends, worker pools, configuration overhead, serialization quirks, and sometimes opaque debugging. With the rise of asyncio, high-performance Redis clients, and modern Python runtimes, many teams are asking a simple question:

Do we really need Celery for every background job use case?

In this article, we’ll build a lightweight distributed task queue using Python's asyncio and Redis that can serve as a practical Celery replacement for many real-world workloads.

You’ll learn:

  • When Celery is overkill
  • How a distributed async task queue actually works
  • How to build one using asyncio and Redis
  • A real-time example: processing user events asynchronously
  • Tradeoffs, limitations, and when to use (or not use) this approach

When You Might Not Need Celery

Celery excels at:

  • CPU-bound tasks
  • Complex workflows and chords
  • Long-running batch jobs
  • Heavy retry policies and task routing

However, many modern systems have workloads like:

  • Sending emails or notifications
  • Processing webhooks
  • Enriching events
  • Calling third-party APIs
  • Lightweight data transformations

These tasks are often I/O-bound, short-lived, high-throughput, or latency-sensitive. For these cases, an async-first task queue can be:

  • Faster
  • Easier to reason about
  • Simpler to deploy
  • More observable

Architecture Overview

Our async task queue consists of four core components:

  1. Producer – Pushes tasks to Redis
  2. Redis – Acts as a distributed message broker
  3. Async workers – Consume tasks using asyncio
  4. Task registry – Maps task names to Python functions

We’ll use:

  • asyncio for concurrency
  • redis-py (async) for Redis access
  • JSON for task serialization

Real-Time Example: User Event Processing

Imagine a SaaS application where users perform actions like signing up, logging in, or upgrading plans. Each event triggers background tasks:

  • Send a welcome email
  • Update analytics
  • Notify downstream systems

And we don’t want to block API requests, and we don’t need heavy Celery workflows. This is a perfect use case for an async task queue.

Step 1: Defining the Task Format

Each task will include:

  • id: Unique task ID
  • name: Task name
  • payload: Task data
  • timestamp: Creation time
Python
 
import uuid
import time
import json

def create_task(name, payload):
    return {
        "id": str(uuid.uuid4()),
        "name": name,
        "payload": payload,
        "timestamp": time.time(),
    }


Step 2: Redis Queue Implementation

We’ll use a Redis list as our queue.

  • RPUSH -> enqueue
  • BLPOP -> blocking dequeue

Redis client setup:

Python
 
import redis.asyncio as redis

redis_client = redis.Redis(
    host="localhost",
    port=6379,
    decode_responses=True,
)


Enqueue a task (Producer):

Python
 
async def enqueue_task(queue_name, task):
    await redis_client.rpush(queue_name, json.dumps(task))


Step 3: Task Registry

Instead of Celery decorators, we’ll use a simple registry.

Python
 
TASK_REGISTRY = {}

def task(name):
    def decorator(fn):
        TASK_REGISTRY[name] = fn
        return fn
    return decorator


This gives us:

  • Explicit task registration
  • Clear ownership
  • No magic imports

Step 4: Defining Async Tasks

Now, let’s define real background tasks.

Example 1: Sending a welcome email

Python
 
import asyncio

@task("send_welcome_email")
async def send_welcome_email(payload):
    email = payload["email"]
    await asyncio.sleep(1)  # simulate I/O
    print(f"Welcome email sent to {email}")


Example 2: Tracking an analytics event

Python
 
@task("track_analytics")
async def track_analytics(payload):
    event = payload["event"]
    user_id = payload["user_id"]
    await asyncio.sleep(0.5)
    print(f"Tracked event '{event}' for user {user_id}")


Step 5: Building the Async Worker

The worker:

  • Blocks on Redis
  • Deserializes tasks
  • Dispatches to async functions
  • Handles errors gracefully
Python
 
async def worker(queue_name):
    print("Worker started...")
    while True:
        _, raw_task = await redis_client.blpop(queue_name)
        task = json.loads(raw_task)

        task_name = task["name"]
        payload = task["payload"]

        handler = TASK_REGISTRY.get(task_name)

        if not handler:
            print(f"Unknown task: {task_name}")
            continue

        try:
            await handler(payload)
        except Exception as e:
            print(f"Task {task['id']} failed: {e}")


Step 6: Running Multiple Workers (Concurrency)

We can scale horizontally with multiple processes and vertically with async concurrency.

Python
 
async def start_workers(queue_name, concurrency=10):
    tasks = [
        asyncio.create_task(worker(queue_name))
        for _ in range(concurrency)
    ]
    await asyncio.gather(*tasks)


Run it:

Python
 
if __name__ == "__main__":
    asyncio.run(start_workers("task_queue", concurrency=5))


Step 7: Producing Tasks From an API

Now let’s simulate a real API request.

Python
 
async def user_signup(email, user_id):
    await enqueue_task(
        "task_queue",
        create_task(
            "send_welcome_email",
            {"email": email},
        ),
    )

    await enqueue_task(
        "task_queue",
        create_task(
            "track_analytics",
            {
                "event": "signup",
                "user_id": user_id,
            },
        ),
    )


From your FastAPI or Flask async route:

Python
 
@app.post("/signup")
async def signup(user: User):
    await user_signup(user.email, user.id)
    return {"status": "ok"}


Your API returns instantly while background work happens asynchronously.

Reliability Considerations

Celery provides:

  • Retries
  • Acknowledgments
  • Result backends

We can add these incrementally.

Simple Retry Strategy:

Python
 
async def execute_with_retry(handler, payload, retries=3):
    for attempt in range(retries):
        try:
            await handler(payload)
            return
        except Exception as e:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)


Observability and Debugging

Unlike Celery, everything is explicit. You can easily add:

  • Structured logging
  • OpenTelemetry spans
  • Prometheus metrics

Example:

Python
 
import time

start = time.time()
await handler(payload)
duration = time.time() - start
print(f"Task {task_name} took {duration:.2f}s")


Tradeoffs vs. Celery

What You Gain

  • Simpler architecture
  • Async-native
  • Easier debugging
  • Lower operational overhead

What You Lose

  • Built-in task routing
  • Advanced workflows
  • Native result backend
  • Ecosystem plugins

When to Use This Pattern

Use asyncio + Redis task queues if:

  • Tasks are I/O-bound
  • You already run Redis
  • You want minimal infrastructure
  • You value clarity over abstraction

Avoid it if:

  • You need complex workflows
  • You run CPU-heavy jobs
  • You rely heavily on Celery primitives

Conclusion

Unlike CPU-bound workloads, most background jobs in SaaS platforms are I/O-heavy: sending HTTP requests, reading from object storage, publishing events, or writing logs. Async workers allow thousands of these tasks to be processed concurrently using a single event loop, dramatically reducing resource usage. This is where an asyncio + Redis approach shines.

Celery is not obsolete, but it’s no longer the default answer. Modern Python gives us the tools to build lean, async-first distributed systems with far less complexity. By combining asyncio and Redis, you can create a production-ready task queue that’s fast, debuggable, and easy to operate.

If you’ve ever felt Celery was too heavy for your workload, this approach might be exactly what you need.

Celery (software) Python (language) Tool

Opinions expressed by DZone contributors are their own.

Related

  • Supercharging Pytest: Integration With External Tools
  • FastHTML and Heroku
  • Automating Python Multi-Version Testing With Tox, Nox and CI/CD
  • Order in Chaos: Python Configuration Management for Enterprise Applications

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook