job

package
v0.0.0-...-f6d2f20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 11 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Cell = cell.Module(
	"jobs",
	"Managed background goroutines and timers",
	cell.Provide(
		newRegistry,
	),
)

Cell provides job.Registry which constructs job.Group's. Job groups automate a lot of the logic involved with lifecycle management of goroutines within a Hive Cell. Providing a context that is canceled on shutdown and making sure multiple goroutines properly shutdown takes a lot of boilerplate. Job groups make it easy to queue, spawn, and collect jobs with minimal boilerplate. The registry maintains references to all groups which will allow us to add automatic metrics collection and/or status reporting in the future.

Functions

func NewTrigger

func NewTrigger() *trigger

NewTrigger creates a new trigger, which can be used to trigger a timer job.

func WithLogger

func WithLogger(logger *slog.Logger) groupOpt

WithLogger replaces the default logger with the given logger, useful if you want to add certain fields to the logs created by the group/jobs.

func WithMetrics

func WithMetrics(metrics Metrics) groupOpt

func WithPprofLabels

func WithPprofLabels(pprofLabels pprof.LabelSet) groupOpt

WithPprofLabels adds pprof labels which will be added to the goroutines spawned for the jobs and thus included in the pprof profiles.

func WithRetry

func WithRetry(times int, backoff RetryBackoff) jobOneShotOpt

WithRetry option configures a one shot job to retry `times` amount of times. On each retry attempt the rate limiter is waited upon before making another attempt. If `times` is <0, then the job is retried forever.

func WithShutdown

func WithShutdown() jobOneShotOpt

WithShutdown option configures a one shot job to shutdown the whole hive if the job returns an error. If the WithRetry option is also configured, all retries must be exhausted before we trigger the shutdown.

func WithTrigger

func WithTrigger(trig Trigger) timerOpt

WithTrigger option allows a user to specify a trigger, which if triggered will invoke the function of a timer before the configured interval has expired.

Types

type ConstantBackoff

type ConstantBackoff time.Duration

func (ConstantBackoff) Wait

func (d ConstantBackoff) Wait() time.Duration

type ExponentialBackoff

type ExponentialBackoff struct {
	Min time.Duration
	Max time.Duration
	// contains filtered or unexported fields
}

func (*ExponentialBackoff) Wait

func (e *ExponentialBackoff) Wait() time.Duration

type Group

type Group interface {
	// Add append the job. If the group has not yet been started the job is queued, otherwise it is started
	// immediately.
	Add(...Job)

	// Scoped creates a scoped group, jobs added to this scoped group will appear as a sub-scope in the health reporter
	Scoped(name string) ScopedGroup

	// HookInterface is implemented to Start and Stop the jobs in the group.
	cell.HookInterface
}

Group aims to streamline the management of work within a cell. Group implements cell.HookInterface and takes care of proper start and stop behavior as expected by hive. A group allows you to add multiple types of jobs which different kinds of logic. No matter the job type, the function provided to is always called with a context which is bound to the lifecycle of the cell.

type Job

type Job interface {
	// contains filtered or unexported methods
}

Job in an interface that describes a unit of work which can be added to a Group. This interface contains unexported methods and thus can only be implemented by functions in this package such as OneShot, Timer, or Observer.

func Observer

func Observer[T any](name string, fn ObserverFunc[T], observable stream.Observable[T], opts ...observerOpt[T]) Job

AddObserver adds an observer job to the group. Observer jobs invoke the given `fn` for each item observed on `observable`. If the `observable` completes, the job stops. The context given to the observable is also canceled once the group stops.

func OneShot

func OneShot(name string, fn OneShotFunc, opts ...jobOneShotOpt) Job

OneShot creates a "one shot" job which can be added to a Group. The function passed to a one shot job is invoked once at startup. It can live for the entire lifetime of the group or exit early depending on its task. If it returns an error, it can optionally be retried if the WithRetry option. If retries are not configured or all retries failed as well, a shutdown of the hive can be triggered by specifying the WithShutdown option.

The given function is expected to exit as soon as the context given to it expires, this is especially important for blocking or long running jobs.

func Timer

func Timer(name string, fn TimerFunc, interval time.Duration, opts ...timerOpt) Job

Timer creates a timer job which can be added to a Group. Timer jobs invoke the given function at the specified interval. Timer jobs are particularly useful to implement periodic syncs and cleanup actions. Timer jobs can optionally be triggered by an external Trigger with the WithTrigger option. This trigger can for example be passed between cells or between jobs in the same cell to allow for an additional invocation of the function.

The interval between invocations is counted from the start of the last invocation. If the `fn` takes longer than the interval, its next invocation is not delayed. The `fn` is expected to stop as soon as the context passed to it expires. This is especially important for long running functions. The signal created by a Trigger is coalesced so multiple calls to trigger before the invocation takes place can result in just a single invocation.

type Metrics

type Metrics interface {
	JobError(name string, err error)
	OneShotRunDuration(name string, duration time.Duration)
	TimerRunDuration(name string, duration time.Duration)
	ObserverRunDuration(name string, duration time.Duration)
}

type NopMetrics

type NopMetrics struct{}

func (NopMetrics) JobError

func (NopMetrics) JobError(name string, err error)

JobError implements Metrics.

func (NopMetrics) ObserverRunDuration

func (NopMetrics) ObserverRunDuration(name string, duration time.Duration)

ObserverRunDuration implements Metrics.

func (NopMetrics) OneShotRunDuration

func (NopMetrics) OneShotRunDuration(name string, duration time.Duration)

OneShotRunDuration implements Metrics.

func (NopMetrics) TimerRunDuration

func (NopMetrics) TimerRunDuration(name string, duration time.Duration)

TimerRunDuration implements Metrics.

type ObserverFunc

type ObserverFunc[T any] func(ctx context.Context, event T) error

ObserverFunc is the func type invoked by observer jobs. A ObserverFunc is expected to return as soon as ctx is canceled.

type OneShotFunc

type OneShotFunc func(ctx context.Context, health cell.Health) error

OneShotFunc is the function type which is invoked by a one shot job. The given function is expected to exit as soon as the context given to it expires, this is especially important for blocking or long running jobs.

type Registry

type Registry interface {
	// NewGroup creates a new group of jobs which can be started and stopped together as part of the cells lifecycle.
	// The provided scope is used to report health status of the jobs. A `cell.Scope` can be obtained via injection
	// an object with the correct scope is provided by the closest `cell.Module`.
	NewGroup(health cell.Health, opts ...groupOpt) Group
}

A Registry creates Groups, it maintains references to these groups for the purposes of collecting information centralized like metrics.

type RetryBackoff

type RetryBackoff interface {
	Wait() time.Duration
}

type ScopedGroup

type ScopedGroup interface {
	Add(jobs ...Job)
}

type TimerFunc

type TimerFunc func(ctx context.Context) error

TimerFunc is the func type invoked by a timer job. A TimerFunc is expected to return as soon as the ctx expires.

type Trigger

type Trigger interface {
	Trigger()
	// contains filtered or unexported methods
}

Trigger which can be used to trigger a timer job, trigger events are coalesced.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL