We want Rover’s website and mobile apps to be fast for our users. One way we try to keep everything speedy is by offloading as much work as we can to asynchronous tasks which are processed outside our web application’s request/response cycle. For example, Rover’s messaging system allows our owners and sitters to communicate through email, text, calling, and in-app messaging and is mostly handled through asynchronous tasks.
We use Celery to define and process our tasks, backed by RabbitMQ as our task queue. Celery is quite powerful, but as Rover has scaled we’ve encountered some difficulties managing its complexity. Many of our tasks trigger other tasks; a complete workflow may involve dozens of tasks triggered from all over our codebase. Each of these tasks emits specific logging information (along with Celery’s own internal logging), which we aggregate in a single searchable location.
However, these log records are interspersed among all of our application’s other logging, which is quite extensive. We use a pool of workers, so the log entries won’t be in any kind of sensible order. Worse, there’s no way to associate log entries from one task to another in the same workflow. We can see what happens in a specific task, but we can’t see where that task came from, or if other tasks in the same workflow were successful. Isolating all of the log records for a particular workflow is like digging through our logging haystack for a single needle.
A Concrete Example
Let’s put this in the context of a real problem our developers face. Say we have an API endpoint in our application for creating new messages:
POST /api/message/
This endpoint creates the message in our database, then triggers an asynchronous task to notify the message recipient. The message recipient should be notified on any channels they have enabled via additional asynchronous tasks. Our full workflow is as follows:
POST /api/message/ | |--> Task send_notification | |--> Task send_push | |--> Task send_email | |--> Task send_text_message
Imagine a user reports that they did not receive a text for a particular message. How could we figure out what happened? Did the POST
request contain invalid data? Did the send_text_message
task fail? Celery’s task logging outputs:
[INFO] Task app.tasks.send_text_message[2118507e-726c-4f99-90db-758889139148] succeeded in 0.413722962607s: None
That is, it gives us the task name, the task UUID, whether or not it succeeded, some timing info, and the return value. But how can we match up this log record with the original request that triggered this task? The task UUID is not connected to anything outside the task, so there’s no way to search our aggregated logs for the original POST
request. If something went wrong elsewhere in the workflow, we have no way of debugging the issue without pinpointing exactly where it happened.
Finding the Needle
We need a way to efficiently search through our trove of log records to find only those associated with the workflow we’re interested in. More precisely, we want a single string we can grep
for that is unique to each workflow.
Let’s start with the request. Our DNS provider assigns each request that arrives at our application a unique identifier called the Request ID. It looks like this: abc123efg456-ABC
and it’s included as a header (so it’s logged by our infrastructure along with other useful attributes of the request).
This seems like a good starting point. If we could attach this identifier to all the log records emitted by our Celery tasks, we could search for this unique identifier and find every log entry from every task that was triggered by this request. Obviously we don’t want to have to manually include the request ID every time we log something in our tasks. Furthermore, we want this ID to show up inside Celery’s own internal logging, because those messages include useful information about the state of the task. So how do we accomplish this?
Tying the Thread
Notice the UUID in the Celery log entry above. This is the Task ID. By default it’s set to a random UUID for each task. This is required for the task queue to function so that tasks can be uniquely identified. Celery’s apply_async
method allows you to specify the task ID when you publish a task, so what if we changed it to something more meaningful than a random string? We can’t just set it to our request ID because it must be unique for every task. What we can do is prepend the request ID to a random UUID to form the task ID. So our task IDs have this form:
abc123efg456-ABC_2118507e-726c-4f99-90db-758889139148
If every task ID took the form <Request ID>_<UUID>
we could simply search for the request ID and get all the log records from every task associated with that request, as well as the logs for that request!
Implementation
We certainly don’t want to have to change all our task invocation code to include this special task ID as a keyword argument, nor do we want our developers to have to remember to include it every time they publish a task. So we’ll take advantage of Celery’s ability to use a custom task class to do this work for us.
Specifically our custom class can override the behavior of apply_async
, through which all asynchronous task invocations pass, to use a task ID of our desired format. An incomplete implementation might look like:
class CustomTaskBase(Task): def apply_async(self, *args, **kwargs): task_id = '{}_{}'.format(get_request_id(), uuid4()) return super(CustomTaskBase, self).apply_async( task_id=task_id, *args,**kwargs )
Some interesting things to note about this code:
- Some of Celery’s canvas components will pass a specific task ID into
apply_async
on the invocation of the last task in a chain. You’ll want to handle this. - Task retries will also pass a task ID into
apply_async
. Your task queue likely expects this ID not to change between retries of the same task; you’ll want to keep the same task ID if a particularapply_async
invocation is for a task retry. - Tread lightly playing around with Celery’s internals like this. Celery is quite complex and small changes can have unintended consequences. While we’ve been using this approach successfully in production for a while now, deploying it was challenging!
Bonus: Passing Around the Request ID
How does get_request_id()
work? If you’re using Flask or Django, you can store the request ID in request local storage (via threading.local
or Flask’s g
) and retrieve it here. But that will only work for tasks that are published directly from the request. What about tasks that are published within other tasks? We want to get a complete view of the entire workflow, so we’ll want to pass down the request ID to every task in the workflow.
Luckily you can get a reference to the currently executing task via Celery’s current_task
proxy. The current task has a request
attribute where you can store arbitrary data in a safe way, regardless of what kind of concurrency your workers use. With a task_prerun
signal handler that extracts the request ID from your task ID and stores it in current_task.request
, you should be able to propagate the request ID to all tasks within a workflow, no matter how complex. I’ll leave that as an exercise for the reader.
Once you have the request ID available like this, you should be sure to include it inside all of your log records, whether you’re serving a request or inside task code. The easiest way to do this is with a logging filter.
Putting it All Together
Let’s finish with our example. A user complains that they did not receive a text for a message. You track down the request that created the message in your nginx
logs:
55.55.555.55 "POST /api/message/ HTTP/1.1" 200 X-Request-ID=XYZ
Noticing the X-Request-ID
header (which I’ve shortened for readability), you search for this string since it’s now contained in all the log entries. You see something like this (I’ve also shortened the UUIDs for readability):
55.55.555.55 "POST /api/message/ HTTP/1.1" 200 X-Request-ID=XYZ ... [INFO] Received task: app.tasks.send_notification[XYZ_123] [INFO] Task app.tasks.send_notification[XYZ_123] succeeded in 0.3s: None ... [INFO] Received task: app.tasks.send_push_notification[XYZ_456] [INFO] Task app.tasks.send_push_notification[XYZ_456] succeeded in 0.7s: None ... [INFO] Received task: app.tasks.send_email[XYZ_789] [INFO] Task app.tasks.send_push_notification[XYZ_789] succeeded in 0.5s: None ... [INFO] Received task: app.tasks.send_text_message[XYZ_246] [WARNING] [XYZ_246] User has text notifications disabled! [INFO] Task app.tasks.send_push_notification[XYZ_246] succeeded in 0.2s: None
Aha! Now the problem is obvious. You kindly inform the user to enable text notifications in their preferences. Problem solved!
With a few small creative changes we’ve dramatically improved the usefulness of our logs and reduced the time it takes to track down issues across complicated workflows. If you make heavy use of Celery and are having trouble wrangling your logging, unifying your workflows with a single string is a simple solution.
If you want to come up with creative solutions to interesting problems like this, we’re hiring!