pipes

package module
v0.0.0-...-2b134be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2022 License: MIT Imports: 2 Imported by: 0

README

Pipes - Type Safe Functional Channels

This library makes use of the newly released Go Generics in v1.18 to enable the melding of functional programming paradigms, and common Go function call patterns, all over top of Go's excellent channels implementation in a type safe manner. Additionally, this library manages the lifecycle of channels and goroutines in a clear and easily reasoned manner, alleviating the details of implementation from the library user.

Dependencies

This library currently only imports the standard library and has no 3rd party dependencies. The current goal is to maintain this as long as possible. However, I am not opposed to importing 3rd party libraries for testing purposes or if a good case is made for the main libraries and it is not more appropriate for the implementation to exist in another repository. This is intended to be a straightforward implementation only at this time.

Versioning

This library is a work in progress and will be released as v0.0.1 once:

  • A stable-ish API exists in at least the pipes package.
  • 80-100% test coverage of at least the pipes package.
  • 80-100% test coverage of all known "gotcha" issues with channels.
  • General documentation is available if not explicitly completed.
  • Documentation of channel "gotcha" issues with channels.

Usage

Future home of README.md library usage documentation... I'm getting to it. For now follow the soon to be completed first pass of the core pipes package.

Examples

This library includes an examples folder containing functional, if sometimes contrived, example implementations of library functionality.

Contributors and the Curious

Contributions are welcome at this time, but inclusion will be selective in scope to the near term core goals of the project until a more stable version is released and we can look at next stages if even appropriate.

Decisions to make (RFC)

Feel free to open an issue to discuss one of the below if one does not already exist. This should provide an opportunity to a documented discussion of the issue more in depth at the time it becomes a requested feature.
  1. If our functionality is passed a nil chan we should "gracefully" skip launching a worker for it, thus preventing accidental blocked reads for no actual useful reason. This should allow for channel lifecycle management as expected.

    • The Chan[T], ChanPull[T], and ChanPush[T] should continue to implement the same functionality as expected from standard channel usage in Go on their associated API methods.
    • This behavior however does not make any effort to warn the library user that the channel was nil and that this is likely a bug in their implementation. We should decide whether we should implement this behavior with or without the ability to indicate this issue to the library user.
  2. Parameter ordering on methods and functions should be made consistent and friendly to the library user.

    • Nothing fancy going on here. We just want to make things nice to work with generally.
  3. Does any of our implementation need to be moved into an internal package?

    • Generally speaking I want to make every package a public API and not make use of internal. However this may be a better solution to gathering common worker code patterns as these may only be interesting to any library contributors.

Decisions made

Feel free to open an issue to provide an alternate implementation or just a discussion of the issue more in depth if one does not already exist. This should provide an opportunity to a documented discussion of the issue more in depth at the time it becomes a requested feature.
  1. Some functionality on Chan[T] and ChanPull[T] cannot be implemented in a type safe manner currently due to the lack of support for parameterized methods. The Functional based implementation should be used instead to ensure type safe implementations.

    References:

    Affected methods at time of writing are below but may not represent a full list:

    • Map: currently implemented on Chan[T] and ChanPull[T] works with any as it's mutated parameter. Use the Map function instead if type safety is needed.
    • MapWithError: currently implemented on Chan[T] and ChanPull[T] works with any as it's mutated parameter. Use the MapWithError function instead if type safety is needed.
    • MapWithErrorSink: currently implemented on Chan[T] and ChanPull[T] works with any as it's mutated parameter. Use the MapWithErrorSink function instead if type safety is needed.
    • Reduce: currently implemented on Chan[T] and ChanPull[T] works with any as it's accumulator parameter. Use the Reduce function instead if type safety is needed.
    • ReduceAndEmit: currently implemented on Chan[T] and ChanPull[T] works with any as it's accumulator parameter. Use the ReduceAndEmit function instead if type safety is needed.
    • Window: currently implemented on Chan[T] and ChanPull[T] works with any as it's accumulator parameter. Use the Window function instead if type safety is needed.
    • Router: currently NOT implemented on any Chan*[T] as we cannot easily make use of the comparable constraint. Use the Router function instead.
    • RouterWithSink: currently NOT implemented on any Chan*[T] as we cannot easily make use of the comparable constraint. Use the RouterWithSink function instead.
  2. FanIn will not be able to be used on the Chan[T] and ChanPush[T] types as FanIn as implemented currently will always close the out channel. This complicates the reasoning of the channel lifecycle when used from the perspective of Chan[T] and ChanPush[T].

Documentation

Index

Constants

View Source
const RepeatForever = -1

Variables

This section is empty.

Functions

func FilterWithError

func FilterWithError[T any](size int, filter func(T) (bool, error), in <-chan T) (ChanPull[T], ChanPull[error])

func MapWithError

func MapWithError[T any, N any](size int, mp func(T) (N, error), in <-chan T) (ChanPull[N], ChanPull[error])

func Reduce

func Reduce[T any, Acc any](reduce func(T, Acc) Acc, acc Acc, in <-chan T) Acc

func Router

func Router[T any, N comparable](size int, matches []N, compare func(T) N, in <-chan T) ([]ChanPull[T], ChanPull[T])

func Sink

func Sink[T any](sink func(T), in <-chan T)

func SinkWithErrorSink

func SinkWithErrorSink[T any](sink func(T) error, errSink func(error), in <-chan T)

func SourceWithError

func SourceWithError[T any](repeat, size int, source func() (T, error)) (ChanPull[T], ChanPull[error])

func TapWithError

func TapWithError[T any](size int, tap func(T) error, in <-chan T) (ChanPull[T], ChanPull[error])

Types

type Chan

type Chan[T any] chan T

func New

func New[T any](len int) Chan[T]

New returns a new Chan with the given type T. This is essentially a chan T and can be used the same way as one would use a channel in Go under normal syntax usages. However this variant has methods for functional operations common to the use and lifecycle of channels.

Passing a len of 0 will create an unbuffered channel.

func (Chan[T]) ChanPull

func (c Chan[T]) ChanPull() ChanPull[T]

ChanPull is a zero cost conversion of Chan[T] to it's ChanPull[T] variant.

func (Chan[T]) ChanPush

func (c Chan[T]) ChanPush() ChanPush[T]

ChanPush is a zero cost conversion of Chan[T] to it's ChanPush[T] variant.

func (Chan[T]) ChanPushPull

func (c Chan[T]) ChanPushPull() (ChanPush[T], ChanPull[T])

ChanPull is a zero cost conversion of Chan[T] to it's ChanPush[T] and ChanPull[T] variants.

func (Chan[T]) Close

func (c Chan[T]) Close()

Close closes the channel. Any attempts to push to a closed channel will panic. Closing an already closed channel will return immeadiately.

func (Chan[T]) Distribute

func (c Chan[T]) Distribute(size, count int, choose func(T) int) []ChanPull[T]

func (Chan[T]) Drain

func (c Chan[T]) Drain()

Drain is a blocking operation that iterates over the channel discarding values until the channel is closed and no further elements remain. This returns immeadiately if the channel is closed or nil.

func (Chan[T]) FanOut

func (c Chan[T]) FanOut(count, size int) []ChanPull[T]

func (Chan[T]) Filter

func (c Chan[T]) Filter(size int, filter func(T) bool) ChanPull[T]

func (Chan[T]) FilterWithError

func (c Chan[T]) FilterWithError(size int, filter func(T) (bool, error)) (ChanPull[T], ChanPull[error])

func (Chan[T]) FilterWithErrorSink

func (c Chan[T]) FilterWithErrorSink(size int, filter func(T) (bool, error), sink func(error)) ChanPull[T]

func (Chan[T]) Map

func (c Chan[T]) Map(size int, mp func(T) any) ChanPull[any]

Map returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Map` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (Chan[T]) MapWithError

func (c Chan[T]) MapWithError(size int, mp func(T) (any, error)) (ChanPull[any], ChanPull[error])

MapWithError returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithError` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (Chan[T]) MapWithErrorSink

func (c Chan[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]

MapWithErrorSink returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithErrorSink` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (Chan[T]) Pull

func (c Chan[T]) Pull() T

Pull is a blocking operation that pulls a T from the channel if available. This blocks while no T is available. If the channel is closed and empty, or nil, this will return a zero version of the T type.

func (Chan[T]) PullSafe

func (c Chan[T]) PullSafe() (t T, ok bool)

PullSafe is a blocking operation that pulls a T from the channel if available. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.

func (Chan[T]) Push

func (c Chan[T]) Push(t T)

Push is a blocking operation that pushes a T onto the channel. This blocks while the channel is full or nil. This will panic if the channel is closed.

func (Chan[T]) Reduce

func (c Chan[T]) Reduce(reduce func(T, any) any, acc any) any

Reduce returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Reduce` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (Chan[T]) ReduceAndEmit

func (c Chan[T]) ReduceAndEmit(reduce func(T, any) any, acc any, in <-chan T) ChanPull[any]

ReduceAndEmit returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `ReduceAndEmit` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (Chan[T]) RoundRobin

func (c Chan[T]) RoundRobin(size, count int) []ChanPull[T]

func (Chan[T]) Sink

func (c Chan[T]) Sink(sink func(T))

func (Chan[T]) SinkWithError

func (c Chan[T]) SinkWithError(size int, sink func(T) error) ChanPull[error]

func (Chan[T]) SinkWithErrorSink

func (c Chan[T]) SinkWithErrorSink(sink func(T) error, errSink func(error))

func (Chan[T]) Tap

func (c Chan[T]) Tap(size int, tap func(T)) ChanPull[T]

func (Chan[T]) TapWithError

func (c Chan[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error])

func (Chan[T]) TapWithErrorSink

func (c Chan[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T]

func (Chan[T]) TryPull

func (c Chan[T]) TryPull() (t T, ok bool)

TryPull is a non-blocking operation that attempts to pull a T from the channel. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.

func (Chan[T]) TryPush

func (c Chan[T]) TryPush(t T) (ok bool)

TryPush is a non-blocking operation that attempts to push a T onto the channel. This returns true if the T was successfully pushed, false if the channel was blocked or nil. It is exceedingly unlikely that you will ever successfully push onto an unbuffered channel as this requires the scheduler to have a recieveing goroutine ready.

func (Chan[T]) Wait

func (c Chan[T]) Wait()

Wait is a blocking operation that waits for a value to be returned from the channel. If the channel is closed or nil this will immeadiately return.

func (Chan[T]) Window

func (c Chan[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]

Window returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Window` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

type ChanPull

type ChanPull[T any] <-chan T

func Distribute

func Distribute[T any](size, count int, choose func(T) int, in <-chan T) []ChanPull[T]

func FanIn

func FanIn[T any](size int, ins ...<-chan T) ChanPull[T]

FanIn is a non-blocking operation that creates len(ins) goroutines and forwards each T read onto the returned push only channel of specified size. Each goroutine will exit after it's assigned input channel is closed and emptied. The last goroutine will close the returned pull only channel to signal completion of processing.

func FanOut

func FanOut[T any](count, size int, in <-chan T) []ChanPull[T]

func Filter

func Filter[T any](size int, filter func(T) bool, in <-chan T) ChanPull[T]

func FilterWithErrorSink

func FilterWithErrorSink[T any](size int, filter func(T) (bool, error), sink func(error), in <-chan T) ChanPull[T]

func Map

func Map[T any, N any](size int, mp func(T) N, in <-chan T) ChanPull[N]

func MapWithErrorSink

func MapWithErrorSink[T any, N any](size int, mp func(T) (N, error), sink func(error), in <-chan T) ChanPull[N]

func ReduceAndEmit

func ReduceAndEmit[T any, Acc any](reduce func(T, Acc) Acc, acc Acc, in <-chan T) ChanPull[Acc]

func RoundRobin

func RoundRobin[T any](size, count int, in <-chan T) []ChanPull[T]

func RouterWithSink

func RouterWithSink[T any, N comparable](size int, matches []N, compare func(T) N, sink func(T), in <-chan T) []ChanPull[T]

func SinkWithError

func SinkWithError[T any](size int, sink func(T) error, in <-chan T) ChanPull[error]

func Source

func Source[T any](repeat, size int, source func() T) ChanPull[T]

func SourceWithErrorSink

func SourceWithErrorSink[T any](repeat, size int, source func() (T, error), sink func(error)) ChanPull[T]

func Tap

func Tap[T any](size int, tap func(T), in <-chan T) ChanPull[T]

func TapWithErrorSink

func TapWithErrorSink[T any](size int, tap func(T) error, sink func(error), in <-chan T) ChanPull[T]

func Window

func Window[T any, Acc any](size int, window time.Duration, reduce func(T, Acc) Acc, acc func() Acc, in <-chan T) ChanPull[Acc]

TODO: Decide if Window's reduce func should take a time.Time object as well and will be passed the "tick" from the ticker for use internally for structuring the Acc being emitted.

func (ChanPull[T]) Distribute

func (c ChanPull[T]) Distribute(size, count int, choose func(T) int) []ChanPull[T]

func (ChanPull[T]) Drain

func (c ChanPull[T]) Drain()

Drain is a blocking operation that iterates over the channel discarding values until the channel is closed and no further elements remain. This returns immeadiately if the channel is closed or nil.

func (ChanPull[T]) FanOut

func (c ChanPull[T]) FanOut(count, size int) []ChanPull[T]

func (ChanPull[T]) Filter

func (c ChanPull[T]) Filter(size int, filter func(T) bool) ChanPull[T]

func (ChanPull[T]) FilterWithError

func (c ChanPull[T]) FilterWithError(size int, filter func(T) (bool, error)) (ChanPull[T], ChanPull[error])

func (ChanPull[T]) FilterWithErrorSink

func (c ChanPull[T]) FilterWithErrorSink(size int, filter func(T) (bool, error), sink func(error)) ChanPull[T]

func (ChanPull[T]) Map

func (c ChanPull[T]) Map(size int, mp func(T) any) ChanPull[any]

Map returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Map` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (ChanPull[T]) MapWithError

func (c ChanPull[T]) MapWithError(size int, mp func(T) (any, error)) (ChanPull[any], ChanPull[error])

MapWithError returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithError` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (ChanPull[T]) MapWithErrorSink

func (c ChanPull[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(error)) ChanPull[any]

MapWithErrorSink returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `MapWithErrorSink` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (ChanPull[T]) Pull

func (c ChanPull[T]) Pull() T

Pull is a blocking operation that pulls a T from the channel if available. This blocks while no T is available. If the channel is closed and empty, or nil, this will return a zero version of the T type.

func (ChanPull[T]) PullSafe

func (c ChanPull[T]) PullSafe() (t T, ok bool)

PullSafe is a blocking operation that pulls a T from the channel if available. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.

func (ChanPull[T]) Reduce

func (c ChanPull[T]) Reduce(reduce func(T, any) any, acc any) any

Reduce returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Reduce` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (ChanPull[T]) ReduceAndEmit

func (c ChanPull[T]) ReduceAndEmit(reduce func(T, any) any, acc any, in <-chan T) ChanPull[any]

ReduceAndEmit returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `ReduceAndEmit` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

func (ChanPull[T]) RoundRobin

func (c ChanPull[T]) RoundRobin(size, count int) []ChanPull[T]

func (ChanPull[T]) Sink

func (c ChanPull[T]) Sink(sink func(T))

func (ChanPull[T]) SinkWithError

func (c ChanPull[T]) SinkWithError(size int, sink func(T) error) ChanPull[error]

func (ChanPull[T]) SinkWithErrorSink

func (c ChanPull[T]) SinkWithErrorSink(sink func(T) error, errSink func(error))

func (ChanPull[T]) Tap

func (c ChanPull[T]) Tap(size int, tap func(T)) ChanPull[T]

func (ChanPull[T]) TapWithError

func (c ChanPull[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error])

func (ChanPull[T]) TapWithErrorSink

func (c ChanPull[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T]

func (ChanPull[T]) TryPull

func (c ChanPull[T]) TryPull() (t T, ok bool)

TryPull is a non-blocking operation that attempts to pull a T from the channel. This returns true if the T returned is valid, false if the channel is closed and empty, or nil.

func (ChanPull[T]) Wait

func (c ChanPull[T]) Wait()

Wait is a blocking operation that waits for a value to be returned from the channel. If the channel is closed or nil this will immeadiately return.

func (ChanPull[T]) Window

func (c ChanPull[T]) Window(size int, window time.Duration, reduce func(T, any) any, acc func() any) ChanPull[any]

Window returns any as the type we transform to here due to generics not supporting method parameterization. If you need type safety here use the `Window` function directly.

ref: https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#No-parameterized-methods

type ChanPush

type ChanPush[T any] chan<- T

func (ChanPush[T]) Close

func (c ChanPush[T]) Close()

Close closes the channel. Any attempts to push to a closed channel will panic. Closing an already closed channel will return immeadiately.

func (ChanPush[T]) Push

func (c ChanPush[T]) Push(t T)

Push is a blocking operation that pushes a T onto the channel. This blocks while the channel is full or nil. This will panic if the channel is closed.

func (ChanPush[T]) TryPush

func (c ChanPush[T]) TryPush(t T) (ok bool)

TryPush is a non-blocking operation that attempts to push a T onto the channel. This returns true if the T was successfully pushed, false if the channel was blocked or nil. It is exceedingly unlikely that you will ever successfully push onto an unbuffered channel as this requires the scheduler to have a recieveing goroutine ready.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL