faas-flow: github.com/s8sg/faas-flow/sdk

package sdk

import "github.com/s8sg/faas-flow/sdk"


Package Files

context.go dag.go definition.go operation.go pipeline.go types.go


const (
    // StateSuccess denotes success state
    StateSuccess = "success"
    // StateFailure denotes failure state
    StateFailure = "failure"
    // StateOngoing denotes ongoing state
    StateOngoing = "ongoing"
const (
    DEPTH_SAME      = 0


var (
    ERR_NO_VERTEX = fmt.Errorf("dag has no vertex set")
    // ERR_CYCLIC denotes that dag has a cycle
    ERR_CYCLIC = fmt.Errorf("dag has cyclic dependency")
    // ERR_DUPLICATE_EDGE denotes that a dag edge is duplicate
    ERR_DUPLICATE_EDGE = fmt.Errorf("edge redefined")
    // ERR_DUPLICATE_VERTEX denotes that a dag edge is duplicate
    ERR_DUPLICATE_VERTEX = fmt.Errorf("vertex redefined")
    // ERR_MULTIPLE_START denotes that a dag has more than one start point
    ERR_MULTIPLE_START = fmt.Errorf("only one start vertex is allowed")
    // ERR_RECURSIVE_DEP denotes that dag has a recursive dependecy
    ERR_RECURSIVE_DEP = fmt.Errorf("dag has recursive dependency")
    // Default forwarder
    DefaultForwarder = func(data []byte) []byte { return data }

func GetPipelineDefinition Uses

func GetPipelineDefinition(pipeline *Pipeline) string

GetPipelineDefinition generate pipeline DAG defintion as a json

type Aggregator Uses

type Aggregator func(map[string][]byte) ([]byte, error)

Aggregator definition for the data aggregator of nodes

type BlankOperation Uses

type BlankOperation struct {

func (*BlankOperation) Encode Uses

func (ops *BlankOperation) Encode() []byte

func (*BlankOperation) Execute Uses

func (ops *BlankOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)

func (*BlankOperation) GetId Uses

func (ops *BlankOperation) GetId() string

func (*BlankOperation) GetProperties Uses

func (ops *BlankOperation) GetProperties() map[string][]string

type Condition Uses

type Condition func([]byte) []string

Condition definition for the condition function

type Context Uses

type Context struct {
    Query url.Values // provides request Query
    State string     // state of the request
    Name  string     // name of the faas-flow

    NodeInput map[string][]byte // stores inputs form each node
    // contains filtered or unexported fields

Context execution context and execution state

func CreateContext Uses

func CreateContext(id string, node string, name string,
    dstore DataStore) *Context

CreateContext create request context (used by template)

func (*Context) Del Uses

func (context *Context) Del(key string) error

Del deletes a value from the context using DataStore

func (*Context) Get Uses

func (context *Context) Get(key string) (interface{}, error)

Get retrieve a value from the context using DataStore

func (*Context) GetBool Uses

func (context *Context) GetBool(key string) bool

GetBool retrieve a boolean value from the context using DataStore

func (*Context) GetBytes Uses

func (context *Context) GetBytes(key string) []byte

GetBytes retrieve a byte array from the context using DataStore

func (*Context) GetInt Uses

func (context *Context) GetInt(key string) int

GetInt retrieve a integer value from the context using DataStore

func (*Context) GetNode Uses

func (context *Context) GetNode() string

GetPhase return the node no

func (*Context) GetRequestId Uses

func (context *Context) GetRequestId() string

GetRequestId returns the request id

func (*Context) GetString Uses

func (context *Context) GetString(key string) string

GetString retrieve a string value from the context using DataStore

func (*Context) Set Uses

func (context *Context) Set(key string, data interface{}) error

Set put a value in the context using DataStore

type Dag Uses

type Dag struct {
    Id string
    // contains filtered or unexported fields

Dag The whole dag

func NewDag Uses

func NewDag() *Dag

NewDag Creates a Dag

func (*Dag) AddEdge Uses

func (this *Dag) AddEdge(from, to string) error

AddEdge add a directed edge as (from)->(to) If vertex doesn't exists creates them

func (*Dag) AddVertex Uses

func (this *Dag) AddVertex(id string, operations []Operation) *Node

AddVertex create a vertex with id and operations

func (*Dag) Append Uses

func (this *Dag) Append(dag *Dag) error

Append appends another dag into an existing dag Its a way to define and reuse subdags append causes disconnected dag which must be linked with edge in order to execute

func (*Dag) GetEndNode Uses

func (this *Dag) GetEndNode() *Node

GetEndNode gets the end node

func (*Dag) GetInitialNode Uses

func (this *Dag) GetInitialNode() *Node

GetInitialNode gets the initial node

func (*Dag) GetNode Uses

func (this *Dag) GetNode(id string) *Node

GetNode get a node by Id

func (*Dag) GetNodes Uses

func (this *Dag) GetNodes(dynamicOption string) []string

GetNodes returns a list of nodes (including subdags) belong to the dag

func (*Dag) GetParentNode Uses

func (this *Dag) GetParentNode() *Node

GetParentNode returns parent node for a subdag

func (*Dag) HasBranch Uses

func (this *Dag) HasBranch() bool

HasBranch check if dag or its subdags has branch

func (*Dag) HasEdge Uses

func (this *Dag) HasEdge() bool

HasEdge check if dag or its subdags has edge

func (*Dag) IsExecutionFlow Uses

func (this *Dag) IsExecutionFlow() bool

IsExecutionFlow check if a dag doesn't use intermediate data

func (*Dag) Validate Uses

func (this *Dag) Validate() error

Validate validates a dag and all subdag as per faas-flow dag requirments A validated graph has only one initialNode and one EndNode set if a graph has more than one endnode, a seperate endnode gets added

type DagExporter Uses

type DagExporter struct {
    Id               string                   `json:"id"`
    StartNode        string                   `json:"start-node"`
    EndNode          string                   `json:"end-node"`
    HasBranch        bool                     `json:"has-branch"`
    HasEdge          bool                     `json:"has-edge"`
    ExecutionOnlyDag bool                     `json:"exec-only-dag"`
    Nodes            map[string]*NodeExporter `json:"nodes"`

    IsValid         bool   `json:"is-valid"`
    ValidationError string `json:"validation-error,omitempty"`

type DataStore Uses

type DataStore interface {
    // Configure the DaraStore with flow name and request ID
    Configure(flowName string, requestId string)
    // Initialize the DataStore (called only once in a request span)
    Init() error
    // Set store a value for key, in failure returns error
    Set(key string, value []byte) error
    // Get retrieves a value by key, if failure returns error
    Get(key string) ([]byte, error)
    // Del deletes a value by a key
    Del(key string) error
    // Cleanup all the resources in DataStore
    Cleanup() error

DataStore for Storing Data

type EventHandler Uses

type EventHandler interface {
    // Configure the EventHandler with flow name and request ID
    Configure(flowName string, requestId string)
    // Initialize an EventHandler (called only once in a request span)
    Init() error
    // ReportRequestStart report a start of request
    ReportRequestStart(requestId string)
    // ReportRequestEnd reports an end of request
    ReportRequestEnd(requestId string)
    // ReportRequestFailure reports a failure of a request with error
    ReportRequestFailure(requestId string, err error)
    // ReportExecutionForward report that an execution is forwarded
    ReportExecutionForward(nodeId string, requestId string)
    // ReportExecutionContinuation report that an execution is being continued
    ReportExecutionContinuation(requestId string)
    // ReportNodeStart report a start of a Node execution
    ReportNodeStart(nodeId string, requestId string)
    // ReportNodeStart report an end of a node execution
    ReportNodeEnd(nodeId string, requestId string)
    // ReportNodeFailure report a Node execution failure with error
    ReportNodeFailure(nodeId string, requestId string, err error)
    // ReportOperationStart reports start of an operation
    ReportOperationStart(operationId string, nodeId string, requestId string)
    // ReportOperationEnd reports an end of an operation
    ReportOperationEnd(operationId string, nodeId string, requestId string)
    // ReportOperationFailure reports failure of an operation with error
    ReportOperationFailure(operationId string, nodeId string, requestId string, err error)
    // Flush flush the reports

EventHandler handle flow events

type ForEach Uses

type ForEach func([]byte) map[string][]byte

ForEach definition for the foreach function

type Forwarder Uses

type Forwarder func([]byte) []byte

Forwarder definition for the data forwarder of nodes

type Logger Uses

type Logger interface {
    // Configure configure a logger with flowname and requestID
    Configure(flowName string, requestId string)
    // Init initialize a logger
    Init() error
    // Log logs a flow log
    Log(str string)

Logger logs the flow logs

type Node Uses

type Node struct {
    Id string // The id of the vertex
    // contains filtered or unexported fields

Node The vertex

func (*Node) AddAggregator Uses

func (this *Node) AddAggregator(aggregator Aggregator)

AddAggregator add a aggregator to a node

func (*Node) AddCondition Uses

func (this *Node) AddCondition(condition Condition)

AddCondition add a condition to a node

func (*Node) AddConditionalDag Uses

func (this *Node) AddConditionalDag(condition string, dag *Dag)

AddConditionalDag adds conditional dag to node

func (*Node) AddForEach Uses

func (this *Node) AddForEach(foreach ForEach)

AddForEach add a aggregator to a node

func (*Node) AddForEachDag Uses

func (this *Node) AddForEachDag(subDag *Dag) error

AddForEachDag adds a foreach subdag to the node

func (*Node) AddForwarder Uses

func (this *Node) AddForwarder(children string, forwarder Forwarder)

AddForwarder adds a forwarder for a specific children

func (*Node) AddOperation Uses

func (this *Node) AddOperation(operation Operation)

AddOperation adds an operation

func (*Node) AddSubAggregator Uses

func (this *Node) AddSubAggregator(aggregator Aggregator)

AddSubAggregator add a foreach aggregator to a node

func (*Node) AddSubDag Uses

func (this *Node) AddSubDag(subDag *Dag) error

AddSubDag adds a subdag to the node

func (*Node) Children Uses

func (this *Node) Children() []*Node

Children get all children node for a node

func (*Node) Dependency Uses

func (this *Node) Dependency() []*Node

Dependency get all dependency node for a node

func (*Node) Dynamic Uses

func (this *Node) Dynamic() bool

Dynamic checks if the node is dynamic

func (*Node) DynamicIndegree Uses

func (this *Node) DynamicIndegree() int

DynamicIndegree returns the no of dynamic input in a node

func (*Node) GetAggregator Uses

func (this *Node) GetAggregator() Aggregator

GetAggregator get a aggregator from a node

func (*Node) GetAllConditionalDags Uses

func (this *Node) GetAllConditionalDags() map[string]*Dag

GetAllConditionalDags get all the subdags for all conditions

func (*Node) GetCondition Uses

func (this *Node) GetCondition() Condition

GetCondition get the condition function

func (*Node) GetConditionalDag Uses

func (this *Node) GetConditionalDag(condition string) *Dag

GetConditionalDag get the sundag for a specific condition

func (*Node) GetForEach Uses

func (this *Node) GetForEach() ForEach

GetForEach get the foreach function

func (*Node) GetForwarder Uses

func (this *Node) GetForwarder(children string) Forwarder

GetForwarder gets a forwarder for a children

func (*Node) GetSubAggregator Uses

func (this *Node) GetSubAggregator() Aggregator

GetSubAggregator gets the subaggregator for condition and foreach

func (*Node) GetUniqueId Uses

func (this *Node) GetUniqueId() string

GetUniqueId returns a unique ID of the node

func (*Node) Indegree Uses

func (this *Node) Indegree() int

Indegree returns the no of input in a node

func (*Node) Operations Uses

func (this *Node) Operations() []Operation

Value provides the ordered list of functions for a node

func (*Node) Outdegree Uses

func (this *Node) Outdegree() int

Outdegree returns the no of output in a node

func (*Node) ParentDag Uses

func (this *Node) ParentDag() *Dag

ParentDag returns the parent dag of the node

func (*Node) SubDag Uses

func (this *Node) SubDag() *Dag

SubDag returns the subdag added in a node

type NodeExporter Uses

type NodeExporter struct {
    Id       string `json:"id"`
    Index    int    `json:"node-index"`
    UniqueId string `json:"unique-id"` // required to fetch intermediate data and state

    IsDynamic        bool `json:"is-dynamic"`
    IsCondition      bool `json:"is-condition"`
    IsForeach        bool `json:"is-foreach"`
    HasAggregator    bool `json:"has-aggregator"`
    HasSubAggregator bool `json:"has-sub-aggregator"`
    HasSubDag        bool `json:"has-subdag"`
    InDegree         int  `json:"in-degree"`
    OutDegree        int  `json:"out-degree"`

    SubDag          *DagExporter            `json:"sub-dag,omitempty"`
    ForeachDag      *DagExporter            `json:"foreach-dag,omitempty"`
    ConditionalDags map[string]*DagExporter `json:"conditional-dags,omitempty"`
    DynamicExecOnly bool                    `json:"dynamic-exec-only"`
    Operations      []*OperationExporter    `json:"operations,omitempty"`

    Children         []string        `json:"childrens,omitempty"`
    ChildrenExecOnly map[string]bool `json:"child-exec-only"`

type Operation Uses

type Operation interface {
    GetId() string
    Encode() []byte
    GetProperties() map[string][]string
    // Execute executes an operation, executor can pass configuration
    Execute([]byte, map[string]interface{}) ([]byte, error)

type OperationExporter Uses

type OperationExporter struct {
    Name       string              `json:"name"`
    Properties map[string][]string `json:"properties"`

type Pipeline Uses

type Pipeline struct {
    Dag *Dag `json:"-"` // Dag that will be executed

    ExecutionPosition map[string]string `json:"pipeline-execution-position"` // Denotes the node that is executing now
    ExecutionDepth    int               `json:"pipeline-execution-depth"`    // Denotes the depth of subgraph its executing

    CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id

    FailureHandler PipelineErrorHandler `json:"-"`
    Finally        PipelineHandler      `json:"-"`

func CreatePipeline Uses

func CreatePipeline() *Pipeline

CreatePipeline creates a faasflow pipeline

func (*Pipeline) ApplyState Uses

func (pipeline *Pipeline) ApplyState(state string)

ApplyState apply a state to a pipeline by from encoded JSON pipeline

func (*Pipeline) CountNodes Uses

func (pipeline *Pipeline) CountNodes() int

CountNodes counts the no of node added in the Pipeline Dag. It doesn't count subdags node

func (*Pipeline) GetAllNodesUniqueId Uses

func (pipeline *Pipeline) GetAllNodesUniqueId() []string

GetAllNodesId returns a recursive list of all nodes that belongs to the pipeline

func (*Pipeline) GetCurrentNodeDag Uses

func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag)

GetCurrentNodeDag returns the current node and current dag based on execution position

func (*Pipeline) GetInitialNodeId Uses

func (pipeline *Pipeline) GetInitialNodeId() string

GetInitialNodeId Get the very first node of the pipeline

func (*Pipeline) GetNodeExecutionUniqueId Uses

func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string

GetNodeExecutionUniqueId provide a ID that is unique in an execution

func (*Pipeline) GetState Uses

func (pipeline *Pipeline) GetState() string

GetState get a state of a pipeline by encoding in JSON

func (*Pipeline) SetDag Uses

func (pipeline *Pipeline) SetDag(dag *Dag)

SetDag overrides the default dag

func (*Pipeline) UpdatePipelineExecutionPosition Uses

func (pipeline *Pipeline) UpdatePipelineExecutionPosition(depthAdjustment int, vertex string)

UpdatePipelineExecutionPosition updates pipeline execution position specified depthAdjustment and vertex denotes how the ExecutionPosition must be altered

type PipelineErrorHandler Uses

type PipelineErrorHandler func(error) ([]byte, error)

PipelineErrorHandler the error handler OnFailure() registration on pipeline

type PipelineHandler Uses

type PipelineHandler func(string)

PipelineHandler definition for the Finally() registration on pipeline

type StateStore Uses

type StateStore interface {
    // Configure the StateStore with flow name and request ID
    Configure(flowName string, requestId string)
    // Initialize the StateStore (called only once in a request span)
    Init() error
    // Set a value (override existing, or create one)
    Set(key string, value string) error
    // Get a value
    Get(key string) (string, error)
    // Compare and Update a value
    Update(key string, oldValue string, newValue string) error
    // Cleanup all the resources in StateStore (called only once in a request span)
    Cleanup() error

StateStore for saving execution state



