flow

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2020 License: MIT Imports: 8 Imported by: 0

README

Flow - FBP / pipelines / workers pool

Build Status Go Report Card Coverage Status

Package flow provides support for very basic FBP / pipelines. It helps to structure multistage processing as a set of independent handlers communicating via channels. The typical use case is for ETL (extract, transform, load) type of processing. Package flow doesn't introduce any high-level abstraction and keeps everything in the hand of the user.

Package pool provides a simplified version of flow suitable for cases with a single-handler flows.

Details about flow package

  • Each handler represents an async stage. It consumes data from an input channel and publishes results to an output channel.
  • Each handler runs in a separate goroutine.
  • User must implement Handler functions and add it to the Flow.
  • Each handler usually creates an output channel, reads from the input channel, processes data, sends results to the output channel and closes the output channel.
  • Processing sequence determined by the order of those handlers.
  • Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.
  • FanOut allows to pass multiple handlers in broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged into single output channel.
  • Processing error detected as return error value from user's handler func. Such error interrupts all other running handlers gracefully and won't keep any goroutine running/leaking.
  • Each Flow object can be executed only once.
  • Handler should handle context cancellation as a termination signal.

Install and update

go get -u github.com/go-pkgz/flow

Example of the flow's handler

// ReaderHandler creates flow.Handler, reading strings from any io.Reader
func ReaderHandler(reader io.Reader) Handler {
	return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
		metrics := flow.GetMetrics(ctx) // metrics collects how many records read with "read" key.

		readerCh := make(chan interface{}, 1000)
		readerFn := func() error {
			defer close(readerCh)

			scanner := bufio.NewScanner(reader)
			for scanner.Scan() {

				select {
				case readerCh <- scanner.Text():
					metrics.Inc("read")
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return errors.Wrap(scanner.Err(), "scanner failed")
		}
		return readerCh, readerFn
	}
}

Usage of the flow package

for complete example see example

// flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.
func ExampleFlow_flow() {

	f := New() // create new empty Flow
	f.Add(     // add handlers. Note: handlers can be added directly in New

		// first handler, generate 100 initial values.
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 100) // example of non-async handler
			for i := 1; i <= 100; i++ {
				out <- i
			}
			close(out)      // each handler has to close out channel
			return out, nil // no runnable function for non-async handler
		},

		// second handler - picks odd numbers only and multiply
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					if val%2 == 0 {
						continue
					}
					f.Metrics().Inc("passed") // increment user-define metric "passed"

					// send result to the next stage with flow.Send helper. Also checks for cancellation
					if err := Send(ctx, out, val*rand.Int()); err != nil {
						return err
					}
				}
				return nil
			}
			return out, runFn
		},

		// final handler - sum all numbers
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 1)
			runFn = func() error {
				defer close(out)
				sum := 0
				for {
					select {
					case e, more := <-in:
						if !more {
							out <- sum //send result
							return nil
						}
						val := e.(int)
						sum += val

					case <-ctx.Done():
						return ctx.Err()
					}
				}
			}
			return out, runFn
		},
	)

	f.Go() // activate flow

	// wait for all handlers to complete
	if err := f.Wait(); err == nil {
		fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed"))
	}
}
// illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.
func ExampleFlow_parallel() {

	f := New() // create new empty Flow

	// make flow with mixed singles and parallel handlers and activate
	f.Add(

		// generate 100 initial values in single handler
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 100) // example of non-async handler
			for i := 1; i <= 100; i++ {
				out <- i
			}
			close(out)      // each handler has to close out channel
			return out, nil // no runnable function for non-async handler
		},

		// multiple all numbers in 10 parallel handlers
		f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					select {
					case out <- val * rand.Int(): // send result to the next stage
					case <-ctx.Done(): // check for cancellation
						return ctx.Err()
					}
				}
				return nil
			}
			return out, runFn
		}),

		// print all numbers
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			runFn = func() error {
				defer close(out)
				sum := 0
				for e := range in {
					val := e.(int)
					sum += val
					select {
					case <-ctx.Done():
						return ctx.Err()
					default:
					}
				}
				fmt.Printf("all done, result=%d", sum)
				return nil
			}
			return out, runFn
		},
	)

	// wait for all handlers to complete
	if err := f.Wait(); err == nil {
		fmt.Printf("all done, result=%v", <-f.Channel())
	}
}

Details about pool package

Pool package provides thin implementation of workers pool. In addition to the default "run a func in multiple goroutines" mode, it also provides an optional support of chunked workers. In this mode each key, detected by user-provide func, guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Another thing pool allows to define is the batch size. This one is a simple performance optimization collecting input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call.

Options:

  • ChunkFn - the function returns string identifying the chunk
  • Batch - sets batch size (default 1)
  • ChanResSize sets the size of output buffered channel (default 1)
  • ChanWorkerSize sets the size of workers buffered channel (default 1)
  • ContinueOnError allows workers continuation after error occurred
  • OnCompletion sets callback (for each worker) called on successful completion
worker function

Worker function passed by user and runs in multiple workers (goroutines) concurrently. This is the function: type workerFn func(ctx context.Context, inp interface{}, sender SenderFn, store WorkerStore} error

It takes inp parameter, does the job and optionally send result(s) with SenderFn to the common results channel. Error will terminate all workers unless ContinueOnError set.

Note: workerFn can be stateful, collect anything it needs and sends 0 or more results by calling SenderFn one or more times.

worker store

Each worker gets WorkerStore and can be used as thread-safe per-worker storage for any intermediate results.

type WorkerStore interface {
	Set(key string, val interface{})
	Get(key string) (interface{}, bool)
	GetInt(key string) int
	GetFloat(key string) float64
	GetString(key string) string
	GetBool(key string) bool
	Keys() []string
	Delete(key string)
}

alternatively state can be kept outside of workers as a slice of values and accessed by worker ID.

usage
    p := pool.New(8, func(ctx context.Context, v interface{}, sendFn pool.Sender, ws pool.WorkerStore} error {
        // worker function gets input v processes it and response(s) channel to send results

        input, ok := v.(string) // in this case it gets string as input
        if !ok {
            return errors.New("incorrect input type")
        }   
        // do something with input
        // ...
       
        v := ws.GetInt("something")  // access thread-local var
           
        sendFn("foo", nil) // send "foo" and nil error     
        sendFn("bar", nil) // send "foo" and nil error     
        pool.Metrics(ctx).Inc("counter")
        ws.Set("something", 1234) // keep thread-local things
       return "something", true, nil
    })
    
    cursor, err := p.Go(context.TODO()) // start all workers in 8 goroutines and get back result's cursor
    
    // submit values (consumer side)
    go func() {
        p.Submit("something")
        p.Submit("something else")
        p.Close() // indicates completion of all inputs
    }()   

    var v interface{}
    for cursor(ctx, &v) {
        log.Print(v)  // print value
    }
    
    if cursor.Err() != nil { // error happened
        return cursor.Err()
    } 

    // alternatively read all from the cursor (response channel)
    res, err := cursor.All(ctx)

    // metrics the same as for flow
    metrics := pool.Metrics()
    log.Print(metrics.Get("counter"))

Documentation

Overview

Package flow provides support for very basic FBP / pipelines. Each handler represents async stage consuming data from the input channel and publishing results to the output channel. Each handler runs in separate goroutines.

User must implement Handler function and add it to the Flow. Each handler usually creates an output channel, reads from the input channel, processes data and sends results to the output channel. Processing sequence defined by order of those handlers.

Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.

FanOut allows to pass multiple handlers in the broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged and combined into a single output channel.

Processing error detected as return error value from user's handler func. Such error interrupts all other running handlers gracefully and won't keep any goroutine running/leaking.

Each Flow object can be executed only once.

Handler has to handle context cancellation as a termination signal.

Index

Examples

Constants

View Source
const CidContextKey contextKey = "cid"

CidContextKey used as concurrentID key in ctx. It doesn't do anything magical, just represents a special case metric set by flow internally. Flow sets cid to indicate which of parallel handlers is processing data subset.

View Source
const MetricsContextKey contextKey = "metrics"

MetricsContextKey used as metrics key in ctx

Variables

This section is empty.

Functions

func CID

func CID(ctx context.Context) int

CID returns concurrentID set by parallel wrapper The only use for cid is to alo some indication/logging.

func Recv

func Recv(ctx context.Context, ch chan interface{}) (interface{}, error)

Recv gets entry from the channel or returns error if context canceled. Shortcut for read-or-fail-on-cancel most handlers implement.

func Send

func Send(ctx context.Context, ch chan interface{}, e interface{}) error

Send entry to channel or returns error if context canceled. Shortcut for send-or-fail-on-cancel most handlers implement.

Types

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow object with list of all runnable functions and common context

Example (FanOut)

fanOut illustrates the use of a Flow for fan-out pipeline running same input via multiple handlers.

f := New() // create new empty Flow

f.Add( // add handlers. Note: handlers can be added directly in New

	// generate 100 ones.
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}, 100) // example of non-async handler
		for i := 1; i <= 100; i++ {
			out <- 1
		}
		close(out)      // each handler has to close out channel
		return out, nil // no runnable function for non-async handler
	},

	// fanout 100 ones and fork processing for odd and even numbers.
	// each input number passed to both handlers. output channels from both handlers merged together.
	f.FanOut(

		// first handler picks odd numbers only and multiply by 2
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					if val%2 != 0 {
						continue
					}
					f.Metrics().Inc("passed odd") // increment user-define metric "passed odd"
					select {
					case out <- val * 2: // send result to the next stage
					case <-ctx.Done(): // check for cancellation
						return ctx.Err()
					}
				}
				return nil
			}
			return out, runFn
		},

		// second handler picks even numbers only and multiply by 3
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					if val%2 == 0 {
						continue
					}
					f.Metrics().Inc("passed even") // increment user-define metric "passed even"
					select {
					case out <- val * 3: // send result to the next stage
					case <-ctx.Done(): // check for cancellation
						return ctx.Err()
					}
				}
				return nil
			}
			return out, runFn
		},
	),

	// sum all numbers
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}, 1)
		runFn = func() error {
			defer close(out)
			sum := 0
			// same loop as above, illustrates different way of reading from channel.
			for {
				select {
				case val, ok := <-in:
					if !ok {
						out <- sum // send result
						return nil
					}
					sum += val.(int)
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		}
		return out, runFn
	},
)

f.Go() // activate flow

// wait for all handlers to complete
if err := f.Wait(); err == nil {
	fmt.Printf("all done, result=%v, odd=%d, even=%d",
		<-f.Channel(), f.Metrics().Get("passed odd"), f.Metrics().Get("passed even"))
}
Output:

Example (Flow)

flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.

f := New() // create new empty Flow
f.Add(     // add handlers. Note: handlers can be added directly in New

	// generate 100 initial values.
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		// example of non-async handler, Add executes it right away, prior to Go call
		out = make(chan interface{}, 100)
		for i := 1; i <= 100; i++ {
			out <- i
		}
		close(out)      // each handler has to close out channel
		return out, nil // no runnable function for non-async handler
	},

	// pick odd numbers only and multiply
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}) // each handler makes its out channel
		runFn = func() error {       // async handler returns runnable func
			defer close(out) // handler should close out channel
			for e := range in {
				val := e.(int)
				if val%2 == 0 {
					continue
				}
				f.Metrics().Inc("passed") // increment user-define metric "passed"

				// send result to the next stage with flow.Send helper. Also, checks for cancellation
				if err := Send(ctx, out, val*rand.Int()); err != nil { //nolint
					return err
				}
			}
			return nil
		}
		return out, runFn
	},

	// sum all numbers
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}, 1)
		runFn = func() error {
			defer close(out)
			sum := 0
			for {
				select {
				case e, more := <-in:
					if !more {
						out <- sum // send result
						return nil
					}
					val := e.(int)
					sum += val

				case <-ctx.Done():
					return ctx.Err()
				}
			}
		}
		return out, runFn
	},
)

f.Go() // activate flow

// wait for all handlers to complete
if err := f.Wait(); err == nil {
	fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed"))
}
Output:

Example (Parallel)

parallel illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.

f := New() // create new empty Flow

// make flow with mixed singles and parallel handlers and activate
f.Add(

	// generate 100 initial values in single handler
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}, 100) // example of non-async handler
		for i := 1; i <= 100; i++ {
			out <- i
		}
		close(out)      // each handler has to close out channel
		return out, nil // no runnable function for non-async handler
	},

	// multiple all numbers in 10 parallel handlers
	f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		out = make(chan interface{}) // async handler makes its out channel
		runFn = func() error {
			defer close(out) // handler should close out channel
			for e := range in {
				val := e.(int)
				select {
				// send result to the next stage
				case out <- val * rand.Int(): //nolint
				case <-ctx.Done(): // check for cancellation
					return ctx.Err()
				}
			}
			return nil
		}
		return out, runFn
	}),

	// print all numbers
	func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
		runFn = func() error {
			defer close(out)
			sum := 0
			for e := range in {
				val := e.(int)
				sum += val
				select {
				case <-ctx.Done():
					return ctx.Err()
				default:
				}
			}
			fmt.Printf("all done, result=%d", sum)
			return nil
		}
		return out, runFn
	},
)

// wait for all handlers to complete
if err := f.Wait(); err == nil {
	fmt.Printf("all done, result=%v", <-f.Channel())
}
Output:

func New

func New(options ...Option) *Flow

New creates flow object with context and common errgroup. This errgroup used to schedule and cancel all handlers. options defines non-default parameters.

func (*Flow) Add

func (f *Flow) Add(handlers ...Handler) *Flow

Add one or more handlers. Each will be linked to the previous one and order of handlers defines sequence of stages in the flow. can be called multiple times.

func (*Flow) Channel

func (f *Flow) Channel() chan interface{}

Channel returns last (final) channel in flow. Usually consumers don't need this channel, but can be used to return some final result(s)

func (*Flow) FanOut

func (f *Flow) FanOut(handler Handler, handlers ...Handler) Handler

FanOut runs all handlers against common input channel and results go to common output channel. This will broadcast each record to multiple handlers and each may process it in different way.

func (*Flow) Go

func (f *Flow) Go() *Flow

Go activates flow. Should be called exactly once after all handlers added, next calls ignored.

func (*Flow) Metrics

func (f *Flow) Metrics() *Metrics

Metrics returns all user-defined counters from the context.

func (*Flow) Parallel

func (f *Flow) Parallel(concurrent int, handler Handler) Handler

Parallel is a decorator, converts & adopts single handler to concurrently executed (parallel) handler. First it makes multiple handlers, registers all of them with common input channel as workers and then merges their output channels into single out channel (fan-in)

func (*Flow) Wait

func (f *Flow) Wait() error

Wait for completion, returns error if any happened in handlers.

type Handler

type Handler func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error)

Handler defines function type used as flow stages, implementations of handler provided by the user. Each handler returns the new out(put) channel and runnable fn function. fn will be executed in a separate goroutine. fn is thread-safe and may have mutable state. It will live all flow lifetime and usually implements read->process->write cycle. If fn returns != nil it indicates critical failure and will stop, with canceled context, all handlers in the flow.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics collects and increments any int numbers, can be used to safely count things in flow. Flow creates empty metrics object and puts it to the context. Consumers (user provided handlers) can retrieve it directly from the context by doing metrics := ctx.Value(flow.MetricsContextKey).(*flow.Metrics) Provided GetMetrics does exactly the same thing. Flow also has a helper method Metrics() to retrieve metrics from ctx.

func GetMetrics

func GetMetrics(ctx context.Context) *Metrics

GetMetrics from context

func NewMetrics

func NewMetrics() *Metrics

NewMetrics makes thread-safe map to collect any counts/metrics

func (*Metrics) Add

func (m *Metrics) Add(key string, delta int) int

Add increments value for given key and returns new value

func (*Metrics) Get

func (m *Metrics) Get(key string) int

Get returns value for given key

func (*Metrics) Inc

func (m *Metrics) Inc(key string) int

Inc increments value for given key by one

func (*Metrics) Set

func (m *Metrics) Set(key string, val int)

Set value for given key

func (*Metrics) String

func (m *Metrics) String() string

String returns sorted key:val string representation of metrics and adds duration

type Option

type Option func(f *Flow)

Option func type

func Context

func Context(ctx context.Context) Option

Context functional option defines initial context. Can be used to add some values to context or set timeouts/deadlines.

func FanOutSize

func FanOutSize(size int) Option

FanOutSize functional option defines size of fanout buffers.

func Input

func Input(ch chan interface{}) Option

Input functional option defines input channels for first handler in chain. Can be used to connect multiple flows together or seed flow from the outside, with some external data.

Directories

Path Synopsis
Package pool implements simplified, single-stage flow.
Package pool implements simplified, single-stage flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL