Documentation ¶
Index ¶
- func Discard[T any](ctx context.Context, src Observable[T]) error
- func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])
- func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)
- func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
- type FuncObservable
- type Observable
- func Broadcast[T any](ctx context.Context, bufSize int, src Observable[T]) Observable[T]
- func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]
- func Concat[T any](srcs ...Observable[T]) Observable[T]
- func Delay[T any](src Observable[T], interval time.Duration) Observable[T]
- func Empty[T any]() Observable[T]
- func Error[T any](err error) Observable[T]
- func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
- func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
- func Flatten[T any](src Observable[[]T]) Observable[T]
- func FromChannel[T any](in <-chan T) Observable[T]
- func FromSlice[T any](items []T) Observable[T]
- func Interval(interval time.Duration) Observable[int]
- func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
- func Merge[T any](srcs ...Observable[T]) Observable[T]
- func OnNext[T any](src Observable[T], f func(T)) Observable[T]
- func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]
- func Range(from, to int) Observable[int]
- func Reduce[T, Result any](src Observable[T], init Result, reduce func(T, Result) Result) Observable[Result]
- func Retry[T any](src Observable[T], shouldRetry func(err error) bool) Observable[T]
- func Single[T any](item T) Observable[T]
- func Stuck[T any]() Observable[T]
- func Take[T any](n int, src Observable[T]) Observable[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Discard ¶
func Discard[T any](ctx context.Context, src Observable[T]) error
Discard discards all items from 'src' and returns an error if any.
func First ¶
func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
First returns the first item from 'src' observable and then closes it.
func SplitHead ¶
func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])
SplitHead splits the source 'src' into two: 'head' which receives the first item, and 'tail' that receives the rest. Errors from source are only handed to 'tail'.
func ToChannels ¶
func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)
ToChannels converts an observable into an item channel and error channel. When the source closes both channels are closed and an error (which may be nil) is always sent to the error channel.
Types ¶
type FuncObservable ¶
FuncObservable wraps a function that implements Observe. Convenience when declaring a struct to implement Observe() is overkill.
type Observable ¶
type Observable[T any] interface { // Observe starts observing a stream of T's. // 'next' is called on each element sequentially. If it returns an error the stream closes // and this error is returned by Observe(). // When 'ctx' is cancelled the stream closes and ctx.Err() is returned. // // Implementations of Observe() must maintain the following invariants: // - Observe blocks until the stream and any upstreams are closed. // - 'next' is called sequentially from the goroutine that called Observe() in // order to maintain good stack traces and not require observer to be thread-safe. // - if 'next' returns an error it must not be called again and the same error // must be returned by Observe(). // // Handling of context cancellation is asynchronous and implementation may // choose to block on 'next' and only handle cancellation after 'next' returns. // If 'next' implements a long-running operation, then it is expected that the caller // will handle 'ctx' cancellation in the 'next' function itself. Observe(ctx context.Context, next func(T) error) error }
func Broadcast ¶
func Broadcast[T any](ctx context.Context, bufSize int, src Observable[T]) Observable[T]
Broadcast creates a publish-subscribe observable that broadcasts items from the 'src' observable to subscribers.
It immediately and only once observes the input and broadcasts the items to downstream observers. If 'ctx' is cancelled all current observers are completed, the input observable is cancelled.
'bufSize' is the number of items to buffer per observer before backpressure towards the source.
func CoalesceByKey ¶
func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]
CoalesceByKey buffers updates from the input observable and keeps only the latest version of the value for the same key when the observer is slow in consuming the values.
func Concat ¶
func Concat[T any](srcs ...Observable[T]) Observable[T]
Concat takes one or more observable of the same type and emits the items from each of them in order.
func Delay ¶
func Delay[T any](src Observable[T], interval time.Duration) Observable[T]
Delay emits item from input at most once per given time interval.
func Empty ¶
func Empty[T any]() Observable[T]
Empty creates an empty observable that completes immediately.
func Error ¶
func Error[T any](err error) Observable[T]
Error creates an observable that fails immediately with given error.
func Filter ¶
func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
Filter keeps only the elements for which the filter function returns true.
func FlatMap ¶
func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the function is flattened (hence FlatMap).
func Flatten ¶
func Flatten[T any](src Observable[[]T]) Observable[T]
Flatten takes an observable of slices of T and returns an observable of T.
func FromChannel ¶
func FromChannel[T any](in <-chan T) Observable[T]
FromChannel creates an observable from a channel. The channel is consumed by the first observer.
func FromSlice ¶
func FromSlice[T any](items []T) Observable[T]
FromSlice converts a slice into an Observable.
func Interval ¶
func Interval(interval time.Duration) Observable[int]
Interval emits an increasing counter value every 'interval' period.
func Map ¶
func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map applies a function onto an observable.
func Merge ¶
func Merge[T any](srcs ...Observable[T]) Observable[T]
Merge multiple observables into one. Error from one of the sources cancels context and completes the stream.
func OnNext ¶
func OnNext[T any](src Observable[T], f func(T)) Observable[T]
OnNext calls the supplied function on each emitted item.
func ParallelMap ¶
func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]
ParallelMap maps a function in parallel to the source. The errors from downstream are propagated asynchronously towards the source.
func Range ¶
func Range(from, to int) Observable[int]
Range creates an observable that emits integers in range from...to-1.
func Reduce ¶
func Reduce[T, Result any](src Observable[T], init Result, reduce func(T, Result) Result) Observable[Result]
Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single result state.
func Retry ¶
func Retry[T any](src Observable[T], shouldRetry func(err error) bool) Observable[T]
Retry resubscribes to the observable if it completes with an error.
func Single ¶
func Single[T any](item T) Observable[T]
Single creates an observable with a single item.
func Stuck ¶
func Stuck[T any]() Observable[T]
Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.
func Take ¶
func Take[T any](n int, src Observable[T]) Observable[T]
Take takes 'n' items from the source 'src'. The context given to source observable is cancelled if it emits more than 'n' items. If all 'n' items were emitted this cancelled error is ignored.