Documentation ¶
Index ¶
- Constants
- Variables
- func AlwaysRetry(err error) bool
- func Discard[T any](ctx context.Context, src Observable[T]) error
- func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func NewObservableValue[T any](ctx context.Context, init T) (ObservableValue[T], Observable[T], error)
- func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])
- func ToChannel[T any](ctx context.Context, errs chan<- error, src Observable[T]) <-chan T
- func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)
- func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
- type BackpressureStrategy
- type FuncObservable
- type MulticastParams
- type Observable
- func Buffer[T any](src Observable[T], bufSize int, strategy BackpressureStrategy) Observable[T]
- func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]
- func Concat[T any](srcs ...Observable[T]) Observable[T]
- func Deferred[T any]() (src Observable[T], start func(Observable[T]))
- func Delay[T any](src Observable[T], duration time.Duration) Observable[T]
- func Empty[T any]() Observable[T]
- func Error[T any](err error) Observable[T]
- func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
- func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
- func Flatten[T any](src Observable[[]T]) Observable[T]
- func FromAnySlice[T any](items []any) Observable[T]
- func FromChannel[T any](in <-chan T) Observable[T]
- func FromFunction[T any](f func() T) Observable[T]
- func FromSlice[T any](items []T) Observable[T]
- func Interval(interval time.Duration) Observable[int]
- func Just[T any](item T) Observable[T]
- func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
- func Merge[T any](srcs ...Observable[T]) Observable[T]
- func Multicast[T any](params MulticastParams, src Observable[T]) (mcast Observable[T], connect func(context.Context) error)
- func OnNext[T any](src Observable[T], f func(T)) Observable[T]
- func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]
- func Range(from, to int) Observable[int]
- func Reduce[T, Result any](src Observable[T], init Result, reduce func(Result, T) Result) Observable[Result]
- func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
- func RetryNext[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
- func Scan[In, Out any](src Observable[In], init Out, step func(Out, In) Out) Observable[Out]
- func Skip[T any](n int, src Observable[T]) Observable[T]
- func Stuck[T any]() Observable[T]
- func Take[T any](n int, src Observable[T]) Observable[T]
- func TakeWhile[T any](pred func(T) bool, src Observable[T]) Observable[T]
- func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
- func Zip2[V1, V2 any](src1 Observable[V1], src2 Observable[V2]) Observable[Tuple2[V1, V2]]
- type ObservableValue
- type RetryFunc
- type Tuple2
- type Tuple3
Constants ¶
const ( // Items are dropped if buffer is full BackpressureDrop = BackpressureStrategy("drop") // Observing blocks until there is room in the buffer BackpressureBlock = BackpressureStrategy("block") )
Variables ¶
var DefaultMulticastParams = MulticastParams{16, false}
Functions ¶
func AlwaysRetry ¶
AlwaysRetry always asks for a retry regardless of the error.
func Discard ¶
func Discard[T any](ctx context.Context, src Observable[T]) error
Discard discards all items from 'src' and returns an error if any.
func First ¶
func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
First returns the first item from 'src' observable and then cancels the subscription.
func Last ¶
func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
Last returns the last item from 'src' observable.
func NewObservableValue ¶
func NewObservableValue[T any](ctx context.Context, init T) (ObservableValue[T], Observable[T], error)
func SplitHead ¶
func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])
SplitHead splits the source 'src' into two: 'head' which receives the first item, and 'tail' that receives the rest. Errors from source are only handed to 'tail'.
func ToChannel ¶
func ToChannel[T any](ctx context.Context, errs chan<- error, src Observable[T]) <-chan T
ToChannel converts an observable into an item of channels. Errors are delivered to the supplied error channel.
func ToChannels ¶
func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)
ToChannels converts an observable into an item channel and error channel. When the source closes both channels are closed and an error (which may be nil) is always sent to the error channel.
Types ¶
type BackpressureStrategy ¶
type BackpressureStrategy string
type FuncObservable ¶
FuncObservable wraps a function that implements Observe. Convenience when declaring a struct to implement Observe() is overkill.
type MulticastParams ¶
type Observable ¶
type Observable[T any] interface { // Observe starts observing a stream of T's. // 'next' is called on each element sequentially. If it returns an error the stream closes // and this error is returned by Observe(). // When 'ctx' is cancelled the stream closes and ctx.Err() is returned. // // Implementations of Observe() must maintain the following invariants: // - Observe blocks until the stream and any upstreams are closed. // - 'next' is called sequentially from the goroutine that called Observe() in // order to maintain good stack traces and not require observer to be thread-safe. // - if 'next' returns an error it must not be called again and the same error // must be returned by Observe(). // // Handling of context cancellation is asynchronous and implementation may // choose to block on 'next' and only handle cancellation after 'next' returns. // If 'next' implements a long-running operation, then it is expected that the caller // will handle 'ctx' cancellation in the 'next' function itself. Observe(ctx context.Context, next func(T) error) error }
func Buffer ¶
func Buffer[T any](src Observable[T], bufSize int, strategy BackpressureStrategy) Observable[T]
Buffer buffers 'n' items with configurable backpressure strategy. Downstream errors are not propagated towards 'src'.
func CoalesceByKey ¶
func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]
CoalesceByKey buffers updates from the input observable and keeps only the latest version of the value for the same key when the observer is slow in consuming the values.
func Concat ¶
func Concat[T any](srcs ...Observable[T]) Observable[T]
Concat takes one or more observable of the same type and emits the items from each of them in order.
func Deferred ¶
func Deferred[T any]() (src Observable[T], start func(Observable[T]))
Deferred creates an observable that allows subscribing, but waits for the real observable to be provided later.
func Delay ¶
func Delay[T any](src Observable[T], duration time.Duration) Observable[T]
Delay shifts the items emitted from source by the given duration.
func Empty ¶
func Empty[T any]() Observable[T]
Empty creates an empty observable that completes immediately.
func Error ¶
func Error[T any](err error) Observable[T]
Error creates an observable that fails immediately with given error.
func Filter ¶
func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
Filter keeps only the elements for which the filter function returns true.
func FlatMap ¶
func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the function is flattened (hence FlatMap).
func Flatten ¶
func Flatten[T any](src Observable[[]T]) Observable[T]
Flatten takes an observable of slices of T and returns an observable of T.
func FromAnySlice ¶
func FromAnySlice[T any](items []any) Observable[T]
FromAnySlice converts a slice of 'any' into an Observable of specified type.
func FromChannel ¶
func FromChannel[T any](in <-chan T) Observable[T]
FromChannel creates an observable from a channel. The channel is consumed by the first observer.
func FromFunction ¶
func FromFunction[T any](f func() T) Observable[T]
func FromSlice ¶
func FromSlice[T any](items []T) Observable[T]
FromSlice converts a slice into an Observable.
func Interval ¶
func Interval(interval time.Duration) Observable[int]
Interval emits an increasing counter value every 'interval' period.
func Map ¶
func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map applies a function onto an observable.
func Merge ¶
func Merge[T any](srcs ...Observable[T]) Observable[T]
Merge multiple observables into one. Error from any one of the sources will cancel and complete the stream. Error from downstream is propagated to the upstream that emitted the item.
Beware: the observables are observed from goroutines spawned by Merge() and thus run concurrently, e.g. functions doFoo and doBar are called from different goroutines than Observe():
Merge(Map(foo, doFoo), Map(bar, doBar)).Observe(...)
func Multicast ¶
func Multicast[T any](params MulticastParams, src Observable[T]) (mcast Observable[T], connect func(context.Context) error)
Multicast creates a publish-subscribe observable that "multicasts" items from the 'src' observable to subscribers.
Returns the wrapped observable and a function to connect observers to the source observable. Connect will block until source observable completes and returns the error if any from the source observable.
Observers can subscribe both before and after the source has been connected, but may miss events if subscribing after connect.
func OnNext ¶
func OnNext[T any](src Observable[T], f func(T)) Observable[T]
OnNext calls the supplied function on each emitted item.
func ParallelMap ¶
func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]
ParallelMap maps a function in parallel to the source. The errors from downstream are propagated asynchronously towards the source.
func Range ¶
func Range(from, to int) Observable[int]
Range creates an observable that emits integers in range from...to-1.
func Reduce ¶
func Reduce[T, Result any](src Observable[T], init Result, reduce func(Result, T) Result) Observable[Result]
Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single result state produced by the last call to 'reduce'.
func Retry ¶
func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
Retry resubscribes to the observable if it completes with an error.
func RetryNext ¶
func RetryNext[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
RetryNext retries the call to 'next' if it returned an error.
func Scan ¶
func Scan[In, Out any](src Observable[In], init Out, step func(Out, In) Out) Observable[Out]
Scan takes an initial state and a step function that is called on each element with the previous state and returns an observable of the states returned by the step function. E.g. Scan is like Reduce that emits the intermediate states.
func Skip ¶
func Skip[T any](n int, src Observable[T]) Observable[T]
Skip skips the first 'n' items from the source.
func Stuck ¶
func Stuck[T any]() Observable[T]
Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.
func Take ¶
func Take[T any](n int, src Observable[T]) Observable[T]
Take takes 'n' items from the source 'src'. The context given to source observable is cancelled if it emits more than 'n' items. If all 'n' items were emitted this cancelled error is ignored.
func TakeWhile ¶
func TakeWhile[T any](pred func(T) bool, src Observable[T]) Observable[T]
TakeWhile takes items from the source until 'pred' returns false after which the observable is completed.
func Throttle ¶
func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
Throttle limits the rate at which items are emitted.
func Zip2 ¶
func Zip2[V1, V2 any](src1 Observable[V1], src2 Observable[V2]) Observable[Tuple2[V1, V2]]
Zip2 takes two observables and merges them into an observable of pairs
type ObservableValue ¶
type ObservableValue[T any] interface { // Get retrieves the latest value Get() T // Update updates the value with a function that modifies // it. Observers of the value are notified of the new value. // Panics if called after Close(). Update(f func(*T)) // Close the value. Any observers to this value are completed. Close() }
type RetryFunc ¶
RetryFunc decides whether the processing should be retried for the given error
func BackoffRetry ¶
BackoffRetry retries with an exponential backoff.
func LimitRetries ¶
LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)