jobs

package
v0.0.0-...-aa29b24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2023 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobStatusPending = iota
	JobStatusRunning
	JobStatusDone
)

Enum for possible job states as tracked by the StateManager.

Variables

This section is empty.

Functions

func GetID

func GetID(ctx context.Context) string

GetID returns the job ID when called from RunContext.

Types

type CronJobStatus

type CronJobStatus struct {
	Name      string
	Schedule  string
	Prev      time.Time
	Next      time.Time
	LastError string
}

CronJobStatus represents the status of a job, either scheduled, running, or terminated in the past.

type ExclusiveLockManager

type ExclusiveLockManager struct {
	// contains filtered or unexported fields
}

ExclusiveLockManager keeps an exclusive lock of running jobs by Key, and can apply an overrun policy to them: upon starting a new job, when a previous job with the same key exists, it can evict the previous job or abort starting the new one.

func NewExclusiveLockManager

func NewExclusiveLockManager() *ExclusiveLockManager

NewExclusiveLockManager returns a new ExclusiveLockManager.

func (*ExclusiveLockManager) WithExclusiveLock

func (m *ExclusiveLockManager) WithExclusiveLock(j Job, lockKey string, killAndRun bool) Job

WithExclusiveLock wraps a Job with an exclusive lock, so that no more than one job with this key may be running at any given time. The killAndRun flag selects the desired overrun policy when a second task is started.

type Job

type Job interface {
	ID() string
	RunContext(context.Context) error
	Cancel()
	Wait() error
}

Job is a task that can be run and canceled, and that has a string that can be used to identify successive instances of the same task, differing only in their execution time (so keys don't have to be unique). It's basically a glorified goroutine wrapper with a cancelable Context.

func AsyncGroup

func AsyncGroup(jobs []Job) Job

AsyncGroup runs all the given jobs asynchronously, and waits for all of them to terminate. It fails if any return an error.

func JobFunc

func JobFunc(fn func(context.Context) error) Job

JobFunc creates a new cancelable Job that wraps a function call.

func SyncGroup

func SyncGroup(jobs []Job) Job

SyncGroup runs all the given jobs synchronously, aborting on the first error.

func WithCancel

func WithCancel(j Job) Job

WithCancel wraps a job with a Context that can be canceled by calling the Cancel() method. It also implements its own Wait() method, so you don't have to.

func WithID

func WithID(j Job) Job

WithID gives a job a random unique ID.

func WithTimeout

func WithTimeout(j Job, timeout time.Duration) Job

WithTimeout wraps a job with a timeout, after which the job is canceled.

type JobGeneratorFunc

type JobGeneratorFunc func() Job

JobGeneratorFunc is a function that returns a new Job.

type JobStatus

type JobStatus int

JobStatus is an enum representing the state of a job.

func (JobStatus) String

func (s JobStatus) String() string

String returns a text representation of the job state.

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager can limit the number of jobs by user-define tag. Each tag corresponds to a separate queue, limited to a certain number of parallel tasks. By default (or for unknown queues) there is no limit.

func NewQueueManager

func NewQueueManager(spec *QueueSpec) *QueueManager

NewQueueManager returns a new QueueManager with the provided configuration.

func (*QueueManager) WithQueue

func (m *QueueManager) WithQueue(j Job) Job

WithQueue wraps a job with a concurrency limit controller.

type QueueSpec

type QueueSpec struct {
	Concurrency int `yaml:"concurrency"`
}

QueueSpec describes the configuration of named queues.

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

A Schedule configures a Scheduler with job generators.

func NewSchedule

func NewSchedule(ctx context.Context, hostSeed int64) *Schedule

NewSchedule creates a new Schedule. The context passed to this function is the one that all scheduled jobs will be using, so use it for global cancellation, or just pass context.Background().

func (*Schedule) Add

func (s *Schedule) Add(name, schedStr string, jobFn JobGeneratorFunc) error

Add a task to the schedule.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

A Scheduler triggers Jobs on a periodic schedule. It uses job generators (functions that return Jobs) to create new jobs at the desired time.

The standard cron syntax (documentation available at https://github.com/robfig/cron) is extended with the syntax:

@random_every <duration>

where 'duration' specifies a time.Duration as recognized by time.ParseDuration. This results in the job running at a time with a random offset within the given period. The offset stays constant over time because the random seed it's generated from is saved in a file.

func NewScheduler

func NewScheduler() *Scheduler

NewScheduler creates a new Scheduler.

func (*Scheduler) GetStatus

func (s *Scheduler) GetStatus() []CronJobStatus

GetStatus returns the current status of the scheduled jobs.

func (*Scheduler) RunNow

func (s *Scheduler) RunNow()

RunNow starts all jobs right now, regardless of their schedule.

func (*Scheduler) SetSchedule

func (s *Scheduler) SetSchedule(schedule *Schedule)

SetSchedule replaces the current schedule with a new one.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop the scheduler (won't affect running jobs).

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager adds a state and ID to jobs and keeps track of them after they have run. This is basically a way to keep track of running goroutines at the level of granularity that we desire.

It has no practical effect on the jobs themselves, it's just a way to provide the user with debugging and auditing information.

func NewStateManager

func NewStateManager() *StateManager

NewStateManager creates a new StateManager.

func (*StateManager) GetStatus

func (m *StateManager) GetStatus() ([]Status, []Status, []Status)

GetStatus returns three lists of Status objects, representing respectively pending jobs (waiting to run), running jobs, and completed jobs (ordered by decreasing timestamp).

func (*StateManager) WithStatus

func (m *StateManager) WithStatus(j Job, name string) Job

WithStatus tracks a job through its lifetime. The name is used for display purposes and needs not be unique.

type Status

type Status struct {
	ID          string
	Name        string
	Status      JobStatus
	StartedAt   time.Time
	CompletedAt time.Time
	Err         error
	Job         Job
}

Status holds information on the current state of a job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL