pipeline

package module
v0.0.0-...-afdaf45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2017 License: MIT Imports: 3 Imported by: 0

README

go-pipeline

Build Status Coverage Status Go Report Card GoDoc

go-pipeline is a Go library to provide some channel "middleware"-like functionality. It can lead to some clean code when processing various inputs that share a flow.

Runner

The Runner interface provides much of the functionality in the package. It's defined as

type Runner interface {
	Run(chan interface{}) chan interface{}
}

The two implementing types provided are Operator and Flow, where the latter is just a collection of Operators, and both provide a Run method.

Example:

func multiplier(x int) Operator {
	return Operator(func(in chan interface{}, out chan interface{}) {
		for m := range in {
			n := m.(int)
			out <- (int(n) * x)
		}
	})
}

Rate Limiter

This package also provides a RateLimiter function which takes a rate limiter from the "golang.org/x/time/rate" package, and returns an Operator which returns a channel whose input is throttled by the provided rate limiter.

Examples

More examples of how to use the pipeline package can be found in the test files

Contributing

If you'd like to contribute to this project, make sure that you're running go vet and go lint before submitting a pull request. If adding a feature or fixing a bug, please also add a test case verify the new functionality/new fix.

Documentation

Overview

Package pipeline is intended to provided some helpful scaffolding for situations in which there are multiple data flows, or when multiple channels are used to operate on data. It's a library to facilitate patterns of "channel-middleware". The Flow example should cover most of the functionality in the package.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Combine

func Combine(ins ...chan interface{}) chan interface{}

Combine takes a variable number of channels and combines their output into a single channel, that can still be used with an operator

func Split

func Split(in chan interface{}, n int) []chan interface{}

Split takes a single input channel, and broadcasts each item to each handler function's channel.

Types

type Flow

type Flow []Operator

Flow is a slice of Operators that can be applied in sequence

Example
package main

import (
	"fmt"

	pipeline "github.com/sbogacz/go-pipeline"
)

func main() {
	input0 := make(chan interface{})
	input1 := make(chan interface{})

	// multiplier takes an int and returns an Operator which multiplies the
	// input by the given int
	multiplier := func(x int) pipeline.Operator {
		return pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
			for m := range in {
				n := m.(int)
				out <- (int(n) * x)
			}
		})
	}

	// ifEven is an operator which filters out odds and passes evens through
	ifEven := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
		for m := range in {
			n := m.(int)
			if n%2 == 0 {
				out <- n
			}
		}
	})

	// ifOdd is an operator which filters out evens and passes odds through
	ifOdd := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
		for m := range in {
			n := m.(int)
			if n%2 == 1 {
				out <- n
			}
		}
	})

	// summer is an operator which aggregates input integers, and outputs the
	// total once the input channel closes
	summer := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
		total := 0
		for m := range in {
			n := m.(int)
			total += n
		}
		out <- total
	})

	// for every odd, multiply times two, and add the results
	oddFlow := pipeline.NewFlow(ifOdd, multiplier(2), summer)
	out0 := oddFlow.Run(input0)

	// for every even, multiply times three and add the results
	evenFlow := pipeline.NewFlow(ifEven, multiplier(3), summer)
	out1 := evenFlow.Run(input1)

	// use the Combine helper to merge the output of out0 and out1 into
	// a single output channel out
	out := summer.Run(pipeline.Combine(out0, out1))

	go func() {
		for i := 0; i < 10; i++ {
			input0 <- i
			input1 <- i
		}
		close(input0)
		close(input1)
	}()
	total := <-out
	// 1 + 3 + 5 + 7 + 9 = 25... * 2 = 50
	// 2 + 4 + 6 + 8 = 20 * 3 = 60
	fmt.Printf("The total is %d\n", total) // Should total 110
}
Output:

The total is 110
Example (WordCount)
package main

import (
	"fmt"
	"strings"

	pipeline "github.com/sbogacz/go-pipeline"
)

func main() {
	// Create a new intermediary type to operate on.
	// A tuple of the word and the number of times it occurred.
	type tuple struct {
		token string
		count int
	}

	// wordCount is an operator that takes in strings (words) and emits a tuple
	// of (word, 1)
	wordCount := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
		for word := range in {
			out <- tuple{word.(string), 1}
		}
	})

	// countAggregator takes in tuples and aggregates their counts. Outputs
	// the word and count output as a string.
	countAggregator := pipeline.Operator(func(in <-chan interface{}, out chan interface{}) {
		counts := make(map[string]int)
		for t := range in {
			counts[t.(tuple).token] += t.(tuple).count
		}
		for word, count := range counts {
			out <- fmt.Sprintf("%s appears %d times", word, count)
		}
	})

	// Launch the word count Flow
	input := make(chan interface{})
	wordCountFlow := pipeline.NewFlow(wordCount, countAggregator)
	output := wordCountFlow.Run(input)

	// Feed in the input document
	document := "the quick fox jumps over the lazy brown dog fox fox"
	for _, word := range strings.Split(document, " ") {
		input <- word
	}
	// Signal that we are done submitting input
	close(input)

	// Read the output
	for result := range output {
		fmt.Println(result)
	}
}
Output:

func NewFlow

func NewFlow(ops ...Operator) Flow

NewFlow is syntactic sugar to create a Flow

func (Flow) Run

func (f Flow) Run(in chan interface{}) chan interface{}

Run takes an input channel and runs the operators in the slice in order. This makes Flow implement the Runner interface

type Operator

type Operator func(<-chan interface{}, chan interface{})

Operator aliases a function that takes one input channel and one output channel

func RateLimiter

func RateLimiter(l *rate.Limiter) Operator

RateLimiter returns a new Operator that only lets items through at the rate of the given `rate.Limiter`. Passing the limiter in allows you to share it across multiple instances of this Operator.

Example
package main

import (
	"fmt"
	"time"

	pipeline "github.com/sbogacz/go-pipeline"

	"golang.org/x/time/rate"
)

func main() {
	// create new rate limiter allowing 10 ops/sec
	limiter := rate.NewLimiter(10, 1)

	in := make(chan interface{}, 21)

	// create a RateLimiter operator and run it on input channel
	out := pipeline.RateLimiter(limiter).Run(in)
	startTime := time.Now()
	for i := 0; i < 21; i++ {
		in <- i
	}
	close(in)

	for range out {
	} // this is just to flush the output channel
	// should have taken about 2 seconds
	fmt.Printf("After rate limiting, this took %d", int(time.Now().Sub(startTime).Seconds()))
}
Output:

After rate limiting, this took 2

func (Operator) Run

func (o Operator) Run(in <-chan interface{}) chan interface{}

Run takes an input channel, and a series of operators, and uses the output of each successive operator as the input for the next. This makes the Operator implement the Runner interface

type Runner

type Runner interface {
	Run(<-chan interface{}) chan interface{}
}

Runner interface exposes functions that take in a chan interface{} and outputs to a chan interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL