pipe

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: MIT Imports: 9 Imported by: 2

Documentation

Overview

Package defines Pipe which is a type of a task. A Pipe works between two tasks, and usually applies any transformations to elements.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchOp added in v0.3.0

type BatchOp[S any] struct {
	// contains filtered or unexported fields
}

A Pipe operator that makes a fixed size of batches.

func Batch added in v0.3.0

func Batch[S any](size int) *BatchOp[S]

Create a Batch operator.

func (*BatchOp) AsPipe added in v0.3.0

func (op *BatchOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*BatchOp) AsTask added in v0.3.0

func (op *BatchOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

type FanoutAggregateFn

type FanoutAggregateFn[I, T any] func(context.Context, []I, chan<- T) error

A function to aggregate results from downstream tasks, and send outputs to the passed output channel. It is ensured that all of the passed elements are created from the same input, and the order of results is the same as the order of registered tasks.

type FanoutMapFn

type FanoutMapFn[I, T any] func(context.Context, []I) (T, error)

A function to aggregate results from downstream tasks, and return an output. This is a variation of FanoutAggregateFn that emits only one output.

type FanoutOp

type FanoutOp[S, I, T any] struct {
	// contains filtered or unexported fields
}

A Pipe task that has multiple downstream tasks, and aggregates those results. Each input to this operator is sent to all its downstreams and processed by them, and those results will be passed to this operator's aggregate function. This operator emits elements that the aggregate function returns.

func Fanout

func Fanout[S, I, T any](aggFn FanoutAggregateFn[I, T]) *FanoutOp[S, I, T]

Create a fanout operator from an aggregate function.

func FanoutWithMap

func FanoutWithMap[S, I, T any](mapFn FanoutMapFn[I, T]) *FanoutOp[S, I, T]

Create a fanout operator from a map function.

func (*FanoutOp[S, I, T]) Add

func (op *FanoutOp[S, I, T]) Add(t task.Task[S, I], inBuffer, outBuffer int)

Register a task as a downstream of the fanout operator.

func (*FanoutOp) AsPipe added in v0.3.0

func (op *FanoutOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*FanoutOp) AsTask

func (op *FanoutOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

type FlattenSliceOp added in v0.4.0

type FlattenSliceOp[S any] struct {
	// contains filtered or unexported fields
}

A Pipe operator that receives slices and emits those elements one by one.

func FlattenSlice added in v0.4.0

func FlattenSlice[S any]() *FlattenSliceOp[S]

Create a FlattenSlice operator.

func (*FlattenSliceOp) AsPipe added in v0.4.0

func (op *FlattenSliceOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*FlattenSliceOp) AsTask added in v0.4.0

func (op *FlattenSliceOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

type MapFn

type MapFn[S, T any] func(context.Context, S) (T, error)

A function that defines the behavior of a map operator.

type MapOp

type MapOp[S, T any] struct {
	// contains filtered or unexported fields
}

A Pipe task that processes an element and emits a corresponding output.

func Map

func Map[S, T any](fn MapFn[S, T]) *MapOp[S, T]

Create a map operator from a MapFn.

func MapWithCache

func MapWithCache[S, T, K, V any](fn MapFn[S, T], sp cache.Spec[S, T, K, V]) *MapOp[S, T]

Create a map operator with cache. The caching behavior is defined by the provided cache.Spec.

func (*MapOp) AsPipe added in v0.3.0

func (op *MapOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*MapOp) AsTask

func (op *MapOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

func (*MapOp) Concurrent

func (op *MapOp) Concurrent(concurrency int, opts ...task.Option) Pipe[S, T]

Create a concurrent Pipe from multiple operators that have the same behavior.

Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.

func (*MapOp[S, T]) ConcurrentPreservingOrder added in v0.7.0

func (op *MapOp[S, T]) ConcurrentPreservingOrder(concurrency int, opts ...task.Option) Pipe[S, T]

Create a concurrent Pipe to apply the map operator.

Unlike a Pipe created with Concurrent, a concurrent Pipe created with this ConcurrentPreservingOrder, preserves the order of elements.

type Pipe

type Pipe[S, T any] task.Task[S, T]

A task that is used as an intermediate process of a data pipeline.

A Pipe usually receives elements from an upstream task via an input channel, process them, and feeds them to a downstream task.

func Concurrent

func Concurrent[S, T any](ps []Pipe[S, T], opts ...task.Option) Pipe[S, T]

Create a Pipe from multiple Pipes. The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe.

Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.

func ConcurrentFromFn

func ConcurrentFromFn[S, T any](fn PipeFn[S, T], concurrency int, opts ...task.Option) Pipe[S, T]

Create a Pipe to run the provided PipeFn concurrently. This is a shorthand to create a concurrent Pipe from Pipes with the same function.

func FromFn

func FromFn[S any, T any](fn PipeFn[S, T], opts ...task.Option) Pipe[S, T]

Build a Pipe with a PipeFn.

type PipeFn

type PipeFn[S, T any] func(ctx context.Context, in <-chan S, out chan<- T) error

A function that defines a Pipe's behavior. This function should receive elements from the passed input channel, process them, and pass the results to the passed output channel. Please note that this function should not close the passed channels because pipe.FromFn automatically closes the output channel and closing the input channel is the upstream task's responsibility. The whole pipeline will be aborted when the returned error is not nil.

type SelectOp added in v0.4.0

type SelectOp[S any] struct {
	// contains filtered or unexported fields
}

A Pipe operator that emits only elements that the passed predicate function returns true.

func Select added in v0.4.0

func Select[S any](predicate func(S) bool) *SelectOp[S]

Create a Select operator.

func (*SelectOp) AsPipe added in v0.4.0

func (op *SelectOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*SelectOp) AsTask added in v0.4.0

func (op *SelectOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

type TakeOp added in v0.4.0

type TakeOp[S any] struct {
	// contains filtered or unexported fields
}

A Pipe operator that emits only N elements from its upstream task.

func Take added in v0.4.0

func Take[S any](n int) *TakeOp[S]

Create a Take operator.

func (*TakeOp) AsPipe added in v0.4.0

func (op *TakeOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*TakeOp) AsTask added in v0.4.0

func (op *TakeOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

type TapFn

type TapFn[S any] func(context.Context, S) error

A function that defines the behavior of a tap operator.

type TapOp

type TapOp[S any] struct {
	MapOp[S, S]
	// contains filtered or unexported fields
}

A Pipe task that receives an element and emits the same element without any processing. This can be used to make a side effect with an input element, for example, logging elements for debug.

func Tap

func Tap[S any](fn TapFn[S]) *TapOp[S]

Create a tap operator from a TapFn.

func (*TapOp) AsPipe added in v0.3.0

func (op *TapOp) AsPipe(opts ...task.Option) Pipe[S, T]

Convert the operator into a Pipe task.

func (*TapOp) AsTask

func (op *TapOp) AsTask(opts ...task.Option) task.Task[S, T]

Convert the operator into a task.

func (*TapOp) Concurrent added in v0.4.0

func (op *TapOp) Concurrent(concurrency int, opts ...task.Option) Pipe[S, T]

Create a concurrent Pipe from multiple operators that have the same behavior.

Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL