storage

package
v0.0.0-...-984cbf5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateYDBTables

func CreateYDBTables(
	ctx context.Context,
	config *tasks_config.TasksConfig,
	db *persistence.YDBClient,
	dropUnusedColumns bool,
) error

func DropYDBTables

func DropYDBTables(
	ctx context.Context,
	config *tasks_config.TasksConfig,
	db *persistence.YDBClient,
) error

func HasResult

func HasResult(status TaskStatus) bool

func IsCancelling

func IsCancelling(status TaskStatus) bool

func IsEnded

func IsEnded(status TaskStatus) bool

func TaskStatusToString

func TaskStatusToString(status TaskStatus) string

Types

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata(vals map[string]string) Metadata

func (*Metadata) DeepCopy

func (m *Metadata) DeepCopy() Metadata

func (*Metadata) Put

func (m *Metadata) Put(key string, val string)

func (*Metadata) Remove

func (m *Metadata) Remove(key string)

func (*Metadata) Vals

func (m *Metadata) Vals() map[string]string

type Node

type Node struct {
	Host string
	// Timestamp of the last heartbeat.
	LastHeartbeat time.Time
	// Number of inflight tasks reported during the last heartbeat.
	InflightTaskCount uint32
}

Registered node that sends heartbeats.

type Storage

type Storage interface {
	// Attempt to register new task in the storage. TaskState.ID is ignored.
	// Returns generated task id for newly created task.
	// Returns existing task id if the task with the same idempotency key and
	// the same account id already exists.
	CreateTask(ctx context.Context, state TaskState) (string, error)

	// Creates periodically scheduled task.
	CreateRegularTasks(
		ctx context.Context,
		state TaskState,
		schedule TaskSchedule,
	) error

	GetTask(ctx context.Context, taskID string) (TaskState, error)

	GetTaskByIdempotencyKey(
		ctx context.Context,
		idempotencyKey string,
		accountID string,
	) (TaskState, error)

	// Used in task execution workflow.
	ListTasksReadyToRun(
		ctx context.Context,
		limit uint64,
		taskTypeWhitelist []string,
	) ([]TaskInfo, error)

	// Used in task execution workflow.
	ListTasksReadyToCancel(
		ctx context.Context,
		limit uint64,
		taskTypeWhitelist []string,
	) ([]TaskInfo, error)

	// Lists tasks that are currently running but don't make any progress for
	// some time.
	ListTasksStallingWhileRunning(
		ctx context.Context,
		excludingHostname string,
		limit uint64,
		taskTypeWhitelist []string,
	) ([]TaskInfo, error)

	// Lists tasks that are currently cancelling but don't make any progress for
	// some time.
	ListTasksStallingWhileCancelling(
		ctx context.Context,
		excludingHostname string,
		limit uint64,
		taskTypeWhitelist []string,
	) ([]TaskInfo, error)

	// Used for SRE tools.
	ListTasksRunning(ctx context.Context, limit uint64) ([]TaskInfo, error)
	ListTasksCancelling(ctx context.Context, limit uint64) ([]TaskInfo, error)
	ListFailedTasks(ctx context.Context, since time.Time) ([]string, error)
	ListSlowTasks(ctx context.Context, since time.Time, estimateMiss time.Duration) ([]string, error)

	// Fails with WrongGenerationError, if generationID does not match.
	LockTaskToRun(
		ctx context.Context,
		taskInfo TaskInfo,
		at time.Time,
		hostname string,
		runner string,
	) (TaskState, error)

	// Fails with WrongGenerationError, if generationID does not match.
	LockTaskToCancel(
		ctx context.Context,
		taskInfo TaskInfo,
		at time.Time,
		hostname string,
		runner string,
	) (TaskState, error)

	// Mark task for cancellation.
	// Returns true if it's already cancelling (or cancelled)
	// Returns false if it has successfully finished.
	MarkForCancellation(ctx context.Context, taskID string, at time.Time) (bool, error)

	// This fails with WrongGenerationError, if generationID does not match.
	// In callback you could perform custom transaction and it will be coupled
	// with current task's updating.
	UpdateTaskWithCallback(
		ctx context.Context,
		state TaskState,
		callback func(context.Context, *persistence.Transaction) error,
	) (TaskState, error)

	// This fails with WrongGenerationError, if generationID does not match.
	UpdateTask(ctx context.Context, state TaskState) (TaskState, error)

	SendEvent(ctx context.Context, taskID string, event int64) error

	// Used for garbage collecting of ended and outdated tasks.
	ClearEndedTasks(ctx context.Context, endedBefore time.Time, limit int) error

	// NOTE: used for SRE operations only.
	// Forcefully finishes task by setting "finished" status.
	ForceFinishTask(ctx context.Context, taskID string) error

	// NOTE: used for SRE operations only.
	// Pauses task execution until ResumeTask is called.
	PauseTask(ctx context.Context, taskID string) error

	// NOTE: used for SRE operations only.
	ResumeTask(ctx context.Context, taskID string) error

	// Update last heartbeat column.
	Heartbeat(ctx context.Context, host string, ts time.Time, inflightTaskCount uint32) error

	// Fetch the nodes that have recently send heartbeats.
	GetAliveNodes(ctx context.Context) ([]Node, error)

	// Fetch the node.
	// This method is used in testing, but can be safely used if needed.
	GetNode(ctx context.Context, host string) (Node, error)
}

func NewStorage

func NewStorage(
	config *tasks_config.TasksConfig,
	metricsRegistry metrics.Registry,
	db *persistence.YDBClient,
) (Storage, error)

type StringSet

type StringSet struct {
	// contains filtered or unexported fields
}

func NewStringSet

func NewStringSet(vals ...string) StringSet

func (*StringSet) Add

func (s *StringSet) Add(val string)

func (*StringSet) DeepCopy

func (s *StringSet) DeepCopy() StringSet

func (*StringSet) Has

func (s *StringSet) Has(val string) bool

func (*StringSet) List

func (s *StringSet) List() []string

func (*StringSet) Remove

func (s *StringSet) Remove(val string)

func (*StringSet) Size

func (s *StringSet) Size() int

func (*StringSet) Vals

func (s *StringSet) Vals() map[string]struct{}

type TaskInfo

type TaskInfo struct {
	ID           string
	GenerationID uint64
	TaskType     string
}

type TaskSchedule

type TaskSchedule struct {
	ScheduleInterval time.Duration
	MaxTasksInflight int

	// Crontab params.
	// Schedules task every day - only 'hour' and 'min' are supported.
	UseCrontab bool // If set, ScheduleInterval is ignored.
	Hour       int  // (0 - 23)
	Min        int  // (0 - 59)
}

type TaskState

type TaskState struct {
	ID                  string
	IdempotencyKey      string
	AccountID           string
	TaskType            string
	Regular             bool
	Description         string
	StorageFolder       string
	CreatedAt           time.Time
	CreatedBy           string
	ModifiedAt          time.Time
	GenerationID        uint64
	Status              TaskStatus
	ErrorCode           grpc_codes.Code
	ErrorMessage        string
	ErrorDetails        *errors.ErrorDetails
	ErrorSilent         bool
	RetriableErrorCount uint64
	Request             []byte
	State               []byte
	Metadata            Metadata
	Dependencies        StringSet
	ChangedStateAt      time.Time
	EndedAt             time.Time
	LastHost            string
	LastRunner          string
	ZoneID              string
	CloudID             string
	FolderID            string
	EstimatedTime       time.Time
	PanicCount          uint64
	Events              []int64
	// contains filtered or unexported fields
}

This is mapped into a DB row. If you change this struct, make sure to update the mapping code.

func (*TaskState) DeepCopy

func (s *TaskState) DeepCopy() TaskState

func (*TaskState) SetError

func (s *TaskState) SetError(e error)

type TaskStatus

type TaskStatus uint32
const (
	TaskStatusReadyToRun      TaskStatus = iota
	TaskStatusWaitingToRun    TaskStatus = iota
	TaskStatusRunning         TaskStatus = iota
	TaskStatusFinished        TaskStatus = iota
	TaskStatusReadyToCancel   TaskStatus = iota
	TaskStatusWaitingToCancel TaskStatus = iota
	TaskStatusCancelling      TaskStatus = iota
	TaskStatusCancelled       TaskStatus = iota
)

NOTE: These values are stored in DB, do not shuffle them around.

func (*TaskStatus) UnmarshalYDB

func (s *TaskStatus) UnmarshalYDB(res persistence.RawValue) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL