Documentation ¶
Index ¶
- Variables
- type BranchOption
- type BranchOptions
- type Context
- type Dag
- func (currentDag *Dag) Append(dag *Dag)
- func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition, ...) (conditiondags map[string]*Dag)
- func (currentDag *Dag) Edge(from, to string, opts ...BranchOption)
- func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)
- func (currentDag *Dag) Node(vertex string, options ...BranchOption) *Node
- func (currentDag *Dag) SubDag(vertex string, dag *Dag)
- type DataStore
- type FuncErrorHandler
- type Modifier
- type Node
- type Option
- type Options
- type ServiceOperation
- func (operation *ServiceOperation) Encode() []byte
- func (operation *ServiceOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)
- func (operation *ServiceOperation) GetId() string
- func (operation *ServiceOperation) GetOptions() map[string][]string
- func (operation *ServiceOperation) GetProperties() map[string][]string
- type StateStore
- type Workflow
- func (flow *Workflow) Dag() *Dag
- func (flow *Workflow) Finally(handler sdk.PipelineHandler)
- func (flow *Workflow) GetPipeline() *sdk.Pipeline
- func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler)
- func (flow *Workflow) SetDag(dag *Dag)
- func (flow *Workflow) SyncNode(options ...BranchOption) *Node
Constants ¶
This section is empty.
Variables ¶
var (
BLANK_MODIFIER = func(data []byte) ([]byte, error) { return data, nil }
)
var ( // Execution specify a edge doesn't forwards a data // but rather mention a execution direction Execution = InvokeEdge() )
Functions ¶
This section is empty.
Types ¶
type BranchOption ¶
type BranchOption func(*BranchOptions)
func Aggregator ¶
func Aggregator(aggregator sdk.Aggregator) BranchOption
Aggregator aggregates all outputs into one
func Forwarder ¶
func Forwarder(forwarder sdk.Forwarder) BranchOption
Forwarder encodes request based on need for children vertex by default the data gets forwarded as it is
func InvokeEdge ¶
func InvokeEdge() BranchOption
InvokeEdge denotes a edge doesn't forwards a data, but rather provides only an execution flow
type BranchOptions ¶
type BranchOptions struct {
// contains filtered or unexported fields
}
BranchOptions options for branching in DAG
type Dag ¶
type Dag struct {
// contains filtered or unexported fields
}
func (*Dag) Append ¶
Append generalizes a seperate dag by appending its properties into current dag. Provided dag should be mutually exclusive
func (*Dag) ConditionalBranch ¶
func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition, options ...BranchOption) (conditiondags map[string]*Dag)
ConditionalBranch composites multiple dags as a sub-dag which executes for a conditions matched and returns the set of dags based on the condition passed
func (*Dag) Edge ¶
func (currentDag *Dag) Edge(from, to string, opts ...BranchOption)
Edge adds a directed edge between two vertex as <from>-><to>
func (*Dag) ForEachBranch ¶
func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)
ForEachBranch composites a sub-dag which executes for each value It returns the sub-dag that will be executed for each value
type FuncErrorHandler ¶
FuncErrorHandler the error handler for OnFailure() options
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
func (*Node) AddOperation ¶
AddOperation adds an Operation to the given vertex
type Option ¶
type Option func(*Options)
func OnFailure ¶
func OnFailure(handler FuncErrorHandler) Option
OnFailure Specify a function failure handler
func WorkloadOption ¶
WorkloadOption Specify a option parameter in a workload
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options options for operation execution
type ServiceOperation ¶
type ServiceOperation struct { Id string // ID Mod Modifier // Modifier Options map[string][]string // The option as a input to workload FailureHandler FuncErrorHandler // The Failure handler of the operation }
func (*ServiceOperation) Encode ¶
func (operation *ServiceOperation) Encode() []byte
func (*ServiceOperation) Execute ¶
func (operation *ServiceOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)
func (*ServiceOperation) GetId ¶
func (operation *ServiceOperation) GetId() string
func (*ServiceOperation) GetOptions ¶
func (operation *ServiceOperation) GetOptions() map[string][]string
func (*ServiceOperation) GetProperties ¶
func (operation *ServiceOperation) GetProperties() map[string][]string
type StateStore ¶
type StateStore sdk.StateStore
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
func GetWorkflow ¶
GetWorkflow initiates a flow with a pipeline
func (*Workflow) Finally ¶
func (flow *Workflow) Finally(handler sdk.PipelineHandler)
Finally sets an execution finish handler routine it will be called once the execution has finished with state either Success/Failure
func (*Workflow) GetPipeline ¶
GetPipeline expose the underlying pipeline object
func (*Workflow) OnFailure ¶
func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler)
OnFailure set a failure handler routine for the pipeline
func (*Workflow) SyncNode ¶
func (flow *Workflow) SyncNode(options ...BranchOption) *Node
SyncNode adds a new vertex named Sync