package tasks

import ""

Package tasks implements asynchronous invocation processing.


dispatcher.go finalization.go tasks.go


var AllTypes = []Type{BQExport, TryFinalizeInvocation}

AllTypes is a slice of all known types of tasks.

var ErrConflict = fmt.Errorf("the task is already leased")

ErrConflict is returned by Lease if the task does not exist or is already leased.

var FinalizationTasks = tq.RegisterTaskClass(tq.TaskClass{
    ID:                  "try-finalize-inv",
    Prototype:           &taskspb.TryFinalizeInvocation{},
    Kind:                tq.Transactional,
    InheritTraceContext: true,
    Queue:               "finalizer",
    RoutingPrefix:       "/internal/tasks/finalizer",

FinalizationTasks describes how to route finalization tasks.

The handler is implemented in internal/services/finalizer.

var PermanentFailure = errors.BoolTag{
    Key: errors.NewTagKey("permanent failure to process invocation task"),

PermanentFailure set in an error indicates that the err is not resolvable by a retry. Such task is doomed.

var UseFinalizationTQ = experiments.Register("rdb-use-tq-finalization")

UseFinalizationTQ experiment enables using server/tq for finalization tasks.

func Delete Uses

func Delete(ctx context.Context, typ Type, id string) error

Delete deletes a task.

func Enqueue Uses

func Enqueue(typ Type, taskID string, invID invocations.ID, payload interface{}, processAfter time.Time) *spanner.Mutation

Enqueue inserts one row to InvocationTasks.

func EnqueueBQExport Uses

func EnqueueBQExport(invID invocations.ID, payload *pb.BigQueryExport, processAfter time.Time) *spanner.Mutation

EnqueueBQExport inserts one row to InvocationTasks for a bq export task.

func Lease Uses

func Lease(ctx context.Context, typ Type, id string, duration time.Duration) (invID invocations.ID, payload []byte, err error)

Lease leases an invocation task. If the task does not exist or is already leased, returns ErrConflict.

func Peek Uses

func Peek(ctx context.Context, typ Type, f func(id string) error) error

Peek calls f on available tasks of a given type.

func StartInvocationFinalization Uses

func StartInvocationFinalization(ctx context.Context, id invocations.ID)

StartInvocationFinalization changes invocation state to FINALIZING and enqueues a TryFinalizeInvocation task.

The caller is responsible for ensuring that the invocation is active.

TODO(nodir): this package is not a great place for this function, but there is no better package at the moment. Keep it here for now, but consider a new package as the code base grows.

type Dispatcher Uses

type Dispatcher struct {
    // How often to query for tasks. Defaults to 1m.
    QueryInterval time.Duration

    // How long to lease a task for. Defaults to 1m.
    LeaseDuration time.Duration

    // Number of tasks to process concurrently. Defaults to GOMAXPROCS.
    Workers int

Dispatcher queries for available tasks and dispatches them to goroutines.

func (*Dispatcher) Run Uses

func (d *Dispatcher) Run(ctx context.Context, taskType Type, fn TaskFunc)

Run queries tasks and dispatches them to goroutines until ctx is canceled. Logs errors.

type TaskFunc Uses

type TaskFunc func(ctx context.Context, invID invocations.ID, payload []byte) error

TaskFunc can execute a task. If the returned error is tagged with PermanentFailure, then the failed task is deleted.

type Type Uses

type Type string

Type is a value for InvocationTasks.TaskType column. It defines what a task does.

const (
    // BQExport is a type of task that exports an invocation to BigQuery.
    // The task payload is binary-encoded BigQueryExport message.
    BQExport Type = "bq_export"

    // TryFinalizeInvocation is a type of task that tries to finalize an
    // invocation. No payload.
    TryFinalizeInvocation Type = "finalize"

Types of invocation tasks. Used as InvocationTasks.TaskType column value.

func (Type) Key Uses

func (t Type) Key(taskID string) spanner.Key

Key returns a Spanner key for the InvocationTasks row.



