Documentation ¶
Index ¶
- Constants
- Variables
- type Context
- func (context *Context) Del(key string) error
- func (context *Context) Get(key string) (interface{}, error)
- func (context *Context) GetBool(key string) (bool, error)
- func (context *Context) GetBytes(key string) ([]byte, error)
- func (context *Context) GetInt(key string) (int, error)
- func (context *Context) GetNode() string
- func (context *Context) GetRequestId() string
- func (context *Context) GetString(key string) (string, error)
- func (context *Context) Set(key string, data interface{}) error
- type DagFlow
- func (this *DagFlow) AddCallback(vertex string, url string, opts ...Option)
- func (this *DagFlow) AddConditionalDags(vertex string, subdags map[string]*DagFlow, condition Option) error
- func (this *DagFlow) AddEdge(from, to string, opts ...Option) error
- func (this *DagFlow) AddForEachDag(vertex string, dag *DagFlow, foreach Option) error
- func (this *DagFlow) AddFunction(vertex string, function string, opts ...Option)
- func (this *DagFlow) AddModifier(vertex string, mod sdk.Modifier)
- func (this *DagFlow) AddSubDag(vertex string, dag *DagFlow) error
- func (this *DagFlow) AddVertex(vertex string, opts ...Option)
- func (this *DagFlow) AppendDag(dag *DagFlow) error
- type DataStore
- type Option
- func Aggregator(aggregator sdk.Aggregator) Option
- func Condition(condition sdk.Condition, aggregator sdk.Aggregator) Option
- func ExecutionOnly() Option
- func ForEach(foreach sdk.ForEach, aggregator sdk.Aggregator) Option
- func Forwarder(forwarder sdk.Forwarder) Option
- func Header(key, value string) Option
- func OnFailure(handler sdk.FuncErrorHandler) Option
- func OnReponse(handler sdk.RespHandler) Option
- func Query(key string, value ...string) Option
- func SyncCall() Option
- type Options
- type StateStore
- type Workflow
- func (flow *Workflow) Apply(function string, opts ...Option) *Workflow
- func (flow *Workflow) Callback(url string, opts ...Option) *Workflow
- func (flow *Workflow) ExecuteDag(dag *DagFlow) error
- func (flow *Workflow) Finally(handler sdk.PipelineHandler) *Workflow
- func (flow *Workflow) GetPipeline() *sdk.Pipeline
- func (flow *Workflow) Modify(mod sdk.Modifier) *Workflow
- func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler) *Workflow
Constants ¶
const ( // StateSuccess denotes success state StateSuccess = "success" // StateFailure denotes failure state StateFailure = "failure" // StateOngoing denotes onging satte StateOngoing = "ongoing" )
Variables ¶
var ( // Sync can be used instead of SyncCall Sync = SyncCall() // Execution specify a edge doesn't forwards a data // but rather mention a execution direction Execution = ExecutionOnly() )
var (
INVAL_OPTION = fmt.Errorf("invalid option specified")
)
Functions ¶
This section is empty.
Types ¶
type Context ¶
type Context struct { Query url.Values // provides request Query State string // state of the request Name string // name of the faas-flow NodeInput map[string][]byte // stores inputs form each node // contains filtered or unexported fields }
Context execution context and execution state
func CreateContext ¶
CreateContext create request context (used by template)
func (*Context) GetRequestId ¶
GetRequestId returns the request id
type DagFlow ¶
type DagFlow struct {
// contains filtered or unexported fields
}
func (*DagFlow) AddCallback ¶
AddCallback adds a new callback to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created
func (*DagFlow) AddConditionalDags ¶
func (this *DagFlow) AddConditionalDags(vertex string, subdags map[string]*DagFlow, condition Option) error
AddConditionalDag composites multiple seperate dag as a subdag which executes for a conditions matched If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited. condition: Condition Option
func (*DagFlow) AddForEachDag ¶
AddForEachDag composites a seperate dag as a subdag which executes for each value If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited. foreach: ForEach Option
func (*DagFlow) AddFunction ¶
AddFunction adds a new function to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created
func (*DagFlow) AddModifier ¶
AddModifier adds a new modifier to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created
func (*DagFlow) AddSubDag ¶
AddSubDag composites a seperate dag as a subdag to the given vertex. If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited.
type DataStore ¶
type DataStore interface { // Configure the DaraStore with flow name and request ID Configure(flowName string, requestId string) // Initialize the DataStore (called only once in a request span) Init() error // Set store a value for key, in failure returns error Set(key string, value string) error // Get retrives a value by key, if failure returns error Get(key string) (string, error) // Del delets a value by a key Del(key string) error // Cleanup all the resorces in DataStore Cleanup() error }
DataStore for Storing Data
type Option ¶
type Option func(*Options)
func Aggregator ¶
func Aggregator(aggregator sdk.Aggregator) Option
Aggregator aggregates multiple inputs to a node into one
func Condition ¶
func Condition(condition sdk.Condition, aggregator sdk.Aggregator) Option
Condition denotes the corresponding subdags will be executed for each condition matched aggregator aggregates all dags outputs into one
func ExecutionOnly ¶
func ExecutionOnly() Option
ExecutionOnly denotes a edge doesn't forwards a data, but rather provides only an execution direction
func ForEach ¶
func ForEach(foreach sdk.ForEach, aggregator sdk.Aggregator) Option
ForEach denotes the vertex will be executed in parralel for each value returned. aggregator aggregates all outputs into one
func OnFailure ¶
func OnFailure(handler sdk.FuncErrorHandler) Option
OnFailure Specify a function failure handler
func OnReponse ¶
func OnReponse(handler sdk.RespHandler) Option
OnResponse Specify a resp handler callback for function
type StateStore ¶
type StateStore interface { // Configure the StateStore with flow name and request ID Configure(flowName string, requestId string) // Initialize the StateStore (called only once in a request span) Init() error // create Vertexes for request // creates a map[<vertexId>]<Indegree Completion Count> Create(vertexs []string) error // Increment Vertex Indegree Completion // synchronously increment map[<vertexId>] Indegree Completion Count by 1 and return updated count IncrementCounter(vertex string) (int, error) // Set state of pipeline SetState(state bool) error // Get State of pipeline GetState() (bool, error) // Cleanup all the resorces in StateStore (called only once in a request span) Cleanup() error }
StateStore for saving execution state
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
func NewFaasflow ¶
NewFaasflow initiates a flow with a pipeline
func (*Workflow) Apply ¶
Apply apply a function with given name and options default call is async, provide Sync option to call synchronously
func (*Workflow) Callback ¶
Callback register a callback url as a part of pipeline definition One or more callback function can be placed for sending either partial pipeline data or after the pipeline completion
func (*Workflow) ExecuteDag ¶
ExecuteDag apply a predefined dag All operation inside dag are async returns error is dag is not valid Note: If executing dag chain gets overridden
func (*Workflow) Finally ¶
func (flow *Workflow) Finally(handler sdk.PipelineHandler) *Workflow
Finally sets an execution finish handler routine it will be called once the execution has finished with state either Success/Failure
func (*Workflow) GetPipeline ¶
GetPipeline expose the underlying pipeline object