sched

package
v0.0.0-...-90deddd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package sched implements task scheduling for Reflow.

A unit of work is encapsulated by a Task, and is submitted to the scheduler. Multiple tasks may be submitted simultaneously (and may lead to better packing). Tasks are packed into a set of Reflow allocs; these are dynamically sourced from the provided cluster as needed. The scheduler attempts to pack tasks into as few allocs as possible so that they may be reclaimed when they become idle.

The scheduler is given a Repository from which dependent objects are downloaded and to which results are uploaded. This repository is typically also the Reflow cache. A task fails if the necessary objects cannot be fetched, or if uploading fails.

If an alloc's keepalive fails, its running tasks are marked as lost and rescheduled.

Index

Constants

View Source
const ExpVarScheduler = "scheduler"

ExpVarScheduler is the prefix of the scheduler stats exported name.

Variables

This section is empty.

Functions

func GetTaskStatsId

func GetTaskStatsId(task *Task) string

Types

type AllocStats

type AllocStats struct {
	sync.Mutex `json:"-"`
	AllocStatsData
}

AllocStats is the per alloc stats used to update stats.

func (*AllocStats) AssignTask

func (a *AllocStats) AssignTask(task *Task)

AssignTask makes an alloc<->task association.

func (*AllocStats) Copy

func (a *AllocStats) Copy() AllocStatsData

Copy returns an immutable snapshot of AllocStats.

func (*AllocStats) MarkDead

func (a *AllocStats) MarkDead()

MarkDead marks an alloc dead.

func (*AllocStats) RemoveTask

func (a *AllocStats) RemoveTask(task *Task)

RemoveTask removes the alloc<->task association.

type AllocStatsData

type AllocStatsData struct {
	// Resources is the currently available resources.
	reflow.Resources
	// Dead indicates if this alloc is dead.
	Dead bool
	// TaskIDs is the list of tasks running in this alloc.
	TaskIDs map[string]int
}

AllocStatsData is the per alloc stats snapshot.

type Cluster

type Cluster interface {
	// Allocate returns an alloc with at least req.Min resources, or an
	// error. The requirement's width is used as a hint to size allocs
	// efficiently.
	Allocate(ctx context.Context, req reflow.Requirements, labels pool.Labels) (pool.Alloc, error)

	// CanAllocate returns whether this cluster can allocate the given amount of resources.
	CanAllocate(reflow.Resources) (bool, error)
}

Cluster is the scheduler's cluster interface.

type OverallStats

type OverallStats struct {
	// TotalAllocs is the total number of allocs in the system (live or dead).
	TotalAllocs int64
	// TotalTasks is the total number of tasks (pending, running or completed).
	TotalTasks int64
}

OverallStats is the overall scheduler stats.

type Scheduler

type Scheduler struct {
	// Transferer is used to manage data movement between
	// allocs and the scheduler's repository.
	Transferer reflow.Transferer
	// Mux is used to manage direct data transfers between blob stores (if supported)
	Mux blob.Mux
	// Cluster provides dynamic allocation of allocs.
	Cluster Cluster
	// Log logs scheduler actions.
	Log *log.Logger
	// TaskDB is the task reporting db.
	TaskDB taskdb.TaskDB

	// MaxPendingAllocs is the maximum number outstanding
	// alloc requests.
	MaxPendingAllocs int
	// MaxAllocIdleTime is the time after which an idle alloc is
	// collected.
	MaxAllocIdleTime time.Duration

	// DrainTimeout is the duration to wait to see if more tasks have been submitted
	// so that we can combine the requirements of those tasks together to make larger allocs.
	DrainTimeout time.Duration

	// MinAlloc is the smallest resource allocation that is made by
	// the scheduler.
	MinAlloc reflow.Resources

	// Labels is the set of labels applied to newly created allocs.
	Labels pool.Labels

	// Stats is the scheduler stats.
	Stats *Stats
	// contains filtered or unexported fields
}

A Scheduler is responsible for managing a set of tasks and allocs, assigning (and reassigning) tasks to appropriate allocs. Scheduler can manage large numbers of tasks and allocs efficiently.

func New

func New() *Scheduler

New returns a new Scheduler instance. The caller may customize its parameters before starting scheduling by invoking Scheduler.Do.

func (*Scheduler) Do

func (s *Scheduler) Do(ctx context.Context) error

Do commences scheduling. The scheduler runs until the provided context is canceled, after which the context error is returned.

func (*Scheduler) ExportStats

func (s *Scheduler) ExportStats()

ExportStats exports scheduler stats as expvars.

func (*Scheduler) Submit

func (s *Scheduler) Submit(tasks ...*Task)

Submit adds a set of tasks to the scheduler's todo list. The provided tasks are managed by the scheduler after this call. The scheduler manages a task until it reaches the TaskDone state.

type Stats

type Stats struct {
	// Mutex protects all the data members.
	sync.Mutex `json:"-"`
	// OverallStats has the overall scheduler stats.
	OverallStats
	// Allocs has all the alloc stats, including dead ones.
	Allocs map[string]*AllocStats
	// Tasks has all the task state and stats, including completed/error tasks.
	Tasks map[string]*TaskStats
}

Stats has all the scheduler stats, including alloc/task states and stats. It is thread safe and can be used to update stats.

func (*Stats) AddAlloc

func (s *Stats) AddAlloc(alloc *alloc)

AddAlloc adds an alloc to the stats.

func (*Stats) AddTasks

func (s *Stats) AddTasks(tasks []*Task)

AddTasks adds the tasks to the stats.

func (*Stats) AssignTask

func (s *Stats) AssignTask(task *Task, alloc *alloc)

AssignTask assigns a task to an alloc.

func (*Stats) GetStats

func (s *Stats) GetStats() StatsData

GetStats returns a snapshot of the scheduler stats.

func (*Stats) MarkAllocDead

func (s *Stats) MarkAllocDead(alloc *alloc)

MarkAllocDead marks an alloc dead.

func (*Stats) Publish

func (s *Stats) Publish()

Publish publishes the stats as a go expvar.

func (*Stats) PublishPrefix

func (s *Stats) PublishPrefix(prefix string)

Publish publishes the stats as a go expvar with the given prefix.

func (*Stats) ReturnTask

func (s *Stats) ReturnTask(task *Task, alloc *alloc)

ReturnTask removes a task from the stats before returning it.

type StatsData

type StatsData struct {
	// OverallStats has the overall scheduler stats.
	OverallStats
	// Allocs has all the alloc stats, including dead ones.
	Allocs map[string]AllocStatsData
	// Tasks has all the task state and stats, including completed/error tasks.
	Tasks map[string]TaskStatsData
}

StatsData is a immutable snapshot of Stats, usually obtained by calling Stats.GetStats().

type Task

type Task struct {
	// Config is the task's exec config, which is passed on to the
	// alloc after scheduling.
	Config reflow.ExecConfig
	// Repository to use for this task.
	// Repository is the repository from which dependent objects are
	// downloaded and to which result objects are uploaded.
	Repository reflow.Repository
	// Log receives any status log messages during task scheduling
	// and execution.
	Log *log.Logger

	// Err stores any task scheduling error. If Err != nil while the
	// task is TaskDone, then the task failed to run.
	Err error
	// Result stores the Reflow result returned by a successful
	// execution.
	Result reflow.Result

	// RunInfo stores log/inspect information from the exec.
	RunInfo reflow.ExecRunInfo

	// Exec is the exec which is running (or ran) the task. Exec is
	// set by the scheduler before the task enters TaskRunning state.
	Exec reflow.Exec

	// Priority is the task priority. Lower numbers indicate higher priority.
	// Higher priority tasks will get scheduler before any lower priority tasks.
	Priority int

	// PostUseChecksum indicates whether input filesets are checksummed after use.
	PostUseChecksum bool

	// ExpectedDuration is the duration the task is expected to take used only as a hint
	// by the scheduler for better scheduling.
	ExpectedDuration time.Duration

	// RunID that created this task.
	RunID taskdb.RunID
	// FlowID is the digest (flow.Digest) of the flow for which this task was created.
	FlowID digest.Digest

	// TaskDB is where the task row for this task is recorded and is set by the scheduler only after the task was attempted.
	TaskDB taskdb.TaskDB
	// contains filtered or unexported fields
}

Task represents a schedulable unit of work. Tasks are submitted to a scheduler which in turn schedules them onto allocs. After submission, all coordination is performed through the task struct.

func NewTask

func NewTask() *Task

NewTask returns a new, initialized task. The Task may be populated and then submitted to the scheduler.

func (*Task) Attempt

func (t *Task) Attempt() int

Attempt returns the task's current attempt index (zero-based).

func (*Task) ID

func (t *Task) ID() taskdb.TaskID

ID is the identifier for this task. ID is only set when the scheduler attempts the task. Attempts to read the ID before it is set will panic.

func (*Task) Init

func (t *Task) Init()

Init initializes the task's ID.

func (*Task) Reset

func (t *Task) Reset()

Reset resets the task. That is: - it resets the task's state to `TaskInit` - assigns a new id for the task - increases its attempt count.

func (*Task) Set

func (t *Task) Set(state TaskState)

Set sets the task's state to the given state.

func (*Task) State

func (t *Task) State() TaskState

State returns the task's current state.

func (*Task) Wait

func (t *Task) Wait(ctx context.Context, state TaskState) error

Wait returns after the task's state is at least the provided state. Wait returns an error if the context was canceled while waiting.

type TaskSet

type TaskSet map[*Task]bool

TaskSet is a set of tasks.

func NewTaskSet

func NewTaskSet(tasks ...*Task) TaskSet

newTaskSet returns a set of tasks.

func (TaskSet) Len

func (s TaskSet) Len() int

Len returns the number of tasks in the taskSet.

func (TaskSet) RemoveAll

func (s TaskSet) RemoveAll(tasks ...*Task)

RemoveAll removes tasks from the taskSet.

func (TaskSet) Slice

func (s TaskSet) Slice() []*Task

Slice returns a slice containing the tasks in the taskSet.

type TaskState

type TaskState int

TaskState enumerates the possible states of a task.

const (
	// TaskInit is the initial state of a Task. No work has yet been done.
	TaskInit TaskState = iota
	// TaskStaging indicates that the task is currently staging input
	// data.
	TaskStaging
	// TaskRunning indicates the task is currently executing.
	TaskRunning
	// TaskLost indicates that the task has transiently failed and will be
	// retried by the scheduler.
	TaskLost
	// TaskDone indicates the task has completed.
	TaskDone
)

func (TaskState) String

func (s TaskState) String() string

type TaskStats

type TaskStats struct {
	// Mutex protects TaskStatsData.
	sync.Mutex `json:"-"`
	// TaskStatsData are the task stats.
	TaskStatsData
}

TaskStats is the per task info and stats used to update stats.

func (*TaskStats) Copy

func (t *TaskStats) Copy() TaskStatsData

Copy returns a immutable snapshot of TaskStats.

func (*TaskStats) Update

func (t *TaskStats) Update(task *Task)

Update updates task state, error, if any.

type TaskStatsData

type TaskStatsData struct {
	// Ident is the exec identifier of this task.
	Ident string
	// Type is the type of exec.
	Type string
	// State is the current state of the task.
	State int
	// Error if not nil, is the task error.
	Error error
	// RunID is the run the task belongs to.
	RunID string
	// FlowID is the flow corresponding to this task.
	FlowID string
}

TaskStatsData is a snapshot of the task stats.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL