luci: go.chromium.org/luci/server/tq Index | Files | Directories

package tq

import "go.chromium.org/luci/server/tq"

Package tq provides a task queue implementation on top of Cloud Tasks.

It exposes a high-level API that operates with proto messages and hides gory details such as serialization, routing, authentication, etc.

Enabling it on Appengine

All default options should just work, except you still need to setup the sweeper cron and the sweeper queue.

In cron.yaml:

- url: /internal/tasks/c/sweep
  schedule: every 1 minutes

In queue.yaml:

- name: tq-sweep
  rate: 500/s

Using it with Spanner

You need to Create a table in your database:

CREATE TABLE TQReminders (
  ID STRING(MAX) NOT NULL,
  FreshUntil TIMESTAMP NOT NULL,
  Payload BYTES(102400) NOT NULL,
) PRIMARY KEY (ID ASC);

Index

Package Files

default.go dispatcher.go distributed.go doc.go inproc.go module.go submitter.go testing.go

Constants

const TraceContextHeader = "X-Luci-Tq-Trace-Context"

TraceContextHeader is name of a header that contains the trace context of a span that produced the task.

It is always set regardless of InheritTraceContext setting. This header is read only by Dispatcher itself and exists mostly for FYI purposes.

Variables

var Retry = errors.BoolTag{Key: errors.NewTagKey("the task should be retried")}

Retry is an error tag used to indicate that the handler wants the task to be redelivered later.

See Handler doc for more details.

func AddTask Uses

func AddTask(ctx context.Context, task *Task) error

AddTask is a shortcut for Default.AddTask.

func MustAddTask Uses

func MustAddTask(ctx context.Context, task *Task)

MustAddTask is like AddTask, but panics on errors.

Mostly useful for AddTask calls inside a Spanner transaction, where they essentially just call span.BufferWrite (i.e. make no RPCs) and thus can fail only if arguments are bad (in which case it is OK to panic).

func NewModule Uses

func NewModule(opts *ModuleOptions) module.Module

NewModule returns a server module that sets up a TQ dispatcher.

func NewModuleFromFlags Uses

func NewModuleFromFlags() module.Module

NewModuleFromFlags is a variant of NewModule that initializes options through command line flags.

Calling this function registers flags in flag.CommandLine. They are usually parsed in server.Main(...).

func Sweep Uses

func Sweep(ctx context.Context) error

Sweep is a shortcut for Default.Sweep.

func TestingContext Uses

func TestingContext(ctx context.Context, d *Dispatcher) (context.Context, *tqtesting.Scheduler)

TestingContext creates a scheduler that executes tasks through the given dispatcher (or Default one if nil) and puts it into the context as Submitter, so AddTask calls eventually submit tasks into this scheduler.

The end result is that tasks submitted using such context end up in the returned Scheduler (allowing them to be examined), and when the Scheduler delivers them, they result in calls to corresponding handlers registered in the Dispatcher.

func UseSubmitter Uses

func UseSubmitter(ctx context.Context, s Submitter) context.Context

UseSubmitter puts an arbitrary submitter in the context.

It will be used by Dispatcher's AddTask to submit Cloud Tasks.

func ValidateNamespace Uses

func ValidateNamespace(n string) error

ValidateNamespace returns an error if `n` is not a valid namespace name.

An empty string is a valid namespace (denoting the default namespace). Other valid namespaces must start with an ASCII letter or '_', contain only ASCII letters, digits or '_', and be less than 50 chars in length.

type CloudSubmitter Uses

type CloudSubmitter struct {
    // contains filtered or unexported fields
}

CloudSubmitter implements Submitter on top of Google Cloud APIs.

func NewCloudSubmitter Uses

func NewCloudSubmitter(ctx context.Context, creds credentials.PerRPCCredentials) (*CloudSubmitter, error)

NewCloudSubmitter creates a new submitter.

func (*CloudSubmitter) Close Uses

func (s *CloudSubmitter) Close()

Close closes the submitter.

func (*CloudSubmitter) Submit Uses

func (s *CloudSubmitter) Submit(ctx context.Context, p *reminder.Payload) (err error)

Submit creates a task, returning a gRPC status.

type CustomPayload Uses

type CustomPayload struct {
    Method      string            // e.g. "GET" or "POST", Cloud Tasks only
    RelativeURI string            // an URI relative to the task's target host, Cloud Tasks only
    Meta        map[string]string // HTTP headers or message attributes to attach
    Body        []byte            // serialized body of the request
}

CustomPayload is returned by TaskClass's Custom, see its doc.

type Dispatcher Uses

type Dispatcher struct {
    // Sweeper knows how to sweep transactional tasks reminders.
    //
    // If not set, Sweep calls will fail.
    Sweeper Sweeper

    // Namespace is a namespace for tasks that use DeduplicationKey.
    //
    // This is needed if two otherwise independent deployments share a single
    // Cloud Tasks instance.
    //
    // Used only for Cloud Tasks tasks. Doesn't affect PubSub tasks.
    //
    // Must be valid per ValidateNamespace. Default is "".
    Namespace string

    // GAE is true when running on Appengine.
    //
    // It alters how tasks are submitted and how incoming HTTP requests are
    // authenticated.
    GAE bool

    // NoAuth can be used to disable authentication on HTTP endpoints.
    //
    // This is useful when running in development mode on localhost or in tests.
    NoAuth bool

    // CloudProject is ID of a project to use to construct full resource names.
    //
    // If not set, "default" will be used, which is pretty useless outside of
    // tests.
    CloudProject string

    // CloudRegion is a ID of a region to use to construct full resource names.
    //
    // If not set, "default" will be used, which is pretty useless outside of
    // tests.
    CloudRegion string

    // DefaultRoutingPrefix is a URL prefix for produced Cloud Tasks.
    //
    // Used only for Cloud Tasks tasks whose TaskClass doesn't provide some custom
    // RoutingPrefix. Doesn't affect PubSub tasks.
    //
    // Default is "/internal/tasks/t/". It means generated Cloud Tasks by will
    // have target URL "/internal/tasks/t/<generated-per-task-suffix>".
    //
    // A non-default value may be valuable if you host multiple dispatchers in
    // a single process. This is a niche use case.
    DefaultRoutingPrefix string

    // DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
    //
    // Individual Cloud Tasks task classes may override it with their own specific
    // host. Doesn't affect PubSub tasks.
    //
    // On GAE defaults to the GAE application itself. Elsewhere defaults to
    // "127.0.0.1", which is pretty useless outside of tests.
    DefaultTargetHost string

    // PushAs is a service account email to be used for generating OIDC tokens.
    //
    // Used only for Cloud Tasks tasks. Doesn't affect PubSub tasks.
    //
    // The service account must be within the same project. The server account
    // must have "iam.serviceAccounts.actAs" permission for PushAs account.
    //
    // Optional on GAE when submitting tasks targeting GAE. Elsewhere defaults to
    // "default@example.com", which is pretty useless outside of tests.
    PushAs string

    // AuthorizedPushers is a list of service account emails to accept pushes from
    // in addition to PushAs.
    //
    // This is handy when migrating from one PushAs account to another, or when
    // submitting tasks from one service, but handing them in another.
    //
    // Optional.
    AuthorizedPushers []string

    // SweepInitiationLaunchers is a list of service account emails authorized to
    // launch sweeps via the exposed HTTP endpoint.
    SweepInitiationLaunchers []string
    // contains filtered or unexported fields
}

Dispatcher is a registry of task classes that knows how serialize and route them.

There's rarely a need to manually create instances of Dispatcher outside of Dispatcher's own tests. You should generally use the global Default dispatcher which is configured by the tq server module. Methods of the default dispatcher (such as RegisterTaskClass and AddTask) are also available as lop-level functions, prefer to use them.

The dispatcher needs a way to submit tasks to Cloud Tasks or Cloud PubSub. This is the job of Submitter. It lives in the context, so that it can be mocked in tests. In production contexts (setup when using the tq server module), the submitter is initialized to be CloudSubmitter. Tests will need to provide their own submitter (usually via TestingContext).

TODO(vadimsh): Support consuming PubSub tasks, not just producing them.

var Default Dispatcher

Default is a dispatcher installed into the server when using NewModule or NewModuleFromFlags.

The module takes care of configuring this dispatcher based on the server environment and module's options.

You still need to register your tasks in it using RegisterTaskClass.

func (*Dispatcher) AddTask Uses

func (d *Dispatcher) AddTask(ctx context.Context, task *Task) (err error)

AddTask submits a task for later execution.

The task payload type should match some registered TaskClass. Its ID will be used to identify the task class in the serialized Cloud Tasks task body.

At some later time, in some other process, the dispatcher will invoke a handler attached to the corresponding TaskClass, based on its ID extracted from the task body.

If the given context is transactional, inherits the transaction if allowed according to the TaskClass's Kind. A transactional task will eventually be submitted to Cloud Tasks if and only if the transaction successfully commits. This requires a sweeper instance to be running somewhere, see ModuleOptions. Note that a failure to submit the task to Cloud Tasks will not abort the transaction.

If the task has a DeduplicationKey and there already was a recent task with the same TaskClass ID and DeduplicationKey, silently ignores the added task. This works only outside of transactions. Using DeduplicationKey with transactional tasks results in an error.

Annotates retriable errors with transient.Tag.

func (*Dispatcher) InstallSweepRoute Uses

func (d *Dispatcher) InstallSweepRoute(r *router.Router, path string)

InstallSweepRoute installs a route that initiates a sweep.

It may be called periodically (e.g. by Cloud Scheduler) to launch sweeps.

func (*Dispatcher) InstallTasksRoutes Uses

func (d *Dispatcher) InstallTasksRoutes(r *router.Router, prefix string)

InstallTasksRoutes installs tasks HTTP routes under the given prefix.

The exposed HTTP endpoints are called by Cloud Tasks service when it is time to execute a task.

func (*Dispatcher) RegisterTaskClass Uses

func (d *Dispatcher) RegisterTaskClass(cls TaskClass) TaskClassRef

RegisterTaskClass tells the dispatcher how to route and handle tasks of some particular type.

Intended to be called during process startup. Panics if there's already a registered task class with the same ID or Prototype.

func (*Dispatcher) Sweep Uses

func (d *Dispatcher) Sweep(ctx context.Context) error

Sweep initiates a sweep of transactional tasks reminders.

It must be called periodically (e.g. once per minute) somewhere in the fleet.

type DistributedSweeperOptions Uses

type DistributedSweeperOptions struct {
    // SweepShards defines how many jobs to produce in each Sweep.
    //
    // Default is 16.
    SweepShards int

    // TasksPerScan caps maximum number of tasks that a sweep job will process.
    //
    // Defaults to 2048.
    TasksPerScan int

    // SecondaryScanShards caps the sharding of additional sweep scans to be
    // performed if the initial scan didn't cover the whole assigned partition.
    // In practice, this matters only when database is slow or there is a huge
    // backlog.
    //
    // Defaults to 16.
    SecondaryScanShards int

    // LessorID identifies an implementation of a system that manages leases on
    // subranges of the database.
    //
    // Default is the same ID as the database implementation ID.
    LessorID string

    // SweepTaskQueue is a Cloud Tasks queue name to use for sweep jobs.
    //
    // Can be in short or full form. See Queue in TaskClass for details. The queue
    // should be configured to allow at least 10 QPS.
    //
    // Default is "tq-sweep".
    TaskQueue string

    // SweepTaskPrefix is a URL prefix to use for sweep jobs.
    //
    // There should be a Dispatcher instance somewhere that is configured to
    // receive such tasks (via non-default ServingPrefix). This is useful if
    // you want to limit what processes process the sweeps.
    //
    // Default is "/internal/tasks".
    TaskPrefix string

    // TaskHost is a hostname to dispatch sweep jobs to.
    //
    // Default is "", meaning to use whatever is configured as default in
    // the Dispatcher.
    TaskHost string
}

DistributedSweeperOptions is configuration for the process of "sweeping" of transactional tasks reminders performed in a distributed manner using Cloud Tasks service itself to distribute work.

The sweeping process ensures all transactionally committed tasks will have a corresponding Cloud Tasks task created. It periodically scans the database for "reminder" records created whenever a task is created as part of a transaction. A reminder older than a certain age likely indicates that the corresponding AddTask call crashed right after the transaction before it had a chance to create Cloud Tasks task. For each such old reminder, the sweeping will idempotently create a Cloud Task and delete the record in the database.

DistributedSweeperOptions tune some of parameters of this process. Roughly:

1. Sweep() call in Dispatcher creates SweepShards jobs that each scan
   a portion of the database for old reminders.
2. Each such job is allowed to process no more than TasksPerScan reminder
   records. This caps its runtime and memory usage. TasksPerScan should be
   small enough so that all sweeping jobs finish before the next Sweep()
   call, but large enough so that it makes meaningful progress.
3. If a sweeping job detects there's more than TasksPerScan items it needs
   to cover, it launches SecondaryScanShards follow-up jobs that cover the
   remaining items. This should be happening in rare circumstances, only if
   the database is slow or has a large backlog.

type ExecutionInfo Uses

type ExecutionInfo struct {
    // ExecutionCount is 0 on a first delivery attempt and increased by 1 for each
    // failed attempt.
    ExecutionCount int
    // contains filtered or unexported fields
}

ExecutionInfo is parsed from incoming task's metadata.

It is accessible from within task handlers via TaskExecutionInfo(ctx).

func TaskExecutionInfo Uses

func TaskExecutionInfo(ctx context.Context) *ExecutionInfo

TaskExecutionInfo returns information about the currently executing task.

Returns nil if called not from a task handler.

type Handler Uses

type Handler func(ctx context.Context, payload proto.Message) error

Handler is called to handle one enqueued task.

If the returned error is tagged with Retry tag, the request finishes with HTTP status 429, indicating to the backend that it should attempt to execute the task later (which it may or may not do, depending on retry config). Same happens if the error is transient (i.e. tagged with the transient.Tag), except the request finishes with HTTP status 500. This difference allows to distinguish "expected" retry requests (errors tagged with Retry) from "unexpected" ones (errors tagged with transient.Tag).

Retry tag should be used **only** if the handler is fully aware of the retry semantics and it **explicitly** wants the task to be retried because it can't be processed right now and the handler expects that the retry may help.

For a contrived example, if the handler can process the task only after 2 PM, but it is 01:55 PM now, the handler should return an error tagged with Retry to indicate this. On the other hand, if the handler failed to process the task due to an RPC timeout or some other exceptional transient situation, it should return an error tagged with transient.Tag.

Note that it is OK (and often desirable) to tag an error with both Retry and transient.Tag. Such errors propagate through the call stack as transient, until they reach Dispatcher, which treats them as retriable.

An untagged error (or success) marks the task as "done", it won't be retried.

type InProcSweeperOptions Uses

type InProcSweeperOptions struct {
    // SweepShards defines how many concurrent sweeping jobs to run.
    //
    // Default is 16.
    SweepShards int

    // TasksPerScan caps maximum number of tasks that a sweep job will process.
    //
    // Defaults to 2048.
    TasksPerScan int

    // SecondaryScanShards caps the sharding of additional sweep scans to be
    // performed if the initial scan didn't cover the whole assigned partition.
    // In practice, this matters only when database is slow or there is a huge
    // backlog.
    //
    // Defaults to 16.
    SecondaryScanShards int

    // SubmitBatchSize limits a single of a single processed batch.
    //
    // When processing a batch, the sweeper loads bodies of all tasks in
    // the batch, thus this setting directly affects memory usage. There will
    // be at most SubmitBatchSize*SubmitConcurrentBatches task bodies worked-on at
    // any moment in time.
    //
    // Default is 512.
    SubmitBatchSize int

    // SubmitConcurrentBatches limits how many submit batches can be worked on
    // concurrently.
    //
    // Default is 8.
    SubmitConcurrentBatches int
}

InProcSweeperOptions is configuration for the process of "sweeping" of transactional tasks reminders performed centrally in the current process.

type ModuleOptions Uses

type ModuleOptions struct {
    // Dispatcher is a dispatcher to use.
    //
    // Default is the global Default instance.
    Dispatcher *Dispatcher

    // CloudProject is ID of a project to use to construct full queue names.
    //
    // Default is the project the server is running in.
    CloudProject string

    // CloudRegion is a ID of a region to use to construct full queue names.
    //
    // Default is the region the server is running in.
    CloudRegion string

    // Namespace is a namespace for tasks that use DeduplicationKey.
    //
    // This is needed if two otherwise independent deployments share a single
    // Cloud Tasks instance.
    //
    // Default is "".
    Namespace string

    // DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
    //
    // Individual task classes may override it with their own specific host.
    //
    // On GAE defaults to the GAE application itself. Elsewhere has no default:
    // if the dispatcher can't figure out where to send the task, the task
    // submission fails.
    DefaultTargetHost string

    // PushAs is a service account email to be used for generating OIDC tokens.
    //
    // The service account must be within the same project. The server account
    // must have "iam.serviceAccounts.actAs" permission for `PushAs` account.
    //
    // Default is the server's own account.
    PushAs string

    // AuthorizedPushers is a list of service account emails to accept pushes from
    // in addition to PushAs.
    //
    // This is handy when migrating from one PushAs account to another, or when
    // submitting tasks from one service, but handing them in another.
    //
    // Optional.
    AuthorizedPushers []string

    // ServingPrefix is a URL path prefix to serve registered task handlers from.
    //
    // POSTs to a URL under this prefix (regardless which one) will be treated
    // as Cloud Tasks pushes.
    //
    // Default is "/internal/tasks". If set to literal "-", no routes will be
    // registered at all.
    ServingPrefix string

    // SweepMode defines how to perform sweeps of the transaction tasks reminders.
    //
    // This process is necessary to make sure all transactionally submitted tasks
    // eventually execute, even if Cloud Tasks RPCs fail. When enqueueing a task
    // the client transactionally commits a special "reminder" record, which
    // indicates an intent to submit a Cloud Task. If the subsequent Cloud Tasks
    // RPC fails (or the process crashes before attempting it), the reminder
    // record is discovered by the sweep process and used to ensure the task is
    // eventually submitted.
    //
    // There are two stages: the sweep initiation and the actual processing.
    //
    // The initiation should happen periodically and centrally: no mater how many
    // replicas of the process are running, there needs to be only one sweep
    // initiator. But it doesn't have to be the same process each time. Also
    // multiple concurrent initiations are not catastrophic, though they impose
    // huge overhead and should be avoided.
    //
    // Two ways to do sweep initiations are:
    //   * Based on a periodic external signal such as a Cloud Scheduler job or
    //     GAE cron handler. See SweepInitiationEndpoint and
    //     SweepInitiationLaunchers.
    //   * Based on a timer inside some *single* primary process. For example
    //     on Kubernetes this may be a single pod Deployment, or a zero-indexed
    //     replica in a StatefulSet. See Sweep().
    //
    // Once the initiation happens, there are two ways to process the sweep (and
    // this is what SweepMode defines):
    //   * "inproc" - do all the processing right inside the replica that
    //     performed the initiation. This has scalability and reliability limits,
    //     but it doesn't require any additional infrastructure setup and has
    //     somewhat better observability.
    //   * "distributed" - use Cloud Tasks itself to distribute the work across
    //     many replicas. This requires some configuration. See SweepTaskQueue,
    //     SweepTaskPrefix and SweepTargetHost.
    //
    // Default is "distributed" mode.
    SweepMode string

    // SweepInitiationEndpoint is a URL path that can be hit to initiate a sweep.
    //
    // GET requests to this endpoint (if they have proper authentication headers)
    // will initiate sweeps. If SweepMode is "inproc" the sweep will happen in
    // the same process that handled the request.
    //
    // On GAE default is "/internal/tasks/c/sweep". On non-GAE it is "-", meaning
    // the endpoint is not exposed. When not using the endpoint there should be
    // some single process somewhere that calls Sweep() to periodically initiate
    // sweeps.
    SweepInitiationEndpoint string

    // SweepInitiationLaunchers is a list of service account emails authorized to
    // launch sweeps via SweepInitiationEndpoint.
    //
    // Additionally on GAE the Appengine service itself is always authorized to
    // launch sweeps via cron or task queues.
    //
    // Default is the server's own account.
    SweepInitiationLaunchers []string

    // SweepTaskQueue is a Cloud Tasks queue name to use to distribute sweep
    // subtasks when running in "distributed" SweepMode.
    //
    // Can be in short or full form. See Queue in TaskClass for details. The queue
    // should be configured to allow at least 10 QPS.
    //
    // Default is "tq-sweep".
    SweepTaskQueue string

    // SweepTaskPrefix is a URL prefix to use for sweep subtasks when running
    // in "distributed" SweepMode.
    //
    // There should be a Dispatcher instance somewhere that is configured to
    // receive such tasks (via non-default ServingPrefix). This is useful if
    // you want to limit what processes process the sweeps.
    //
    // If unset defaults to the value of ServingPrefix.
    SweepTaskPrefix string

    // SweepTargetHost is a hostname to dispatch sweep subtasks to when running
    // in "distributed" SweepMode.
    //
    // This usually should be DefaultTargetHost, but it may be different if you
    // want to route sweep subtasks somewhere else.
    //
    // If unset defaults to the value of DefaultTargetHost.
    SweepTargetHost string

    // SweepShards defines how many subtasks are submitted when initiating
    // a sweep.
    //
    // It is safe to change it any time. Default is 16.
    SweepShards int
}

ModuleOptions contain configuration of the TQ server module.

It will be used to initialize Default dispatcher.

func (*ModuleOptions) Register Uses

func (o *ModuleOptions) Register(f *flag.FlagSet)

Register registers the command line flags.

Mutates `o` by populating defaults.

type Submitter Uses

type Submitter interface {
    // Submit submits a task, returning a gRPC status.
    //
    // AlreadyExists status indicates the task with request name already exists.
    // Other statuses are handled using their usual semantics.
    //
    // Will be called from multiple goroutines at once.
    Submit(ctx context.Context, p *reminder.Payload) error
}

Submitter is used by Dispatcher to submit tasks.

It lives in the context, so that it can be mocked in tests. In production contexts (setup when using the tq server module), the submitter is initialized to be CloudSubmitter. Tests will need to provide their own submitter (usually via TestingContext).

Note that currently Submitter can only be implemented by structs in server/tq package, since its signature depends on an internal reminder.Payload type.

type Sweeper Uses

type Sweeper interface {
    // contains filtered or unexported methods
}

Sweeper knows how sweep transaction tasks reminders.

func NewDistributedSweeper Uses

func NewDistributedSweeper(disp *Dispatcher, opts DistributedSweeperOptions) Sweeper

NewDistributedSweeper creates a sweeper that distributes and executes sweeping tasks through the given dispatcher.

func NewInProcSweeper Uses

func NewInProcSweeper(opts InProcSweeperOptions) Sweeper

NewInProcSweeper creates a sweeper that performs sweeping in the current process whenever Sweep is called.

type Task Uses

type Task struct {
    // Payload is task's payload as well as indicator of its class.
    //
    // Its type will be used to find a matching registered TaskClass which defines
    // how to route and handle the task.
    Payload proto.Message

    // DeduplicationKey is optional unique key used to derive name of the task.
    //
    // If a task of a given class with a given key has already been enqueued
    // recently (within ~1h), this task will be silently ignored.
    //
    // Because there is an extra lookup cost to identify duplicate task names,
    // enqueues of named tasks have significantly increased latency.
    //
    // Can be used only with Cloud Tasks tasks, since PubSub doesn't support
    // deduplication during enqueuing.
    //
    // Named tasks can only be used outside of transactions.
    DeduplicationKey string

    // Title is optional string that identifies the task in server logs.
    //
    // For Cloud Tasks it will also show up as a suffix in task handler URL. It
    // exists exclusively to simplify reading server logs. It serves no other
    // purpose! In particular, it is *not* a task name.
    //
    // Handlers won't ever see it. Pass all information through the payload.
    Title string

    // Delay specifies the duration the Cloud Tasks service must wait before
    // attempting to execute the task.
    //
    // Can be used only with Cloud Tasks tasks. Either Delay or ETA may be set,
    // but not both.
    Delay time.Duration

    // ETA specifies the earliest time a task may be executed.
    //
    // Can be used only with Cloud Tasks tasks. Either Delay or ETA may be set,
    // but not both.
    ETA time.Time
}

Task contains task body and metadata.

type TaskClass Uses

type TaskClass struct {
    // ID is unique identifier of this class of tasks.
    //
    // Must match `[a-zA-Z0-9_\-.]{1,100}`.
    //
    // It is used to decide how to deserialize and route the task. Changing IDs of
    // existing task classes is a disruptive operation, make sure the queue is
    // drained first. The dispatcher will permanently fail all Cloud Tasks with
    // unrecognized class IDs.
    //
    // Required.
    ID  string

    // Prototype identifies a proto message type of a task payload.
    //
    // Used for its type information only. In particular it is used by AddTask
    // to discover what TaskClass matches the added task. There should be
    // one-to-one correspondence between proto message types and task classes.
    //
    // It is safe to arbitrarily change this type as long as JSONPB encoding of
    // the previous type can be decoded using the new type. The dispatcher will
    // permanently fail Cloud Tasks with bodies it can't deserialize.
    //
    // Required.
    Prototype proto.Message

    // Kind indicates whether the task requires a transaction to be enqueued.
    //
    // Note that using transactional tasks requires setting up a sweeper first,
    // see ModuleOptions.
    //
    // Default is NonTransactional which means that tasks can be enqueued only
    // outside of transactions.
    Kind TaskKind

    // Queue is a name of Cloud Tasks queue to use for the tasks.
    //
    // If set, indicates the task should be submitted through Cloud Tasks API.
    // The queue must exist already in this case. Can't be set together with
    // Topic.
    //
    // It can either be a short name like "default" or a full name like
    // "projects/<project>/locations/<region>/queues/<name>". If it is a full
    // name, it must have the above format or RegisterTaskClass would panic.
    //
    // If it is a short queue name, the full queue name will be constructed using
    // dispatcher's CloudProject and CloudRegion if they are set.
    Queue string

    // Topic is a name of PubSub topic to use for the tasks.
    //
    // If set, indicates the task should be submitted through Cloud PubSub API.
    // The topic must exist already in this case. Can't be set together with
    // Queue.
    //
    // It can either be a short name like "tasks" or a full name like
    // "projects/<project>/topics/<name>". If it is a full name, it must have the
    // above format or RegisterTaskClass would panic.
    Topic string

    // RoutingPrefix is a URL prefix for produced Cloud Tasks.
    //
    // Can only be used for Cloud Tasks task (i.e. only if Queue is also set).
    //
    // Default is dispatcher's DefaultRoutingPrefix which itself defaults to
    // "/internal/tasks/t/". It means generated Cloud Tasks by default will have
    // target URL "/internal/tasks/t/<generated-per-task-suffix>".
    //
    // A non-default value can be used to route Cloud Tasks tasks of a particular
    // class to particular processes, assuming the load balancer is configured
    // accordingly.
    RoutingPrefix string

    // TargetHost is a hostname to dispatch Cloud Tasks to.
    //
    // Can only be used for Cloud Tasks task (i.e. only if Queue is also set).
    //
    // If unset, will use dispatcher's DefaultTargetHost.
    TargetHost string

    // Quiet, if set, instructs the dispatcher not to log bodies of tasks.
    Quiet bool

    // InheritTraceContext, if set, makes the task handler trace span be a child
    // of the span that called AddTask.
    //
    // Ignored for PubSub tasks currently, since there's no easy way to put
    // the trace context header into PubSub request headers.
    //
    // Use it only for "one-off" tasks. Using it for deep chains of tasks usually
    // leads to messy complicated traces.
    InheritTraceContext bool

    // Custom, if given, will be called to generate a custom payload from the
    // task's proto payload.
    //
    // Useful for interoperability with existing code that doesn't use dispatcher
    // or if the tasks are meant to be consumed in some custom way. You'll need to
    // setup the consumer manually, the Dispatcher doesn't know how to handle
    // tasks with custom payload.
    //
    // For Cloud Tasks tasks it is possible to customize HTTP method, relative
    // URI, headers and the request body this way. Other properties of the task
    // (such as the target host, the queue, the task name, authentication headers)
    // are not customizable.
    //
    // For PubSub tasks it is possible to customize only task's body and
    // attributes (via CustomPayload.Meta). Other fields in CustomPayload are
    // ignored.
    //
    // Receives the exact same context as passed to AddTask. If returns nil
    // result, the task will be submitted as usual.
    Custom func(ctx context.Context, m proto.Message) (*CustomPayload, error)

    // Handler will be called by the dispatcher to execute the tasks.
    //
    // The handler will receive the task's payload as a proto message of the exact
    // same type as the type of Prototype. See Handler doc for more info.
    //
    // Populating this field is equivalent to calling AttachHandler after
    // registering the class. It may be left nil if the current process just wants
    // to submit tasks, but not handle them. Some other process would need to
    // attach the handler then to be able to process tasks.
    //
    // The dispatcher will permanently fail tasks if it can't find a handler for
    // them.
    Handler Handler
}

TaskClass defines how to treat tasks of a specific proto message type.

It assigns some stable ID to a proto message kind and also defines how tasks of this kind should be submitted and routed.

The are two backends for tasks: Cloud Tasks and Cloud PubSub. Which one to use for a particular task class is defined via mutually exclusive Queue and Topic fields.

Refer to Google Cloud documentation for all semantic differences between Cloud Tasks and Cloud PubSub. One important difference is that Cloud PubSub tasks can't be deduplicated and thus the handler must expect to receive duplicates.

type TaskClassRef Uses

type TaskClassRef interface {
    // AttachHandler sets a handler which will be called by the dispatcher to
    // execute the tasks.
    //
    // The handler will receive the task's payload as a proto message of the exact
    // same type as the type of TaskClass's Prototype. See Handler doc for more
    // info.
    //
    // Panics if the class has already a handler attached.
    AttachHandler(h Handler)
}

TaskClassRef represents a TaskClass registered in a Dispatcher.

func RegisterTaskClass Uses

func RegisterTaskClass(t TaskClass) TaskClassRef

RegisterTaskClass is a shortcut for Default.RegisterTaskClass.

type TaskKind Uses

type TaskKind int

TaskKind describes how a task class interoperates with transactions.

const (
    // NonTransactional is a task kind for tasks that must be enqueued outside
    // of a transaction.
    NonTransactional TaskKind = 0

    // Transactional is a task kind for tasks that must be enqueued only from
    // a transaction.
    //
    // Using transactional tasks requires setting up a sweeper first, see
    // ModuleOptions.
    Transactional TaskKind = 1

    // FollowsContext is a task kind for tasks that are enqueue transactionally
    // if the context is transactional or non-transactionally otherwise.
    //
    // Using transactional tasks requires setting up a sweeper first, see
    // ModuleOptions.
    FollowsContext TaskKind = 2
)

Directories

PathSynopsis
internal
internal/dbPackage db defines common database interface.
internal/lessorPackage lessor defines common lessor interface.
internal/metrics
internal/partitionPackage partition encapsulates partitioning and querying large keyspace which can't be expressed even as uint64.
internal/reminderPackage reminder holds Reminder to avoid circular dependencies.
internal/sweep
internal/testutilPackage testutil provides fakes for testing TQ guts.
internal/tqpb
internal/worksetPackage workset contains a synchronized work queue implementation used by inproc sweeper.
tqtestingPackage tqtesting contains helpers for running server/tq in tests and on localhost.
txn/datastoreDatastore contains Transactional Enqueue support for Cloud Datastore.
txn/spannerSpanner contains Transactional Enqueue support for Cloud Spanner.

Package tq imports 53 packages (graph) and is imported by 4 packages. Updated 2020-09-30. Refresh now. Tools for package owners.