Documentation ¶
Overview ¶
Package pipe - an utility to create streamable workers
As sometimes we are bound to IO blocks this will help to create workers to stream data
Example ¶
package main import ( "fmt" "log" "sort" "github.com/stdiopt/pipe" ) func main() { origin := pipe.NewProc( pipe.WithFunc(func(_ pipe.Consumer, ints pipe.Sender) error { for i := 0; i < 10; i++ { if err := ints.Send(i); err != nil { return err } } return nil }), ) evenodd := pipe.NewProc( pipe.WithWorkers(4), pipe.WithSource(0, origin), pipe.WithFunc(func(c pipe.Consumer, odds, evens pipe.Sender) error { return c.Consume(func(vv interface{}) error { v := vv.(int) var err error if v&1 == 0 { err = evens.Send(v) } else { err = odds.Send(v) } return err }) }), ) res := []int{} pipe.NewProc( pipe.WithBuffer(10), pipe.WithSource(0, evenodd), pipe.WithSource(1, evenodd), pipe.WithFunc(func(c pipe.Consumer) error { return c.Consume(func(vv interface{}) error { v := vv.(int) res = append(res, v) return nil }) }), ) if err := origin.Run(); err != nil { log.Fatal(err) } sort.Ints(res) fmt.Println(res) }
Output: [0 1 2 3 4 5 6 7 8 9]
Index ¶
- func DumpDOT(p *Proc) string
- type Consumer
- type ConsumerFunc
- type ConsumerMiddleware
- type Message
- type Proc
- type ProcFunc
- func Group(fns ...ProcFunc) ProcFunc
- func WithBuffer(n int) ProcFunc
- func WithConsumerMiddleware(mws ...ConsumerMiddleware) ProcFunc
- func WithFunc(fn interface{}) ProcFunc
- func WithName(n string) ProcFunc
- func WithNamedSource(n string, source ...*Proc) ProcFunc
- func WithNamedTarget(k string, targets ...*Proc) ProcFunc
- func WithOutputs(o ...string) ProcFunc
- func WithSource(n int, source ...*Proc) ProcFunc
- func WithTarget(k int, targets ...*Proc) ProcFunc
- func WithWorkers(n int) ProcFunc
- type Sender
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consumer ¶
type Consumer interface { // Context returns the current consumer context Context() context.Context // Consume will call the fn for every value received, // fn must be a func with a signature like `func(T)error` where T is any type Consume(fn interface{}) error }
Consumer provides methods to consume a stream
type ConsumerFunc ¶ added in v0.1.0
ConsumerFunc type of base function for the consumer
type ConsumerMiddleware ¶ added in v0.1.0
type ConsumerMiddleware func(fn ConsumerFunc) ConsumerFunc
ConsumerMiddleware func type to build consumers middleware
func BackoffConsumer ¶ added in v0.1.0
func BackoffConsumer(max time.Duration, factor float64) ConsumerMiddleware
BackoffConsumer consumer middleware that will retry at an exponential time until it reaches the maximum duration
func RetryConsumer ¶ added in v0.1.0
func RetryConsumer(tries int) ConsumerMiddleware
RetryConsumer consumer middleware that will retry consumer up to 'tries' on error
type Message ¶ added in v0.1.0
type Message interface { Origin() *Proc Value() interface{} }
Message is the type that flows along a line, with the current value and origin
type Proc ¶
type Proc struct {
// contains filtered or unexported fields
}
Proc is a pipeline processor, should be created with 'NewProc' and must have a func option
func NewProc ¶
NewProc is used to create a Proc
p := pipe.NewProc( pipe.WithBuffer(8), pipe.WithWorkers(10), pipe.WithFunc(func(c pipe.Consumer, s1, s2 pipe.Sender) error { ... }), )
func (*Proc) Link ¶
Link send output to specified procs, 'k' can be an int or string if it is a string it will query params by name declared in 'Output' option
func (*Proc) RunWithContext ¶
RunWithContext starts processors with the given context, if the context is canceled all workers should stop
type ProcFunc ¶
type ProcFunc func(p *Proc)
ProcFunc is used by the Options funcs
func WithBuffer ¶ added in v0.1.0
WithBuffer sets the receive channel buffer
func WithConsumerMiddleware ¶ added in v0.1.0
func WithConsumerMiddleware(mws ...ConsumerMiddleware) ProcFunc
WithConsumerMiddleware sets a ConsumerMiddleware to be used while consuming data.
func WithFunc ¶ added in v0.1.0
func WithFunc(fn interface{}) ProcFunc
WithFunc sets the proc Function option as function must have a consumer and optionally 1 or more senders
func WithNamedSource ¶ added in v0.1.0
WithNamedSource will link this proc to the sources by the outputs identified by name s.
func WithNamedTarget ¶ added in v0.1.0
WithNamedTarget will link this proc output identified by name to targets.
func WithOutputs ¶ added in v0.1.0
WithOutputs describes the proc outputs to be used by linkers the name index must match the Func signature of senders
func WithSource ¶ added in v0.1.0
WithSource will link this proc to the sources by the outputs identified by index n.
func WithTarget ¶ added in v0.1.0
WithTarget will link this proc output identified by k to targets.
func WithWorkers ¶ added in v0.1.0
WithWorkers sets the proc workers