faasflow

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: MIT Imports: 9 Imported by: 0

README

Faas-flow - Function Composition for OpenFaaS

Build Status GoDoc OpenTracing Badge OpenFaaS

  • Pure              FaaS with OpenFaaS
  • Fast               Built with Go
  • Secured        With HMAC
  • Stateless      By design
  • Tracing         With open-tracing
  • Available      As faas-flow template

Faas-flow tower visualizes and monitors flow functions

Overview

Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about the internals

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
    flow.SyncNode().Apply("Func1").Apply("Func2")
    return
}

After building and deploying, it will give you an OpenFaaS function that orchestrates calling Func2 with the output of Func1

Use Cases

Faas-flow as a function composure provides the back-bone for building complex solutions and promote automation

Data Processing Pipeline

Faas-flow can orchestrate a pipeline with long and short running function performing ETL jobs without having to orchestrate them manually or maintaining a separate application. Faas-flow ensures the execution order of several functions running in parallel or dynamically and provides rich construct to aggregate results while maintaining the intermediate data

Application Orchestration Workflow

Functions are great for isolating certain functionalities of an application. Although one still need to call the functions, write workflow logic, handle parallel processing and retries on failures. Using Faas-flow you can combine multiple OpenFaaS functions with little codes while your workflow will scale up/down automatically to handle the load

Function Reusability

Fass-flow allows you to write function only focused on solving one problem without having to worry about the next. It makes function loosely coupled from the business logic promoting reusability. You can write the stateless function and use it across multiple applications, where Faas-flow maintains the execution state for individual workflow per requests

Pipeline Definition

By supplying a number of pipeline operators, the complex composition can be achieved with little work: alt overview

The above pipelines can be achieved with little, but powerfull code:

SYNC Chain

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
        flow.SyncNode().Apply("func1").Apply("func2").
                Modify(func(data []byte) ([]byte, error) {
                        // do something 
                        return data, nil
                })
        return
}

ASYNC Chain

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
        dag := flow.Dag()
        dag.Node("n1").Apply("func1")
        dag.Node("n2").Apply("func2").
                Modify(func(data []byte) ([]byte, error) {
                        // do something
                        return data, nil
                })
        dag.Node("n3").Apply("func4")
        dag.Edge("n1", "n2")
        dag.Edge("n2", "n3")
        return
}

PARALLEL Branching

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
        dag := flow.Dag()
        dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
                // do something
                return data, nil
        })
        dag.Node("n2").Apply("func1")
        dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) {
                // do something
                return data, nil
        })
        dag.Node("n4", faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
                // aggregate branch result data["n2"] and data["n3"]
                return []byte(""), nil
        }))

        dag.Edge("n1", "n2")
        dag.Edge("n1", "n3")
        dag.Edge("n2", "n4")
        dag.Edge("n3", "n4")
        return
}

DYNAMIC Branching

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
        dag := flow.Dag()
        dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
                // do something
                return data, nil
        })
        conditionalDags := dag.ConditionalBranch("C",
                []string{"c1", "c2"}, // possible conditions
                func(response []byte) []string {
                        // for each returned condition the corresponding branch will execute
                        // this function executes in the runtime of condition C
                        return []string{"c1", "c2"}
                },
                faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
                        // aggregate all dynamic branches results
                        return []byte(""), nil
                }),
        )

        conditionalDags["c2"].Node("n1").Apply("func1").Modify(func(data []byte) ([]byte, error) {
                // do something
                return data, nil
        })
        foreachDag := conditionalDags["c1"].ForEachBranch("F",
                func(data []byte) map[string][]byte {
                        // for each returned key in the hashmap a new branch will be executed
                        // this function executes in the runtime of foreach F
                        return map[string][]byte{"f1": data, "f2": data}
                },
                faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
                        // aggregate all dynamic branches results
                        return []byte(""), nil
                }),
        )
        foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) {
                // do something
                return data, nil
        })
        dag.Node("n2")
        dag.Edge("n1", "C")
        dag.Edge("C", "n2")
}

Full implementation of the above examples are available here

Faas-flow Design

The current design consideration is made based on the below goals

  1. Leverage the OpenFaaS platform
  2. Not to violate the notions of function
  3. Provide flexibility, scalability, and adaptability
Just as function as any other

Faas-flow is deployed and provisioned just like any other OpenFaaS function. It allows Faas-flow to take advantage of rich functionalities available on OpenFaaS. Faas-flow provide an OpenFaaS template (faas-flow) and just like any other OpenFaaS function it can be deployed with faas-cli
alt its a function

Adapter pattern for zero intrumenttaion in code

Faas-flow function follows the adapter pattern. Here the adaptee is the functions and the adapter is the flow. For each node execution, Faas-flow handle the calls to the functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is separated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completely independent of the details of the compositions
alt function is independent of composition

Aggregate pattern as chaining

Aggregation of separate function calls is done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to support more complex use cases alt aggregation

Event driven iteration

OpenFaaS uses Nats for event delivery and Faas-flow leverages OpenFaaS platform. Node execution in Faas-flow starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes have completed. The event carries the execution state and identifies the next node to execute. With events Faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed alt iteration

3rd party KV store for coordination

When executing branches, one node is dependent on more than one predecessor nodes. In that scenario, the event for completion is generated by coordination of earlier nodes. Like any distributed system the coordination is achieved via a centralized service. Faas-flow keeps the logic of the coordination controller inside of Faas-flow implementation and lets the user use any external synchronous KV store by implementing StateStore alt coordination

3rd party Storage for intermediate data

Results from function execution and intermediate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initializes, store, retrieve and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of Faas-flow implementation and lets the user use any external object storage by implementing DataStore alt storage

Faas-flow design is not fixed and like any good design, it is evolving. Please contribute to make it better.

Getting Started

This example implements a very simple flow to Greet

Get template

Pull faas-flow template with the faas-cli

faas template pull https://github.com/s8sg/faas-flow
Create new flow function

Create a new function using faas-flow template

faas new greet --lang faas-flow
Edit stack

Edit function stack file greet.yml

  greet:
    lang: faas-flow
    handler: ./greet
    image: greet:latest
    environment:
      read_timeout: 120 # A value larger than `max` of all execution times of Nodes
      write_timeout: 120 # A value larger than `max` of all execution times of Nodes
      exec_timeout: 0 # disable exec timeout
      write_debug: true
      combine_output: false
    environment_file:
      - flow.yml
Add configuration

Add a separate file flow.yml with faas-flow related configuration.

environment:
  gateway: "gateway:8080" # The address of OpenFaaS gateway, Faas-flow use this to forward completion event
  # gateway: "gateway.openfaas:8080" # For K8s 
  enable_tracing: false # tracing allow to trace internal node execution with opentracing
  enable_hmac: true # hmac adds an extra layer of security by validating the event source
Edit function defnition

Edit greet/handler.go and Update Define()

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
      flow.SyncNode().
	  Modify(func(data []byte) ([]byte, error) {
	  	result := "Hello " + string(data)
		return []byte(result), nil
	  })
      return nil
}
Build and Deploy

Build and deploy

faas build -f greet.yml
faas deploy -f greet.yml

This function will generate one Synchronous node

Modify("name") -> Hello name

All calls will be performed in one single execution of the flow function and result will be returned to the callee

Note: For flow that has more than one nodes, Faas-flow doesn't return any response. External storage or callback can be used to retrieve an async result

Invoke
echo "Adam" | faas invoke greet

Deploy FaaS-Flow Infra

FaaS-Flow infra allows to set up the components needed to run more advance workflows

Deploy in Kubernets
Deploy in Swarm

Request Tracking by ID

For each new request, faas-flow generates a unique Request Id for the flow. The same Id is used when logging

2018/08/13 07:51:59 [Request `bdojh7oi7u6bl8te4r0g`] Created
2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received

The assigned request Id is set on the response header X-Faas-Flow-Reqid One may provide custom request Id by setting X-Faas-Flow-Reqid in the request header

Request Tracing by Open-Tracing

Request tracing can be retrieved from trace_server once enabled. Tracing is the best way to monitor flows and execution status of each node for each request

Edit flow.yml

Enable tracing and add trace server as:

      enable_tracing: true
      trace_server: "jaegertracing:5775"
      # trace_server: "jaegertracing.faas-flow-infra:5775" # use this for kubernets
Start The Trace Server

jaeger (opentracing-1.x) is the tracing backend
Quick start with jaegertracing: https://www.jaegertracing.io/docs/1.8/getting-started/

Use faas-flow-tower

Retrive the requestID from X-Faas-Flow-Reqid header of response

Below is an example of tracing information for example-branching-in-Faas-flow in Faas-flow-tower
alt monitoring

Use of Callback

To receive a result of long running FaaSFlow request, you can specify the X-Faas-Flow-Callback-Url. FaaSFlow will invoked the callback URL with the final result and with the request ID set as X-Faas-Flow-Reqid in request Header. X-Callback-Url from OpenFaaS is not supported in FaaSFlow.

Pause, Resume or Stop Request

A request in faas-flow has three states :

  1. Running
  2. Paused
  3. Stopped

Faas-flow doesn't keep the state of a finished request

To pause a running request:

faas invoke <workflow_name> --query pause-flow=<request_id>

To resume a paused request

faas invoke <workflow_name> --query resume-flow=<request_id>

To stop an active (pasued/running) request

faas invoke <workflow_name> --query stop-flow=<request_id>

Use of context

Context can be used inside definition for differet usecases. Context provide verious information such as:
HttpQuery to retrivbe original request queries
State to get flow state
Node to get current node
along with that it wraps the DataStore to store data

Store data in context with DataStore

Context uses DataStore to store/retrive data. User can do the same by calling Get(), Set(), and Del() from context:

     flow.SyncNode().
     Modify(func(data []byte) {
	  // parse data and set to be used later
          // json.Unmarshal(&req, data)
          context.Set("commitsha", req.Sha)
     }).
     Apply("myfunc").
     Modify(func(data []byte) {
          // retrieve the data that was set in the context
          commitsha, _ = context.GetString("commitsha")
          // use the query
     })
Geting Http Query to Workflow:

Http Query to flow can be used retrieved from context using context.Query

    flow.SyncNode().Apply("myfunc", Query("auth-token", context.Query.Get("token"))). // pass as a function query
     	 Modify(func(data []byte) {
          	token = context.Query.Get("token") // get query inside modifier
     	 })
Use of request context:

Node, requestId, State is provided by the context

   currentNode := context.GetNode()
   requestId := context.GetRequestId()
   state := context.State

for more details check Faas-flow GoDoc

External StateStore for coordination controller

Any DAG which has a branch needs coordination for nodes completion events, also request execution state needs to me maintained. Faas-flow implements coordination controller and request state store which allows the user to use any external Synchronous KV store. User can define custom state-store with StateStore interface

type StateStore interface {
        // Configure the StateStore with flow name and request ID
        Configure(flowName string, requestId string)
        // Initialize the StateStore (called only once in a request span)
        Init() error
        // Set a value (override existing, or create one)
        Set(key string, value string) error
        // Get a value
        Get(key string) (string, error)
        // Compare and Update a value
        Update(key string, oldValue string, newValue string) error
        // Cleanup all the resorces in StateStore (called only once in a request span)
        Cleanup() error
}

A StateStore can be implemented with any KV Store that provides Synchronization. The implemented StateStore can be set with DefineStateStore() at function/handler.go:

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
        consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
        return consulss, err
}

StateStore is mandetory for a FaaSFlow to operate.

Available state-stores:

External DataStore for storage controller

Faas-flow uses the DataStore to store partially completed data between nodes and request context data. Faas-flow implements a storage controller to handle storage that allows the user to use any external object-store. User can define custom data-store with DataStore interface.

 type DataStore interface {
        // Configure the DaraStore with flow name and request ID
        Configure(flowName string, requestId string)
        // Initialize the DataStore (called only once in a request span)
        Init() error
        // Set store a value for key, in failure returns error
        Set(key string, value string) error
        // Get retrives a value by key, if failure returns error
        Get(key string) (string, error)
        // Del delets a value by a key
        Del(key string) error
        // Cleanup all the resorces in DataStore
        Cleanup() error
 }

Data Store can be implemented and set by user at the DefineDataStore() at function/handler.go:

// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
        // initialize minio DataStore
        miniods, err := minioDataStore.InitFromEnv()
        return miniods, err
}

DataStore is only needed for dags that stores intermediate data

Available data-stores:

Cleanup with Finally()

Finally provides an efficient way to perform post-execution steps of the flow. If specified Finally() invokes in case of both failure and success of the flow. A Finally method can be set as:

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
     // Define flow
     flow.SyncNode().Modify(func(data []byte) {
	  // parse data and set to be used later
          // json.Unmarshal(&req, data)
          context.Set("commitsha", req.Sha)
     }).
     Apply("myfunc").Modify(func(data []byte) {
          // retrieve the data in different node from context
          commitsha, _ = context.GetString("commitsha")
     })
     flow.OnFailure(func(err error) {
          // failure handler
     }) 
     flow.Finally(func() {
          // delete the state resource
          context.Del("commitsha")
     })
}

Contribute:

Issue/Suggestion Create an issue at Faas-flow-issue.
ReviewPR/Implement Create Pull Request at Faas-flow-pr.

Join Faasflow Slack for more

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BLANK_MODIFIER = func(data []byte) ([]byte, error) { return data, nil }
)
View Source
var (
	// Execution specify a edge doesn't forwards a data
	// but rather mention a execution direction
	Execution = InvokeEdge()
)

Functions

This section is empty.

Types

type BranchOption added in v0.4.0

type BranchOption func(*BranchOptions)

func Aggregator

func Aggregator(aggregator sdk.Aggregator) BranchOption

Aggregator aggregates all outputs into one

func Forwarder

func Forwarder(forwarder sdk.Forwarder) BranchOption

Forwarder encodes request based on need for children vertex by default the data gets forwarded as it is

func InvokeEdge added in v0.4.0

func InvokeEdge() BranchOption

InvokeEdge denotes a edge doesn't forwards a data, but rather provides only an execution flow

type BranchOptions added in v0.4.0

type BranchOptions struct {
	// contains filtered or unexported fields
}

BranchOptions options for branching in DAG

type Context

type Context sdk.Context

type Dag added in v0.4.0

type Dag struct {
	// contains filtered or unexported fields
}

func NewDag added in v0.4.0

func NewDag() *Dag

NewDag creates a new dag separately from pipeline

func (*Dag) Append added in v0.4.0

func (this *Dag) Append(dag *Dag)

Append generalizes a seperate dag by appending its properties into current dag. Provided dag should be mutually exclusive

func (*Dag) ConditionalBranch added in v0.4.0

func (this *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition,
	options ...BranchOption) (conditiondags map[string]*Dag)

ConditionalBranch composites multiple dags as a sub-dag which executes for a conditions matched and returns the set of dags based on the condition passed

func (*Dag) Edge added in v0.4.0

func (this *Dag) Edge(from, to string, opts ...BranchOption)

Edge adds a directed edge between two vertex as <from>-><to>

func (*Dag) ForEachBranch added in v0.4.0

func (this *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)

ForEachBranch composites a sub-dag which executes for each value It returns the sub-dag that will be executed for each value

func (*Dag) Node added in v0.4.0

func (this *Dag) Node(vertex string, options ...BranchOption) *Node

Node adds a new vertex by id

func (*Dag) SubDag added in v0.4.0

func (this *Dag) SubDag(vertex string, dag *Dag)

SubDag composites a seperate dag as a node.

type DataStore

type DataStore sdk.DataStore

type FaasOperation added in v0.5.0

type FaasOperation struct {
	// FaasOperations
	Function       string   // The name of the function
	HttpRequestUrl string   // HttpRequest Url
	Mod            Modifier // Modifier

	// Optional Options
	Header map[string]string   // The HTTP call header
	Param  map[string][]string // The Parameter in Query string

	FailureHandler FuncErrorHandler // The Failure handler of the operation
	Requesthandler ReqHandler       // The http request handler of the operation
	OnResphandler  RespHandler      // The http Resp handler of the operation
}

func (*FaasOperation) Encode added in v0.5.0

func (operation *FaasOperation) Encode() []byte

func (*FaasOperation) Execute added in v0.5.0

func (operation *FaasOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)

func (*FaasOperation) GetHeaders added in v0.5.0

func (operation *FaasOperation) GetHeaders() map[string]string

func (*FaasOperation) GetId added in v0.5.0

func (operation *FaasOperation) GetId() string

func (*FaasOperation) GetParams added in v0.5.0

func (operation *FaasOperation) GetParams() map[string][]string

func (*FaasOperation) GetProperties added in v0.5.0

func (operation *FaasOperation) GetProperties() map[string][]string

type FuncErrorHandler added in v0.5.0

type FuncErrorHandler func(error) error

FuncErrorHandler the error handler for OnFailure() options

type Modifier added in v0.5.0

type Modifier func([]byte) ([]byte, error)

Modifier definition for Modify() call

type Node added in v0.4.0

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) AddOperation added in v0.5.0

func (node *Node) AddOperation(operation sdk.Operation) *Node

AddOperation adds an Operation to the given vertex

func (*Node) Apply added in v0.4.0

func (node *Node) Apply(function string, opts ...Option) *Node

Apply adds a new function to the given vertex

func (*Node) Modify added in v0.4.0

func (node *Node) Modify(mod Modifier) *Node

Modify adds a new modifier to the given vertex

func (*Node) Request added in v0.6.0

func (node *Node) Request(url string, opts ...Option) *Node

Request adds a new http Request to the given vertex

type Option

type Option func(*Options)
func Header(key, value string) Option

Header Specify a header in a http call

func OnFailure

func OnFailure(handler FuncErrorHandler) Option

OnFailure Specify a function failure handler

func OnReponse

func OnReponse(handler RespHandler) Option

OnResponse Specify a response handler for function and callback

func Query

func Query(key string, value ...string) Option

Query Specify a query parameter in a http call

func RequestHandler added in v0.4.0

func RequestHandler(handler ReqHandler) Option

RequestHandler Specify a request handler for function and callback request

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options options for operation execution

type ReqHandler added in v0.5.0

type ReqHandler func(*http.Request)

Reqhandler definition for RequestHdlr() option on operation

type RespHandler added in v0.5.0

type RespHandler func(*http.Response) ([]byte, error)

RespHandler definition for OnResponse() option on operation

type StateStore

type StateStore sdk.StateStore

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

func GetWorkflow added in v0.4.0

func GetWorkflow(pipeline *sdk.Pipeline) *Workflow

GetWorkflow initiates a flow with a pipeline

func (*Workflow) Dag added in v0.4.0

func (flow *Workflow) Dag() *Dag

Dag provides the workflow dag object

func (*Workflow) Finally

func (flow *Workflow) Finally(handler sdk.PipelineHandler)

Finally sets an execution finish handler routine it will be called once the execution has finished with state either Success/Failure

func (*Workflow) GetPipeline

func (flow *Workflow) GetPipeline() *sdk.Pipeline

GetPipeline expose the underlying pipeline object

func (*Workflow) OnFailure

func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler)

OnFailure set a failure handler routine for the pipeline

func (*Workflow) SetDag added in v0.4.0

func (flow *Workflow) SetDag(dag *Dag)

SetDag apply a predefined dag, and override the default dag

func (*Workflow) SyncNode added in v0.4.0

func (flow *Workflow) SyncNode(options ...BranchOption) *Node

SyncNode adds a new vertex named Sync

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL