scheduler

package
v0.0.0-...-e6aa6c5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package scheduler implements a runtime scheduler for cff with support for interdependent jobs.

To use the scheduler, build one with Config.New, providing the desired maximum number of goroutines.

cfg := scheduler.Config{Concurrency: 4}
sched := cfg.New()

With a scheduler available, enqueue jobs into it with the Enqueue method.

j1 := sched.Enqueue(ctx, Job{..})

The scheduler will begin running this job as soon as a worker is available.

Enqueue returns a reference to the scheduled job. Use this reference in other Enqueue calls to specify dependencies for jobs.

j3 := sched.Enqueue(ctx, Job{
	...,
	Dependencies: []*scheduler.ScheduledJob{j1, j2},
})

j3 will not be run until j1 and j2 have finished successfully.

Dependencies must be enqueued before jobs that depend on them. This adds the burden of dependency order resolution on the caller.

After enqueuing all jobs, await completion with Scheduler.Wait. This is comparable to WaitGroup.Wait().

err := sched.Wait(ctx)

If any of the enqueued jobs failed, the remaining jobs will be aborted and Wait will return the error. This may be changed by setting Config.ContinueOnError.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Concurrency is the number of concurrent workers to schedule tasks to.
	//
	// Defaults to max(GOMAXPROCS, 4).
	Concurrency int

	// Emitter provides a hook into the state of the scheduler.
	Emitter Emitter

	// StateFlushFrequency is how often the scheduler will emit metrics with the
	// emitter.
	//
	// Defaults to 100 milliseconds.
	StateFlushFrequency time.Duration

	// ContinueOnError, if true when a job fails, directs the scheduler to
	// record its failure, invalidate all jobs that depend on the failed job,
	// and keep running.
	ContinueOnError bool
}

Config stores parameters the scheduler should run with and is the entry point for running the scheduler.

func (Config) New

func (c Config) New() *Scheduler

New starts a scheduler with a fixed number of goroutines. goroutines.

Enqueue jobs into the returned scheduler using the Enqueue method, and wait for the result with Wait.

type Emitter

type Emitter interface {
	Emit(State)
}

Emitter emits the state of the cff scheduler.

type Job

type Job struct {
	// Run executes the job and returns the error it encountered, if any.
	Run func(context.Context) error

	// Dependencies are previously enqueued jobs that must run before this
	// job.
	Dependencies []*ScheduledJob
}

Job is an independent executable unit meant to be executed by the scheduler.

type ScheduledJob

type ScheduledJob struct {
	// contains filtered or unexported fields
}

ScheduledJob is a job that has been scheduled for execution by the scheduler.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules jobs for a cff flow or parallel.

func (*Scheduler) Enqueue

func (s *Scheduler) Enqueue(ctx context.Context, j Job) *ScheduledJob

Enqueue queues up a job for execution with the scheduler. The returned object may be used as a dependency for other jobs.

Enqueue will panic if called after calling Wait.

func (*Scheduler) Wait

func (s *Scheduler) Wait(ctx context.Context) error

Wait waits for all scheduled jobs to finish by default it returns the first error encountered, if any.

If ContinueOnError was specified, the returned error will combine all errors from jobs failures.

No new jobs may be enqueued once Wait is called.

type State

type State struct {
	// Pending is total number of jobs, including jobs being executed and
	// waiting to be executed.
	Pending int
	// Ready is number of jobs to be executed but awaiting a free worker.
	// If this number is consistently high, increase the concurrency for this
	// flow.
	Ready int
	// Waiting is number of jobs waiting for other jobs to be finished before
	// being scheduled. If this number is consistently high, your flow has a
	// task that bottlenecks its performance, consider analyzing and
	// restructuring dependencies.
	Waiting int

	// IdleWorkers reports the number of workers that don't have any jobs to run.
	// If this is consistently high, decrease the concurrency for this flow.
	IdleWorkers int

	// Concurrency is the number of workers the scheduler can process tasks
	// with.
	Concurrency int
}

State describes the status of jobs managed by the cff scheduler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL