Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Item ¶
type Item interface { Obj() interface{} Err() error }
func UnwrapChunk ¶ added in v0.2.0
type Pipeline ¶
type Pipeline []Stage
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() count := 20 f1 := func(ctx context.Context, item Item) (Item, error) { x, err := Unwrap(item) if err != nil { return nil, err } i := x.(int) req, err := http.NewRequest(http.MethodGet, srv.URL+"/get", nil) if err != nil { return nil, err } res, err := http.DefaultClient.Do(req) if err != nil { return Wrap(nil, err), nil } _, _ = io.Copy(io.Discard, res.Body) _ = res.Body.Close() log.Printf("request %d completed\n", i) return Wrap(res, nil), nil } f2 := func(ctx context.Context, item Item) (Item, error) { x, err := Unwrap(item) if err != nil { return nil, err } res := x.(*http.Response) log.Printf("response %s: %d\n", res.Request.URL.Query().Get("page"), res.StatusCode) return item, nil } p := append(Pipeline{}, Transform(f1, 3), TransformBuffered(f2, 1, count)) results := p.Run(ctx, Sequence(ctx, count)) err := Drain(ctx, results) if err != nil { panic(err) }
Output:
type Stage ¶
Stage is a generic stage operating on channel level
func Chunk ¶ added in v0.2.0
Chunk returns a new pipeline stage which groups incoming items into sized chunks.
func ChunkWait ¶ added in v0.2.0
ChunkWait returns a new stage which groups incoming items into sized chunks. Non-empty partial chunks will be returned after the timeout.
func Flatten ¶ added in v0.2.0
func Flatten() Stage
Flatten returns a new stage which splits incoming item chunks into individual items
func Transform ¶ added in v0.2.0
func Transform(f TransformFunc, fanout int) Stage
Transform returns a new stage executing concurrent 1:1 transformation
func TransformBuffered ¶ added in v0.2.0
func TransformBuffered(f TransformFunc, fanout, buffer int) Stage
TransformBuffered returns a new stage executing concurrent 1:1 transformation with output buffering
Click to show internal directories.
Click to hide internal directories.