pipeline

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: MIT Imports: 5 Imported by: 0

README

Pipeline

Generic vent-sink pipeline

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Drain

func Drain(ctx context.Context, ch <-chan Item) error

func Sequence

func Sequence(ctx context.Context, n int) <-chan Item

Sequence returns channel fed with [0...n) sequence, if n <= 0 the sequence will be infinite.

func Unwrap

func Unwrap(item Item) (interface{}, error)

Types

type Item

type Item interface {
	Obj() interface{}
	Err() error
}

func UnwrapChunk added in v0.2.0

func UnwrapChunk(item Item) ([]Item, error)

func Wrap

func Wrap(obj interface{}, err error) Item

type Pipeline

type Pipeline []Stage
Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

count := 20

f1 := func(ctx context.Context, item Item) (Item, error) {
	x, err := Unwrap(item)
	if err != nil {
		return nil, err
	}
	i := x.(int)
	req, err := http.NewRequest(http.MethodGet, srv.URL+"/get", nil)
	if err != nil {
		return nil, err
	}
	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return Wrap(nil, err), nil
	}
	_, _ = io.Copy(io.Discard, res.Body)
	_ = res.Body.Close()
	log.Printf("request %d completed\n", i)
	return Wrap(res, nil), nil
}

f2 := func(ctx context.Context, item Item) (Item, error) {
	x, err := Unwrap(item)
	if err != nil {
		return nil, err
	}
	res := x.(*http.Response)
	log.Printf("response %s: %d\n", res.Request.URL.Query().Get("page"), res.StatusCode)
	return item, nil
}

p := append(Pipeline{}, Transform(f1, 3), TransformBuffered(f2, 1, count))

results := p.Run(ctx, Sequence(ctx, count))

err := Drain(ctx, results)
if err != nil {
	panic(err)
}
Output:

func (Pipeline) Run

func (p Pipeline) Run(ctx context.Context, inch <-chan Item) (outch <-chan Item)

type Stage

type Stage func(ctx context.Context, inch <-chan Item) (outch <-chan Item)

Stage is a generic stage operating on channel level

func Chunk added in v0.2.0

func Chunk(size int) Stage

Chunk returns a new pipeline stage which groups incoming items into sized chunks.

func ChunkWait added in v0.2.0

func ChunkWait(size int, d time.Duration) Stage

ChunkWait returns a new stage which groups incoming items into sized chunks. Non-empty partial chunks will be returned after the timeout.

func Flatten added in v0.2.0

func Flatten() Stage

Flatten returns a new stage which splits incoming item chunks into individual items

func Transform added in v0.2.0

func Transform(f TransformFunc, fanout int) Stage

Transform returns a new stage executing concurrent 1:1 transformation

func TransformBuffered added in v0.2.0

func TransformBuffered(f TransformFunc, fanout, buffer int) Stage

TransformBuffered returns a new stage executing concurrent 1:1 transformation with output buffering

type TransformFunc added in v0.2.0

type TransformFunc func(ctx context.Context, item Item) (Item, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL