enq

package module
v0.0.0-...-b69f10d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2024 License: GPL-3.0 Imports: 0 Imported by: 0

README

enq - a task queuing system for a/i

enq is a distributed (partitioned, not HA) DAG task queue: in it, tasks are processing units with one output and zero or more inputs, which can be references to the output of other tasks (thus building the DAG).

In this partitioned architecture, each shard runs its own standalone queue engine. The service supports the execution of tasks on specific shards, where the shard is chosen by the client, presumably in such a way as to be aligned with the data the task should be processing. Task IDs are global and contain the shard ID in them, so that references to the task can be resolved to the right backend.

Desired properties:

  • a proper fully functional task entity with max_retries, an absolute deadline, logs, etc (logs are somewhat TODO)
  • a JSON input/output data type for genericity
  • the ability to run worker code in any language
    • this requires all communication to be mediated by a well-defined API
    • every task is also a future, with a specific ID, clients can ask for its state and have a consistent reference to it throughout its lifetime
    • a task implementation (the executor) API is just an external program, to which we pass the JSON arguments via stdin, and read JSON output via stdout
  • the ability to enqueue large number of tasks (for batch-like operations, forklifts, etc)
  • a concept of task dependencies (a DAG), to support multi-step use cases such as Data Takeout
  • the ability to manually inspect and cancel jobs (TODO)
  • the ability to specify that tasks need to run close to the data, i.e. on a specific shard (the queuing system does not need to know about the data layout, it's fine if the caller does it)
  • high availability. This is hard, as queuing systems tend to make write-heavy usage of databases. An option that suits our use case is to make the system fully partitioned: every task gets a shard assigned to, if a shard is down we wait for it to come back. This might be acceptable if we consider that most tasks on the system will eventually be data-related anyway (as opposed to a generic batch processing platform). In this case, we can run one queue engine on each shard, solving the engine HA problem. The challenge becomes how to handle cross-shard state transitions: we'd have to design not only the proper RPC API, but a mechanism that can tolerate (temporary) failure at that stage.

Usage

The service has two main components: the engine (server), and the worker. You should, in most cases, co-locate a shard's worker with that shard's engine.

Both components can be started using the enq command-line tool, with either the server or worker command. Check their specific --help outputs for further configuration options.

The components of the service talk GRPC between themselves, and they can authenticate each other via mutual TLS, or via a shared authentication token (or both).

The worker will handle tasks by running external processes found in the /etc/enq/worker.d/ directory (or wherever you set the --dir option to), named after task methods. Such processes are passed task arguments as a JSON-encoded list of objects on standard input, and are expected to return a JSON-encoded result on standard output.

The current queue engine implementation uses SQLite, so do not expect high throughput: emphasis has been placed on correctness first.

Quickstart

Single shard

Let's start with the simplest possible scenario, that of a single shard, with identifier 1. Let's also assume that one task method to, say, process some user data, is defined by a script in /etc/enq/worker.d/process-user-data - this will take a user ID as argument and perform some operation on the data associated to that account. The details are unimportant for now, let's create a simple script just for testing purposes, to print the received user ID to stderr:

#!/bin/sh
jq '.[].uid' - >&2

We need to start the queue engine and tell it which database to use:

# enq server --addr :3733 --shard 1 --db test.db

Then we can start the worker and have it talk to the engine:

enq worker --server localhost:3733

Run each of these commands in its own terminal as they are daemons that do not detach into the background.

Let's now create a task to submit to the queue. We want to run the process-user-data handler with a JSON argument of {"uid":1234}. The value_json attribute of arguments must be base64-encoded, so we can obtain the desired value with:

$ echo -n '{"uid":1234}' | base64
eyJ1aWQiOjEyMzR9

The task submission input takes the form of a JSON-encoded list of TaskSpec protos:

[{
    "task_id": {
        "shard": "1",
        "id": "mytask"
    },
    "method": "process-user-data",
    "args": [
        {
            "type": "VALUE",
            "value_json": "eyJ1aWQiOjEyMzR9"
        }
    ]
}]

Note that we are providing a "mock" task ID: the only important part there is the shard, as the unique ID will be replaced with an automatically generated one by the server. In this case, we want to execute our task on shard 1, so that's what we put in the task_id, while we put something generic in the id attribute. Mock task IDs are important when you need to reference a task from another one, to describe a tree of task, which we'll look into later.

Save the above to task.json. Now it's possible to submit this task to the queue:

enq submit --server localhost:3733 < task.json

The worker logs should show the process-user-data script being executed with an input of [{"uid": 1234}].

Task trees

With the same server setup as above, let's consider a more complex process: we now have two task handlers:

  • save-user-data, which collects the data for a specific user and saves it to the Cloud, returning an ephemeral link to download it;
#!/bin/sh
uid=$(jq '.[].uid' -)
echo "{\"link\": \"https://download/for/$uid\"}"
  • send-mail, which takes a link as input and sends an email to the user so they can download their data.
#!/bin/sh
link=$(jq '.[].link' -)
echo "sending mail with link $link" >&2

We can tell enq that the input of the second task depends on the output of the first one, thus building a very small task tree:

[{
    "task_id": {
        "shard": "1",
        "id": "save",
    },
    "method": "save-user-data",
    "args": [
        {
            "type": "VALUE",
            "value_json": "{\"uid\": 1234}",
        }
    ]
}, {
    "task_id": {
        "shard": "1",
        "id": "email",
    },
    "method": "process-user-data",
    "args": [
        {
            "type": "TASKREF",
            "task_id": {
                "shard": "1",
                "id": "save",
            },
        }
    ]
}]

We are using an argument of type taskref to tell enq that the argument of the second task is the output of the first one. The engine will ensure the correct order of execution. Save the above to tree.json and submit it with:

enq submit --server localhost:3733 < tree.json

Again, worker logs should show the two tasks being executed in sequence.

Multiple shards

We can now consider a two-server scenario, with data shards 1 and 2. For simplicity we're going to run them both on the same host.

The two engines need to know how to talk to each other, so we need to tell them some rules for finding other shards. There are two ways to do this:

  • a static map, with a list of shard -> addr entries;
  • a dynamic pattern where the DNS address of the peer is determined by replacing the peer shard ID in a string pattern.

We can start the various components with the following commands (again, run each one in a separate terminal):

enq server --db test1.db --shard host1 --addr :3733 \
    --topology static:1=localhost:3733,2=localhost:3734
enq server --db test2.db --shard host2 --addr :3734 \
    --topology static:1=localhost:3733,2=localhost:3734
enq worker --id worker1 --server localhost:3733
enq worker --id worker2 --server localhost:3734

Suppose that we now want to save user data from all shards:

[{
    "task_id": {
        "shard": "1",
        "id": "save",
    },
    "method": "save-user-data",
    "args": [
        {
            "type": "VALUE",
            "value_json": "{\"uid\": 1234}",
        }
    ]
}, {
    "task_id": {
        "shard": "2",
        "id": "save",
    },
    "method": "save-user-data",
    "args": [
        {
            "type": "VALUE",
            "value_json": "{\"uid\": 1234}",
        }
    ]
}, {
    "task_id": {
        "shard": "1",
        "id": "email",
    },
    "method": "process-user-data",
    "args": [
        {
            "type": "TASKREF",
            "task_id": {
                "shard": "1",
                "id": "save",
            },
        },
        {
            "type": "TASKREF",
            "task_id": {
                "shard": "2",
                "id": "save",
            },
        }
    ]
}]

The task 1/email now depends on both save tasks, one of which runs on a different shard.

You can send the submission request to any instance of the engine. Save the above to tree2.json and submit it with:

enq submit --server localhost:3734 < tree2.json

All tasks should execute in order on the expected workers.

Internals and development notes

Data model
  • tasks can be schedulable, i.e. with no pending dependencies, or pending, waiting on a dependency. This lets the engine avoid doing dependency recalculations at every poll, and can simply "pop" tasks off the top of the queue knowing they can be run.

  • a task is an entry in a database. The primary key for schedulable tasks consist of a timestamp and a random id (somewhere we must keep a map of task ID -> this key)

  • the timestamp in the key allows us to schedule tasks in the future, which is necessary to support delayed retries: on poll, we can just scan the schedulable queue up to the current timestamp.

  • with respect to the DAG:

    • tasks have a list of (positional) arguments
    • tasks can have fixed (static) task arguments, i.e. plain values
    • tasks can have one or more upstream dependencies as arguments, i.e. other tasks
    • every task can have a single downstream dependency, i.e. its output can only be consumed by one other task
    • the task graph is static, tasks can't create other tasks dynamically
  • tasks need to store:

    • their global ID
    • the task state:
      • PENDING (waiting on dependencies)
      • SCHEDULABLE
      • RUNNING
      • FAILED (final)
      • SUCCESS (final)
    • a signature of their arguments, whether static values or references to other tasks (list of upstream dependencies)
    • a reference to their downstream task, so we know which task is "unblocked" when we receive a successful result from a remote node

Other partitioned services:

  • log storage, keyed by global task ID
  • result storage, keyed by its own unique result ID
Task state machine

We need to understand how and where to implement the task state machine, which is complicated by dependencies, error handling, and cancellation. The global state machine is advanced at every transition (receiving a report of a task execution status from a worker), and it should produce a consistent database at every step. This is opposed to having the queue logic itself do the work. On a task state transition:

  • if the task state is already CANCEL, throw away the result
  • if the report status is RETRIABLE ERROR, and the task hasn't reached its max_retries or its absolute deadline, add it back to the queue with a timestamp sometime in the future.
  • if the report status is PERMANENT ERROR:
    • set the task state to FAILED
    • on the downstream task - SetFailed(downstream_id):
      • climb up the tree of upstream dependencies (excluding the original task) -- this can be done more efficiently on a single shard, instead of using RPC recursion -- and for each one of them:
        • set the task state to CANCEL - SetCancel(downstream_upstream_id)
      • set the task state to FAILED, repeat steps above downstream
  • if the report status is SUCCESS:
    • set the task state to SUCCESS
    • on the downstream task - ReportSuccess(id, downstream_id):
      • save the result of the task for the downstream task to consume (returns a result ID, stored on this shard)
      • count the number of remaining upstream task arguments that are not in SUCCESS state
      • if the result has no more pending dependencies, move it to the schedulable queue

For cross-shard state transitions, every operation on a different task is potentially an RPC, and we need to handle the case when it fails temporarily. If we can isolate the local state changes outlined above in a transaction, we can abort the transaction and retry it later: we would then maintain a queue of incoming state transitions, and periodically attempt to apply them. As a consequence, the remote task state change operations need to be idempotent.

With this approach, task cancellation for large DAGs can result in a significant number of RPC calls from the transaction controller.

In the RPC API:

  • TaskStateTransition is a first-class object
  • internal SetFailed()/SetCancel() RPCs for the failure states (FAILURE/CANCEL) identified earlier
  • internal ReportSuccess() RPC to notify a downstream task that one of its arguments has successfully completed, and perhaps schedule it for execution

To improve robustness, we store the task results on the downstream shard, so that at de-queuing time the argument lookup is local.

De-queuing
  • pick the item from the top of the schedulable queue and:
    • fetch results from any task arguments with a SUCCESS state and add them to the argument list
    • dispatch the task and its arguments to the worker
  • uses a leasing scheme to detect unresponsive workers
    • workers make keepalive calls when working on tasks
    • a cleaner process periodically checks for expired leases and puts the task back into the schedulable queue (with a future timestamp)
Tables

We have in theory a few tables that store tasks themselves, separated primarily to simplify indexing and for performance reasons (how often are they queried vs. how large they are):

  • schedulable, with tasks in state SCHEDULABLE
  • pending, with tasks in state PENDING
  • done, with tasks in state ERROR or FINAL
  • leases, with tasks in state RUNNING

A task can appear in only one of these at any given time.

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
cmd
enq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL