channels

package module
v0.0.0-...-eaa1fe6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

Channels

Yet another Go channels library.

DISCLAIMER: This is not an officially supported Google product.

No, really, this is not.

This is code that happens to be owned by Google for legal reasons.

But what is this, really

This is my take on channel operator functions.

I tried to re-implement RxJS in Go, but instead of using observables and propagate change immediately, I wrote the operators to all work in parallel.

This works basically like a bash pipeline, with channels as the pipes and funcs as programs.

Note that every operator creates at least one goroutine, so it might not be the most efficient solution out there, but I wanted to experiment with the concept.

We have slices and maps packages in the stdlib, seemed appropriate to try an make a channels one.

Documentation

Overview

Package channels implements some channels functions and utilities.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AbortOnErr

func AbortOnErr(errs <-chan error, cancelParent func()) error

AbortOnErr returns the first non-nil error encountered (cancelling the parent) and discards the others. If no error is encountered, it returns nil.

func CombineFactories

func CombineFactories[I, T, O any](fa func() Operator[I, T], fb func() Operator[T, O]) func() Operator[I, O]

CombineFactories is like Combine, but it works on operator constructors instead of operators.

func EndScope

func EndScope(errs ErrorPipe, err Error) error

func ErrorScope

func ErrorScope(cancel func()) (ErrorPipe, Error)

ErrorScope is a helper to simplify error handling.

func ToFirst

func ToFirst[T any](in <-chan T, predicate func(T) bool, cancelParent func()) (t T, ok bool)

ToFirst returns the first value emitted and then discards the rest of the input.

func ToFirstErr

func ToFirstErr(errs chan error, cancelParent func()) <-chan error

ToFirstErr emits the first non-nil error.

func ToSlice

func ToSlice[T any](in <-chan T) []T

ToSlice collects all inputs in a slice.

func ToSliceParallel

func ToSliceParallel[T any](in <-chan T) <-chan []T

ToSliceParallel is like ToSlice, but emits the collected slice when the input is closed.

func ToSlices

func ToSlices[I, L any](i <-chan I, l <-chan L) ([]I, []L)

ToSlices collects two channels into two slices, in parallel.

Types

type Clock

type Clock interface {
	After(time.Duration) <-chan time.Time
}

Clock is an interface to abstract over the time package.

type Error

type Error = <-chan error

type ErrorPipe

type ErrorPipe = chan Error

type ErrorSink

type ErrorSink = chan<- Error

type ErrorSource

type ErrorSource = <-chan Error

type FanInOperator

type FanInOperator[I, O any] func(...<-chan I) <-chan O

func Race

func Race[T any](cancels ...func()) FanInOperator[T, T]

Race races the inputs and becomes a clone of the first one that emits, cancelling all the others.

type FanOutOperator

type FanOutOperator[I, O any] func(<-chan I) []<-chan O

type Operator

type Operator[I, O any] func(<-chan I) <-chan O

func At

func At[T any](index int, cancelParent func()) Operator[T, T]

At emits the item at the provided index (if any) and concludes.

func AtWithDefault

func AtWithDefault[T any](index int, deflt T, cancelParent func()) Operator[T, T]

AtWithDefault is like At but if the input concludes too early a default value is emitted.

func Audit

func Audit[T, D any](emit <-chan D) Operator[T, T]

Audit can be used to observe the last emitted value for the input emitter. When emit emits, the most recent value for the input will be forwarded to the output.

Note that this might cause repeated items to be emitted, for example if emit fires twice between input emissions. If this behavior is not desired, please use Sample instead.

func Buffer

func Buffer[T, D any](emit <-chan D) Operator[T, []T]

Buffer buffers elements from the input until the emitter emits, emitting a slice of the values seen between two emissions (or the start and the first emission).

Emitted slices preserve the order of receiving.

If the emitter emits when the buffer is empty, nothing is emitted.

If the emitter or the input are closed when there are still elements in the buffer, those elements are emitted immediately and the output is closed.

func BufferChan

func BufferChan[T any](count int) Operator[T, T]

BufferChan returns a buffered channel that copies the input emissions. If a non-positive count is passed an arbitrary value is used.

func BufferCount

func BufferCount[T any](count int) Operator[T, []T]

BufferCount collects elements from the input and emits them in batches of length equal to count. If there are still elements in the buffer when the input is closed, one last, shorter, batch is emitted. Any count value smaller than 1 is treated as 1.

func BufferTime

func BufferTime[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration) Operator[T, []T]

BufferTime is like Buffer, but it uses a ticker to decide when to emit. If tickerFactory is nil, the real time is used.

func BufferTimeOrCount

func BufferTimeOrCount[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration, maxCount int) Operator[T, []T]

BufferTimeOrCount buffers input for at most maxCount elements and at most duration time. If tickerFactory is nil, the real time is used.

func BufferToggle

func BufferToggle[T, I1, I2 any](openings <-chan I1, closings <-chan I2) Operator[T, []T]

BufferToggle buffers elements between an emission of openings and an emission of closings. Two consecutive emissions of openings or closings are ignored. If the buffer is open when any of the inputs ends the leftovers are emitted without an emission of closings. Elements between an emission of closings and an emission of openings are ignored.

func CollectErrs

func CollectErrs(cancel func()) Operator[Error, error]

CollectErrs processes all the given errors and returns a collection of them, or nil. If a cancel func is passed it will be called on the first non-nil error.

func Combine

func Combine[I, T, O any](a Operator[I, T], b Operator[T, O]) Operator[I, O]

Combine returns an operator that is the combination of the two input ones. When using Combine keep in mind that every operator usually spawns at least one goroutine, so very long combination chains might end up being expensive to run.

Also consider that re-using operators might cause unexpected behaviors as some operators preserve state, so for these cases consider CombineFactories or re-calling Combine with freshly built operators every time the combined result is needed.

func Concat

func Concat[T any]() Operator[<-chan T, T]

Concat emits all values from all inner inputs, one after the other, exhausting the previous ones before consuming the next.

func ConcatMap

func ConcatMap[I, O any](project func(in I) <-chan O) Operator[I, O]

ConcatMap calls Map(project) and then concats the resulting emitters to a single one.

func Count

func Count[D any]() Operator[D, int]

Count emits the item count of the input.

func DefaultIfEmpty

func DefaultIfEmpty[T any](deflt T) Operator[T, T]

DefaultIfEmpty emits the default value if and only if the input emitter did not emit before being closed.

func Distinct

func Distinct[T comparable]() Operator[T, T]

Distinct forwards the input to the output for values that have not been seen yet. Note: this needs to keep a set of all seen values in memory, so it might use a lot of memory.

func DistinctKey

func DistinctKey[T any, K comparable](keyer func(T any) K) Operator[T, T]

DistinctKey is like Distinct but it accepts a keyer function to compare elements.

func DistinctUntilChanged

func DistinctUntilChanged[T comparable]() Operator[T, T]

DistinctUntilChanged is like DistinctUntilChangedFunc but it works on comparable items.

func DistinctUntilChangedEqualer

func DistinctUntilChangedEqualer[T interface{ Equal(T) bool }]() Operator[T, T]

DistinctUntilChangedEqualer is like DistinctUntilChangedFunc, but it compares items that have a Equal(T)bool method on them.

func DistinctUntilChangedFunc

func DistinctUntilChangedFunc[T any](equals func(T, T) bool) Operator[T, T]

DistinctUntilChangedFunc forwards the input to the output, with the exception of identical consecutive values, which are discarded. In other words this behaves like slices.Compact, but for channels.

func DistinctUntilKeyChanged

func DistinctUntilKeyChanged[T any, K comparable](key func(T) K) Operator[T, T]

DistinctUntilKeyChanged is like DistinctUntilChangedFunc but uses a key function to match consecutive values.

func Every

func Every[T any](predicate func(T) bool, cancelParent func()) Operator[T, bool]

Every returns whether all values emitted by the input match the predicate. If no value is emitted, it returns true. Every cancels and discard the parent as soon as the first non-matching value is encountered.

func ExhaustMap

func ExhaustMap[I, O any](project func(in I) <-chan O) Operator[I, O]

ExhaustMap project inputs to inner emitters and forwards all inner emissions to the output.

If the input emits while an inner emitter is being consumed, the value is discarded. In summary: this operator prioritizes inputs from the projected emitters over the input, making the input lossy.

This is like SwitchMap, but the inner emitters are prioritized.

func Filter

func Filter[T any](predicate func(T) bool) Operator[T, T]

Filter conditionally forwards inputs to the output. Values that the predicate returns true for are forwarded.

func FilterCancel

func FilterCancel[T any](predicate func(T) (emit, last bool), cancelParent func()) Operator[T, T]

FilterCancel is like filter, but it can report when the last emission happens, which will cause the output channel to be closed and the input to be drained.

func FindIndex

func FindIndex[T any](predicate func(T) bool, cancelParent func()) Operator[T, int]

FindIndex emits the index for the first input that matches the predicate and it ends.

func First

func First[T any](predicate func(T) bool, cancelParent func()) Operator[T, T]

First emits the first item that the predicate returns true for and exits. If predicate is nil, the first element from the input is forwarded to the output.

func IgnoreElements

func IgnoreElements[D any]() Operator[D, D]

IgnoreElements never emits, and it concludes when the input ends.

func IsEmpty

func IsEmpty[D any](cancelParent func()) Operator[D, bool]

IsEmpty emits exactly once to report whether the input is empty or not.

func Last

func Last[T any](predicate func(T) bool) Operator[T, T]

Last only emits the last element from the input that the predicate retuned true for (if any) when the input is closed. If the predicate is nil, the last element is emitted.

func Map

func Map[I, O any](project func(in I) O) Operator[I, O]

Map projects inputs to outputs.

func MapCancel

func MapCancel[I, O any](project func(in I) (projected O, ok bool), cancelParent func()) Operator[I, O]

MapCancel projects inputs to outputs until the first not-ok value is reached, it then cancels the parent and drains the input. The not-ok value returned is discarded.

func MapFilter

func MapFilter[I, O any](project func(in I) (projected O, emit bool)) Operator[I, O]

MapFilter conditionally projects inputs to outputs. If the project func returns false as a second return value, that emission is skipped.

func MapFilterCancel

func MapFilterCancel[I, O any](project func(in I) (projected O, emit, ok bool), cancelParent func()) Operator[I, O]

MapFilterCancel conditionally projects inputs to outputs until the first non-ok value is reached, it then cancels the parent and drains the input. It is like a combination of MapFilter and MapCancel.

func MapFilterCancelTeardown

func MapFilterCancelTeardown[I, O any](
	project func(in I) (projected O, emit, ok bool),
	teardown func(last I, emitted bool) (lastItem O, shouldEmitLast bool),
	cancelParent func(),
) Operator[I, O]

MapFilterCancelTeardown conditionally projects inputs to outputs until the last value is reached, it then cancels the parent, drains the input and runs the teardown routine. It is like a combination of MapFilter, MapCancel and Teardown.

func Max

func Max[T constraints.Ordered]() Operator[T, T]

Max emits the max element of the input.

func Merge

func Merge[T any]() Operator[<-chan T, T]

Merge concurrently reads all inner inputs and emits them. Oder is not preserved.

func MergeMap

func MergeMap[I, O any](maxParallelism int, project func(in I) <-chan O) Operator[I, O]

MergeMap projects the input emissions to inner emitters that are then merged in the output. Order of the output is not guaranteed to be related to the order of the input.

func Min

func Min[T constraints.Ordered]() Operator[T, T]

Min emits the min element of the input.

func PairWise

func PairWise[T any]() Operator[T, [2]T]

PairWise emits couples of subcessive emissions. This means all values are emitted twice, once as the second item of the pair, and then as the first item of the pair (the oldest is the one at index 0, as if it was a sliding window of length 2. The only exceptions are the first and last items, which will only be emitted once.

func ParallelMap

func ParallelMap[I, O any](maxParallelism int, project func(in I) O) Operator[I, O]

ParallelMap is like Map but runs the project function in parallel. Output order is not guaranteed to be related to input order. If maxParallelism is 0 or negative, the number of CPU is used.

func ParallelMapCancel

func ParallelMapCancel[I, O any](maxParallelism int, project func(context.Context, I) (o O, ok bool), cancelParent func()) Operator[I, O]

ParallelMapCancel is like ParallelMap, but allows the project function to abort operations.

func ParallelMapStable

func ParallelMapStable[I, O any](maxParallelism, maxWindow int, project func(in I) O) Operator[I, O]

ParallelMapStable is like ParallelMap, but it guarantees that the output is in the same order of the input. The maxWindow parameter is used to determine how large the sorting window should be. In other words: if the input at index N is still processing when the item at index N+maxWindow is read, ParallelMapStable will wait for item N to be done processing. Using a maxWindow smaller than maxParallelism is not useful as the maxParallelism will never be achieved. If maxParallelism is 0 or negative the number of CPUs is used instead. If maxWindow is 0 or negative a default is used.

func Reduce

func Reduce[I, O any](project func(accum O, in I) O, seed O) Operator[I, O]

Reduce is like ReduceAcc, but the accumulator is used as the output value.

func ReduceAcc

func ReduceAcc[I, O, A any](project func(accum A, in I) (newAccum A, out O), seed A) Operator[I, O]

ReduceAcc scans the input calling the project function with the previous value for the accumulator and the current value for the input. Once the input is closed, it emits the last output value, or nothing if the input never emitted.

func ReduceFunc

func ReduceFunc[I, O any](project func(accum O, in I) O, seed func(in I) O) Operator[I, O]

ReduceFunc calls the seed func to compute the first accumulator value, then scans the input subcessively calling project. Once the input is closed, it emits the last output value, or nothing if the input never emitted.

func ReduceFuncAcc

func ReduceFuncAcc[I, O, A any](project func(accum A, in I) (A, O), seed func(in I) (A, O)) Operator[I, O]

ReduceFuncAcc calls the seed func to compute the first accumulator value, then scans the input subcessively calling project. Once the input is closed, it emits the last output value, or nothing if the input never emitted.

func Sample

func Sample[T, D any](emit <-chan D) Operator[T, T]

Sample emits the latest value from the input when the provided emitter emits. If no emission from the input has happened between consecutive requests to emit, they are ignored. In other words, no input elements are emitted more than once. If this behavior is not desired, please use Audit instead.

func Scan

func Scan[I, O any](project func(accum O, cur I) O, seed O) Operator[I, O]

Scan is like Map, but the project function is called with the the last value emitted and the current value from the input. The first iteraion will be called with seed.

func ScanAccum

func ScanAccum[I, O, A any](project func(accum A, cur I) (nextAccum A, o O), seed A) Operator[I, O]

ScanAccum is like Scan, but the accumulator is kept separate from the return value, and they can be of different types.

func ScanPreamble

func ScanPreamble[I, O, A any](project func(accum A, in I) (A, O), seedFn func(in I) (A, O)) Operator[I, O]

ScanPreamble is like ScanAccum, but instead of taking a seed value it computes the seed and the first emission by calling seedFn func on the first input value.

func Skip

func Skip[T any](count int) Operator[T, T]

Skip discards the first count emissions from the input, then it becomes the copy of the input.

func SkipLast

func SkipLast[T any](count int) Operator[T, T]

SkipLast skips the last count items from the input. Since it needs to keep a buffer, SkipLast will wait until count items are emitted or the input is closed before emitting the first value, and then it becomes a delayed copy of the input that will not emit the last count items.

func StartWith

func StartWith[T any](initial T) Operator[T, T]

StartWith emits the initial value and then it becomes a copy of the input.

func SwitchMap

func SwitchMap[I, O any](ctx context.Context, project func(ctx context.Context, in I) <-chan O) Operator[I, O]

SwitchMap projects the inputs to inner emitters and copies them to the output until the input emits a new value. When a new input value comes in, the last inner emitter's context is cancelled and its values are discarded in favor of the ones emitted by the most recent inner emitter. This is like ExhaustMap but the input is prioritized.

func Take

func Take[T any](count int, cancelParent func()) Operator[T, T]

Take forwards the first count items of the input to the output, then it ends.

func TakeUntil

func TakeUntil[T, D any](emit <-chan D, cancelParent func()) Operator[T, T]

TakeUntil forwards the input to the output until the first emission (or closure) of emit, then it ends.

func Tap

func Tap[T any](observer func(T)) Operator[T, T]

Tap calls the observer for every value emitted by the input and forwards that value to the output once observer returns.

func Teardown

func Teardown[T any](deferred func(last T, emitted bool) (additional T, emitAdditional bool)) Operator[T, T]

Teardown returns an emitter that is the copy of the input, but it calls deferred at the end of the input with the last emitted value, or with the zero value and emitted set to false. The deferred func can optionally return a value to emit as a last value.

func Timeout

func Timeout[T any](c Clock, maxDuration time.Duration, cancelParent func()) Operator[T, T]

Timeout creates a copy of the input unless the input takes more than maxDuration to emit the first value, in which case the output is closed.

func Window

func Window[T, D any](emit <-chan D) Operator[T, <-chan T]

Window maps the input to a series of emitters that just copy the input values. Every time emit emits a new inner emitter that copies the input values is emitted.

In other words this is like buffer, but instead of emitting slices of values it emits emitters of these values.

Consecutive emissions that emit no values do not cause empty emitters to be generated.

func WindowCount

func WindowCount[T any](count int) Operator[T, <-chan T]

WindowCount is like Window but it starts a new inner emitter every count emissions received from the input. The last inner emitter might emit less than count items.

func WithLatestFrom

func WithLatestFrom[I, L any](other <-chan L, cancelOther func()) Operator[I, Pair[I, L]]

WithLatestFrom starts emitting when both the input and the other emitter have emitted at least once. It then emits every time the input emits, pairing that value with the latest coming from the other one. If the other emitter is closed, its last emitted value will still be used until the input is closed. If the other emitter never emits, the returned emitter will also never emit and end when the input ends.

type Pair

type Pair[A, B any] struct {
	A A
	B B
}

type ParallelOperator

type ParallelOperator[I, O any] func([]<-chan I) []<-chan O

type RealClock

type RealClock struct{}

func NewRealClock

func NewRealClock() RealClock

func (RealClock) After

func (RealClock) After(d time.Duration) <-chan time.Time

type RealTicker

type RealTicker struct {
	// contains filtered or unexported fields
}

func (RealTicker) Chan

func (r RealTicker) Chan() <-chan time.Time

func (RealTicker) Stop

func (r RealTicker) Stop()

type SinkOperator

type SinkOperator[T any] func(<-chan T)

func Collect

func Collect[T any](consume func(T) (ok bool), cancelParent func()) SinkOperator[T]

Collect calls the consume func once per every input item. If the consume function returns false, it cancels the parent, discards the input and exits.

func Discard

func Discard[D any]() SinkOperator[D]

Discard spawns a goroutine that consumes the input and returns immediately. If a nil input is passed, it's ignored.

func ToFileLines

func ToFileLines[I any](path string, errs ErrorSink, cancelParent func()) SinkOperator[I]

ToFileLines truncates or creates the file at the specified path and writes to it all inputs as lines (using fmt.Fprintln).

func Wait

func Wait[D any]() SinkOperator[D]

Wait blocks until the input is closed.

type SourceOperator

type SourceOperator[T any] func() <-chan T

func FromFileLines

func FromFileLines(path string, errs ErrorSink) SourceOperator[string]

FromFileLines is like FromReaderLines but for files.

func FromFunc

func FromFunc[T any](generator func(index int) (t T, ok bool)) SourceOperator[T]

FromFunc uses the provided generator as a source for data, and emits all values until ok becomes false (the element returned when ok is false is not emitted).

func FromRange

func FromRange(start, end int) SourceOperator[int]

FromRange emits all values between start (included) and end (excluded).

func FromReaderLines

func FromReaderLines(r io.Reader, errs chan<- error) SourceOperator[string]

FromReaderLines emits all of the lines of the reader. If an error is encountered it's emitted on the errs chan, otherwise a nil one is emitted when the read is finished.

func FromSlice

func FromSlice[T any](s []T) SourceOperator[T]

FromSlice emits all values in s and ends.

func FromTicker

func FromTicker(ctx context.Context, tickerFactory func(time.Duration) Ticker, duration time.Duration, max int) SourceOperator[time.Time]

FromTicker emits the current time at most max times, separated by a wait of duration. If the context is cancelled it stops. If tickerFactory is nil, the real time is used.

type SplitOperator

type SplitOperator[I, O, P any] func(<-chan I) (<-chan O, <-chan P)

func Partition

func Partition[T any](condition func(t T) bool) SplitOperator[T, T, T]

Partition emits values that match the condition on the first output (then) and the values that don't on the second output (elze).

func Tee

func Tee[T any]() SplitOperator[T, T, T]

Tee duplicates the input to the two outputs. Tee guarantees that input scanning only proceeds if both copies of the previous input have been received.

type Ticker

type Ticker interface {
	Stop()
	Chan() <-chan time.Time
}

Ticker is an interface to abstract over a time.Ticker.

func NewRealTicker

func NewRealTicker(d time.Duration) Ticker

type Triplet

type Triplet[A, B, C any] struct {
	A A
	B B
	C C
}

type ZipOperator

type ZipOperator[A, B, O any] func(<-chan A, <-chan B) <-chan O

func CombineLatest

func CombineLatest[A, B any]() ZipOperator[A, B, Pair[A, B]]

CombineLatest emits pairs of the last values emitted by the given inputs.

That means that if input a emits twice and input b emits once, the two subsequent emitted pairs will have the same value for B, but different ones for A.

Emissions only start after both inputs have emitted at least once, all emissions before then for the input that emitted first are discarded except for the last value.

CombineLatest ends when both inputs end.

func ForkJoin

func ForkJoin[A, B any]() ZipOperator[A, B, Pair[A, B]]

func Zip

func Zip[A, B any](cancelA, cancelB func()) ZipOperator[A, B, Pair[A, B]]

Zip emits pairs of values from the inputs. Once a value is received, it waits for the other input to emit one before reading more from the same.

In other words: every value from both inputs is guaranteed to be emitted exactly once in exactly one pair until one input is closed.

If one input is closed, the other is cancelled and discarded.

If both inputs have the same length, the second one to be closed might get cancelled right after its last emission, which should be a no-op.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL