go-pipeline: github.com/sbogacz/go-pipeline Index | Examples | Files

package pipeline

import "github.com/sbogacz/go-pipeline"

Package pipeline is intended to provided some helpful scaffolding for situations in which there are multiple data flows, or when multiple channels are used to operate on data. It's a library to facilitate patterns of "channel-middleware". The Flow example should cover most of the functionality in the package.



Package Files

definitions.go doc.go ratelimiter.go

func Combine Uses

func Combine(ins ...chan interface{}) chan interface{}

Combine takes a variable number of channels and combines their output into a single channel, that can still be used with an operator

func Split Uses

func Split(in chan interface{}, n int) []chan interface{}

Split takes a single input channel, and broadcasts each item to each handler function's channel.

type Flow Uses

type Flow []Operator

Flow is a slice of Operators that can be applied in sequence


input0 := make(chan interface{})
input1 := make(chan interface{})

// multiplier takes an int and returns an Operator which multiplies the
// input by the given int
multiplier := func(x int) pipeline.Operator {
    return pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
        for m := range in {
            n := m.(int)
            out <- (int(n) * x)

// ifEven is an operator which filters out odds and passes evens through
ifEven := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
    for m := range in {
        n := m.(int)
        if n%2 == 0 {
            out <- n

// ifOdd is an operator which filters out evens and passes odds through
ifOdd := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
    for m := range in {
        n := m.(int)
        if n%2 == 1 {
            out <- n

// summer is an operator which aggregates input integers, and outputs the
// total once the input channel closes
summer := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
    total := 0
    for m := range in {
        n := m.(int)
        total += n
    out <- total

// for every odd, multiply times two, and add the results
oddFlow := pipeline.NewFlow(ifOdd, multiplier(2), summer)
out0 := oddFlow.Run(input0)

// for every even, multiply times three and add the results
evenFlow := pipeline.NewFlow(ifEven, multiplier(3), summer)
out1 := evenFlow.Run(input1)

// use the Combine helper to merge the output of out0 and out1 into
// a single output channel out
out := summer.Run(pipeline.Combine(out0, out1))

go func() {
    for i := 0; i < 10; i++ {
        input0 <- i
        input1 <- i
total := <-out
// 1 + 3 + 5 + 7 + 9 = 25... * 2 = 50
// 2 + 4 + 6 + 8 = 20 * 3 = 60
fmt.Printf("The total is %d\n", total) // Should total 110


The total is 110


// Create a new intermediary type to operate on.
// A tuple of the word and the number of times it occurred.
type tuple struct {
    token string
    count int

// wordCount is an operator that takes in strings (words) and emits a tuple
// of (word, 1)
wordCount := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
    for word := range in {
        out <- tuple{word.(string), 1}

// countAggregator takes in tuples and aggregates their counts. Outputs
// the word and count output as a string.
countAggregator := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
    counts := make(map[string]int)
    for t := range in {
        counts[t.(tuple).token] += t.(tuple).count
    for word, count := range counts {
        out <- fmt.Sprintf("%s appears %d times", word, count)

// Launch the word count Flow
input := make(chan interface{})
wordCountFlow := pipeline.NewFlow(wordCount, countAggregator)
output := wordCountFlow.Run(input)

// Feed in the input document
document := "the quick fox jumps over the lazy brown dog fox fox"
for _, word := range strings.Split(document, " ") {
    input <- word
// Signal that we are done submitting input

// Read the output
for result := range output {

func NewFlow Uses

func NewFlow(ops ...Operator) Flow

NewFlow is syntactic sugar to create a Flow

func (Flow) Run Uses

func (f Flow) Run(in chan interface{}) chan interface{}

Run takes an input channel and runs the operators in the slice in order. This makes Flow implement the Runner interface

type Operator Uses

type Operator func(<-chan interface{}, chan interface{})

Operator aliases a function that takes one input channel and one output channel

func RateLimiter Uses

func RateLimiter(l *rate.Limiter) Operator

RateLimiter returns a new Operator that only lets items through at the rate of the given `rate.Limiter`. Passing the limiter in allows you to share it across multiple instances of this Operator.


// create new rate limiter allowing 10 ops/sec
limiter := rate.NewLimiter(10, 1)

in := make(chan interface{}, 21)

// create a RateLimiter operator and run it on input channel
out := pipeline.RateLimiter(limiter).Run(in)
startTime := time.Now()
for i := 0; i < 21; i++ {
    in <- i

for range out {
}   // this is just to flush the output channel
// should have taken about 2 seconds
fmt.Printf("After rate limiting, this took %d", int(time.Now().Sub(startTime).Seconds()))


After rate limiting, this took 2

func (Operator) Run Uses

func (o Operator) Run(in <-chan interface{}) chan interface{}

Run takes an input channel, and a series of operators, and uses the output of each successive operator as the input for the next. This makes the Operator implement the Runner interface

type Runner Uses

type Runner interface {
    Run(<-chan interface{}) chan interface{}

Runner interface exposes functions that take in a chan interface{} and outputs to a chan interface{}

Package pipeline imports 3 packages (graph). Updated 2018-08-23. Refresh now. Tools for package owners. This is an inactive package (no imports and no commits in at least two years).