flow

package module
v0.0.0-...-6943f8c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2019 License: Apache-2.0 Imports: 23 Imported by: 0

README

Serverless Workflows with Go

Easily create serverless workflows directly in Go with the power of Fn Flow.

Quick Intro

Simply import this library into your go function, build and deploy onto Fn. Flows use the fdk-go to handle interacting with Fn, below is an example flow:

package main

import (
	"fmt"
	"strings"
	"time"

  	fdk "github.com/fnproject/fdk-go"
  	flows "github.com/fnproject/flow-lib-go"
)

func init() {
  	flows.RegisterAction(strings.ToUpper)
  	flows.RegisterAction(strings.ToLower)
}

func main() {
	fdk.Handle(flows.WithFlow(
    		fdk.HandlerFunc(func(ctx context.Context, r io.Reader, w io.Writer) {
      			cf := flows.CurrentFlow().CompletedValue("foo")
      			valueCh, errorCh := cf.ThenApply(strings.ToUpper).ThenApply(strings.ToLower).Get()
      			select {
      			case value := <-valueCh:
        			fmt.Fprintf(w, "Flow succeeded with value %v", value)
      			case err := <-errorCh:
        			fmt.Fprintf(w, "Flow failed with error %v", err)
      			case <-time.After(time.Minute * 1):
        			fmt.Fprintf(w, "Timed out!")
      			}
    		}),
  	)
}

Where do I go from here?

A variety of example use-cases is provided here.

FAQs

How are values serialized?

Go's gob serialization mechanism is used to encode/decode values for communication with the completer.

What kinds of values can be serialized?

Booleans, string, structs, arrays and slices are supported. Functions, closures and channels are not.

How are continuations serialized?

Since Go does not support serializing closures/functions due to its statically compiled nature, they are in fact not serialized at all. Go functions implementing a continuation need to be explicitly registered by calling flows.RegisterAction(actionFunction) typically inside the handler's init function. Registering actions assigns a unique and stable key that can be serialized and used to look up a pointer to the function during a continuation invocation.

Why do actions need to be registered?

See above.

Can I use closures or method receivers in my continuations?

No. Only continuation actions implemented with functions are supported, since they are stateless. No state will be serialized with a continuation. Although possible, invoking a method receiver is not currently supported inside continuations.

How does error-handling work?

Go allows functions to return error types in addition to a result via its support for multiple return values. If a continuation function returns a (non-nil) error as its second return value, its error message will be serialized and form the failed value of that stage.

If a panic occurs while invoking the continuation function, the panic value will be captured and the stage failed with the same value.

Can I invoke other fn functions?

Yes. flows.CurrentFlow().InvokeFunction("your_function_id", req), where the function ID is a value like 01CQV4NEGMNG8G00GZJ0000002 and can be resolved with the following command:

fn inspect function your_app your_function | grep fnproject.io/fn/invokeEndpoint

See here for a full example.

Documentation

Index

Constants

View Source
const (
	// protocol headers. beware, since we're using go http.Header, casing is sensitive
	HeaderPrefix  = "Fnproject-"
	FlowIDHeader  = HeaderPrefix + "Flowid"
	StageIDHeader = HeaderPrefix + "Stageid"

	ContentTypeHeader = "Content-Type"

	JSONMediaHeader         = "application/json"
	GobMediaHeader          = "application/x-gob"
	OctetStreamMediaHeader  = "application/octet-stream"
	MaxContinuationArgCount = 2
)

Variables

This section is empty.

Functions

func Debug

func Debug(withDebug bool)

Debug enables internal library debugging

func Log

func Log(msg string)

Log to stderr when Debug mode is enabled

func RegisterAction

func RegisterAction(actionFunc interface{})

RegisterAction registers a go function so it can be used as an action in a flow stage

func UseHTTPClient

func UseHTTPClient(client *http.Client)

UseHTTPClient allows the default http client to be overriden for calls to the flow service. This function must be called prior to flows.WithFlow to take effect (e.g. from an init method)

func WithFlow

func WithFlow(fn fdk.Handler) fdk.Handler

Types

type ErrorResult

type ErrorResult struct {
	Error string `json:"error"`
}

errors cannot be encoded using gobs, so we just extract the message and encode with json

func (*ErrorResult) Err

func (e *ErrorResult) Err() error

type Flow

type Flow interface {
	InvokeFunction(functionID string, arg *HTTPRequest) FlowFuture
	Supply(action interface{}) FlowFuture
	Delay(duration time.Duration) FlowFuture
	CompletedValue(value interface{}) FlowFuture // value can be an error
	EmptyFuture() FlowFuture
	AllOf(futures ...FlowFuture) FlowFuture
	AnyOf(futures ...FlowFuture) FlowFuture
}

func CurrentFlow

func CurrentFlow() Flow

type FlowFuture

type FlowFuture interface {
	Get() (chan interface{}, chan error)
	// Get result as the given type. E.g. for use with ThenCompose
	GetType(t reflect.Type) (chan interface{}, chan error)
	ThenApply(action interface{}) FlowFuture
	ThenCompose(action interface{}) FlowFuture
	ThenCombine(other FlowFuture, action interface{}) FlowFuture
	WhenComplete(action interface{}) FlowFuture
	ThenAccept(action interface{}) FlowFuture
	AcceptEither(other FlowFuture, action interface{}) FlowFuture
	ApplyToEither(other FlowFuture, action interface{}) FlowFuture
	ThenAcceptBoth(other FlowFuture, action interface{}) FlowFuture
	ThenRun(action interface{}) FlowFuture
	Handle(action interface{}) FlowFuture
	Exceptionally(action interface{}) FlowFuture
	ExceptionallyCompose(action interface{}) FlowFuture
	Complete(value interface{}) bool
}

type HTTPRequest

type HTTPRequest struct {
	Headers http.Header
	Method  string
	Body    []byte
}

type HTTPResponse

type HTTPResponse struct {
	StatusCode int32
	Headers    http.Header
	Body       []byte
}

type InvokeStageRequest

type InvokeStageRequest struct {
	FlowID  string                          `json:"flow_id,omitempty"`
	StageID string                          `json:"stage_id,omitempty"`
	Closure *models.ModelBlobDatum          `json:"closure,omitempty"`
	Args    []*models.ModelCompletionResult `json:"args,omitempty"`
}

models incoming request API (not auto-generated from swagger!)

type InvokeStageResponse

type InvokeStageResponse struct {
	Result *models.ModelCompletionResult `json:"result,omitempty"`
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL