gardener: Index | Files

package flow

import ""

Package flow provides utilities to construct a directed acyclic computational graph that is then executed and monitored with maximum parallelism.


Package Files

flow.go graph.go taskfn.go taskid.go


var (
    // ContextWithTimeout is context.WithTimeout. Exposed for testing.
    ContextWithTimeout = context.WithTimeout

func Causes Uses

func Causes(err error) *multierror.Error

Causes reports the causes of all Task errors of the given Flow error.

func Errors Uses

func Errors(err error) *multierror.Error

Errors reports all wrapped Task errors of the given Flow error.

func WasCanceled Uses

func WasCanceled(err error) bool

WasCanceled determines whether the given flow error was caused by cancellation.

type ErrorCleaner Uses

type ErrorCleaner func(context.Context, string)

ErrorCleaner is called when a task which errored during the previous reconciliation phase completes with success

type Flow Uses

type Flow struct {
    // contains filtered or unexported fields

Flow is a validated executable Graph.

func (*Flow) Len Uses

func (f *Flow) Len() int

Len retrieves the amount of tasks in a Flow.

func (*Flow) Name Uses

func (f *Flow) Name() string

Name retrieves the name of a flow.

func (*Flow) Run Uses

func (f *Flow) Run(opts Opts) error

Run starts an execution of a Flow. It blocks until the Flow has finished and returns the error, if any.

type Graph Uses

type Graph struct {
    // contains filtered or unexported fields

Graph is a builder for a Flow.

func NewGraph Uses

func NewGraph(name string) *Graph

NewGraph returns a new Graph with the given name.

func (*Graph) Add Uses

func (g *Graph) Add(task Task) TaskID

Add adds the given Task to the graph. This panics if - There is already a Task present with the same name - One of the dependencies of the Task is not present

func (*Graph) Compile Uses

func (g *Graph) Compile() *Flow

Compile compiles the graph into an executable Flow.

func (*Graph) Name Uses

func (g *Graph) Name() string

Name returns the name of a graph.

type Opts Uses

type Opts struct {
    Logger           logrus.FieldLogger
    ProgressReporter func(ctx context.Context, stats *Stats)
    ErrorCleaner     func(ctx context.Context, taskID string)
    ErrorContext     *utilerrors.ErrorContext
    Context          context.Context

Opts are options for a Flow execution. If they are not set, they are left blank and don't affect the Flow.

type ProgressReporter Uses

type ProgressReporter func(context.Context, *Stats)

ProgressReporter is continuously called on progress in a flow.

type RecoverFn Uses

type RecoverFn func(ctx context.Context, err error) error

RecoverFn is a function that can recover an error.

type Stats Uses

type Stats struct {
    All       TaskIDs
    Succeeded TaskIDs
    Failed    TaskIDs
    Running   TaskIDs
    Pending   TaskIDs

Stats are the statistics of a Flow execution.

func InitialStats Uses

func InitialStats(all TaskIDs) *Stats

InitialStats creates a new Stats object with the given set of initial TaskIDs. The initial TaskIDs are added to all TaskIDs as well as to the pending ones.

func (*Stats) Copy Uses

func (s *Stats) Copy() *Stats

Copy deeply copies a Stats object.

func (*Stats) ProgressPercent Uses

func (s *Stats) ProgressPercent() int32

ProgressPercent retrieves the progress of a Flow execution in percent.

type Task Uses

type Task struct {
    Name         string
    Fn           TaskFn
    Dependencies TaskIDs

Task is a unit of work. It has a name, a payload function and a set of dependencies. A is only started once all its dependencies have been completed successfully.

func (*Task) Spec Uses

func (t *Task) Spec() *TaskSpec

Spec returns the TaskSpec of a task.

type TaskFn Uses

type TaskFn func(ctx context.Context) error

TaskFn is a payload function of a task.

var EmptyTaskFn TaskFn = func(ctx context.Context) error { return nil }

EmptyTaskFn is a TaskFn that does nothing (returns nil).

func Parallel Uses

func Parallel(fns ...TaskFn) TaskFn

Parallel runs the given TaskFns in parallel, collecting their errors in a multierror.

func ParallelExitOnError Uses

func ParallelExitOnError(fns ...TaskFn) TaskFn

ParallelExitOnError runs the given TaskFns in parallel and stops execution as soon as one TaskFn returns an error.

func Sequential Uses

func Sequential(fns ...TaskFn) TaskFn

Sequential runs the given TaskFns sequentially.

func SimpleTaskFn Uses

func SimpleTaskFn(f func() error) TaskFn

SimpleTaskFn converts the given function to a TaskFn, disrespecting any context.Context it is being given. deprecated: Only used during transition period. Do not use for new functions.

func (TaskFn) DoIf Uses

func (t TaskFn) DoIf(condition bool) TaskFn

DoIf returns a TaskFn that will be executed if the condition is true when it is called. Otherwise, it will do nothing when called.

func (TaskFn) Recover Uses

func (t TaskFn) Recover(recoverFn RecoverFn) TaskFn

Recover creates a new TaskFn that recovers an error with the given RecoverFn.

func (TaskFn) Retry Uses

func (t TaskFn) Retry(interval time.Duration) TaskFn

Retry returns a TaskFn that is retried until the timeout is reached. Deprecated: Retry handling should be done in the function itself, if necessary.

func (TaskFn) RetryUntilTimeout Uses

func (t TaskFn) RetryUntilTimeout(interval, timeout time.Duration) TaskFn

RetryUntilTimeout returns a TaskFn that is retried until the timeout is reached.

func (TaskFn) SkipIf Uses

func (t TaskFn) SkipIf(condition bool) TaskFn

SkipIf returns a TaskFn that does nothing if the condition is true, otherwise the function will be executed once called.

func (TaskFn) Timeout Uses

func (t TaskFn) Timeout(timeout time.Duration) TaskFn

Timeout returns a TaskFn that is bound to a context which times out.

func (TaskFn) ToRecoverFn Uses

func (t TaskFn) ToRecoverFn() RecoverFn

ToRecoverFn converts the TaskFn to a RecoverFn that ignores the incoming error.

type TaskID Uses

type TaskID string

TaskID is an id of a task.

func (TaskID) TaskIDs Uses

func (t TaskID) TaskIDs() []TaskID

TaskIDs retrieves this TaskID as a singleton slice.

type TaskIDSlice Uses

type TaskIDSlice []TaskID

TaskIDSlice is a slice of TaskIDs.

func (TaskIDSlice) Len Uses

func (t TaskIDSlice) Len() int

func (TaskIDSlice) Less Uses

func (t TaskIDSlice) Less(i1, i2 int) bool

func (TaskIDSlice) Swap Uses

func (t TaskIDSlice) Swap(i1, i2 int)

func (TaskIDSlice) TaskIDs Uses

func (t TaskIDSlice) TaskIDs() []TaskID

TaskIDs returns this as a slice of TaskIDs.

type TaskIDer Uses

type TaskIDer interface {
    // TaskIDs reports all TaskIDs of this TaskIDer.
    TaskIDs() []TaskID

TaskIDer can produce a slice of TaskIDs. Default implementations of this are TaskIDs, TaskID and TaskIDSlice

type TaskIDs Uses

type TaskIDs map[TaskID]struct{}

TaskIDs is a set of TaskID.

func NewTaskIDs Uses

func NewTaskIDs(ids ...TaskIDer) TaskIDs

NewTaskIDs returns a new set of TaskIDs initialized to contain all TaskIDs of the given TaskIDers.

func (TaskIDs) Copy Uses

func (t TaskIDs) Copy() TaskIDs

Copy makes a deep copy of this TaskIDs.

func (TaskIDs) Delete Uses

func (t TaskIDs) Delete(iders ...TaskIDer) TaskIDs

Delete deletes the TaskIDs of all TaskIDers from this TaskIDs.

func (TaskIDs) Has Uses

func (t TaskIDs) Has(id TaskID) bool

Has checks if the given TaskID is present in this set.

func (TaskIDs) Insert Uses

func (t TaskIDs) Insert(iders ...TaskIDer) TaskIDs

Insert inserts the TaskIDs of all TaskIDers into this TaskIDs.

func (TaskIDs) InsertIf Uses

func (t TaskIDs) InsertIf(condition bool, iders ...TaskIDer) TaskIDs

InsertIf inserts the TaskIDs of all TaskIDers into this TaskIDs if the given condition evaluates to true.

func (TaskIDs) Len Uses

func (t TaskIDs) Len() int

Len returns the amount of TaskIDs this contains.

func (TaskIDs) List Uses

func (t TaskIDs) List() TaskIDSlice

List returns the elements of this in an ordered slice.

func (TaskIDs) StringList Uses

func (t TaskIDs) StringList() []string

StringList returns the elements of this in an ordered string slice.

func (TaskIDs) TaskIDs Uses

func (t TaskIDs) TaskIDs() []TaskID

TaskIDs retrieves all TaskIDs as an unsorted slice.

func (TaskIDs) UnsortedList Uses

func (t TaskIDs) UnsortedList() TaskIDSlice

UnsortedList returns the elements of this in an unordered slice.

func (TaskIDs) UnsortedStringList Uses

func (t TaskIDs) UnsortedStringList() []string

UnsortedStringList returns the elements of this in an unordered string slice.

type TaskSpec Uses

type TaskSpec struct {
    Fn           TaskFn
    Dependencies TaskIDs

TaskSpec is functional body of a Task, consisting only of the payload function and the dependencies of the Task.

type Tasks Uses

type Tasks map[TaskID]*TaskSpec

Tasks is a mapping from TaskID to TaskSpec.

Package flow imports 11 packages (graph) and is imported by 13 packages. Updated 2020-08-13. Refresh now. Tools for package owners.