Documentation ¶
Index ¶
- Constants
- func FilterWithError[T any](size int, filter func(T) (bool, error), in <-chan T) (ChanPull[T], ChanPull[error])
- func MapWithError[T any, N any](size int, mp func(T) (N, error), in <-chan T) (ChanPull[N], ChanPull[error])
- func Reduce[T any, Acc any](reduce func(T, Acc) Acc, acc Acc, in <-chan T) Acc
- func Router[T any, N comparable](size int, matches []N, compare func(T) N, in <-chan T) ([]ChanPull[T], ChanPull[T])
- func Sink[T any](sink func(T), in <-chan T)
- func SinkWithErrorSink[T any](sink func(T) error, errSink func(error), in <-chan T)
- func SourceWithError[T any](repeat, size int, source func() (T, error)) (ChanPull[T], ChanPull[error])
- func TapWithError[T any](size int, tap func(T) error, in <-chan T) (ChanPull[T], ChanPull[error])
- type Chan
- func (c Chan[T]) ChanPull() ChanPull[T]
- func (c Chan[T]) ChanPush() ChanPush[T]
- func (c Chan[T]) ChanPushPull() (ChanPush[T], ChanPull[T])
- func (c Chan[T]) Close()
- func (c Chan[T]) Distribute(size, count int, choose func(T) int) []ChanPull[T]
- func (c Chan[T]) Drain()
- func (c Chan[T]) FanOut(count, size int) []ChanPull[T]
- func (c Chan[T]) Filter(size int, filter func(T) bool) ChanPull[T]
- func (c Chan[T]) FilterWithError(size int, filter func(T) (bool, error)) (ChanPull[T], ChanPull[error])
- func (c Chan[T]) FilterWithErrorSink(size int, filter func(T) (bool, error), sink func(error)) ChanPull[T]
- func (c Chan[T]) Map(size int, mp func(T) any) ChanPull[any]
- func (c Chan[T]) MapWithError(size int, mp func(T) (any, error)) (ChanPull[any], ChanPull[error])
- func (c Chan[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]
- func (c Chan[T]) Pull() T
- func (c Chan[T]) PullSafe() (t T, ok bool)
- func (c Chan[T]) Push(t T)
- func (c Chan[T]) Reduce(reduce func(T, any) any, acc any) any
- func (c Chan[T]) ReduceAndEmit(reduce func(T, any) any, acc any, in <-chan T) ChanPull[any]
- func (c Chan[T]) RoundRobin(size, count int) []ChanPull[T]
- func (c Chan[T]) Sink(sink func(T))
- func (c Chan[T]) SinkWithError(size int, sink func(T) error) ChanPull[error]
- func (c Chan[T]) SinkWithErrorSink(sink func(T) error, errSink func(error))
- func (c Chan[T]) Tap(size int, tap func(T)) ChanPull[T]
- func (c Chan[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error])
- func (c Chan[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T]
- func (c Chan[T]) TryPull() (t T, ok bool)
- func (c Chan[T]) TryPush(t T) (ok bool)
- func (c Chan[T]) Wait()
- func (c Chan[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]
- type ChanPull
- func Distribute[T any](size, count int, choose func(T) int, in <-chan T) []ChanPull[T]
- func FanIn[T any](size int, ins ...<-chan T) ChanPull[T]
- func FanOut[T any](count, size int, in <-chan T) []ChanPull[T]
- func Filter[T any](size int, filter func(T) bool, in <-chan T) ChanPull[T]
- func FilterWithErrorSink[T any](size int, filter func(T) (bool, error), sink func(error), in <-chan T) ChanPull[T]
- func Map[T any, N any](size int, mp func(T) N, in <-chan T) ChanPull[N]
- func MapWithErrorSink[T any, N any](size int, mp func(T) (N, error), sink func(error), in <-chan T) ChanPull[N]
- func ReduceAndEmit[T any, Acc any](reduce func(T, Acc) Acc, acc Acc, in <-chan T) ChanPull[Acc]
- func RoundRobin[T any](size, count int, in <-chan T) []ChanPull[T]
- func RouterWithSink[T any, N comparable](size int, matches []N, compare func(T) N, sink func(T), in <-chan T) []ChanPull[T]
- func SinkWithError[T any](size int, sink func(T) error, in <-chan T) ChanPull[error]
- func Source[T any](repeat, size int, source func() T) ChanPull[T]
- func SourceWithErrorSink[T any](repeat, size int, source func() (T, error), sink func(error)) ChanPull[T]
- func Tap[T any](size int, tap func(T), in <-chan T) ChanPull[T]
- func TapWithErrorSink[T any](size int, tap func(T) error, sink func(error), in <-chan T) ChanPull[T]
- func Window[T any, Acc any](size int, window time.Duration, reduce func(T, Acc) Acc, acc func() Acc, ...) ChanPull[Acc]
- func (c ChanPull[T]) Distribute(size, count int, choose func(T) int) []ChanPull[T]
- func (c ChanPull[T]) Drain()
- func (c ChanPull[T]) FanOut(count, size int) []ChanPull[T]
- func (c ChanPull[T]) Filter(size int, filter func(T) bool) ChanPull[T]
- func (c ChanPull[T]) FilterWithError(size int, filter func(T) (bool, error)) (ChanPull[T], ChanPull[error])
- func (c ChanPull[T]) FilterWithErrorSink(size int, filter func(T) (bool, error), sink func(error)) ChanPull[T]
- func (c ChanPull[T]) Map(size int, mp func(T) any) ChanPull[any]
- func (c ChanPull[T]) MapWithError(size int, mp func(T) (any, error)) (ChanPull[any], ChanPull[error])
- func (c ChanPull[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]
- func (c ChanPull[T]) Pull() T
- func (c ChanPull[T]) PullSafe() (t T, ok bool)
- func (c ChanPull[T]) Reduce(reduce func(T, any) any, acc any) any
- func (c ChanPull[T]) ReduceAndEmit(reduce func(T, any) any, acc any, in <-chan T) ChanPull[any]
- func (c ChanPull[T]) RoundRobin(size, count int) []ChanPull[T]
- func (c ChanPull[T]) Sink(sink func(T))
- func (c ChanPull[T]) SinkWithError(size int, sink func(T) error) ChanPull[error]
- func (c ChanPull[T]) SinkWithErrorSink(sink func(T) error, errSink func(error))
- func (c ChanPull[T]) Tap(size int, tap func(T)) ChanPull[T]
- func (c ChanPull[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error])
- func (c ChanPull[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T]
- func (c ChanPull[T]) TryPull() (t T, ok bool)
- func (c ChanPull[T]) Wait()
- func (c ChanPull[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]
- type ChanPush
Constants ¶
const RepeatForever = -1
Variables ¶
This section is empty.
Functions ¶
func FilterWithError ¶
func MapWithError ¶
func Router ¶
func Router[T any, N comparable](size int, matches []N, compare func(T) N, in <-chan T) ([]ChanPull[T], ChanPull[T])
func SinkWithErrorSink ¶
func SourceWithError ¶
Types ¶
type Chan ¶
type Chan[T any] chan T
func New ¶
New returns a new Chan with the given type T. This is essentially a chan T and can be used the same way as one would use a channel in Go under normal syntax usages. However this variant has methods for functional operations common to the use and lifecycle of channels.
Passing a len of 0 will create an unbuffered channel.
func (Chan[T]) ChanPull ¶
ChanPull is a zero cost conversion of Chan[T] to it's ChanPull[T] variant.
func (Chan[T]) ChanPush ¶
ChanPush is a zero cost conversion of Chan[T] to it's ChanPush[T] variant.
func (Chan[T]) ChanPushPull ¶
ChanPull is a zero cost conversion of Chan[T] to it's ChanPush[T] and ChanPull[T] variants.
func (Chan[T]) Close ¶
func (c Chan[T]) Close()
Close closes the channel. Any attempts to push to a closed channel will panic. Closing an already closed channel will return immeadiately.
func (Chan[T]) Distribute ¶
func (Chan[T]) Drain ¶
func (c Chan[T]) Drain()
Drain is a blocking operation that iterates over the channel discarding values until the channel is closed and no further elements remain. This returns immeadiately if the channel is closed or nil.
func (Chan[T]) FilterWithError ¶
func (Chan[T]) FilterWithErrorSink ¶
func (Chan[T]) Map ¶
Map returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Map` function directly.
func (Chan[T]) MapWithError ¶
MapWithError returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithError` function directly.
func (Chan[T]) MapWithErrorSink ¶
func (c Chan[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]
MapWithErrorSink returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithErrorSink` function directly.
func (Chan[T]) Pull ¶
func (c Chan[T]) Pull() T
Pull is a blocking operation that pulls a T from the channel if available. This blocks while no T is available. If the channel is closed and empty, or nil, this will return a zero version of the T type.
func (Chan[T]) PullSafe ¶
PullSafe is a blocking operation that pulls a T from the channel if available. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.
func (Chan[T]) Push ¶
func (c Chan[T]) Push(t T)
Push is a blocking operation that pushes a T onto the channel. This blocks while the channel is full or nil. This will panic if the channel is closed.
func (Chan[T]) Reduce ¶
Reduce returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Reduce` function directly.
func (Chan[T]) ReduceAndEmit ¶
ReduceAndEmit returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `ReduceAndEmit` function directly.
func (Chan[T]) RoundRobin ¶
func (Chan[T]) SinkWithError ¶
func (Chan[T]) SinkWithErrorSink ¶
func (Chan[T]) TapWithError ¶
func (Chan[T]) TapWithErrorSink ¶
func (Chan[T]) TryPull ¶
TryPull is a non-blocking operation that attempts to pull a T from the channel. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.
func (Chan[T]) TryPush ¶
TryPush is a non-blocking operation that attempts to push a T onto the channel. This returns true if the T was successfully pushed, false if the channel was blocked or nil. It is exceedingly unlikely that you will ever successfully push onto an unbuffered channel as this requires the scheduler to have a recieveing goroutine ready.
func (Chan[T]) Wait ¶
func (c Chan[T]) Wait()
Wait is a blocking operation that waits for a value to be returned from the channel. If the channel is closed or nil this will immeadiately return.
func (Chan[T]) Window ¶
func (c Chan[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]
Window returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Window` function directly.
type ChanPull ¶
type ChanPull[T any] <-chan T
func Distribute ¶
func FanIn ¶
FanIn is a non-blocking operation that creates len(ins) goroutines and forwards each T read onto the returned push only channel of specified size. Each goroutine will exit after it's assigned input channel is closed and emptied. The last goroutine will close the returned pull only channel to signal completion of processing.
func FilterWithErrorSink ¶
func MapWithErrorSink ¶
func ReduceAndEmit ¶
func RoundRobin ¶
func RouterWithSink ¶
func RouterWithSink[T any, N comparable](size int, matches []N, compare func(T) N, sink func(T), in <-chan T) []ChanPull[T]
func SinkWithError ¶
func SourceWithErrorSink ¶
func TapWithErrorSink ¶
func Window ¶
func Window[T any, Acc any](size int, window time.Duration, reduce func(T, Acc) Acc, acc func() Acc, in <-chan T) ChanPull[Acc]
TODO: Decide if Window's reduce func should take a time.Time object as well and will be passed the "tick" from the ticker for use internally for structuring the Acc being emitted.
func (ChanPull[T]) Distribute ¶
func (ChanPull[T]) Drain ¶
func (c ChanPull[T]) Drain()
Drain is a blocking operation that iterates over the channel discarding values until the channel is closed and no further elements remain. This returns immeadiately if the channel is closed or nil.
func (ChanPull[T]) FilterWithError ¶
func (ChanPull[T]) FilterWithErrorSink ¶
func (ChanPull[T]) Map ¶
Map returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Map` function directly.
func (ChanPull[T]) MapWithError ¶
func (c ChanPull[T]) MapWithError(size int, mp func(T) (any, error)) (ChanPull[any], ChanPull[error])
MapWithError returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithError` function directly.
func (ChanPull[T]) MapWithErrorSink ¶
func (c ChanPull[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]
MapWithErrorSink returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithErrorSink` function directly.
func (ChanPull[T]) Pull ¶
func (c ChanPull[T]) Pull() T
Pull is a blocking operation that pulls a T from the channel if available. This blocks while no T is available. If the channel is closed and empty, or nil, this will return a zero version of the T type.
func (ChanPull[T]) PullSafe ¶
PullSafe is a blocking operation that pulls a T from the channel if available. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.
func (ChanPull[T]) Reduce ¶
Reduce returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Reduce` function directly.
func (ChanPull[T]) ReduceAndEmit ¶
ReduceAndEmit returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `ReduceAndEmit` function directly.
func (ChanPull[T]) RoundRobin ¶
func (ChanPull[T]) SinkWithError ¶
func (ChanPull[T]) SinkWithErrorSink ¶
func (ChanPull[T]) TapWithError ¶
func (ChanPull[T]) TapWithErrorSink ¶
func (ChanPull[T]) TryPull ¶
TryPull is a non-blocking operation that attempts to pull a T from the channel. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.
func (ChanPull[T]) Wait ¶
func (c ChanPull[T]) Wait()
Wait is a blocking operation that waits for a value to be returned from the channel. If the channel is closed or nil this will immeadiately return.
func (ChanPull[T]) Window ¶
func (c ChanPull[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]
Window returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Window` function directly.
type ChanPush ¶
type ChanPush[T any] chan<- T
func (ChanPush[T]) Close ¶
func (c ChanPush[T]) Close()
Close closes the channel. Any attempts to push to a closed channel will panic. Closing an already closed channel will return immeadiately.
func (ChanPush[T]) Push ¶
func (c ChanPush[T]) Push(t T)
Push is a blocking operation that pushes a T onto the channel. This blocks while the channel is full or nil. This will panic if the channel is closed.
func (ChanPush[T]) TryPush ¶
TryPush is a non-blocking operation that attempts to push a T onto the channel. This returns true if the T was successfully pushed, false if the channel was blocked or nil. It is exceedingly unlikely that you will ever successfully push onto an unbuffered channel as this requires the scheduler to have a recieveing goroutine ready.