luci: go.chromium.org/luci/resultdb/internal/tasks Index | Files

package tasks

import "go.chromium.org/luci/resultdb/internal/tasks"

Package tasks implements asynchronous invocation processing.

Index

Package Files

dispatcher.go finalization.go tasks.go

Variables

var AllTypes = []Type{BQExport, TryFinalizeInvocation}

AllTypes is a slice of all known types of tasks.

var ErrConflict = fmt.Errorf("the task is already leased")

ErrConflict is returned by Lease if the task does not exist or is already leased.

var PermanentFailure = errors.BoolTag{
    Key: errors.NewTagKey("permanent failure to process invocation task"),
}

PermanentFailure set in an error indicates that the err is not resolvable by a retry. Such task is doomed.

func Delete Uses

func Delete(ctx context.Context, typ Type, id string) error

Delete deletes a task.

func Enqueue Uses

func Enqueue(typ Type, taskID string, invID invocations.ID, payload interface{}, processAfter time.Time) *spanner.Mutation

Enqueue inserts one row to InvocationTasks.

func EnqueueBQExport Uses

func EnqueueBQExport(invID invocations.ID, payload *pb.BigQueryExport, processAfter time.Time) *spanner.Mutation

EnqueueBQExport inserts one row to InvocationTasks for a bq export task.

func Lease Uses

func Lease(ctx context.Context, typ Type, id string, duration time.Duration) (invID invocations.ID, payload []byte, err error)

Lease leases an invocation task. If the task does not exist or is already leased, returns ErrConflict.

func Peek Uses

func Peek(ctx context.Context, typ Type, f func(id string) error) error

Peek calls f on available tasks of a given type.

func StartInvocationFinalization Uses

func StartInvocationFinalization(ctx context.Context, txn *spanner.ReadWriteTransaction, id invocations.ID) error

StartInvocationFinalization changes invocation state to FINALIZING and enqueues a TryFinalizeInvocation task.

The caller is responsible for ensuring that the invocation is active.

TODO(nodir): this package is not a great place for this function, but there is no better package at the moment. Keep it here for now, but consider a new package as the code base grows.

type Dispatcher Uses

type Dispatcher struct {
    // How often to query for tasks. Defaults to 1m.
    QueryInterval time.Duration

    // How long to lease a task for. Defaults to 1m.
    LeaseDuration time.Duration

    // Number of tasks to process concurrently. Defaults to GOMAXPROCS.
    Workers int
}

Dispatcher queries for available tasks and dispatches them to goroutines.

func (*Dispatcher) Run Uses

func (d *Dispatcher) Run(ctx context.Context, taskType Type, fn TaskFunc)

Run queries tasks and dispatches them to goroutines until ctx is canceled. Logs errors.

type TaskFunc Uses

type TaskFunc func(ctx context.Context, invID invocations.ID, payload []byte) error

TaskFunc can execute a task. If the returned error is tagged with PermanentFailure, then the failed task is deleted.

type Type Uses

type Type string

Type is a value for InvocationTasks.TaskType column. It defines what a task does.

const (
    // BQExport is a type of task that exports an invocation to BigQuery.
    // The task payload is binary-encoded BigQueryExport message.
    BQExport Type = "bq_export"

    // TryFinalizeInvocation is a type of task that tries to finalize an
    // invocation. No payload.
    TryFinalizeInvocation Type = "finalize"
)

Types of invocation tasks. Used as InvocationTasks.TaskType column value.

func (Type) Key Uses

func (t Type) Key(taskID string) spanner.Key

Key returns a Spanner key for the InvocationTasks row.

Package tasks imports 18 packages (graph) and is imported by 1 packages. Updated 2020-07-09. Refresh now. Tools for package owners.