gopipe

package module
v2.0.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2019 License: MIT Imports: 3 Imported by: 0

README

gopipe

A stream-filter like pipeline primitive for go

Build Status GoDoc

Gopipe exposes a simple interface that your "Pipe" must implement:

/*
A single pipe component that processes items. Pipes can be composed to form a pipeline
*/
type Pipe interface {
	Process(in chan interface{}, out chan interface{})
}

Any such pipe can then be combined into a pipeline like:

// Make a pipeline
pipeline := gopipe.NewPipeline(
  jsonUnmarshalPipe,
  redisWriterPipe,
  logWriterPipe
)

// Or Make a Buffered Pipeline
// This allows up to bufSize elements to queue at *Each Pipe stage
bufSize := 10
// Buffersize 10 throughout the pipe
bufP := gopipe.NewBufferedPipeline(bufSize, redisWriterPipe, logWriterPipe)

// Attach some source
jobs := make(chan interface{})
pipeline.AttachSource(jobs)

// Attach Sink
processedJobs := make(chan interface{})
pipeline.AttachSink(processedJobs)

// Or Enqueue from somewhere (Block if the pipeline has no capacity)
pipeline.Enqueue("foo")

// And Dequeue (Blocks if nothing is flowing)
bar := pipeline.Dequeue()

// Dequeue with timeout
baz := pipeline.DequeueTimeout(10 * time.Millisecond)

Complex pipelining

You can also create a "routing" junction and attach other Pipelines downstream.


// Create a RoutingFunc func(interface{}) interface{}
routingFn := RoutingFunc(func(val interface{}) interface{} {
  if val > 10 && val < 100 {
    return "smallishNumber"
  } else if val >= 100 {
    return "biggishNumber"
  }
  return "eh!", errors.New("dwarfnumber")
})

// Create a junction
j := NewJunction(routingFn)
j.AddPipeline("smallishNumber", NewPipeline(smallNumPipe)).AddPipeline("biggishNumber", NewPipeline(bigNumPipe)

// Now attach the junction - as soon as this is attached, data will start flowing
pipeline.AddJunction(j)

Example Pipe:

This is a pipe that takes in integers and doubles them. If the input is invalid, it effectively "filters" it from going down the pipeline. In a more complex scenario, you can update the incoming structs with error flags etc and might still want to propagate it dowstream.

To filter, simply don't put it on the out chan.

type doublingPipe struct{}

func (dp doublingPipe) Process(in chan interface{}, out chan interface{}) {
	for {
		select {
		case item, more := <-in:
			if !more {
				log.Println("Pipe-in closed")
				close(out)
				return
			}
			if intval, ok := item.(int); ok {
				out <- intval * 2
			} else {
			  log.Println("not ok - filtering...")
			}
		}
	}
}

More Examples:

Documentation

Overview

Package gopipe is an abstraction of go channels as a pipeline composed of multiple pipe joints

Installation:

import "github.com/urjitbhatia/gopipe"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Junction

type Junction struct {
	DebugMode bool // Option to log pipeline state transitions, false by default
	// contains filtered or unexported fields
}

Junction joins one or more pipelines together

Example

ExampleJunction shows how to create a junction and route data across the junction to other pipelines

max := 4
p := NewBufferedPipeline(max)
rf := RoutingFunc(func(in interface{}) interface{} {
	if i, _ := in.(int); i > 2 {
		return "big"
	}
	return "small"
})
j := NewJunction(rf)

dp := NewPipeline(doublingPipe{})
sp := NewPipeline(subtractingPipe{})

// If input is "small" send to doublingPipeline
// If input is "big" send to subtractingPipeline
j.AddPipeline("small", dp).AddPipeline("big", sp)
p.AddJunction(j)

for i := range intGenerator(max) {
	p.Enqueue(i)
}

fmt.Println("Small pipeline got: ", dp.Dequeue())
fmt.Println("Small pipeline got: ", dp.Dequeue())
fmt.Println("Small pipeline got: ", dp.Dequeue())
fmt.Println("Big pipeline got: ", sp.Dequeue())
Output:

Small pipeline got:  0
Small pipeline got:  2
Small pipeline got:  4
Big pipeline got:  2

func NewJunction

func NewJunction(rFunc RoutingFunc) Junction

NewJunction creates a new Junction

func (*Junction) AddPipeline

func (j *Junction) AddPipeline(key interface{}, p *Pipeline) *Junction

AddPipeline adds a pipeline to a junction. Items that output the given key when fed into the routing function for this junction are routed the given pipeline

type Pipe

type Pipe interface {
	Process(in chan interface{}, out chan interface{})
}

Pipe is a single component that processes items. Pipes can be composed to form a pipeline

type Pipeline

type Pipeline struct {
	DebugMode bool // Option to log pipeline state transitions, false by default
	// contains filtered or unexported fields
}

Pipeline connects multiple pipes in order. The head chan receives incoming items and tail chan send out items that the pipeline has finished processing.

Example
max := 4
dp := doublingPipe{}
sp := subtractingPipe{}
pipeline := NewPipeline(dp, sp)

pipeinput := intGenerator(max)
pipeline.AttachSource(pipeinput)

for i := 0; i < max; i += 1 {
	fmt.Printf("value is: %d\n", pipeline.Dequeue())
}
Output:

value is: -1
value is: 1
value is: 3
value is: 5

func NewBufferedPipeline

func NewBufferedPipeline(s int, pipes ...Pipe) *Pipeline

NewBufferedPipeline creates a Pipeline with channel buffers set to the given size. This is useful in increasing processing speed. NewPipeline should mostly always be tried first.

func NewPipeline

func NewPipeline(pipes ...Pipe) *Pipeline

NewPipeline takes multiple pipes in-order and connects them to form a pipeline. Enqueue and Dequeue methods are used to attach source/sink to the pipeline. If debugLog is true, logs state transitions to stdout.

func (*Pipeline) AddJunction

func (p *Pipeline) AddJunction(j Junction)

AddJunction creates a new Junction to this pipeline and immediately start routing to that junction

func (*Pipeline) AddPipe

func (p *Pipeline) AddPipe(pipe Pipe) *Pipeline

AddPipe attaches a pipe to the end of the pipeline. This will immediately start routing items to this newly attached pipe

func (*Pipeline) AttachSink

func (p *Pipeline) AttachSink(out chan interface{})

AttachSink takes a terminating channel and dequeues the messages from the pipeline onto that channel.

func (*Pipeline) AttachSinkFanOut

func (p *Pipeline) AttachSinkFanOut(chanfan map[string]chan interface{}, unrouted chan interface{}, routingFunc func(interface{}) (string, error))

AttachSinkFanOut redirects outgoing items to the appropriate channel based on the routing function provided. Returns a channel where unrouted items are pushed. If the routing function returns a routing key that does not have an associated channel provided, the item will be routed to the unrouted channel. Items encountering errors on routing are also put on the unrouted channel. Clients of the library should handle unrouted chan properly - if nothing is listening on that chan, operations will block if an unroutable item is put on the channel (or until its buffer is full)

func (*Pipeline) AttachSource

func (p *Pipeline) AttachSource(source chan interface{})

AttachSource accepts the source channel as the entry point to the pipeline

func (*Pipeline) AttachTap

func (p *Pipeline) AttachTap(tapOut chan interface{})

AttachTap adds a Tap to the pipeline.

func (*Pipeline) Close

func (p *Pipeline) Close()

Close makes sure that the pipeline accepts no further messages. If the go routine/method writing to the pipeline is still Enqueuing, it will cause a panic - can't write to a closed channel

func (*Pipeline) Dequeue

func (p *Pipeline) Dequeue() interface{}

Dequeue will block till an item is available and then dequeue from the pipeline.

func (*Pipeline) DequeueTimeout

func (p *Pipeline) DequeueTimeout(t time.Duration) interface{}

DequeueTimeout will block till an item is available and then dequeue from the pipeline.

func (*Pipeline) Enqueue

func (p *Pipeline) Enqueue(item interface{})

Enqueue takes an item one at a time and adds it to the start of the pipeline. Use AttachSource to attach a chan of incoming items to the pipeline. If the pipeline is blocked, this is going to be a blocking operation

func (*Pipeline) String

func (p *Pipeline) String()

String prints a helpful debug state of the pipeline

type RoutingFunc

type RoutingFunc func(in interface{}) interface{}

RoutingFunc takes in an item flowing through the pipeline and maps it to another item. The output item is used to then route data flowing through the Junction to the right pipeline attached to it

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL