tasks

package
v4.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: AGPL-3.0 Imports: 35 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub *pubsub.PubSub
)

Functions

func UnSubWithFlush added in v4.2.2

func UnSubWithFlush(ch chan interface{}, topics ...string)

UnSubWithFlush wraps PubSub.Unsub with a select to make sure all messages are consumed before unsubscribing.

Types

type ContextJobParametersKey

type ContextJobParametersKey struct{}

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(rootCtx context.Context, maxWorkers int, job *jobs.Job, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Queue added in v4.2.4

func (d *Dispatcher) Queue() chan RunnerFunc

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	*jobs.Action
	Task           *Task
	Message        *jobs.ActionMessage
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
	// contains filtered or unexported fields
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, parent *Runnable, queue chan RunnerFunc, action *jobs.Action, message *jobs.ActionMessage, indexTag ...int) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, task *Task) Runnable

func (*Runnable) Add added in v4.0.5

func (r *Runnable) Add(i int)

func (*Runnable) AsRunnerFuncRun added in v4.0.5

func (r *Runnable) AsRunnerFuncRun() RunnerFunc

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(input *jobs.ActionMessage, aa []*jobs.Action, queue chan RunnerFunc)

Dispatch gets next Runnable list from action and enqueues them to the Queue Done channel should be working correctly with chained actions

func (*Runnable) Done added in v4.0.5

func (r *Runnable) Done()

func (*Runnable) RunAction

func (r *Runnable) RunAction(queue chan RunnerFunc)

RunAction creates an action and calls Dispatch

func (*Runnable) SetupCollector added in v4.0.5

func (r *Runnable) SetupCollector(parentCtx context.Context, mergeAction *jobs.Action, queue chan RunnerFunc)

type RunnerFunc added in v4.0.5

type RunnerFunc func(queue chan RunnerFunc)

type Subscriber

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition.

func (*Subscriber) Init

func (s *Subscriber) Init(ctx context.Context) error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

type Task

type Task struct {
	*jobs.Job
	common.RuntimeHolder
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(runtime, ctx context.Context, job *jobs.Job, event interface{}) *Task

NewTaskFromEvent creates a task based on incoming job and event

func (*Task) Add

func (t *Task) Add(delta int)

Add increments task internal retain counter

func (*Task) CleanUp added in v4.0.1

func (t *Task) CleanUp()

CleanUp is triggered after a task has no more subroutines running.

func (*Task) Clone added in v4.2.2

func (t *Task) Clone() *jobs.Task

Clone creates a protobuf clone of this task

func (*Task) Done

func (t *Task) Done(delta int)

Done decrements task internal retain counter - When reaching 0, it triggers the CleanUp operation

func (*Task) GetRunUUID

func (t *Task) GetRunUUID() string

GetRunUUID returns the task internal run UUID

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels(runnableCtx context.Context, controllable bool) (*actions.RunnableChannels, chan bool)

GetRunnableChannels prepares a set of data channels for action actual Run method.

func (*Task) Queue added in v4.0.1

func (t *Task) Queue(queue ...chan RunnerFunc)

Queue send this new task to the dispatcher queue. If a second queue is passed, it may differ from main input queue, so it is used for children queuing

func (*Task) Save

func (t *Task) Save()

func (*Task) SaveStatus added in v4.4.0

func (t *Task) SaveStatus(runnableContext context.Context, runnableStatus jobs.TaskStatus)

SaveStatus publish task to PubSub topic, including Runnable context if passed

func (*Task) SetControllable

func (t *Task) SetControllable(canPause bool)

SetControllable flags task as being able to be stopped or paused

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

SetEndTime updates end time

func (*Task) SetError added in v4.0.1

func (t *Task) SetError(e error, appendLog bool)

SetError set task in error globally

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

SetHasProgress flags task as providing progress information

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

SetProgress updates task internal progress

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

SetStartTime updates start time

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

SetStatus updates task internal status

type TaskStatusUpdate added in v4.4.0

type TaskStatusUpdate struct {
	*jobs.Task
	RunnableContext context.Context
	RunnableStatus  jobs.TaskStatus
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan RunnerFunc, requeue chan RunnerFunc, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL