tasks

package
v0.0.0-...-b3c40c2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2023 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StatePending - initial state of a task
	StatePending = "PENDING"
	// StateReceived - when task is received by a worker
	StateReceived = "RECEIVED"
	// StateStarted - when the worker starts processing the task
	StateStarted = "STARTED"
	// StateRetry - when failed task has been scheduled for retry
	StateRetry = "RETRY"
	// StateSuccess - when the task is processed successfully
	StateSuccess = "SUCCESS"
	// StateFailure - when processing of the task fails
	StateFailure = "FAILURE"
)

Variables

View Source
var (
	// ErrTaskMustBeFunc ...
	ErrTaskMustBeFunc = errors.New("Task must be a func type")
	// ErrTaskReturnsNoValue ...
	ErrTaskReturnsNoValue = errors.New("Task must return at least a single value")
	// ErrLastReturnValueMustBeError ..
	ErrLastReturnValueMustBeError = errors.New("Last return value of a task must be error")
)
View Source
var ErrTaskPanicked = errors.New("Invoking task caused a panic")

ErrTaskPanicked ...

Functions

func HumanReadableResults

func HumanReadableResults(results []reflect.Value) string

HumanReadableResults ...

func IsContextType

func IsContextType(t reflect.Type) bool

IsContextType checks to see if the type is a context.Context

func ReflectTaskResults

func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error)

ReflectTaskResults ...

func ReflectValue

func ReflectValue(valueType string, value interface{}) (reflect.Value, error)

ReflectValue converts interface{} to reflect.Value based on string type

func ValidateTask

func ValidateTask(task interface{}) error

ValidateTask validates task function using reflection and makes sure it has a proper signature. Functions used as tasks must return at least a single value and the last return type must be error

Types

type Arg

type Arg struct {
	Name  string      `bson:"name"`
	Type  string      `bson:"type"`
	Value interface{} `bson:"value"`
}

Arg represents a single argument passed to invocation fo a task

type Chain

type Chain struct {
	Tasks []*Signature
}

Chain creates a chain of tasks to be executed one after another

func NewChain

func NewChain(signatures ...*Signature) (*Chain, error)

NewChain creates a new chain of tasks to be processed one by one, passing results unless task signatures are set to be immutable

type Chord

type Chord struct {
	Group    *Group
	Callback *Signature
}

Chord adds an optional callback to the group to be executed after all tasks in the group finished

func NewChord

func NewChord(group *Group, callback *Signature) (*Chord, error)

NewChord creates a new chord (a group of tasks with a single callback to be executed after all tasks in the group has completed)

type ErrRetryTaskLater

type ErrRetryTaskLater struct {
	// contains filtered or unexported fields
}

ErrRetryTaskLater ...

func NewErrRetryTaskLater

func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater

NewErrRetryTaskLater returns new ErrRetryTaskLater instance

func (ErrRetryTaskLater) Error

func (e ErrRetryTaskLater) Error() string

Error implements the error interface

func (ErrRetryTaskLater) RetryIn

func (e ErrRetryTaskLater) RetryIn() time.Duration

RetryIn returns time.Duration from now when task should be retried

type ErrUnsupportedType

type ErrUnsupportedType struct {
	// contains filtered or unexported fields
}

ErrUnsupportedType ...

func NewErrUnsupportedType

func NewErrUnsupportedType(valueType string) ErrUnsupportedType

NewErrUnsupportedType returns new ErrUnsupportedType

func (ErrUnsupportedType) Error

func (e ErrUnsupportedType) Error() string

Error method so we implement the error interface

type Group

type Group struct {
	GroupUUID string
	Tasks     []*Signature
}

Group creates a set of tasks to be executed in parallel

func NewGroup

func NewGroup(signatures ...*Signature) (*Group, error)

NewGroup creates a new group of tasks to be processed in parallel

func (*Group) GetUUIDs

func (group *Group) GetUUIDs() []string

GetUUIDs returns slice of task UUIDS

type GroupMeta

type GroupMeta struct {
	GroupUUID      string    `bson:"_id"`
	TaskUUIDs      []string  `bson:"task_uuids"`
	ChordTriggered bool      `bson:"chord_triggered"`
	Lock           bool      `bson:"lock"`
	CreatedAt      time.Time `bson:"created_at"`
	TTL            int64     `bson:"ttl,omitempty"`
}

GroupMeta stores useful metadata about tasks within the same group E.g. UUIDs of all tasks which are used in order to check if all tasks completed successfully or not and thus whether to trigger chord callback

type Headers

type Headers map[string]interface{}

Headers represents the headers which should be used to direct the task

func (Headers) ForeachKey

func (h Headers) ForeachKey(handler func(key, val string) error) error

ForeachKey on Headers implements opentracing.TextMapReader for trace propagation. It is essentially the same as the opentracing.TextMapReader implementation except for the added casting from interface{} to string.

func (Headers) Set

func (h Headers) Set(key, val string)

Set on Headers implements opentracing.TextMapWriter for trace propagation

type Retriable

type Retriable interface {
	RetryIn() time.Duration
}

Retriable is interface that retriable errors should implement

type Signature

type Signature struct {
	UUID           string
	Name           string
	RoutingKey     string
	ETA            *time.Time
	GroupUUID      string
	GroupTaskCount int
	Args           []Arg
	Headers        Headers
	Priority       uint8
	Immutable      bool
	RetryCount     int
	RetryTimeout   int
	OnSuccess      []*Signature
	OnError        []*Signature
	ChordCallback  *Signature
	//MessageGroupId for Broker, e.g. SQS
	BrokerMessageGroupId string
	//ReceiptHandle of SQS Message
	SQSReceiptHandle string
	// StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
	// and don't want machinery to delete from source queue
	StopTaskDeletionOnError bool
	// IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available
	// When this is true a task with no handler will be ignored and not placed back in the queue
	IgnoreWhenTaskNotRegistered bool
}

Signature represents a single task invocation

func CopySignature

func CopySignature(signature *Signature) *Signature

func CopySignatures

func CopySignatures(signatures ...*Signature) []*Signature

func NewSignature

func NewSignature(name string, args []Arg) (*Signature, error)

NewSignature creates a new task signature

func SignatureFromContext

func SignatureFromContext(ctx context.Context) *Signature

SignatureFromContext gets the signature from the context

type Task

type Task struct {
	TaskFunc   reflect.Value
	UseContext bool
	Context    context.Context
	Args       []reflect.Value
}

Task wraps a signature and methods used to reflect task arguments and return values after invoking the task

func New

func New(taskFunc interface{}, args []Arg) (*Task, error)

New tries to use reflection to convert the function and arguments into a reflect.Value and prepare it for invocation

func NewWithSignature

func NewWithSignature(taskFunc interface{}, signature *Signature) (*Task, error)

NewWithSignature is the same as New but injects the signature

func (*Task) Call

func (t *Task) Call() (taskResults []*TaskResult, err error)

Call attempts to call the task with the supplied arguments.

`err` is set in the return value in two cases:

  1. The reflected function invocation panics (e.g. due to a mismatched argument list).
  2. The task func itself returns a non-nil error.

func (*Task) ReflectArgs

func (t *Task) ReflectArgs(args []Arg) error

ReflectArgs converts []TaskArg to []reflect.Value

type TaskResult

type TaskResult struct {
	Type  string      `bson:"type"`
	Value interface{} `bson:"value"`
}

TaskResult represents an actual return value of a processed task

type TaskState

type TaskState struct {
	TaskUUID  string        `bson:"_id"`
	TaskName  string        `bson:"task_name"`
	State     string        `bson:"state"`
	Results   []*TaskResult `bson:"results"`
	Error     string        `bson:"error"`
	CreatedAt time.Time     `bson:"created_at"`
	TTL       int64         `bson:"ttl,omitempty"`
}

TaskState represents a state of a task

func NewFailureTaskState

func NewFailureTaskState(signature *Signature, err string) *TaskState

NewFailureTaskState ...

func NewPendingTaskState

func NewPendingTaskState(signature *Signature) *TaskState

NewPendingTaskState ...

func NewReceivedTaskState

func NewReceivedTaskState(signature *Signature) *TaskState

NewReceivedTaskState ...

func NewRetryTaskState

func NewRetryTaskState(signature *Signature) *TaskState

NewRetryTaskState ...

func NewStartedTaskState

func NewStartedTaskState(signature *Signature) *TaskState

NewStartedTaskState ...

func NewSuccessTaskState

func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState

NewSuccessTaskState ...

func (*TaskState) IsCompleted

func (taskState *TaskState) IsCompleted() bool

IsCompleted returns true if state is SUCCESS or FAILURE, i.e. the task has finished processing and either succeeded or failed.

func (*TaskState) IsFailure

func (taskState *TaskState) IsFailure() bool

IsFailure returns true if state is FAILURE

func (*TaskState) IsSuccess

func (taskState *TaskState) IsSuccess() bool

IsSuccess returns true if state is SUCCESS

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL