flow

package module
v0.0.0-...-d71f89c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2017 License: Apache-2.0 Imports: 3 Imported by: 0

README

go-flow

A cancellable concurrent pattern for Go programming language

Go routine and channels facilitate developers to do concurrent programming. However, it is not easy for a beginner to write bug-free fan-out/fan-in go-routines. Especially when dealing with a complex flow net, make it cancellable is not that straightforward. Consider following situations:

There are 5 ways to exit from a go routine job:

  1. Successful return void or result(s)
  2. Expected error return
  3. Unexpected panic / error
  4. Job is timeout
  5. Job is cancelled from another go routine

There are 2 actions to deal with panic / error :

  1. quit the whole process whenever there is a panic
  2. only cancel the problematic go-routine branch (includes its sub-go-routines)
This package is aim to abstract the flow net, and give a panic-free solution.

GOAL

  • User defines the flow net
  • It completes the whole task or fail in all
  • User should get notify whether there is a panic, or error, or job succeed
  • User could define the timeout for the whole task

How-To

Note: this package requires go version 1.7+. Or you need import "golang.org/x/net/context" package manually if using a lower version of golang.

<1> define a flow net, together with a timeout duration

flow := NewFlowNet(1 * time.Millisecond)

<2> define a super start node (must-have), a super sink node (must-have), and several internal nodes (optional) Note that, all nodes must have a unique name tag. They are used by the flow control.

start := flow.InitStart("Start")
A := flow.InitNode("A")
B := flow.InitNode("B")
C := flow.InitSink("C")

<3> define actions for each node. The function signature is func() error. And each node could have multiple input and output channels, where all channels' signature is chan interface{}. User could identify input channel by using node.From(nodeName) function, and output channel by using node.To(nodeName)

start.Tk = func() error {
		start.To("A") <- 1
		start.To("B") <- "2"
		return nil
}
A.Tk = func() error {
		a := <-A.From("Start")
		A.To("C") <- a
		return nil
}
B.Tk = func() error {
		bStr := <-B.From("Start")
		switch bStr := bStr.(type) { 
		case string:
			B.To("C") <- b
		}
		return nil
}
C.Tk = func() error {
		a := <-C.From("A")
		b := <-C.From("B")
		// do something with a and b
		C.ToSink() <- true // indicate job is done
		return nil
}

<4> connect the dots

flow.Connect(start, A)
flow.Connect(start, B)
flow.Connect(A, C)
flow.Connect(B, C)

<5> run the flow

flow.Run()

<6> cleanup the flow after use it (optional)

flow.Cleanup()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Background

type Background struct {
	Ctx context.Context // background context
	Err chan<- error    // write-only error channel
}

Flow's background, which defines its timed context and a global error channel.

type Flow

type Flow struct {
	Conn  map[string]chan interface{} // edge name -> channel
	Nodes map[string]*Node            // node name -> node
	Start *Node                       // super start node, not in Nodes map
	TOut  time.Duration               // timeout definition
	Done  chan interface{}            // done channel, named as "Done"
}

A flow is defined as: A map for all connections / edges; a map for all nodes; a super start node, which is regarded as flow's switch; a timeout duration, which defines max running time for the flow; and a Done channel, which indicate the flow is completed or not by using a bool value.

func NewFlow

func NewFlow(time time.Duration) *Flow

Init a new flow with a timeout value.

func (*Flow) Cleanup

func (fn *Flow) Cleanup()

Cleanup all opened channel to save resources.

func (*Flow) Connect

func (fn *Flow) Connect(n1, n2 *Node)

Connect two nodes in the flow. Channel will be named as {n1.Name}2{n2.Name} to indicate the direction.

func (*Flow) InitNode

func (fn *Flow) InitNode(name string) *Node

Init normal node. It will be added to flow.Nodes map

func (*Flow) InitSink

func (fn *Flow) InitSink(name string) *Node

Init sink node. It will be added to flow.Nodes map. This is a special node, which has an output pointing to flow's Done channel (aka. super sink node).

func (*Flow) InitStart

func (fn *Flow) InitStart(name string) *Node

Init super start node. It will be regarded as flow.Start.

func (*Flow) Run

func (fn *Flow) Run() error

Run the flow. It will return error when task is incomplete, or timeout, or an error occur at any flow node. Otherwise, it return nil to indicate the flow is done successfully.

type Node

type Node struct {
	Name string       // node name, cannot be empty
	Ps   *Ports       // node's interfaces
	Tk   func() error // node's job
	Bg   *Background  // background, cannot be nil
}

Each node in the flow must have an unique name. It is used for labeled the connections / edges. The main task is defined in Tk function, which take Ps ports as input and output. In order to better control the task, user should pass in the flow's background.

func NewNode

func NewNode(name string) *Node

Return a new node with specified unique name, which should not be nil or empty string. It will help user to initialize ports too. Note that, 1) user should define its background and function task before using it; 2) "SUPER_SINK_NODE" is a special reserved name.

func (*Node) From

func (n *Node) From(sender string) <-chan interface{}

Return the input channel by its sender node's name

func (*Node) Run

func (n *Node) Run()

Run the task defined in Node.Tk function. Normally, it should be called by flow.Run(), if this node is joined in the flow.

func (*Node) To

func (n *Node) To(receiver string) chan<- interface{}

Return the output channel by its receiver node's name

func (*Node) ToSink

func (n *Node) ToSink() chan<- interface{}

Return a channel for super sink node

type Ports

type Ports struct {
	In  map[string]<-chan interface{} // channel name -> read-only channel
	Out map[string]chan<- interface{} // channel name -> write-only channel
}

Each node can have multiple input and output ports. They are connected via channels with interface{} type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL