pipe

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2021 License: MIT Imports: 12 Imported by: 0

README

Pipes - an utility to create streamable workers

As sometimes we are bound to IO blocks this will help to create workers to stream data

A Proc:

  • only have one consumer and can consume from several sources
  • can have more than one sender and each sender can send to several consumers
  • should not have a cyclic links

Example:

func main() {
	// Create an origin that produces data, consumes from an DB/API or etc
	origin := pipe.NewProc(
		pipe.WithFunc(func(_ pipe.Consumer, ints pipe.Sender) error {
			for i := 0; i < 10; i++ {
				if err := ints.Send(i); err !=nil {
					return err
				}
			}
			return nil
		}),
	)

	evenodd := pipe.NewProc(
		pipe.WithWorkers(4),        // use 4 go routines
		pipe.WithSource(0, origin), // consumes output 0 from origin
		pipe.WithFunc(func(c pipe.Consumer, odds, evens pipe.Sender) error {
			return c.Consume(func(vv interface{}) error {
				v := c.Value().(int)
				target := odds
				if v&1 == 0 {
					target = evens
				}
				return target.Send(v)
			}
			return nil
		}),
	)

	res := []int{}
	// consumes data produced by evenodd and write it to result slice
	// could be an API endpoint/file/db
	pipe.NewProc(
		pipe.WithBuffer(10),          // buffer size of the consumer
		pipe.WithSource(0, evenodd),  // consumes output 0 (odds) from evenodd
		pipe.WithSource(1, evenodd),  // consumes output 1 (evens) from evenodd
		pipe.WithFunc(func(c pipe.Consumer) error {
			return c.Consume(func(vv interface{}) error {
				v := c.Value().(string) // we expect strings
				res = append(res, v)
				return nil
			})
		}),
	)

	// Run will start the procs binded to `origin` and wait until all finishes
	// if an error is returned in any proc func the context will be canceled
	// and the first error will be returned here
	if err := origin.Run(); err != nil {
		log.Fatal(err)
	}
}

Documentation

Overview

Package pipe - an utility to create streamable workers

As sometimes we are bound to IO blocks this will help to create workers to stream data

Example
package main

import (
	"fmt"
	"log"
	"sort"

	"github.com/stdiopt/pipe"
)

func main() {
	origin := pipe.NewProc(
		pipe.WithFunc(func(_ pipe.Consumer, ints pipe.Sender) error {
			for i := 0; i < 10; i++ {
				if err := ints.Send(i); err != nil {
					return err
				}
			}
			return nil
		}),
	)

	evenodd := pipe.NewProc(
		pipe.WithWorkers(4),
		pipe.WithSource(0, origin),
		pipe.WithFunc(func(c pipe.Consumer, odds, evens pipe.Sender) error {
			return c.Consume(func(vv interface{}) error {
				v := vv.(int)
				var err error
				if v&1 == 0 {
					err = evens.Send(v)
				} else {
					err = odds.Send(v)
				}
				return err
			})
		}),
	)

	res := []int{}
	pipe.NewProc(
		pipe.WithBuffer(10),
		pipe.WithSource(0, evenodd),
		pipe.WithSource(1, evenodd),
		pipe.WithFunc(func(c pipe.Consumer) error {
			return c.Consume(func(vv interface{}) error {
				v := vv.(int)
				res = append(res, v)
				return nil
			})
		}),
	)
	if err := origin.Run(); err != nil {
		log.Fatal(err)
	}
	sort.Ints(res)
	fmt.Println(res)

}
Output:

[0 1 2 3 4 5 6 7 8 9]

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DumpDOT added in v0.0.3

func DumpDOT(p *Proc) string

DumpDOT a proc line in graphviz dot language format

Types

type Consumer

type Consumer interface {
	// Context returns the current consumer context
	Context() context.Context

	// Consume will call the fn for every value received,
	// fn must be a func with a signature like `func(T)error` where T is any type
	Consume(fn interface{}) error
}

Consumer provides methods to consume a stream

type ConsumerFunc added in v0.1.0

type ConsumerFunc func(v Message) error

ConsumerFunc type of base function for the consumer

type ConsumerMiddleware added in v0.1.0

type ConsumerMiddleware func(fn ConsumerFunc) ConsumerFunc

ConsumerMiddleware func type to build consumers middleware

func BackoffConsumer added in v0.1.0

func BackoffConsumer(max time.Duration, factor float64) ConsumerMiddleware

BackoffConsumer consumer middleware that will retry at an exponential time until it reaches the maximum duration

func RetryConsumer added in v0.1.0

func RetryConsumer(tries int) ConsumerMiddleware

RetryConsumer consumer middleware that will retry consumer up to 'tries' on error

type Message added in v0.1.0

type Message interface {
	Origin() *Proc
	Value() interface{}
}

Message is the type that flows along a line, with the current value and origin

type Proc

type Proc struct {
	// contains filtered or unexported fields
}

Proc is a pipeline processor, should be created with 'NewProc' and must have a func option

func NewProc

func NewProc(opt ...ProcFunc) *Proc

NewProc is used to create a Proc

p := pipe.NewProc(
	pipe.WithBuffer(8),
	pipe.WithWorkers(10),
	pipe.WithFunc(func(c pipe.Consumer, s1, s2 pipe.Sender) error {
		...
	}),
)
func (p *Proc) Link(k interface{}, t ...*Proc)

Link send output to specified procs, 'k' can be an int or string if it is a string it will query params by name declared in 'Output' option

func (*Proc) Run

func (p *Proc) Run() error

Run will start processors sequentially and blocks until all completed

func (*Proc) RunWithContext

func (p *Proc) RunWithContext(ctx context.Context) error

RunWithContext starts processors with the given context, if the context is canceled all workers should stop

func (*Proc) String

func (p *Proc) String() string

type ProcFunc

type ProcFunc func(p *Proc)

ProcFunc is used by the Options funcs

func Group added in v0.0.4

func Group(fns ...ProcFunc) ProcFunc

Group groups options in one ProcFunc

func WithBuffer added in v0.1.0

func WithBuffer(n int) ProcFunc

WithBuffer sets the receive channel buffer

func WithConsumerMiddleware added in v0.1.0

func WithConsumerMiddleware(mws ...ConsumerMiddleware) ProcFunc

WithConsumerMiddleware sets a ConsumerMiddleware to be used while consuming data.

func WithFunc added in v0.1.0

func WithFunc(fn interface{}) ProcFunc

WithFunc sets the proc Function option as function must have a consumer and optionally 1 or more senders

func WithName added in v0.1.0

func WithName(n string) ProcFunc

WithName sets optional proc name for easier debugging

func WithNamedSource added in v0.1.0

func WithNamedSource(n string, source ...*Proc) ProcFunc

WithNamedSource will link this proc to the sources by the outputs identified by name s.

func WithNamedTarget added in v0.1.0

func WithNamedTarget(k string, targets ...*Proc) ProcFunc

WithNamedTarget will link this proc output identified by name to targets.

func WithOutputs added in v0.1.0

func WithOutputs(o ...string) ProcFunc

WithOutputs describes the proc outputs to be used by linkers the name index must match the Func signature of senders

func WithSource added in v0.1.0

func WithSource(n int, source ...*Proc) ProcFunc

WithSource will link this proc to the sources by the outputs identified by index n.

func WithTarget added in v0.1.0

func WithTarget(k int, targets ...*Proc) ProcFunc

WithTarget will link this proc output identified by k to targets.

func WithWorkers added in v0.1.0

func WithWorkers(n int) ProcFunc

WithWorkers sets the proc workers

type Sender

type Sender interface {
	// Send a value
	Send(v interface{}) error
}

Sender a channel writer wrapper

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL