
package module
v0.2.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2018 License: MIT Imports: 4 Imported by: 0


Faas-flow - Function Composition for Openfaas

Build Status GoDoc OpenTracing Badge OpenFaaS

  • Pure              FaaS with openfaas
  • Fast               build with go
  • Secured        with HMAC
  • Stateless      by design (DAG needs external StateStore and DataStore)
  • Tracing         with open-tracing
  • Available       as faas-flow template

FYI: Faasflow is into conceptual state and API which may change and under active development


faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals.

import faasflow "github.com/s8sg/faas-flow"

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
  flow.Apply("yourFunc1", faasflow.Sync).
       Apply("yourFunc2", faasflow.Sync)

After building and deploying, it will give you a function that orchestrates calling yourFunc2 with the output of yourFunc1

Pipeline Definition

By supplying a number of pipeline operators, complex compostion can be achieved with little work: alt overview

The above pipeline can be achieved with little, but powerfull code:


func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
     flow.Apply("func1", faasflow.Sync).
          Apply("func2", faasflow.Sync).
	  Modify(func(data []byte) ([]byte, error) {
	  	// Do something
		return data, nil
     return nil


func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

        Modify(func(data []byte) ([]byte, error) {
	        // Do something
               	return data
        Callback("storage.io/bucket?id=3345612358265349126&file=" + context.Query.Get("filename")).
        OnFailure(func(err error) {
              // failure handler
        Finally(func(state string) {
              // cleanup code
	return nil


func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

     dag := faasflow.CreateDag()
     dag.AddModifier("mod1", func(data []byte) ([]byte, error) {
     		// do something
		return data, nil
     dag.AddFunction("func1", "function_1_name")
     dag.AddFunction("func2", "function_2_name")
     dag.AddModifier("mod2", func(data []byte) ([]byte, error) {
     		// do something
		return data, nil
     // To Serialize multiple input the dag need be defined with a Aggregator
     dag.AddVertex("callback", faasflow.Aggregator(func(inputs map[string][]byte) ([]byte, error) {
				          mod2Data := inputs["mod2"]
					  func2Data := inputs["func2"]
				          // Serialize input for callback
					  return data, nil
     dag.AddCallback("callback", "storage.io/bucket?id=3345612358265349126&file=" + context.Query.Get("filename"))

     dag.AddEdge("mod1", "func1")
     dag.AddEdge("mod1", "func2")
     dag.AddEdge("func1", "mod2")
     dag.AddEdge("func2", "callback")
     dag.AddEdge("mod2", "callback")
     return nil

func DefineStateStore() (faasflow.StateStore, error) {
        // use consul StateStore
        consulss, err := consulStateStore.GetConsulStateStore()
        return consulss, err

func DefineDataStore() (faasflow.DataStore, error) {
        // use minio DataStore
        miniods, err := minioDataStore.InitFromEnv()
        return miniods, err

Sync or Async

Faasflow supports sync and async function call. By default all call are async. To call a function in Sync, faas-flow provide option faasflow.Sync:

 flow.Apply("function", faasflow.Sync)

If all calls are Sync, pipeline will have one Node (Vertex) and return the result to the caller alt single node

One or more Async function call results a pipeline to have multiple Nodes (Vertex) as a chain alt multi node

If pipeline is created as a dag, the pipeline will have multiple Nodes(Vertex) alt multi node dag

Acronyms description
Pipeline Definition User define the flow as a pipeline by implementing the template Handle(). For a given flow the definition is always the same
Function A FaaS Function. A function is applied to flow by calling flow.Apply(funcName, Sync) or flow.Apply(funcName). By Default function call are async
Modifier A inline function. A inline modifier function is applied as flow.Modify(func(data []byte) ([]byte, error) { return data, nil } )
Callback A URL that will be called with the final/partial result. flow.Callback(url)
Handler A Failure handler registered as flow.OnFailure(func(err error){}). If registered it is called if an error occured
Finally A Cleanup handler registered as flow.Finally(func(){}). If registered it is called at the end if state is StateFailure otherwise StateSuccess
Node A vertex that represent a segment of a pipeline definiton which consist of one or more call to Operation. A pipeline definition has one or more nodes. Async call results in a new node in a chain. A dag is a composition of multiple nodes
Context Request context has the state of request. It abstracts the StateHandler and provide API to manage state of the request. Interface StateHandler{} can be set by user to use 3rd party storage to manage state.


Faasflow runs four major steps to define and run the pipeline alt internal

Step description
Build Workflow Identify a request and build a flow. A incoming request could be a partially finished pipeline or a fresh raw request. For a partial request faas-flow parse and understand the state of the pipeline from the incoming request
Get Definition FaasWorkflow create simple pipeline-definition with one or multiple nodes based on the flow defined at Define() function in handler.go. A pipeline-definition consist of multiple nodes. Each Node includes one or more Function Call, Modifier or Callback. Always a single node is executed in a single invokation of the flow. A same flow always outputs to same pipeline definition, which allows faas-flow to be completly stateless
Execute Execute executes a Node by calling the Modifier, Functions or Callback based on how user defines the pipeline. Only one Node gets executed at a single execution of faas-flow function.
Repeat Or Response If pipeline is not yet completed, FaasWorkflow forwards the remaining pipeline with partial execution state and the partial result to the same flow function via gateway. If the pipeline has only one node or completed faas-flow returns the output to the gateway otherwise it returns empty



Getting Started

Get the faas-flow template with faas-cli
faas-cli template pull https://github.com/s8sg/faas-flow
Create a new func with faas-flow template
faas-cli new test-flow --lang faas-flow
Edit the test-flow.yml
    lang: faas-flow
    handler: ./test-flow
    image: test-flow:latest
      read_timeout: 120
      write_timeout: 120
      write_debug: true
      combine_output: false
      - flow.yml

read_timeout : A value larger than max node execution time.
write_timeout : A value larger than max node execution time.
write_debug: It enables the debug msg in logs.
combine_output : It allows debug msg to be excluded from output.

Add flow.yml with faas-flow configuration

To make the stack.yml look clean we can create a seperate flow.yml with faas-flow related configuration.

  workflow_name: "test-flow"
  gateway: "gateway:8080"
  enable_tracing: false
  trace_server: ""
  enable_hmac: false

workflow_name : The name of the flow function. Faasflow use this to forward partial request.
gateway : We need to tell faas-flow the address of openfaas gateway. All calls are made via gateway

             # swarm
             gateway: "gateway:8080"
             # k8
             gateway: "gateway.openfaas:8080"

enable_tracing : It enables the opentracing for requests and their nodes.
trace_server : The address of opentracing backend jaeger.
enable_hmac : Enable hmac to add extra layer of security for partial request forward.

Edit the test-flow/handler.go
    flow.Apply("yourFunc1", Header("method","post")).
        Modify(func(data []byte) ([]byte, error) {
                // Check, update/customize data, replay data ...   
                return []byte(fmt.Sprintf("{ \"data\" : \"%s\" }", string(data))), nil
        }).Apply("yourFunc2", Header("method","post")).
                 Header("method", "post"), Query("authtoken", os.Getenv(token)))

This function will generate two nodes as:

Node 1 :    
Node 2:    
Build and Deploy the test-flow


faas-cli build -f test-flow.yml


faas-cli deploy -f test-flow.yml
cat data | faas-cli invoke --async -f test-flow.yml test-flow
Function submitted asynchronously.
Convert with Sync function

Edit the atfunction/handler.go``

    flow.Apply("yourFunc1", Header("method","post"), faasflow.Sync).
        Modify(func(data []byte) ([]byte, error) {
                // Check, update/customize data, replay data ...   
                return []byte(fmt.Sprintf("{ \"data\" : \"%s\" }", string(data))), nil                
        }).Apply("yourFunc2", Header("method", "post"), faasflow.Sync)

This function will generate one node as:

Node 1 :    
Invoke (Sync)
cat data | faas-cli invoke -f test-flow.yml test-flow > updated_data

Request Tracking by ID

Request can be tracked from the log by RequestId. For each new Request a unique RequestId is generated.

2018/08/13 07:51:59 [request `bdojh7oi7u6bl8te4r0g`] Created
2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received

Request Tracing by Open-Tracing

Request tracing can be enabled by providing by specifying

      enable_tracing: true
      trace_server: "jaegertracing:5775"
Start The Trace Server

jaeger (opentracing-1.x) used for traceing
To start the trace server we run jaegertracing/all-in-one as a service.

docker service rm jaegertracing
docker pull jaegertracing/all-in-one:latest
docker service create --constraint="node.role==manager" --detach=true \
        --network func_functions --name jaegertracing -p 5775:5775/udp -p 16686:16686 \

Below is an example of tracing for an async request with 3 Nodes

alt multi node

Using request context

Request context provide verious function such as:
DataStore to store data,
HttpQuery to retrivbe request query,
State* to get flow state,
Node to get current node etc.

Manage Data Accross Node with DataStore

The main state in faas-flow chain is the execution-position (next-Node) and the partially completed data.
Apart from that faas-flow allow user to define state with DataStore interface.

 type DataStore interface {
        // Configure the DaraStore with flow name and request ID
        Configure(flowName string, requestId string)
        // Initialize the DataStore (called only once in a request span)
        Init() error
        // Set store a value for key, in failure returns error
        Set(key string, value string) error
        // Get retrives a value by key, if failure returns error
        Get(key string) (string, error)
        // Del delets a value by a key
        Del(key string) error
        // Cleanup all the resorces in DataStore
        Cleanup() error

Data Store can be implemented and set by user at the DefineDataStore() at function/handler.go:

// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
        // initialize minio DataStore
        miniods, err := minioDataStore.InitFromEnv()
        return miniods, err

Once a DataStore is set it can be used by calling Get() and Set() from context:

     flow.Modify(func(data []byte) {
	  // parse data and set to be used later
          // json.Unmarshal(&req, data)
          context.Set("commitsha", req.Sha)
     Modify(func(data []byte) {
          // retrived the data that was set in the context
          commitsha, _ = context.GetString("commitsha")
          // use the query

Default requestEmbedDataStore:
By default faas-flow template use requestEmbedDataStore which embed the state data along with the request for the next node. For bigger values it is recommended to pass it with custom DataStore.

Once DataStore is overridden, all call to Set(), Get() and del() will call the provided DataStore

Use DataStore to store intermediate result

By default partially completed data gets forwarded along with the async request. When using external DataStore it can be saved and retrived from the DataStore if the flag intermediate_storage is set. Default is false

   intermediate_storage: true

Due to nats 1mb storage limitation, async call may fail. In such scenario using intermediate_storage is recommended

Manage State of Pipeline in a DAG with StateStore

In a faas-flow DAG execution faas-flow state is not only depends on the execution position, as the DAG execution happens on a shared state, a 3rd party Synchoronous KV store can be used as a StateStore StateStore provides the below interface:

type StateStore interface {
        // Configure the StateStore with flow name and request ID
        Configure(flowName string, requestId string)
        // Initialize the StateStore (called only once in a request span)
        Init() error
        // create Vertexes for request
        // creates a map[<vertexId>]<Indegree Completion Count>
        Create(vertexs []string) error
        // Increment Vertex Indegree Completion
        // synchronously increment map[<vertexId>] Indegree Completion Count by 1 and return updated count
        IncrementCounter(vertex string) (int, error)
        // Set state of pipeline
        SetState(state bool) error
        // Get State of pipeline
        GetState() (bool, error)
        // Cleanup all the resorces in StateStore (called only once in a request span)
        Cleanup() error

A StateStore can be implemented with any KV Store that provides Synchronization. The implemented StateStore can be set with DefineStateStore() at function/handler.go:

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
        consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
        return consulss, err
Geting Http Query to Workflow:

Http Query to flow can be used from context as

    flow.Apply("myfunc", Query("auth-token", context.Query.Get("token"))). // pass as a function query
     	  Modify(func(data []byte) {
          	token = context.Query.Get("token") // get query inside modifier
Other from context:

Node, requestId, State is provided by the context

   currentNode := context.GetNode()
   requestId := context.GetRequestId()
   state := context.State

for more details check `faas-flow-GoDoc

Cleanup with Finally()

Finally provides a way to cleanup context and other resources and do post completion work of the pipeline. A Finally method can be used on flow as:

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
     // initialize my custom DataStore as myDataStore
     // Define flow
     flow.Modify(func(data []byte) {
	  // parse data and set to be used later
          // json.Unmarshal(&req, data)
          context.Set("commitsha", req.Sha)
     Modify(func(data []byte) {
          // retrived the data in different node from context
          commitsha, _ = context.GetString("commitsha")
     }).Finally(func() {
          // delete the state resource


Issue/Suggestion Create an issue at faas-flow-issue.
ReviewPR/Implement Create Pull Request at faas-flow-pr.




View Source
const (
	// StateSuccess denotes success state
	StateSuccess = "success"
	// StateFailure denotes failure state
	StateFailure = "failure"
	// StateOngoing denotes onging satte
	StateOngoing = "ongoing"


View Source
var (
	// Sync can be used instead of SyncCall
	Sync = SyncCall()
	// Execution specify a edge doesn't forwards a data
	// but rather mention a execution direction
	Execution = ExecutionOnly()
View Source
var (
	INVAL_OPTION = fmt.Errorf("invalid option specified")


This section is empty.


type Context

type Context struct {
	Query url.Values // provides request Query
	State string     // state of the request
	Name  string     // name of the faas-flow

	NodeInput map[string][]byte // stores inputs form each node
	// contains filtered or unexported fields

Context execution context and execution state

func CreateContext

func CreateContext(id string, node string, name string, dstore DataStore) *Context

CreateContext create request context (used by template)

func (*Context) Del

func (context *Context) Del(key string) error

Del deletes a value from the context using DataStore

func (*Context) Get

func (context *Context) Get(key string) (interface{}, error)

Get retrive a value from the context using DataStore

func (*Context) GetBool

func (context *Context) GetBool(key string) (bool, error)

GetBool retrive a boolean value from the context using DataStore

func (*Context) GetBytes

func (context *Context) GetBytes(key string) ([]byte, error)

GetBytes retrive a byte array from the context using DataStore

func (*Context) GetInt

func (context *Context) GetInt(key string) (int, error)

GetInt retrive a integer value from the context using DataStore

func (*Context) GetNode

func (context *Context) GetNode() string

GetPhase return the node no

func (*Context) GetRequestId

func (context *Context) GetRequestId() string

GetRequestId returns the request id

func (*Context) GetString

func (context *Context) GetString(key string) (string, error)

GetString retrive a string value from the context using DataStore

func (*Context) Set

func (context *Context) Set(key string, data interface{}) error

Set put a value in the context using DataStore

type DagFlow

type DagFlow struct {
	// contains filtered or unexported fields

func CreateDag

func CreateDag() *DagFlow

CreateDag creates a new dag definition

func (*DagFlow) AddCallback

func (this *DagFlow) AddCallback(vertex string, url string, opts ...Option)

AddCallback adds a new callback to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created

func (*DagFlow) AddConditionalDags

func (this *DagFlow) AddConditionalDags(vertex string, subdags map[string]*DagFlow, condition Option) error

AddConditionalDag composites multiple seperate dag as a subdag which executes for a conditions matched If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited. condition: Condition Option

func (*DagFlow) AddEdge

func (this *DagFlow) AddEdge(from, to string, opts ...Option) error

AddEdge adds a directed edge between two vertex as <from>-><to>

func (*DagFlow) AddForEachDag

func (this *DagFlow) AddForEachDag(vertex string, dag *DagFlow, foreach Option) error

AddForEachDag composites a seperate dag as a subdag which executes for each value If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited. foreach: ForEach Option

func (*DagFlow) AddFunction

func (this *DagFlow) AddFunction(vertex string, function string, opts ...Option)

AddFunction adds a new function to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created

func (*DagFlow) AddModifier

func (this *DagFlow) AddModifier(vertex string, mod sdk.Modifier)

AddModifier adds a new modifier to the given vertex if vertex already exist it will be appended at the end, if not new vertex will be created

func (*DagFlow) AddSubDag

func (this *DagFlow) AddSubDag(vertex string, dag *DagFlow) error

AddSubDag composites a seperate dag as a subdag to the given vertex. If vertex already exist it will override the existing definition, If not new vertex will be created. When a vertex is dag, operations are ommited.

func (*DagFlow) AddVertex

func (this *DagFlow) AddVertex(vertex string, opts ...Option)

AddVertex add a new vertex by id If exist overrides the vertex settings. Allowed option: Aggregator, ForEach

func (*DagFlow) AppendDag

func (this *DagFlow) AppendDag(dag *DagFlow) error

AppendDag generalizes a seperate dag by appending its properties into current dag. Provided dag should be mutually exclusive

type DataStore

type DataStore interface {
	// Configure the DaraStore with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize the DataStore (called only once in a request span)
	Init() error
	// Set store a value for key, in failure returns error
	Set(key string, value string) error
	// Get retrives a value by key, if failure returns error
	Get(key string) (string, error)
	// Del delets a value by a key
	Del(key string) error
	// Cleanup all the resorces in DataStore
	Cleanup() error

DataStore for Storing Data

type Option

type Option func(*Options)

func Aggregator

func Aggregator(aggregator sdk.Aggregator) Option

Aggregator aggregates multiple inputs to a node into one

func Condition

func Condition(condition sdk.Condition, aggregator sdk.Aggregator) Option

Condition denotes the corresponding subdags will be executed for each condition matched aggregator aggregates all dags outputs into one

func ExecutionOnly

func ExecutionOnly() Option

ExecutionOnly denotes a edge doesn't forwards a data, but rather provides only an execution direction

func ForEach

func ForEach(foreach sdk.ForEach, aggregator sdk.Aggregator) Option

ForEach denotes the vertex will be executed in parralel for each value returned. aggregator aggregates all outputs into one

func Forwarder

func Forwarder(forwarder sdk.Forwarder) Option

Forwarder encodes request based on need for children vertex

func Header(key, value string) Option

Header Specify a header in a http call

func OnFailure

func OnFailure(handler sdk.FuncErrorHandler) Option

OnFailure Specify a function failure handler

func OnReponse

func OnReponse(handler sdk.RespHandler) Option

OnResponse Specify a resp handler callback for function

func Query

func Query(key string, value ...string) Option

Query Specify a query parameter in a http call

func SyncCall

func SyncCall() Option

SyncCall Set sync flag, denotes a call to be in sync

type Options

type Options struct {
	// contains filtered or unexported fields

type StateStore

type StateStore interface {
	// Configure the StateStore with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize the StateStore (called only once in a request span)
	Init() error
	// create Vertexes for request
	// creates a map[<vertexId>]<Indegree Completion Count>
	Create(vertexs []string) error
	// Increment Vertex Indegree Completion
	// synchronously increment map[<vertexId>] Indegree Completion Count by 1 and return updated count
	IncrementCounter(vertex string) (int, error)
	// Set state of pipeline
	SetState(state bool) error
	// Get State of pipeline
	GetState() (bool, error)
	// Cleanup all the resorces in StateStore (called only once in a request span)
	Cleanup() error

StateStore for saving execution state

type Workflow

type Workflow struct {
	// contains filtered or unexported fields

func NewFaasflow

func NewFaasflow(name string) *Workflow

NewFaasflow initiates a flow with a pipeline

func (*Workflow) Apply

func (flow *Workflow) Apply(function string, opts ...Option) *Workflow

Apply apply a function with given name and options default call is async, provide Sync option to call synchronously

func (*Workflow) Callback

func (flow *Workflow) Callback(url string, opts ...Option) *Workflow

Callback register a callback url as a part of pipeline definition One or more callback function can be placed for sending either partial pipeline data or after the pipeline completion

func (*Workflow) ExecuteDag

func (flow *Workflow) ExecuteDag(dag *DagFlow) error

ExecuteDag apply a predefined dag All operation inside dag are async returns error is dag is not valid Note: If executing dag chain gets overridden

func (*Workflow) Finally

func (flow *Workflow) Finally(handler sdk.PipelineHandler) *Workflow

Finally sets an execution finish handler routine it will be called once the execution has finished with state either Success/Failure

func (*Workflow) GetPipeline

func (flow *Workflow) GetPipeline() *sdk.Pipeline

GetPipeline expose the underlying pipeline object

func (*Workflow) Modify

func (flow *Workflow) Modify(mod sdk.Modifier) *Workflow

Modify allows to apply inline callback function the callback fucntion prototype is func([]byte) ([]byte, error)

func (*Workflow) OnFailure

func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler) *Workflow

OnFailure set a failure handler routine for the pipeline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL