Common Issues Using Celery (And Other Task Queues)2020-02-03
Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply with other task queues, I simply haven’t used them so much.
If you duplicate data from your database in your task arguments, it can go stale in the queue before the task executes. I can’t describe this more completely than Celery’s documentation on task state. See the description and example there.
This is not so easy to do accidentally on Celery since version 4, which changed the default serializer from Pickle to JSON. (If you’re not sure what serializer you’re using, check your settings.)
But, it is still possible to enqueue data rather than references. For example, imagine you enqueue an email with a 1 minute delay, using the user’s email address in the task arguments, rather than ID. If the user changes their email address before the ask runs, the email gets sent to the wrong address.
Although you might think Celery is for executing tasks “later/eventually,” it can execute them quickly too!
If you enqueue a task from within a database transaction, it might execute before the database commits the transaction.
This can mean that the data your task needs to access, such as a new model instance, isn’t visible, and it raises a
If you’re not using transactions in your views, you can also end up with tasks executing before all the data they expect is there.
I recommend using transactions in your views - the easiest way is to use Django’s
While Django makes it easy to use database transactions in your views with
ATOMIC_REQUESTS, you’re a bit on your own for other code paths.
This includes Celery tasks.
If you don’t wrap your tasks with
transaction.atomic(), or use it inside your task body, you may have data integrity problems.
It’s worth auditing your tasks to find where you should use
You could even add a project-specific wrapper for Celery’s
@shared_task that adds
@atomic to your tasks.
By default, the Celery worker will send batches of tasks to its worker processes where they are re-queued in-memory. This is intended to increase throughput as worker processes don’t need to wait for tasks from your broker. But it does mean a long running task in a process holds up faster tasks queued behind it, even when other worker processes are free to run them.
Taylor Hughes’ Celery tips post has a great diagram demonstrating this visually in tip #2.
In my experience, this default behaviour has never been desirable. It’s very common for projects to have tasks with vastly different running times, leading to this blocking.
You can disable it by running
celery worker with
The Celery documentation on “Prefork pool prefetch settings” has a better explanation.
Celery provides the
countdown arguments to task enqueues.
These let you schedule tasks for later execution.
Unfortunately the way these work is not built into brokers. These delayed tasks will end up at the head of your queue, in front of later non-delayed tasks. The Celery worker process fetches the delayed tasks and “puts them aside” in memory, then fetches the non-delayed tasks.
With many such tasks, the Celery worker process will use a lot of memory to hold these tasks. Restarting the worker process will also need to re-fetch all the delayed tasks at the head of the queue. I’ve seen a case where there were enough delayed tasks that restarting took several minutes to start doing actual work!
If you need to delay tasks for more than a few minutes, you should avoid
It’s better to add a field to your model instances with a time to be acted on, and use a periodic task to enqueue the actual execution.
Celery’s default behaviour to acknowledge tasks immediately, removing them from your broker’s queue. If they are interrupted, for example by a random server crash, Celery won’t retry the task.
This is good in the case that your task is not idempotent (repeatable without problems). But it’s not good for dealing with random errors, such as your database connection dropping randomly. In this case, work goes missing, since Celery removed it from the queue before attempting it.
The opposite behaviour, “acks late,” acknowledges tasks only after successful completion. This is the encouraged behaviour of many other queuing systems, such as SQS.
Celery covers this in its documentation in the FAQ “Should I use retry or acks_late?”. It’s a nuanced issue, but I do think the default “acks early” behaviour tends to be counter-intuitive.
I recommend setting
acks_late = True as the default in your Celery configuration and thinking through which mode is appropriate for each task.
You can reconfigure it on a per task function by passing
acks_late to the
Tasks might crash for any number of reasons, many of which are out of your control. For example, if your database server crashes, Celery might fail to execute tasks, and raise a “connection failed” error.
The easiest way to fix this is with a second periodic “sweeper task” that scans and repeats/requeues missed work. The recent AWS article Avoiding insurmountable queue backlogs revealed they call such tasks “anti-entropy sweepers.” (That article is a fantastic read, filled with advice on working with queues at scale.)
Imagine we had a task function like this:
@shared_task def frobnicate(widget_id: int): ...
And we changed it to add a new required argument
@shared_task def frobnicate(widget_id: int, frazzle: bool): ...
When we deploy this change, any enqueued tasks from the first version of the code will fail with:
TypeError: frobnicate() missing 1 required positional argument: 'frazzle'
You should treat task function signatures with the same consideration as database migrations.
If you’re adding a new argument, give it a default first - similar to adding database columns as nullable. After that version of the code is live, you can remove the default.
Similarly if you’re removing an argument, give it a default first.
And if you’re removing a task, remove the call sites first, then the task itself.
I’m not aware of any tooling to help enforce this, though in my previous position at YPlan we had some homegrown Django system checks. These made sure we thought through every change we made to tasks and their arguments.
📙👉Speed Up Your Django Tests👈📙
One summary email a week, no spam, I pinky promise.
- Celery, Rabbits, and Warrens
- Working Around Memory Leaks in Your Django Application
- Getting a Django Application to 100% Test Coverage
Tags: celery, django
© 2020 All rights reserved.