tasks

package module
v0.0.0-...-984cbf5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunSteps

func RunSteps(
	startStep uint32,
	steps []StepFunc,
	saveState func(uint32) error,
) error

func StartRunners

func StartRunners(
	ctx context.Context,
	taskStorage storage.Storage,
	registry *Registry,
	runnerMetricsRegistry metrics.Registry,
	config *tasks_config.TasksConfig,
	host string,
) error

Types

type Controller

type Controller interface {
	StartRunners() error

	HealthChangedCallback(status bool)
}

Controller starts/stops task runners.

func NewController

func NewController(
	ctx context.Context,
	taskStorage storage.Storage,
	registry *Registry,
	runnerMetricsRegistry metrics.Registry,
	config *tasks_config.TasksConfig,
	host string,
) Controller

type ExecutionContext

type ExecutionContext interface {
	SaveState(ctx context.Context) error

	SaveStateWithCallback(
		ctx context.Context,
		callback func(context.Context, *persistence.Transaction) error,
	) error

	GetTaskType() string

	GetTaskID() string

	// Dependencies are automatically added by Scheduler.WaitTask.
	AddTaskDependency(ctx context.Context, taskID string) error

	SetEstimate(estimatedDuration time.Duration)

	HasEvent(ctx context.Context, event int64) bool

	FinishWithCallback(
		ctx context.Context,
		callback func(context.Context, *persistence.Transaction) error,
	) error
}

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry

func (*Registry) NewTask

func (r *Registry) NewTask(taskType string) (Task, error)

func (*Registry) Register

func (r *Registry) Register(
	taskType string,
	newTask func() Task,
) error

func (*Registry) RegisterForExecution

func (r *Registry) RegisterForExecution(
	taskType string,
	newTask func() Task,
) error

func (*Registry) TaskTypesForExecution

func (r *Registry) TaskTypesForExecution() []string

type Scheduler

type Scheduler interface {
	// Requires "idempotency-key" header in ctx metadata.
	// Returns id of the task.
	ScheduleTask(
		ctx context.Context,
		taskType string,
		description string,
		request proto.Message,
		cloudID string,
		folderID string,
	) (string, error)

	// Requires "idempotency-key" header in ctx metadata.
	// The task will be executed by a worker in certain zone.
	// Returns id of the task.
	ScheduleZonalTask(
		ctx context.Context,
		taskType string,
		description string,
		zoneID string,
		request proto.Message,
		cloudID string,
		folderID string,
	) (string, error)

	ScheduleRegularTasks(
		ctx context.Context,
		taskType string,
		schedule TaskSchedule,
	)

	// Marks task as cancelled.
	// Returns true if it's already cancelling (or cancelled).
	// Returns false if it has successfully finished.
	CancelTask(ctx context.Context, taskID string) (bool, error)

	// Interrupts execution if waited task has no result yet.
	// Execution will be resumed when result is ready.
	// Returns task response.
	// Returns error if task is cancelling.
	WaitTask(
		ctx context.Context,
		execCtx ExecutionContext,
		taskID string,
	) (proto.Message, error)

	// Synchronously waits until any of the given tasks is finished.
	// Returns finished tasks ids.
	// Returns error if any of the given tasks finishes with error.
	WaitAnyTasks(ctx context.Context, taskIDs []string) ([]string, error)

	// Synchronously waits until task is finished successfully or cancelled.
	WaitTaskEnded(ctx context.Context, taskID string) error

	GetTaskMetadata(ctx context.Context, taskID string) (proto.Message, error)

	SendEvent(ctx context.Context, taskID string, event int64) error

	// TODO: Does it belong here?
	GetOperation(ctx context.Context, taskID string) (*operation.Operation, error)

	// Used in tests.
	// Synchronously waits for task response.
	WaitTaskSync(
		ctx context.Context,
		taskID string,
		timeout time.Duration,
	) (proto.Message, error)

	// Schedules no-op task. Used for testing.
	ScheduleBlankTask(ctx context.Context) (string, error)
}

func NewScheduler

func NewScheduler(
	ctx context.Context,
	registry *Registry,
	storage tasks_storage.Storage,
	config *tasks_config.TasksConfig,
	metricsRegistry metrics.Registry,
) (Scheduler, error)

type StepFunc

type StepFunc func(done *bool) error

type Task

type Task interface {
	// Serialize task state.
	Save() ([]byte, error)

	// Deserialize task state.
	Load(request []byte, state []byte) error

	// Synchronously run the task.
	// At the end it's expected to respond to GetRequest.
	Run(ctx context.Context, execCtx ExecutionContext) error

	// Synchronously cancel the task.
	Cancel(ctx context.Context, execCtx ExecutionContext) error

	GetMetadata(ctx context.Context, taskID string) (proto.Message, error)

	// It only makes sense after Run has completed successfully.
	// But in that case it must not return nil.
	GetResponse() proto.Message
}

type TaskSchedule

type TaskSchedule struct {
	ScheduleInterval time.Duration
	MaxTasksInflight int

	// Crontab params.
	// Schedules task every day - only 'hour' and 'min' are supported.
	UseCrontab bool // If set, ScheduleInterval is ignored.
	Hour       int  // (0 - 23)
	Min        int  // (0 - 59)
}

Directories

Path Synopsis
acceptance_tests
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL