Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Customizing Celery with Task Arguments

DZone's Guide to

Customizing Celery with Task Arguments

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Here's how.

· Integration Zone
Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

Celery is an awesome distributed asynchronous task system for Python. It's great out of the box, but a couple of times I have needed to customize it. Specifically, I want to be able to define behavior based on a new apply_sync arguments. Also, it would be nice to be able to pass state to the worker tasks.

First, you can subclass the main Celery class to define a custom Task class.

import socket

from celery import Celery, Task
from kombu.exceptions import InconsistencyError


class MyCelery(Celery):
    """ Subclass of a Celery application class that uses a custom Task type """
    task_cls = 'myapp.mymodule:MyTask'

In your Task class, you can override apply_async (which is also called from delay), as well as __call__, which wraps around the actual task body.

class MyTask(Task):

    abstract = True

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, **options):
        """ invoked either directly or via .delay() to fork a task from the main process """

        # parse any custom task options from the .delay() or .apply_async() calls
        safe = options.pop('safe', False)  # safely trap errors talking to celery broker

        options['headers'] = options.get('headers', {})
        options['headers'].update({
            'safe': safe,
        })

        try:
            return super(MyTask, self).apply_async(
                args, kwargs, task_id, producer, link, link_error, **options)
        except (InconsistencyError, socket.error) as e:
            # InconsistencyError == cannot find the celery queue
            # socket.error == cannot talk to the queue server at all
            if not safe:
                raise

    def __call__(self, *args, **kwargs):
        """ execute the task body on the remote worker """
        safe = self.get_header('safe')
        try:
            return super(NWTask, self).__call__(*args, **kwargs)
        except Exception:
            if not safe:
                raise

    def get_header(self, key, default=None):
        return (self.request.headers or {}).get(key, default)

In this example, I'm introducing an optional safe argument to apply_async, which traps and ignores specific exceptions trying to fork the task. It also piggy backs on the celery task headers to pass itself to the worker process, where it ignores any exception thrown by the task itself.


Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:
python ,celery ,cutomize celery ,task arguments ,apply_async

Published at DZone with permission of Chase Seibert, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}