goflow

package
v1.1.1-0...-89f4513 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2021 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BLANK_MODIFIER = func(data []byte) ([]byte, error) { return data, nil }
)
View Source
var (
	// Execution specify a edge doesn't forwards a data
	// but rather mention a execution direction
	Execution = InvokeEdge()
)

Functions

This section is empty.

Types

type BranchOption

type BranchOption func(*BranchOptions)

func Aggregator

func Aggregator(aggregator sdk.Aggregator) BranchOption

Aggregator aggregates all outputs into one

func Forwarder

func Forwarder(forwarder sdk.Forwarder) BranchOption

Forwarder encodes request based on need for children vertex by default the data gets forwarded as it is

func InvokeEdge

func InvokeEdge() BranchOption

InvokeEdge denotes a edge doesn't forwards a data, but rather provides only an execution flow

type BranchOptions

type BranchOptions struct {
	// contains filtered or unexported fields
}

BranchOptions options for branching in DAG

type Context

type Context sdk.Context

type Dag

type Dag struct {
	// contains filtered or unexported fields
}

func NewDag

func NewDag() *Dag

NewDag creates a new dag separately from pipeline

func (*Dag) Append

func (currentDag *Dag) Append(dag *Dag)

Append generalizes a seperate dag by appending its properties into current dag. Provided dag should be mutually exclusive

func (*Dag) ConditionalBranch

func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition,
	options ...BranchOption) (conditiondags map[string]*Dag)

ConditionalBranch composites multiple dags as a sub-dag which executes for a conditions matched and returns the set of dags based on the condition passed

func (*Dag) Edge

func (currentDag *Dag) Edge(from, to string, opts ...BranchOption)

Edge adds a directed edge between two vertex as <from>-><to>

func (*Dag) ForEachBranch

func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)

ForEachBranch composites a sub-dag which executes for each value It returns the sub-dag that will be executed for each value

func (*Dag) Node

func (currentDag *Dag) Node(vertex string, options ...BranchOption) *Node

Node adds a new vertex by id

func (*Dag) SubDag

func (currentDag *Dag) SubDag(vertex string, dag *Dag)

SubDag composites a seperate dag as a node.

type DataStore

type DataStore sdk.DataStore

type FuncErrorHandler

type FuncErrorHandler func(error) error

FuncErrorHandler the error handler for OnFailure() options

type Modifier

type Modifier func([]byte, map[string][]string) ([]byte, error)

Modifier definition for Modify() call

type Node

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) AddOperation

func (node *Node) AddOperation(operation sdk.Operation) *Node

AddOperation adds an Operation to the given vertex

func (*Node) Apply

func (node *Node) Apply(id string, workload Modifier, opts ...Option) *Node

Apply adds a new workload to the given vertex

type Option

type Option func(*Options)

func OnFailure

func OnFailure(handler FuncErrorHandler) Option

OnFailure Specify a function failure handler

func WorkloadOption

func WorkloadOption(key string, value ...string) Option

WorkloadOption Specify a option parameter in a workload

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options options for operation execution

type ServiceOperation

type ServiceOperation struct {
	Id      string              // ID
	Mod     Modifier            // Modifier
	Options map[string][]string // The option as a input to workload

	FailureHandler FuncErrorHandler // The Failure handler of the operation
}

func (*ServiceOperation) Encode

func (operation *ServiceOperation) Encode() []byte

func (*ServiceOperation) Execute

func (operation *ServiceOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)

func (*ServiceOperation) GetId

func (operation *ServiceOperation) GetId() string

func (*ServiceOperation) GetOptions

func (operation *ServiceOperation) GetOptions() map[string][]string

func (*ServiceOperation) GetProperties

func (operation *ServiceOperation) GetProperties() map[string][]string

type StateStore

type StateStore sdk.StateStore

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

func GetWorkflow

func GetWorkflow(pipeline *sdk.Pipeline) *Workflow

GetWorkflow initiates a flow with a pipeline

func (*Workflow) Dag

func (flow *Workflow) Dag() *Dag

Dag provides the workflow dag object

func (*Workflow) Finally

func (flow *Workflow) Finally(handler sdk.PipelineHandler)

Finally sets an execution finish handler routine it will be called once the execution has finished with state either Success/Failure

func (*Workflow) GetPipeline

func (flow *Workflow) GetPipeline() *sdk.Pipeline

GetPipeline expose the underlying pipeline object

func (*Workflow) OnFailure

func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler)

OnFailure set a failure handler routine for the pipeline

func (*Workflow) SetDag

func (flow *Workflow) SetDag(dag *Dag)

SetDag apply a predefined dag, and override the default dag

func (*Workflow) SyncNode

func (flow *Workflow) SyncNode(options ...BranchOption) *Node

SyncNode adds a new vertex named Sync

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL