Over a million developers have joined DZone.

Customizing Celery with Task Arguments

DZone's Guide to

Customizing Celery with Task Arguments

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Here's how.

· Integration Zone ·
Free Resource

SnapLogic is the leading self-service enterprise-grade integration platform. Download the 2018 GartnerMagic Quadrant for Enterprise iPaaS or play around on the platform, risk free, for 30 days.

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Specifically, I want to be able to define behavior based on a new apply_sync arguments. Also, it would be nice to be able to pass state to the worker tasks.

First, you can subclass the main Celery class to define a custom Task class.

import socket

from celery import Celery, Task
from kombu.exceptions import InconsistencyError

class MyCelery(Celery):
    """ Subclass of a Celery application class that uses a custom Task type """
    task_cls = 'myapp.mymodule:MyTask'

In your Task class, you can override apply_async (which is also called from delay), as well as __call__, which wraps around the actual task body.

class MyTask(Task):

    abstract = True

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, **options):
        """ invoked either directly or via .delay() to fork a task from the main process """

        # parse any custom task options from the .delay() or .apply_async() calls
        safe = options.pop('safe', False)  # safely trap errors talking to celery broker

        options['headers'] = options.get('headers', {})
            'safe': safe,

            return super(MyTask, self).apply_async(
                args, kwargs, task_id, producer, link, link_error, **options)
        except (InconsistencyError, socket.error) as e:
            # InconsistencyError == cannot find the celery queue
            # socket.error == cannot talk to the queue server at all
            if not safe:

    def __call__(self, *args, **kwargs):
        """ execute the task body on the remote worker """
        safe = self.get_header('safe')
            return super(NWTask, self).__call__(*args, **kwargs)
        except Exception:
            if not safe:

    def get_header(self, key, default=None):
        return (self.request.headers or {}).get(key, default)

In this example, I'm introducing an optional safe argument to apply_async, which traps and ignores specific exceptions trying to fork the task. It also piggy backs on the celery task headers to pass itself to the worker process, where it ignores any exception thrown by the task itself.

Download A Buyer's Guide to Application and Data Integration, your one-stop-shop for research, checklists, and explanations for an application and data integration solution.

python ,celery ,cutomize celery ,task arguments ,apply_async

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}