Documentation ¶
Overview ¶
Package flow provides support for very basic FBP / pipelines. Each handler represents async stage consuming data from the input channel and publishing results to the output channel. Each handler runs in separate goroutines.
User must implement Handler function and add it to the Flow. Each handler usually creates an output channel, reads from the input channel, processes data and sends results to the output channel. Processing sequence defined by order of those handlers.
Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.
FanOut allows to pass multiple handlers in the broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged and combined into a single output channel.
Processing error detected as return error value from user's handler func. Such error interrupts all other running handlers gracefully and won't keep any goroutine running/leaking.
Each Flow object can be executed only once.
Handler has to handle context cancellation as a termination signal.
Index ¶
- Constants
- func CID(ctx context.Context) int
- func Recv(ctx context.Context, ch chan interface{}) (interface{}, error)
- func Send(ctx context.Context, ch chan interface{}, e interface{}) error
- type Flow
- func (f *Flow) Add(handlers ...Handler) *Flow
- func (f *Flow) Channel() chan interface{}
- func (f *Flow) FanOut(handler Handler, handlers ...Handler) Handler
- func (f *Flow) Go() *Flow
- func (f *Flow) Metrics() *Metrics
- func (f *Flow) Parallel(concurrent int, handler Handler) Handler
- func (f *Flow) Wait() error
- type Handler
- type Metrics
- type Option
Examples ¶
Constants ¶
const CidContextKey contextKey = "cid"
CidContextKey used as concurrentID key in ctx. It doesn't do anything magical, just represents a special case metric set by flow internally. Flow sets cid to indicate which of parallel handlers is processing data subset.
const MetricsContextKey contextKey = "metrics"
MetricsContextKey used as metrics key in ctx
Variables ¶
This section is empty.
Functions ¶
func CID ¶
CID returns concurrentID set by parallel wrapper The only use for cid is to alo some indication/logging.
Types ¶
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow object with list of all runnable functions and common context
Example (FanOut) ¶
fanOut illustrates the use of a Flow for fan-out pipeline running same input via multiple handlers.
f := New() // create new empty Flow f.Add( // add handlers. Note: handlers can be added directly in New // generate 100 ones. func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}, 100) // example of non-async handler for i := 1; i <= 100; i++ { out <- 1 } close(out) // each handler has to close out channel return out, nil // no runnable function for non-async handler }, // fanout 100 ones and fork processing for odd and even numbers. // each input number passed to both handlers. output channels from both handlers merged together. f.FanOut( // first handler picks odd numbers only and multiply by 2 func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}) // async handler makes its out channel runFn = func() error { defer close(out) // handler should close out channel for e := range in { val := e.(int) if val%2 != 0 { continue } f.Metrics().Inc("passed odd") // increment user-define metric "passed odd" select { case out <- val * 2: // send result to the next stage case <-ctx.Done(): // check for cancellation return ctx.Err() } } return nil } return out, runFn }, // second handler picks even numbers only and multiply by 3 func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}) // async handler makes its out channel runFn = func() error { defer close(out) // handler should close out channel for e := range in { val := e.(int) if val%2 == 0 { continue } f.Metrics().Inc("passed even") // increment user-define metric "passed even" select { case out <- val * 3: // send result to the next stage case <-ctx.Done(): // check for cancellation return ctx.Err() } } return nil } return out, runFn }, ), // sum all numbers func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}, 1) runFn = func() error { defer close(out) sum := 0 // same loop as above, illustrates different way of reading from channel. for { select { case val, ok := <-in: if !ok { out <- sum // send result return nil } sum += val.(int) case <-ctx.Done(): return ctx.Err() } } } return out, runFn }, ) f.Go() // activate flow // wait for all handlers to complete if err := f.Wait(); err == nil { fmt.Printf("all done, result=%v, odd=%d, even=%d", <-f.Channel(), f.Metrics().Get("passed odd"), f.Metrics().Get("passed even")) }
Output:
Example (Flow) ¶
flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.
f := New() // create new empty Flow f.Add( // add handlers. Note: handlers can be added directly in New // generate 100 initial values. func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { // example of non-async handler, Add executes it right away, prior to Go call out = make(chan interface{}, 100) for i := 1; i <= 100; i++ { out <- i } close(out) // each handler has to close out channel return out, nil // no runnable function for non-async handler }, // pick odd numbers only and multiply func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}) // each handler makes its out channel runFn = func() error { // async handler returns runnable func defer close(out) // handler should close out channel for e := range in { val := e.(int) if val%2 == 0 { continue } f.Metrics().Inc("passed") // increment user-define metric "passed" // send result to the next stage with flow.Send helper. Also, checks for cancellation if err := Send(ctx, out, val*rand.Int()); err != nil { //nolint return err } } return nil } return out, runFn }, // sum all numbers func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}, 1) runFn = func() error { defer close(out) sum := 0 for { select { case e, more := <-in: if !more { out <- sum // send result return nil } val := e.(int) sum += val case <-ctx.Done(): return ctx.Err() } } } return out, runFn }, ) f.Go() // activate flow // wait for all handlers to complete if err := f.Wait(); err == nil { fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed")) }
Output:
Example (Parallel) ¶
parallel illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.
f := New() // create new empty Flow // make flow with mixed singles and parallel handlers and activate f.Add( // generate 100 initial values in single handler func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}, 100) // example of non-async handler for i := 1; i <= 100; i++ { out <- i } close(out) // each handler has to close out channel return out, nil // no runnable function for non-async handler }, // multiple all numbers in 10 parallel handlers f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { out = make(chan interface{}) // async handler makes its out channel runFn = func() error { defer close(out) // handler should close out channel for e := range in { val := e.(int) select { // send result to the next stage case out <- val * rand.Int(): //nolint case <-ctx.Done(): // check for cancellation return ctx.Err() } } return nil } return out, runFn }), // print all numbers func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) { runFn = func() error { defer close(out) sum := 0 for e := range in { val := e.(int) sum += val select { case <-ctx.Done(): return ctx.Err() default: } } fmt.Printf("all done, result=%d", sum) return nil } return out, runFn }, ) // wait for all handlers to complete if err := f.Wait(); err == nil { fmt.Printf("all done, result=%v", <-f.Channel()) }
Output:
func New ¶
New creates flow object with context and common errgroup. This errgroup used to schedule and cancel all handlers. options defines non-default parameters.
func (*Flow) Add ¶
Add one or more handlers. Each will be linked to the previous one and order of handlers defines sequence of stages in the flow. can be called multiple times.
func (*Flow) Channel ¶
func (f *Flow) Channel() chan interface{}
Channel returns last (final) channel in flow. Usually consumers don't need this channel, but can be used to return some final result(s)
func (*Flow) FanOut ¶
FanOut runs all handlers against common input channel and results go to common output channel. This will broadcast each record to multiple handlers and each may process it in different way.
func (*Flow) Go ¶
Go activates flow. Should be called exactly once after all handlers added, next calls ignored.
type Handler ¶
type Handler func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error)
Handler defines function type used as flow stages, implementations of handler provided by the user. Each handler returns the new out(put) channel and runnable fn function. fn will be executed in a separate goroutine. fn is thread-safe and may have mutable state. It will live all flow lifetime and usually implements read->process->write cycle. If fn returns != nil it indicates critical failure and will stop, with canceled context, all handlers in the flow.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics collects and increments any int numbers, can be used to safely count things in flow. Flow creates empty metrics object and puts it to the context. Consumers (user provided handlers) can retrieve it directly from the context by doing metrics := ctx.Value(flow.MetricsContextKey).(*flow.Metrics) Provided GetMetrics does exactly the same thing. Flow also has a helper method Metrics() to retrieve metrics from ctx.
func NewMetrics ¶
func NewMetrics() *Metrics
NewMetrics makes thread-safe map to collect any counts/metrics