Documentation ¶
Overview ¶
Package stream provides a way to construct data processing streams from smaller pieces. The design is inspired by fs2 Scala library.
Index ¶
- Variables
- func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A]
- func ChunkN[A any](n int) func(sa Stream[A]) Stream[[]A]
- func ChunksResize[A any](newSize int) func(stm Stream[[]A]) Stream[[]A]
- func Collect[A any](stm Stream[A], collector func(A) error) io.IO[fun.Unit]
- func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit]
- func FanOut[A any, B any](stm Stream[A], handlers ...func(Stream[A]) io.IO[B]) io.IO[[]B]
- func FanOutOld[A any, B any](stm Stream[A], handlers ...func(Stream[A]) io.IO[B]) io.IO[[]B]
- func FoldLeft[A any, B any](stm Stream[A], zero B, combine func(B, A) B) io.IO[B]
- func FoldLeftEval[A any, B any](stm Stream[A], zero B, combine func(B, A) io.IO[B]) io.IO[B]
- func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit]
- func FromBackpressureChannel[A any, B any](bc BackpressureChannel[A], f func(Stream[A]) io.IO[B]) io.IO[B]
- func Head[A any](stm Stream[A]) io.IO[A]
- func HeadAndTail[A any](stm Stream[A]) io.IO[fun.Pair[A, Stream[A]]]
- func JoinManyFibers[A any](capacity int) io.IO[Pipe[io.Fiber[A], io.GoResult[A]]]
- func Last[A any](stm Stream[A]) io.IO[A]
- func LazyFinishedStepResult[A any]() io.IO[StepResult[A]]
- func NewPool[A any](size int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]
- func NewPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]
- func NewUnorderedPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]
- func Partition[A any, C any, D any](stm Stream[A], predicate func(A) bool, trueHandler Collector[A, C], ...) io.IO[fun.Pair[C, D]]
- func PipeToPairOfChannels[A any, B any](pipe Pipe[A, B]) io.IO[fun.Pair[chan<- A, <-chan B]]
- func StreamFold[A any, B any](stm Stream[A], onFinish func() io.IO[B], ...) io.IO[B]
- func TakeAndTail[A any](stm Stream[A], n int, prefix []A) io.IO[fun.Pair[[]A, Stream[A]]]
- func ToBackPressureChannels[A any](stm Stream[A], channels ...BackpressureChannel[A]) io.IO[fun.Unit]
- func ToChannel[A any](stm Stream[A], ch chan<- A) io.IO[fun.Unit]
- func ToChannels[A any](stm Stream[A], channels ...chan<- A) io.IO[fun.Unit]
- func ToChunks[A any](size int) func(stm Stream[A]) Stream[[]A]
- func ToSlice[A any](stm Stream[A]) io.IO[[]A]
- type BackpressureChannel
- func (bc BackpressureChannel[A]) Close() (err error)
- func (bc BackpressureChannel[A]) CloseReceiverNormally()
- func (bc BackpressureChannel[A]) CloseReceiverWithError(err error)
- func (bc BackpressureChannel[A]) HappyPathReceive() Stream[A]
- func (bc BackpressureChannel[A]) RequestOneItem() StreamEvent[A]
- func (bc BackpressureChannel[A]) Send(sea StreamEvent[A]) (isFinished bool, err error)
- func (bc BackpressureChannel[A]) SendError(err error) (bool, error)
- func (bc BackpressureChannel[A]) SendValue(a A) (bool, error)
- type Collector
- type Pipe
- func ChannelBufferPipe[A any](size int) Pipe[A, A]
- func ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C]
- func FlatMapPipe[A any, B any](f func(a A) Stream[B]) Pipe[A, B]
- func MapPipe[A any, B any](f func(a A) B) Pipe[A, B]
- func PairOfChannelsToPipe[A any, B any](input chan A, output chan B) Pipe[A, B]
- type Sink
- type StepResult
- type Stream
- func AddSeparatorAfterEachElement[A any](stm Stream[A], sep A) Stream[A]
- func AndThen[A any](stm1 Stream[A], stm2 Stream[A]) Stream[A]
- func AndThenLazy[A any](stm1 Stream[A], stm2 func() Stream[A]) Stream[A]
- func Drop[A any](stm Stream[A], n int) Stream[A]
- func DropWhile[A any](stm Stream[A], predicate func(A) bool) Stream[A]
- func Emit[A any](a A) Stream[A]
- func EmitMany[A any](as ...A) Stream[A]
- func Empty[A any]() Stream[A]
- func EmptyUnit() Stream[fun.Unit]
- func Eval[A any](ioa io.IO[A]) Stream[A]
- func Fail[A any](err error) Stream[A]
- func Filter[A any](stm Stream[A], predicate func(A) bool) Stream[A]
- func FilterNot[A any](stm Stream[A], predicate func(A) bool) Stream[A]
- func FlatMap[A any, B any](stm Stream[A], f func(a A) Stream[B]) Stream[B]
- func Flatten[A any](stm Stream[Stream[A]]) Stream[A]
- func FoldToGoResult[A any](stm Stream[A]) Stream[io.GoResult[A]]
- func FromChannel[A any](ch <-chan A) Stream[A]
- func FromSideEffectfulFunction[A any](f func() (A, error)) Stream[A]
- func FromSlice[A any](as []A) Stream[A]
- func FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]
- func Generate[A any, S any](zero S, f func(s S) (S, A)) Stream[A]
- func GroupBy[A any, K comparable](stm Stream[A], key func(A) K) Stream[fun.Pair[K, []A]]
- func GroupByEval[A any, K comparable](stm Stream[A], keyIO func(A) io.IO[K]) Stream[fun.Pair[K, []A]]
- func Len[A any](sa Stream[A]) Stream[int]
- func Lift[A any](a A) Stream[A]
- func LiftMany[A any](as ...A) Stream[A]
- func Map[A any, B any](stm Stream[A], f func(a A) B) Stream[B]
- func MapEval[A any, B any](stm Stream[A], f func(a A) io.IO[B]) Stream[B]
- func Nats() Stream[int]
- func Repeat[A any](stm Stream[A]) Stream[A]
- func SideEval[A any](stm Stream[A], iounit func(A) io.IOUnit) Stream[A]
- func StateFlatMap[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]]) Stream[B]
- func StateFlatMapWithFinish[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]], ...) Stream[B]
- func StateFlatMapWithFinishAndFailureHandling[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]], ...) Stream[B]
- func Sum[A fun.Number](sa Stream[A]) Stream[A]
- func Take[A any](stm Stream[A], n int) Stream[A]
- func TakeWhile[A any](stm Stream[A], predicate func(A) bool) Stream[A]
- func Through[A any, B any](stm Stream[A], pipe Pipe[A, B]) Stream[B]
- func ThroughExecutionContext[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]
- func ThroughExecutionContextUnordered[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]
- func ThroughPipeEval[A any, B any](stm Stream[A], pipeIO io.IO[Pipe[A, B]]) Stream[B]
- func ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit]
- func ToStreamEvent[A any](stm Stream[A]) Stream[StreamEvent[A]]
- func Unfold[A any](zero A, f func(A) A) Stream[A]
- func UnfoldGoResult[A any](stm Stream[io.GoResult[A]], onFailure func(err error) Stream[A]) Stream[A]
- func Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A]
- func ZipWithIndex[A any](as Stream[A]) Stream[fun.Pair[int, A]]
- type StreamEvent
Constants ¶
This section is empty.
Variables ¶
var ErrHeadOfEmptyStream = errors.New("head of empty stream")
ErrHeadOfEmptyStream - an error that is returned when someone attempts to retrieve the head of an empty stream.
Functions ¶
func AppendToSlice ¶
AppendToSlice executes the stream and appends it's results to the slice.
func ChunksResize ¶ added in v0.2.9
ChunksResize rebuffers chunks to the given size.
func Collect ¶ added in v0.0.10
Collect collects all element from the stream and for each element invokes the provided function
func FanOut ¶ added in v0.2.6
FanOut distributes the same element to all handlers. Stream failure is also distribured to all handlers.
func FanOutOld ¶ added in v0.3.0
FanOut distributes the same element to all handlers. Stream failure is also distribured to all handlers.
func FoldLeftEval ¶ added in v0.2.8
FoldLeftEval aggregates stream in a more simple way than StateFlatMap. It takes `zero` as the initial accumulator value and then combines one element from the stream with the accumulator. It continuous to do so until there are no more elements in the stream. Finally, it yields the accumulator value. (In case the stream was empty, `zero` is yielded.)
func FromBackpressureChannel ¶ added in v0.3.0
func FromBackpressureChannel[A any, B any](bc BackpressureChannel[A], f func(Stream[A]) io.IO[B]) io.IO[B]
FromBackpressureChannel forms a stream[A] that will be consumed by `f`. The result of `f` will be used to report back failures and finish signals. this is intended to be run in
func Head ¶ added in v0.0.2
Head returns the first element of the stream. It'll fail if the stream is empty.
func HeadAndTail ¶ added in v0.3.5
HeadAndTail returns the very first element of the stream and the rest of the stream.
func JoinManyFibers ¶ added in v0.3.4
JoinManyFibers starts a separate go-routine for each incoming Fiber. As soon as result is ready it is sent to output. At any point in time at most capacity fibers could be waited for.
func Last ¶ added in v0.3.0
Last keeps track of the current element of the stream and returns it when the stream completes.
func LazyFinishedStepResult ¶ added in v0.3.2
func LazyFinishedStepResult[A any]() io.IO[StepResult[A]]
LazyFinishedStepResult returns
func NewPool ¶ added in v0.1.4
NewPool creates an execution pool that will execute tasks concurrently. Simultaneously there could be as many as size executions.
func NewPoolFromExecutionContext ¶ added in v0.2.5
func NewPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]
NewPoolFromExecutionContext creates an execution pool that will execute tasks concurrently. After the execution context a buffer is created to allow as many as `capacity` parallel tasks to be executed. This pool won't change the order of elements. NB! As work starts in parallel, in case of failure some future elements could be evaluated even after the failed element. Hence we use GoResult to represent evaluation results.
func NewUnorderedPoolFromExecutionContext ¶ added in v0.3.4
func NewUnorderedPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]
NewUnorderedPoolFromExecutionContext creates an execution pool that will execute tasks concurrently. Each task's result will be passed to a channel as soon as it completes. Hence, the order of results will be different from the order of tasks.
func Partition ¶ added in v0.2.6
func Partition[A any, C any, D any](stm Stream[A], predicate func(A) bool, trueHandler Collector[A, C], falseHandler Collector[A, D], ) io.IO[fun.Pair[C, D]]
Partition divides the stream into two that are handled independently.
func PipeToPairOfChannels ¶ added in v0.1.3
PipeToPairOfChannels converts a streaming pipe to a pair of channels that could be used to interact with external systems.
func StreamFold ¶ added in v0.3.2
func StreamFold[A any, B any]( stm Stream[A], onFinish func() io.IO[B], onValue func(a A, tail Stream[A]) io.IO[B], onEmpty func(tail Stream[A]) io.IO[B], onError func(err error) io.IO[B], ) io.IO[B]
StreamFold performs arbitrary processing of a stream's single step result.
func TakeAndTail ¶ added in v0.3.5
TakeAndTail collects n leading elements of the stream and returns them along with the tail of the stream. If the stream is shorter, then only available elements are returned and an emtpy stream.
func ToBackPressureChannels ¶ added in v0.3.0
func ToBackPressureChannels[A any](stm Stream[A], channels ...BackpressureChannel[A]) io.IO[fun.Unit]
ToBackPressureChannels sends each element to all channels.
func ToChannel ¶ added in v0.0.10
ToChannel sends all stream elements to the given channel. When stream is completed, channel is closed. The IO blocks until the stream is exhausted. If the stream is failed, the channel is closed anyway. NB! The failure cannot be communicated via channel of type A. Hence, on the reading side there is no way to see whether it was a successful completion or a failed one.
func ToChannels ¶ added in v0.2.6
ToChannels sends each stream element to every given channel. Failure or completion of the stream leads to closure of all channels. TODO: Send failure to the channels.
Types ¶
type BackpressureChannel ¶ added in v0.3.0
type BackpressureChannel[A any] struct { // contains filtered or unexported fields }
BackpressureChannel has a control mechanism that allows consumer to influence the producer. There is a back pressure channel. Protocol:
sender | receiver --------------------+------------------------------------ | send "Ready to receive" to back channel read back | immediately start listening data. if ready | send data | read data | start processing | the result of processing (ready-to-receive/finished/error) loop | LOOP. | | on error after processing | send error to back | on processing complete | send finished to back when finishing: | send finish signal | on receiving finish signal, stop the loop.
and read back |
| when error: | send error | on receiving error, stop the loop.
and read back |
| if not ready, | don't send data | on back error - fail all on back finish - unsubscribe
func NewBackpressureChannel ¶ added in v0.3.0
func NewBackpressureChannel[A any]() BackpressureChannel[A]
func (BackpressureChannel[A]) Close ¶ added in v0.3.0
func (bc BackpressureChannel[A]) Close() (err error)
func (BackpressureChannel[A]) CloseReceiverNormally ¶ added in v0.3.0
func (bc BackpressureChannel[A]) CloseReceiverNormally()
func (BackpressureChannel[A]) CloseReceiverWithError ¶ added in v0.3.0
func (bc BackpressureChannel[A]) CloseReceiverWithError(err error)
func (BackpressureChannel[A]) HappyPathReceive ¶ added in v0.3.0
func (bc BackpressureChannel[A]) HappyPathReceive() Stream[A]
HappyPathReceive forms a stream of a happy path.
func (BackpressureChannel[A]) RequestOneItem ¶ added in v0.3.0
func (bc BackpressureChannel[A]) RequestOneItem() StreamEvent[A]
RequestOneItem - sends notification to backpressure channel and receives one item from data channel.
func (BackpressureChannel[A]) Send ¶ added in v0.3.0
func (bc BackpressureChannel[A]) Send(sea StreamEvent[A]) (isFinished bool, err error)
Send receives readiness signal from `back`. If ready, sends data to `data`.
func (BackpressureChannel[A]) SendError ¶ added in v0.3.0
func (bc BackpressureChannel[A]) SendError(err error) (bool, error)
func (BackpressureChannel[A]) SendValue ¶ added in v0.3.0
func (bc BackpressureChannel[A]) SendValue(a A) (bool, error)
type Pipe ¶ added in v0.0.2
Pipe is a conversion of one stream to another. Technically it's a function that takes one stream and returns another.
func ChannelBufferPipe ¶ added in v0.3.0
ChannelBufferPipe puts incoming values into a buffer of the given size and then reads from that same buffer. This buffer allows to decouple producer and consumer to some extent.
func ConcatPipes ¶ added in v0.2.9
ConcatPipes connects two pipes into one.
func FlatMapPipe ¶ added in v0.1.3
FlatMapPipe creates a pipe that flatmaps one stream through the provided function.
func MapPipe ¶ added in v0.1.3
MapPipe creates a pipe that maps one stream through the provided function.
func PairOfChannelsToPipe ¶ added in v0.1.0
PairOfChannelsToPipe - takes two channels that are being used to talk to some external process and convert them into a single pipe. It first starts a separate go routine that will continuously run the input stream and send all it's contents to the `input` channel. The current thread is left with reading from the output channel.
type Sink ¶ added in v0.0.2
Sink is a pipe that does not return meaningful values.
type StepResult ¶
type StepResult[A any] struct { Value A HasValue bool // models "Option[A]" Continuation Stream[A] IsFinished bool // true when stream has completed }
StepResult[A] represents the result of a single step in the step machine. It might be one of - empty, new value, or finished. The step result also returns the continuation of the stream.
func NewStepResult ¶ added in v0.0.2
func NewStepResult[A any](value A, continuation Stream[A]) StepResult[A]
NewStepResult constructs StepResult that has one value.
func NewStepResultEmpty ¶ added in v0.0.2
func NewStepResultEmpty[A any](continuation Stream[A]) StepResult[A]
NewStepResultEmpty constructs an empty StepResult.
func NewStepResultFinished ¶ added in v0.0.2
func NewStepResultFinished[A any]() StepResult[A]
NewStepResultFinished constructs a finished StepResult. The continuation will be empty as well.
type Stream ¶
type Stream[A any] io.IO[StepResult[A]]
Stream is modelled as a function that performs a single step in the state machine.
func AddSeparatorAfterEachElement ¶ added in v0.0.2
AddSeparatorAfterEachElement adds a separator after each stream element
func AndThenLazy ¶
AndThenLazy appends another stream. The other stream is constructed lazily.
func DropWhile ¶ added in v0.3.2
DropWhile removes the beginning of the stream so that the new stream starts with an element that falsifies the predicate.
func EmptyUnit ¶ added in v0.1.0
EmptyUnit returns an empty stream of units. It's more performant because the same instance is being used.
func Filter ¶ added in v0.0.2
Filter leaves in the stream only the elements that satisfy the given predicate.
func FilterNot ¶ added in v0.2.6
Filter leaves in the stream only the elements that do not satisfy the given predicate.
func FlatMap ¶
FlatMap constructs a new stream by concatenating all substreams, produced by f from elements of the original stream.
func Flatten ¶ added in v0.0.11
Flatten simplifies a stream of streams to just the stream of values by concatenating all inner streams.
func FoldToGoResult ¶ added in v0.2.7
FoldToGoResult converts a stream into a stream of go results. All go results will be non-error except probably the last one.
func FromChannel ¶ added in v0.0.10
FromChannel constructs a stream that reads from the given channel until the channel is open. When channel is closed, the stream is also closed.
func FromSideEffectfulFunction ¶ added in v0.0.3
FromSideEffectfulFunction constructs a stream from a Go-style function. It is expected that this function is not pure and can return different results.
func FromStepResult ¶ added in v0.0.10
func FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]
FromStepResult constructs a stream from an IO that returns StepResult.
func Generate ¶ added in v0.0.2
Generate constructs an infinite stream of values using the production function.
func GroupBy ¶ added in v0.2.3
GroupBy collects group by a user-provided key. Whenever a new key is encountered, the previous group is emitted. When the original stream finishes, the last group is emitted.
func GroupByEval ¶ added in v0.2.4
func GroupByEval[A any, K comparable](stm Stream[A], keyIO func(A) io.IO[K]) Stream[fun.Pair[K, []A]]
GroupByEval collects group by a user-provided key (which is evaluated as IO). Whenever a new key is encountered, the previous group is emitted. When the original stream finishes, the last group is emitted.
func Len ¶ added in v0.1.3
Len is a pipe that returns a stream of 1 element that is the count of elements of the original stream.
func SideEval ¶ added in v0.3.3
SideEval executes a computation for each element for it's side effect. Could be used for logging, for instance.
func StateFlatMap ¶
func StateFlatMap[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]]) Stream[B]
StateFlatMap maintains state along the way.
func StateFlatMapWithFinish ¶ added in v0.0.4
func StateFlatMapWithFinish[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]], onFinish func(s S) Stream[B]) Stream[B]
StateFlatMapWithFinish maintains state along the way. When the source stream finishes, it invokes onFinish with the last state.
func StateFlatMapWithFinishAndFailureHandling ¶ added in v0.3.1
func StateFlatMapWithFinishAndFailureHandling[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]], onFinish func(s S) Stream[B], onFailure func(s S, err error) Stream[B]) Stream[B]
StateFlatMapWithFinishAndFailureHandling maintains state along the way. When the source stream finishes, it invokes onFinish with the last state. If there is an error during stream evaluation, onFailure is invoked. NB! onFinish is not invoked in case of failure.
func Sum ¶ added in v0.1.3
Sum is a pipe that returns a stream of 1 element that is sum of all elements of the original stream.
func TakeWhile ¶ added in v0.3.2
TakeWhile returns the beginning of the stream such that all elements satisfy the predicate.
func Through ¶ added in v0.0.2
Through passes the stream data through the pipe. Technically it applies the pipe function to this stream.
func ThroughExecutionContext ¶ added in v0.2.5
func ThroughExecutionContext[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]
ThroughExecutionContext runs a stream of tasks through an ExecutionContext. NB! This operation recovers GoResults. This will lead to lost of good elements after one that failed. At most `capacity - 1` number of lost elements.
func ThroughExecutionContextUnordered ¶ added in v0.3.4
func ThroughExecutionContextUnordered[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]
ThroughExecutionContextUnordered runs a stream of tasks through an ExecutionContext. The order of results is not preserved! This operation recovers GoResults. This will lead to lost of good elements after one that failed. At most `capacity - 1` number of lost elements.
func ThroughPipeEval ¶ added in v0.2.7
ThroughPipeEval runs the given stream through pipe that is returned by the provided pipeIO.
func ToStreamEvent ¶ added in v0.3.0
func ToStreamEvent[A any](stm Stream[A]) Stream[StreamEvent[A]]
ToStreamEvent converts the given stream to a stream of StreamEvents. Each normal element will become a StreamEvent with data. On a failure or finish a single element is returned before the end of the stream.
func Unfold ¶ added in v0.0.2
Unfold constructs an infinite stream of values using the production function.
func UnfoldGoResult ¶ added in v0.2.7
func UnfoldGoResult[A any](stm Stream[io.GoResult[A]], onFailure func(err error) Stream[A]) Stream[A]
UnfoldGoResult converts a stream of GoResults back to normal stream. On the first encounter of Error, the stream fails. default value for `onFailure` - `Fail[A]`.
type StreamEvent ¶ added in v0.3.0
type StreamEvent[A any] struct { Error error IsFinished bool // true when stream has completed Value A }
Fields should be checked in order - If Error == nil, If !IsFinished, then Value
func NewStreamEvent ¶ added in v0.3.0
func NewStreamEvent[A any](value A) StreamEvent[A]
func NewStreamEventError ¶ added in v0.3.0
func NewStreamEventError[A any](err error) StreamEvent[A]
func NewStreamEventFinished ¶ added in v0.3.0
func NewStreamEventFinished[A any]() StreamEvent[A]