Skip to content

Async Backend

Anthony Lukach edited this page Oct 10, 2017 · 8 revisions

The Cadasta asynchronous system is written to provide the means to easily schedule long-running data operations to take place outside of the standard request-response lifecycle of a webserver. The intention is for this system to provide the framework for Cadasta to easily develop asynchronous microservices.

This system is built with:

Architecture

The Cadasta asynchronous system is designed so that both the scheduled tasks and the task results can be tracked by the central Cadasta Platform.

Below is a diagram describing the general flow of a task within our infrastructure:

Async System Architecture Diagram

Going from left to right, we an see the path a task takes from schedule to execution to storage. Some important points:

  • We send all tasks through a Topic Exchange.
  • All tasks are assigned a routing_key based on their task name.
  • Queues register with the Topic Exchange stating that they should receive any message with a routing_key matching a given pattern. As currently implemented, we have a Platform Queue that subscribes to all messages (subscribing to routing_key='#'), while service-specific queues will subscribe to messages that match their queue name. As an example, our queue to handle data-export tasks is named export, it subscribes to routing_key='export'. Any task with a name starting with export. (eg export.project) will be assigned a routing_key='export' before being sent to the Topic Exchange. That message will then be routed to both the export queue and the Platform Queue. It's critical that the message make its way to both the Platform Queue (so that it can be logged in our system) and the service-specific queue (so that it can be actually executed).
  • Service-specific queues are monitored by our worker services. They retrieve messages from their respective queues, execute the task, and store the results of their tasks in the Result Table of our Platform Database (for more information, see Tracking Task Results). Optionally, tasks can schedule subsequent tasks, placing new tasks back into the Topic Exchange.
  • The Platform Queue is monitored by our sync-tasks service. This retrieves the scheduled tasks and inserts them into the Task Table of our Platform Database (for more information, see Tracking Scheduled Tasks).

Tracking Task Results

Tasks results are inserted by each worker into the Platform DB. For this reason, it is important that each worker have network access and valid credentials to the Platform DB. The Result Table has a one-to-one relation via the task_id column to the Task Table. This should not be enforced via a constraint, as it is possible for a task's result to be entered into the DB before the sync-tasks service enters the task into the Task Table.

Tracking Scheduled Tasks

To keep our system aware of all tasks being scheduled, the Cadasta Platform has a process running to consume task messages off of a task-monitor queue (i.e. the Platform-Queue) and insert those messages into our database. To support this design, all task producers (including worker nodes) must publish their task messages to both the normal destination queues and the task-monitor queue. This is achieved by registering all queues with a Topic Exchange, setting the task-monitor queue to subscribe to all messages sent to the exchange, and setting standard work queues to subscribe to messages with a matching routing_key. Being that the Cadasta Platform is designed to work with Amazon SQS and the SQS backend only keeps exchange/queue declarations in memory, each message producer (including workers that may schedule tasks) must have this set up within their configuration (for more information, see Worker Configuration).

Creating Services

Worker Configuration

As described above, it is critical that every worker is configured exactly as expected for the system to work properly. This includes ensuring that the worker is configured with a Topic Exchange, that it is aware Platform Queue and any other queues that it may send work into, and that it is using the correct result backend. Ideally, we want it to be quick/easy/cheap for developers to spin up new services that work with our async system. For these reasons, we've created the Cadasta-WorkerToolbox library. This library manages the ways in which Celery is configured, offers a set of functional tests to ensure that Celery is configured correctly, and offers some convenience utilities to assist with common needs for worker services.

Creating Tasks

Data Access

It is strongly recommended that all data access between worker services and the Cadasta Platform occur through the Cadasta Platform API. This has two main benefits: acting as a sniff-test to ensure that our public API offers the functionality required of a consumer; and to avoid excessive coupling between the inner-workings of disparate services. To facilitate this data access, the Cadasta Platform supports authentication via a temporary permissions-scoped token. This allows the Cadasta Platform to generate a string to permit certain RESTful actions on its API on behalf of a user.

Reporting Task Results

Being that a task can schedule sub-tasks and that every task uses the ResultBackend to store its output, how do we separate task results that are intended to be displayed to a user from task results that are for internal use only? The solution is to set the is_result flag on a task to true. This is to be done in the codebase of the worker, as the task producer is not (and should not) be aware of whether or not a scheduled task will farm out its work to many sub-tasks.

A contrived example:

# Worker
@app.task(name='math.square_op')
def square_op(value):
    """ Square a single integer """
    return value ** 2

@app.task(name='math.square')
def square(*values):
    """ Square any number of integers """
    for value in values:
        square_op.apply_async(args=(value,), is_result=True)
    return "Scheduled subtasks"

# Cadasta Platform
>>> result = square.delay(1, 2, 3, 4, 5)
>>> task = BackgroundTask.objects.get(task_id=result.id)
>>> print(task.overall_results)
[1, 4, 9, 16, 25]

NOTE: Be aware that this is a convention only within the Cadasta codebase. This will not be documented within the official Celery documentation.

NOTE: Being that the Cadasta Platform does not know if a task will have subtasks, it should not set the is_result. However, it is the role of the Task Producer to set is_result, which does not fit in well with the previous statement. These two at-odd principles mean that a task without subtasks can never have is_result set. The database ResultBackend does not support sending arbitrary data (unless it's embedded in the defined columns). Currently, the only resolution is for tasks that are without subtasks to schedule a pass-through report_results task with its results and set is_result true on that task. Possible solutions in the future: 1) Assume that if a task has no sub-tasks, the overall_results property should return just that tasks results. 2) Have the task send a message to the Platform Queue that will update the Task Table with is_result=True for that task. 3) Keep the is_result data in the WorkerToolbox.

Formatting Task Results

The Cadasta Platform will use the Task Result values from a set of tasks for a number of purposes. For example, Cadasta's project export tasks return URLs to files on S3 to deliver to end users for download. It is recommended that Cadasta adopt an agreed upon format for Task Results, allowing for the results from any service to be parsed the same way. Currently, the format is designed as a JSON object intended to hold different output types. Currently, the only type used is 'links', which is intended to be one or many URLs to resources:

{
    'links': [
        {
            'text': "Asynchronous Architecture", # An optional string used for link
            'url': "https://raw.githubusercontent.com/wiki/Cadasta/cadasta-platform/img/async-backend-task-flow-chart.png"
        }
    ]
}

This would be used to generate a link on our frontend such as:

<a href="https://raw.githubusercontent.com/wiki/Cadasta/cadasta-platform/img/async-backend-task-flow-chart.png">Asynchronous Architecture</a>

The goal is for the result object format to expand to suit our needs in the future (for example, adding a 'msg' result detail seems likely for passing back human-readable text to pass to users). To assist in creating and structuring the result objects, it may make sense to add tooling (perhaps using a schema library like schema or jsonschema) into the WorkerToolbox at some point.

Performance

In efforts to create a highly-performant distributed system, developers are strongly recommended to become familiar with Celery's Canvas functionality and tips on Performance and Strategies (particularly the section on Granularity). A good rule of thumb is that if a task is doing work in a for loop, it is a good time to consider whether this work should be performed in subsequent sub-tasks (however, if all subtasks would require a common resource such as an XLS file that they are to be inserted, it's likely better to keep it as a single task).

When scheduling sub-tasks, it is important to pass the parent task's callbacks/errbacks to the last task of the subtasks. The Cadasta-WorkerToolbox library provides a helper utility to do just that. NOTE: At time of writing, chords with single tasks don't respect callbacks/errbacks (celery/celery#3317, celery/celery#3709, celery/celery#3597).

RPC

The Async Backend supports synchronous RPC calls. To make use of RPC-type calls, schedule a task as normal and call .get() on its returned AsyncResult object. This will block until the tasks results are available and then return the results. Naturally, avoid launching synchronous subtasks.

Clone this wiki locally