Adam Johnson

Home | Blog | Training | Projects | Colophon | Contact

Common Issues Using Celery (And Other Task Queues)

2020-02-03

Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply with other task queues, I simply haven’t used them so much.

1. Enqueueing Data Rather Than References

If you duplicate data from your database in your task arguments, it can go stale in the queue before the task executes. I can’t describe this more completely than Celery’s documentation on task state. See the description and example there.

This is not so easy to do accidentally on Celery since version 4, which changed the default serializer from Pickle to JSON. (If you’re not sure what serializer you’re using, check your settings.)

But, it is still possible to enqueue data rather than references. For example, imagine you enqueue an email with a 1 minute delay, using the user’s email address in the task arguments, rather than ID. If the user changes their email address before the ask runs, the email gets sent to the wrong address.

2. Enqueueing Tasks Within Database Transactions

Although you might think Celery is for executing tasks “later/eventually,” it can execute them quickly too! If you enqueue a task from within a database transaction, it might execute before the database commits the transaction. This can mean that the data your task needs to access, such as a new model instance, isn’t visible, and it raises a DoesNotExist exception.

The solution is to enqueue your tasks after the transaction commits, using Django’s transaction.on_commit. This is well described in both the Django documentation and the Celery documentation.

If you’re not using transactions in your views, you can also end up with tasks executing before all the data they expect is there. I recommend using transactions in your views - the easiest way is to use Django’s ATOMIC_REQUESTS setting.

Update (2020-02-03): Simon Charette pointed out on Reddit that stale data problems can occur with tasks reading from database replicas too. His suggested solution is to filter by an `updated_at` field on your model to be sure you got a recent enough timestamp. If you're using MySQL you can also do this by checking the global transaction ID has been applied on the replica.

3. Not Using Database Transactions in Tasks

While Django makes it easy to use database transactions in your views with ATOMIC_REQUESTS, you’re a bit on your own for other code paths. This includes Celery tasks. If you don’t wrap your tasks with transaction.atomic(), or use it inside your task body, you may have data integrity problems.

It’s worth auditing your tasks to find where you should use transaction.atomic(). You could even add a project-specific wrapper for Celery’s @shared_task that adds @atomic to your tasks.

4. Default “Unfair” Task Distribution

By default, the Celery worker will send batches of tasks to its worker processes where they are re-queued in-memory. This is intended to increase throughput as worker processes don’t need to wait for tasks from your broker. But it does mean a long running task in a process holds up faster tasks queued behind it, even when other worker processes are free to run them.

Taylor Hughes’ Celery tips post has a great diagram demonstrating this visually in tip #2.

In my experience, this default behaviour has never been desirable. It’s very common for projects to have tasks with vastly different running times, leading to this blocking.

You can disable it by running celery worker with -O fair. The Celery documentation on “Prefork pool prefetch settings” has a better explanation.

5. Using a Long countdown or an eta in the Far Future

Celery provides the eta and countdown arguments to task enqueues. These let you schedule tasks for later execution.

Unfortunately the way these work is not built into brokers. These delayed tasks will end up at the head of your queue, in front of later non-delayed tasks. The Celery worker process fetches the delayed tasks and “puts them aside” in memory, then fetches the non-delayed tasks.

With many such tasks, the Celery worker process will use a lot of memory to hold these tasks. Restarting the worker process will also need to re-fetch all the delayed tasks at the head of the queue. I’ve seen a case where there were enough delayed tasks that restarting took several minutes to start doing actual work!

If you need to delay tasks for more than a few minutes, you should avoid eta and countdown. It’s better to add a field to your model instances with a time to be acted on, and use a periodic task to enqueue the actual execution.

6. ACKS Behaviour

Celery’s default behaviour to acknowledge tasks immediately, removing them from your broker’s queue. If they are interrupted, for example by a random server crash, Celery won’t retry the task.

This is good in the case that your task is not idempotent (repeatable without problems). But it’s not good for dealing with random errors, such as your database connection dropping randomly. In this case, work goes missing, since Celery removed it from the queue before attempting it.

The opposite behaviour, “acks late,” acknowledges tasks only after successful completion. This is the encouraged behaviour of other many queuing systems, such as SQS.

Celery covers this in its documentation in the FAQ “Should I use retry or acks_late?”. It’s a nuanced issue, but I do think the default “acks early” behaviour tends to be counter-intuitive.

I recommend setting acks_late = True as the default in your Celery configuration and thinking through which mode is appropriate for each task. You can reconfigure it on a per task function by passing acks_late to the @shared_task decorator.

6. Not Retrying Missed Work

Tasks might crash for any number of reasons, many of which are out of your control. For example, if your database server crashes, Celery might fail to execute tasks, and raise a “connection failed” error.

The easiest way to fix this is with a second periodic “sweeper task” that scans and repeats/requeues missed work. The recent AWS article Avoiding insurmountable queue backlogs revealed they call such tasks “anti-entropy sweepers.” (That article is a fantastic read, filled with advice on working with queues at scale.)

7. Changing Task Signatures in Backwards-Incompatible Ways

Imagine we had a task function like this:

@shared_task
def frobnicate(widget_id: int):
    ...

And we changed it to add a new required argument frazzle:

@shared_task
def frobnicate(widget_id: int, frazzle: bool):
    ...

When we deploy this change, any enqueued tasks from the first version of the code will fail with:

TypeError: frobnicate() missing 1 required positional argument: 'frazzle'

You should treat task function signatures with the same consideration as database migrations.

If you’re adding a new argument, give it a default first - similar to adding database columns as nullable. After that version of the code is live, you can remove the default.

Similarly if you’re removing an argument, give it a default first.

And if you’re removing a task, remove the call sites first, then the task itself.

I’m not aware of any tooling to help enforce this, though in my previous position at YPlan we had some homegrown Django system checks. These made sure we thought through every change we made to tasks and their arguments.

Fin

I hope this helps you enjoy Celery, or whatever task queue you use, more,

—Adam


Are your Django project's tests slow? Read Speed Up Your Django Tests now!


Subscribe via RSS, Twitter, or email:

One summary email a week, no spam, I pinky promise.

Related posts:

Tags: celery, django