flow

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2017 License: Apache-2.0 Imports: 6 Imported by: 3

README

Initail flow config

flow1ConfigStr := `
	context {
		provider = LocalContextProvider
		options = {}
	}

	runner {
	
		type = PipeTaskRunner
		
		options = {
			singleton = false

			anko {
				singleton = true
				dir = "/anko_script"
			}

			goja {
				singleton = true
				dir = "/goja_script"
				timelimit = 180s
			}

			otto {
				dir = "/otto_script"
				timelimit = 5s
			}
		}
	}

	steps {
		order = [A,B,C]
		A.handler = anko
		B.handler = goja
		C.handler = otto
	}
`

Create flow

import flow and handlers

import (
	"fmt"
	"github.com/gogap/flow"

	_ "github.com/gogap/flow-contrib/handler/anko"
	_ "github.com/gogap/flow-contrib/handler/goja"
	_ "github.com/gogap/flow-contrib/handler/otto"
)
var f *flow.Flow
	f, err = flow.NewFlow("flow1",
		flow.ConfigString(flow1ConfigStr),
	)

Example Script

/anko_script/flow1/A.ank

println("print from ank")
val,exist=ctx.Get("hello")
println("context value",val,ctx.ID())

/goja_script/flow1/B.js

console.log("B.js")

/otto_script/flow1/C.js

var lyrics = [
  {line: 1, words: "I'm a lumberjack and I'm okay"},
  {line: 2, words: "I sleep all night and I work all day"},
  {line: 3, words: "He's a lumberjack and he's okay"},
  {line: 4, words: "He sleeps all night and he works all day"}
];

v=_.chain(lyrics)
  .map(function(line) { return line.words.split(' '); })
  .flatten()
  .reduce(function(counts, word) {
    counts[word] = (counts[word] || 0) + 1;
    return counts;
  }, {})
  .value();


console.log( JSON.stringify(v))

Run flow

task := f.NewTask()

task.Context().Set("hello", "world")

task.Run()

Output:

print from ank
context value hello c13e3850-d1e7-4cc1-bd04-90f9db791dba

2017/09/09 17:00:31 B.js

{"He":1,"He's":1,"I":2,"I'm":2,"a":2,"all":4,"and":4,"day":2,"he":1,"he's":1,"lumberjack":2,"night":2,"okay":2,"sleep":1,"sleeps":1,"work":1,"works":1}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextProviders

func ContextProviders() []string

func Handlers

func Handlers() []string

func RegisterContextProvider

func RegisterContextProvider(name string, newContextProviderFunc NewContextProviderFunc)

func RegisterHandler

func RegisterHandler(name string, newHandlerFunc NewHandlerFunc)

func RegisterRunner

func RegisterRunner(name string, newRunnerFunc NewRunnerFunc)

func Runners

func Runners() []string

Types

type Context

type Context interface {
	ID() string

	Get(key string) (value interface{}, exist bool)
	Set(key string, value interface{}) Context
	Delete(key string) Context
	Flush()

	Keys() []string

	GetAll() map[string]interface{}
}

type ContextProvider

type ContextProvider interface {
	NewContext(conf config.Configuration) Context
}

func NewContextProvider

func NewContextProvider(name string, conf config.Configuration) (contextProvider ContextProvider, err error)

func NewLocalContextProvider

func NewLocalContextProvider(conf config.Configuration) (provider ContextProvider, err error)

type ErrorHandler

type ErrorHandler interface {
	Handle(err error, step *Step, ctx Context) bool
}

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow(name string, opts ...FlowOption) (f *Flow, err error)

func (*Flow) Name

func (p *Flow) Name() string

func (*Flow) NewTask

func (p *Flow) NewTask() *Task

func (*Flow) Setup

func (p *Flow) Setup(steps []Step) *Flow

type FlowOption

type FlowOption func(*Flow) error

func Config

func Config(conf config.Configuration) FlowOption

func ConfigFile

func ConfigFile(filename string) FlowOption

func ConfigString

func ConfigString(str string) FlowOption

type Handler

type Handler interface {
	Handle(step Step, ctx Context) error
}

func NewHandler

func NewHandler(name string, conf config.Configuration) (handler Handler, err error)

type LocalContext

type LocalContext struct {
	// contains filtered or unexported fields
}

func (*LocalContext) Delete

func (p *LocalContext) Delete(key string) Context

func (*LocalContext) Flush

func (p *LocalContext) Flush()

func (*LocalContext) Get

func (p *LocalContext) Get(key string) (value interface{}, exist bool)

func (LocalContext) GetAll

func (p LocalContext) GetAll() map[string]interface{}

func (*LocalContext) ID

func (p *LocalContext) ID() string

func (LocalContext) Keys

func (p LocalContext) Keys() []string

func (*LocalContext) Set

func (p *LocalContext) Set(key string, value interface{}) Context

type LocalContextProvider

type LocalContextProvider struct {
}

func (*LocalContextProvider) NewContext

func (p *LocalContextProvider) NewContext(conf config.Configuration) Context

type NewContextProviderFunc

type NewContextProviderFunc func(conf config.Configuration) (contextProvider ContextProvider, err error)

type NewHandlerFunc

type NewHandlerFunc func(conf config.Configuration) (handler Handler, err error)

type NewRunnerFunc

type NewRunnerFunc func(conf config.Configuration) (runner TaskRunner, err error)

type PipeTaskRunner

type PipeTaskRunner struct {
	// contains filtered or unexported fields
}

func (*PipeTaskRunner) Name

func (p *PipeTaskRunner) Name() string

func (*PipeTaskRunner) Run

func (p *PipeTaskRunner) Run(task *Task)

func (*PipeTaskRunner) SetErrorHandler

func (p *PipeTaskRunner) SetErrorHandler(handler ErrorHandler)

type Step

type Step struct {
	// contains filtered or unexported fields
}

func NewStep

func NewStep(flowName, stepName, handlerName string, conf config.Configuration) Step

func (*Step) Flow

func (p *Step) Flow() string

func (*Step) Handler

func (p *Step) Handler() string

func (*Step) Name

func (p *Step) Name() string

func (*Step) String

func (p *Step) String() string

func (*Step) TaskID

func (p *Step) TaskID() string

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(flo *Flow, ctx Context) *Task

func (*Task) Context

func (p *Task) Context() Context

func (*Task) Errors

func (p *Task) Errors() []error

func (*Task) Flow

func (p *Task) Flow() string

func (*Task) ID

func (p *Task) ID() string

func (*Task) LatestError

func (p *Task) LatestError() error

func (*Task) Run

func (p *Task) Run() (err error)

func (*Task) Status

func (p *Task) Status() TaskStatus

func (*Task) String

func (p *Task) String() string

type TaskRunner

type TaskRunner interface {
	Run(task *Task)
	SetErrorHandler(handler ErrorHandler)
}

func NewPipeTaskRunner

func NewPipeTaskRunner(conf config.Configuration) (runner TaskRunner, err error)

func NewRunner

func NewRunner(name string, conf config.Configuration) (runner TaskRunner, err error)

type TaskStatus

type TaskStatus int
var (
	TaskStatusReady   TaskStatus = 1
	TaskStatusPending TaskStatus = 2
	TaskStatusRunning TaskStatus = 3
	TaskStatusDone    TaskStatus = 4
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL