flow

package
v0.0.0-...-be4dc66 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2019 License: Apache-2.0, BSD-2-Clause, MIT, + 1 more Imports: 9 Imported by: 0

Documentation

Overview

Package flow provides utilities to construct a directed acyclic computational graph that is then executed and monitored with maximum parallelism.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Causes

func Causes(err error) *multierror.Error

Causes reports the causes of all Task errors of the given Flow error.

func Errors

func Errors(err error) *multierror.Error

Errors reports all wrapped Task errors of the given Flow error.

func WasCanceled

func WasCanceled(err error) bool

WasCanceled determines whether the given flow error was caused by cancellation.

Types

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow is a validated executable Graph.

func (*Flow) Len

func (f *Flow) Len() int

Len retrieves the amount of tasks in a Flow.

func (*Flow) Name

func (f *Flow) Name() string

Name retrieves the name of a flow.

func (*Flow) Run

func (f *Flow) Run(opts Opts) error

Run starts an execution of a Flow. It blocks until the Flow has finished and returns the error, if any.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is a builder for a Flow.

func NewGraph

func NewGraph(name string) *Graph

NewGraph returns a new Graph with the given name.

func (*Graph) Add

func (g *Graph) Add(task Task) TaskID

Add adds the given Task to the graph. This panics if - There is already a Task present with the same name - One of the dependencies of the Task is not present

func (*Graph) Compile

func (g *Graph) Compile() *Flow

Compile compiles the graph into an executable Flow.

func (*Graph) Name

func (g *Graph) Name() string

Name returns the name of a graph.

type Opts

type Opts struct {
	Logger           logrus.FieldLogger
	ProgressReporter func(stats *Stats)
	Context          context.Context
}

Opts are options for a Flow execution. If they are not set, they are left blank and don't affect the Flow.

type ProgressReporter

type ProgressReporter func(*Stats)

ProgressReporter is continuously called on progress in a flow.

type RecoverFn

type RecoverFn func(ctx context.Context, err error) error

RecoverFn is a function that can recover an error.

type Stats

type Stats struct {
	All       TaskIDs
	Succeeded TaskIDs
	Failed    TaskIDs
	Running   TaskIDs
	Pending   TaskIDs
}

Stats are the statistics of a Flow execution.

func InitialStats

func InitialStats(all TaskIDs) *Stats

InitialStats creates a new Stats object with the given set of initial TaskIDs. The initial TaskIDs are added to all TaskIDs as well as to the pending ones.

func (*Stats) Copy

func (s *Stats) Copy() *Stats

Copy deeply copies a Stats object.

func (*Stats) ProgressPercent

func (s *Stats) ProgressPercent() int

ProgressPercent retrieves the progress of a Flow execution in percent.

type Task

type Task struct {
	Name         string
	Fn           TaskFn
	Dependencies TaskIDs
}

Task is a unit of work. It has a name, a payload function and a set of dependencies. A is only started once all its dependencies have been completed successfully.

func (*Task) Spec

func (t *Task) Spec() *TaskSpec

Spec returns the TaskSpec of a task.

type TaskFn

type TaskFn func(ctx context.Context) error

TaskFn is a payload function of a task.

var EmptyTaskFn TaskFn = func(ctx context.Context) error { return nil }

EmptyTaskFn is a TaskFn that does nothing (returns nil).

func SimpleTaskFn

func SimpleTaskFn(f func() error) TaskFn

SimpleTaskFn converts the given function to a TaskFn, disrespecting any context.Context it is being given. deprecated: Only used during transition period. Do not use for new functions.

func (TaskFn) DoIf

func (t TaskFn) DoIf(condition bool) TaskFn

DoIf returns a TaskFn that will be executed if the condition is true when it is called. Otherwise, it will do nothing when called.

func (TaskFn) Recover

func (t TaskFn) Recover(recoverFn RecoverFn) TaskFn

Recover creates a new TaskFn that recovers an error with the given RecoverFn.

func (TaskFn) RecoverTimeout

func (t TaskFn) RecoverTimeout(recoverFn RecoverFn) TaskFn

RecoverTimeout creates a new TaskFn that recovers an error that satisfies `utils.IsTimedOut` with the given RecoverFn.

func (TaskFn) Retry

func (t TaskFn) Retry(interval time.Duration) TaskFn

Retry returns a TaskFn that is retried until the timeout is reached.

func (TaskFn) RetryUntilTimeout

func (t TaskFn) RetryUntilTimeout(interval, timeout time.Duration) TaskFn

RetryUntilTimeout returns a TaskFn that is retried until the timeout is reached.

func (TaskFn) SkipIf

func (t TaskFn) SkipIf(condition bool) TaskFn

SkipIf returns a TaskFn that does nothing if the condition is true, otherwise the function will be executed once called.

func (TaskFn) ToRecoverFn

func (t TaskFn) ToRecoverFn() RecoverFn

ToRecoverFn converts the TaskFn to a RecoverFn that ignores the incoming error.

type TaskID

type TaskID string

TaskID is an id of a task.

func (TaskID) TaskIDs

func (t TaskID) TaskIDs() []TaskID

TaskIDs retrieves this TaskID as a singleton slice.

type TaskIDSlice

type TaskIDSlice []TaskID

TaskIDSlice is a slice of TaskIDs.

func (TaskIDSlice) Len

func (t TaskIDSlice) Len() int

func (TaskIDSlice) Less

func (t TaskIDSlice) Less(i1, i2 int) bool

func (TaskIDSlice) Swap

func (t TaskIDSlice) Swap(i1, i2 int)

func (TaskIDSlice) TaskIDs

func (t TaskIDSlice) TaskIDs() []TaskID

TaskIDs returns this as a slice of TaskIDs.

type TaskIDer

type TaskIDer interface {
	// TaskIDs reports all TaskIDs of this TaskIDer.
	TaskIDs() []TaskID
}

TaskIDer can produce a slice of TaskIDs. Default implementations of this are TaskIDs, TaskID and TaskIDSlice

type TaskIDs

type TaskIDs map[TaskID]struct{}

TaskIDs is a set of TaskID.

func NewTaskIDs

func NewTaskIDs(ids ...TaskIDer) TaskIDs

NewTaskIDs returns a new set of TaskIDs initialized to contain all TaskIDs of the given TaskIDers.

func (TaskIDs) Copy

func (t TaskIDs) Copy() TaskIDs

Copy makes a deep copy of this TaskIDs.

func (TaskIDs) Delete

func (t TaskIDs) Delete(iders ...TaskIDer)

Delete deletes the TaskIDs of all TaskIDers from this TaskIDs.

func (TaskIDs) Has

func (t TaskIDs) Has(id TaskID) bool

Has checks if the given TaskID is present in this set.

func (TaskIDs) Insert

func (t TaskIDs) Insert(iders ...TaskIDer)

Insert inserts the TaskIDs of all TaskIDers into this TaskIDs.

func (TaskIDs) Len

func (t TaskIDs) Len() int

Len returns the amount of TaskIDs this contains.

func (TaskIDs) List

func (t TaskIDs) List() TaskIDSlice

List returns the elements of this in an ordered slice.

func (TaskIDs) StringList

func (t TaskIDs) StringList() []string

StringList returns the elements of this in an ordered string slice.

func (TaskIDs) TaskIDs

func (t TaskIDs) TaskIDs() []TaskID

TaskIDs retrieves all TaskIDs as an unsorted slice.

func (TaskIDs) UnsortedList

func (t TaskIDs) UnsortedList() TaskIDSlice

UnsortedList returns the elements of this in an unordered slice.

func (TaskIDs) UnsortedStringList

func (t TaskIDs) UnsortedStringList() []string

UnsortedStringList returns the elements of this in an unordered string slice.

type TaskSpec

type TaskSpec struct {
	Fn           TaskFn
	Dependencies TaskIDs
}

TaskSpec is functional body of a Task, consisting only of the payload function and the dependencies of the Task.

type Tasks

type Tasks map[TaskID]*TaskSpec

Tasks is a mapping from TaskID to TaskSpec.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL