Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Customizing Celery with Task Arguments

DZone's Guide to

Customizing Celery with Task Arguments

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Here's how.

· Integration Zone ·
Free Resource

The State of API Integration 2018: Get Cloud Elements’ report for the most comprehensive breakdown of the API integration industry’s past, present, and future.

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Specifically, I want to be able to define behavior based on a new apply_sync arguments. Also, it would be nice to be able to pass state to the worker tasks.

First, you can subclass the main Celery class to define a custom Task class.

import socket

from celery import Celery, Task
from kombu.exceptions import InconsistencyError


class MyCelery(Celery):
    """ Subclass of a Celery application class that uses a custom Task type """
    task_cls = 'myapp.mymodule:MyTask'

In your Task class, you can override apply_async (which is also called from delay), as well as __call__, which wraps around the actual task body.

class MyTask(Task):

    abstract = True

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, **options):
        """ invoked either directly or via .delay() to fork a task from the main process """

        # parse any custom task options from the .delay() or .apply_async() calls
        safe = options.pop('safe', False)  # safely trap errors talking to celery broker

        options['headers'] = options.get('headers', {})
        options['headers'].update({
            'safe': safe,
        })

        try:
            return super(MyTask, self).apply_async(
                args, kwargs, task_id, producer, link, link_error, **options)
        except (InconsistencyError, socket.error) as e:
            # InconsistencyError == cannot find the celery queue
            # socket.error == cannot talk to the queue server at all
            if not safe:
                raise

    def __call__(self, *args, **kwargs):
        """ execute the task body on the remote worker """
        safe = self.get_header('safe')
        try:
            return super(NWTask, self).__call__(*args, **kwargs)
        except Exception:
            if not safe:
                raise

    def get_header(self, key, default=None):
        return (self.request.headers or {}).get(key, default)

In this example, I'm introducing an optional safe argument to apply_async, which traps and ignores specific exceptions trying to fork the task. It also piggy backs on the celery task headers to pass itself to the worker process, where it ignores any exception thrown by the task itself.


Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

Topics:
python ,celery ,cutomize celery ,task arguments ,apply_async

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}