Documentation ¶
Overview ¶
Package gopipe is an abstraction of go channels as a pipeline composed of multiple pipe joints
Installation:
import "github.com/urjitbhatia/gopipe"
Index ¶
- type Junction
- type Pipe
- type Pipeline
- func (p *Pipeline) AddJunction(j Junction)
- func (p *Pipeline) AddPipe(pipe Pipe) *Pipeline
- func (p *Pipeline) AttachSink(out chan interface{})
- func (p *Pipeline) AttachSinkFanOut(chanfan map[string]chan interface{}, unrouted chan interface{}, ...)
- func (p *Pipeline) AttachSource(source chan interface{})
- func (p *Pipeline) AttachTap(tapOut chan interface{})
- func (p *Pipeline) Close()
- func (p *Pipeline) Dequeue() interface{}
- func (p *Pipeline) DequeueTimeout(t time.Duration) interface{}
- func (p *Pipeline) Enqueue(item interface{})
- func (p *Pipeline) String()
- type RoutingFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Junction ¶
type Junction struct { DebugMode bool // Option to log pipeline state transitions, false by default // contains filtered or unexported fields }
Junction joins one or more pipelines together
Example ¶
ExampleJunction shows how to create a junction and route data across the junction to other pipelines
max := 4 p := NewBufferedPipeline(max) rf := RoutingFunc(func(in interface{}) interface{} { if i, _ := in.(int); i > 2 { return "big" } return "small" }) j := NewJunction(rf) dp := NewPipeline(doublingPipe{}) sp := NewPipeline(subtractingPipe{}) // If input is "small" send to doublingPipeline // If input is "big" send to subtractingPipeline j.AddPipeline("small", dp).AddPipeline("big", sp) p.AddJunction(j) for i := range intGenerator(max) { p.Enqueue(i) } fmt.Println("Small pipeline got: ", dp.Dequeue()) fmt.Println("Small pipeline got: ", dp.Dequeue()) fmt.Println("Small pipeline got: ", dp.Dequeue()) fmt.Println("Big pipeline got: ", sp.Dequeue())
Output: Small pipeline got: 0 Small pipeline got: 2 Small pipeline got: 4 Big pipeline got: 2
func (*Junction) AddPipeline ¶
AddPipeline adds a pipeline to a junction. Items that output the given key when fed into the routing function for this junction are routed the given pipeline
type Pipe ¶
type Pipe interface {
Process(in chan interface{}, out chan interface{})
}
Pipe is a single component that processes items. Pipes can be composed to form a pipeline
type Pipeline ¶
type Pipeline struct { DebugMode bool // Option to log pipeline state transitions, false by default // contains filtered or unexported fields }
Pipeline connects multiple pipes in order. The head chan receives incoming items and tail chan send out items that the pipeline has finished processing.
Example ¶
max := 4 dp := doublingPipe{} sp := subtractingPipe{} pipeline := NewPipeline(dp, sp) pipeinput := intGenerator(max) pipeline.AttachSource(pipeinput) for i := 0; i < max; i += 1 { fmt.Printf("value is: %d\n", pipeline.Dequeue()) }
Output: value is: -1 value is: 1 value is: 3 value is: 5
func NewBufferedPipeline ¶
NewBufferedPipeline creates a Pipeline with channel buffers set to the given size. This is useful in increasing processing speed. NewPipeline should mostly always be tried first.
func NewPipeline ¶
NewPipeline takes multiple pipes in-order and connects them to form a pipeline. Enqueue and Dequeue methods are used to attach source/sink to the pipeline. If debugLog is true, logs state transitions to stdout.
func (*Pipeline) AddJunction ¶
AddJunction creates a new Junction to this pipeline and immediately start routing to that junction
func (*Pipeline) AddPipe ¶
AddPipe attaches a pipe to the end of the pipeline. This will immediately start routing items to this newly attached pipe
func (*Pipeline) AttachSink ¶
func (p *Pipeline) AttachSink(out chan interface{})
AttachSink takes a terminating channel and dequeues the messages from the pipeline onto that channel.
func (*Pipeline) AttachSinkFanOut ¶
func (p *Pipeline) AttachSinkFanOut(chanfan map[string]chan interface{}, unrouted chan interface{}, routingFunc func(interface{}) (string, error))
AttachSinkFanOut redirects outgoing items to the appropriate channel based on the routing function provided. Returns a channel where unrouted items are pushed. If the routing function returns a routing key that does not have an associated channel provided, the item will be routed to the unrouted channel. Items encountering errors on routing are also put on the unrouted channel. Clients of the library should handle unrouted chan properly - if nothing is listening on that chan, operations will block if an unroutable item is put on the channel (or until its buffer is full)
func (*Pipeline) AttachSource ¶
func (p *Pipeline) AttachSource(source chan interface{})
AttachSource accepts the source channel as the entry point to the pipeline
func (*Pipeline) AttachTap ¶
func (p *Pipeline) AttachTap(tapOut chan interface{})
AttachTap adds a Tap to the pipeline.
func (*Pipeline) Close ¶
func (p *Pipeline) Close()
Close makes sure that the pipeline accepts no further messages. If the go routine/method writing to the pipeline is still Enqueuing, it will cause a panic - can't write to a closed channel
func (*Pipeline) Dequeue ¶
func (p *Pipeline) Dequeue() interface{}
Dequeue will block till an item is available and then dequeue from the pipeline.
func (*Pipeline) DequeueTimeout ¶
DequeueTimeout will block till an item is available and then dequeue from the pipeline.
type RoutingFunc ¶
type RoutingFunc func(in interface{}) interface{}
RoutingFunc takes in an item flowing through the pipeline and maps it to another item. The output item is used to then route data flowing through the Junction to the right pipeline attached to it