stream

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2022 License: Apache-2.0 Imports: 8 Imported by: 5

Documentation

Index

Constants

View Source
const (
	// Items are dropped if buffer is full
	BackpressureDrop = BackpressureStrategy("drop")

	// Observing blocks until there is room in the buffer
	BackpressureBlock = BackpressureStrategy("block")
)

Variables

View Source
var DefaultMulticastParams = MulticastParams{16, false}

Functions

func AlwaysRetry

func AlwaysRetry(err error) bool

AlwaysRetry always asks for a retry regardless of the error.

func Discard

func Discard[T any](ctx context.Context, src Observable[T]) error

Discard discards all items from 'src' and returns an error if any.

func First

func First[T any](ctx context.Context, src Observable[T]) (item T, err error)

First returns the first item from 'src' observable and then cancels the subscription.

func Last

func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)

Last returns the last item from 'src' observable.

func NewObservableValue

func NewObservableValue[T any](ctx context.Context, init T) (ObservableValue[T], Observable[T], error)

func SplitHead

func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])

SplitHead splits the source 'src' into two: 'head' which receives the first item, and 'tail' that receives the rest. Errors from source are only handed to 'tail'.

func ToChannel

func ToChannel[T any](ctx context.Context, errs chan<- error, src Observable[T]) <-chan T

ToChannel converts an observable into an item of channels. Errors are delivered to the supplied error channel.

func ToChannels

func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)

ToChannels converts an observable into an item channel and error channel. When the source closes both channels are closed and an error (which may be nil) is always sent to the error channel.

func ToSlice

func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)

ToSlice converts an Observable into a slice.

Types

type BackpressureStrategy

type BackpressureStrategy string

type FuncObservable

type FuncObservable[T any] func(context.Context, func(T) error) error

FuncObservable wraps a function that implements Observe. Convenience when declaring a struct to implement Observe() is overkill.

func (FuncObservable[T]) Observe

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T) error) error

type MulticastParams

type MulticastParams struct {
	// BufferSize is the number of items to buffer per observer before backpressure
	// towards the source.
	BufferSize int

	// EmitLatest if set will emit the latest seen item when neb observer
	// subscribes.
	EmitLatest bool
}

type Observable

type Observable[T any] interface {
	// Observe starts observing a stream of T's.
	// 'next' is called on each element sequentially. If it returns an error the stream closes
	// and this error is returned by Observe().
	// When 'ctx' is cancelled the stream closes and ctx.Err() is returned.
	//
	// Implementations of Observe() must maintain the following invariants:
	// - Observe blocks until the stream and any upstreams are closed.
	// - 'next' is called sequentially from the goroutine that called Observe() in
	//   order to maintain good stack traces and not require observer to be thread-safe.
	// - if 'next' returns an error it must not be called again and the same error
	//   must be returned by Observe().
	//
	// Handling of context cancellation is asynchronous and implementation may
	// choose to block on 'next' and only handle cancellation after 'next' returns.
	// If 'next' implements a long-running operation, then it is expected that the caller
	// will handle 'ctx' cancellation in the 'next' function itself.
	Observe(ctx context.Context, next func(T) error) error
}

func Buffer

func Buffer[T any](src Observable[T], bufSize int, strategy BackpressureStrategy) Observable[T]

Buffer buffers 'n' items with configurable backpressure strategy. Downstream errors are not propagated towards 'src'.

func CoalesceByKey

func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]

CoalesceByKey buffers updates from the input observable and keeps only the latest version of the value for the same key when the observer is slow in consuming the values.

func Concat

func Concat[T any](srcs ...Observable[T]) Observable[T]

Concat takes one or more observable of the same type and emits the items from each of them in order.

func Deferred

func Deferred[T any]() (src Observable[T], start func(Observable[T]))

Deferred creates an observable that allows subscribing, but waits for the real observable to be provided later.

func Delay

func Delay[T any](src Observable[T], duration time.Duration) Observable[T]

Delay shifts the items emitted from source by the given duration.

func Empty

func Empty[T any]() Observable[T]

Empty creates an empty observable that completes immediately.

func Error

func Error[T any](err error) Observable[T]

Error creates an observable that fails immediately with given error.

func Filter

func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]

Filter keeps only the elements for which the filter function returns true.

func FlatMap

func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]

FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the function is flattened (hence FlatMap).

func Flatten

func Flatten[T any](src Observable[[]T]) Observable[T]

Flatten takes an observable of slices of T and returns an observable of T.

func FromAnySlice

func FromAnySlice[T any](items []any) Observable[T]

FromAnySlice converts a slice of 'any' into an Observable of specified type.

func FromChannel

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an observable from a channel. The channel is consumed by the first observer.

func FromFunction

func FromFunction[T any](f func() T) Observable[T]

func FromSlice

func FromSlice[T any](items []T) Observable[T]

FromSlice converts a slice into an Observable.

func Interval

func Interval(interval time.Duration) Observable[int]

Interval emits an increasing counter value every 'interval' period.

func Just

func Just[T any](item T) Observable[T]

Just creates an observable with a single item.

func Map

func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]

Map applies a function onto an observable.

func Merge

func Merge[T any](srcs ...Observable[T]) Observable[T]

Merge multiple observables into one. Error from any one of the sources will cancel and complete the stream. Error from downstream is propagated to the upstream that emitted the item.

Beware: the observables are observed from goroutines spawned by Merge() and thus run concurrently, e.g. functions doFoo and doBar are called from different goroutines than Observe():

Merge(Map(foo, doFoo), Map(bar, doBar)).Observe(...)

func Multicast

func Multicast[T any](params MulticastParams, src Observable[T]) (mcast Observable[T], connect func(context.Context) error)

Multicast creates a publish-subscribe observable that "multicasts" items from the 'src' observable to subscribers.

Returns the wrapped observable and a function to connect observers to the source observable. Connect will block until source observable completes and returns the error if any from the source observable.

Observers can subscribe both before and after the source has been connected, but may miss events if subscribing after connect.

func OnNext

func OnNext[T any](src Observable[T], f func(T)) Observable[T]

OnNext calls the supplied function on each emitted item.

func ParallelMap

func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]

ParallelMap maps a function in parallel to the source. The errors from downstream are propagated asynchronously towards the source.

func Range

func Range(from, to int) Observable[int]

Range creates an observable that emits integers in range from...to-1.

func Reduce

func Reduce[T, Result any](src Observable[T], init Result, reduce func(Result, T) Result) Observable[Result]

Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single result state produced by the last call to 'reduce'.

func Retry

func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]

Retry resubscribes to the observable if it completes with an error.

func RetryNext

func RetryNext[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]

RetryNext retries the call to 'next' if it returned an error.

func Scan

func Scan[In, Out any](src Observable[In], init Out, step func(Out, In) Out) Observable[Out]

Scan takes an initial state and a step function that is called on each element with the previous state and returns an observable of the states returned by the step function. E.g. Scan is like Reduce that emits the intermediate states.

func Skip

func Skip[T any](n int, src Observable[T]) Observable[T]

Skip skips the first 'n' items from the source.

func Stuck

func Stuck[T any]() Observable[T]

Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.

func Take

func Take[T any](n int, src Observable[T]) Observable[T]

Take takes 'n' items from the source 'src'. The context given to source observable is cancelled if it emits more than 'n' items. If all 'n' items were emitted this cancelled error is ignored.

func TakeWhile

func TakeWhile[T any](pred func(T) bool, src Observable[T]) Observable[T]

TakeWhile takes items from the source until 'pred' returns false after which the observable is completed.

func Throttle

func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]

Throttle limits the rate at which items are emitted.

func Zip2

func Zip2[V1, V2 any](src1 Observable[V1], src2 Observable[V2]) Observable[Tuple2[V1, V2]]

Zip2 takes two observables and merges them into an observable of pairs

type ObservableValue

type ObservableValue[T any] interface {
	// Get retrieves the latest value
	Get() T

	// Update updates the value with a function that modifies
	// it. Observers of the value are notified of the new value.
	// Panics if called after Close().
	Update(f func(*T))

	// Close the value. Any observers to this value are completed.
	Close()
}

type RetryFunc

type RetryFunc func(err error) bool

RetryFunc decides whether the processing should be retried for the given error

func BackoffRetry

func BackoffRetry(shouldRetry RetryFunc, minBackoff, maxBackoff time.Duration) RetryFunc

BackoffRetry retries with an exponential backoff.

func LimitRetries

func LimitRetries(shouldRetry RetryFunc, numRetries int) RetryFunc

LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)

type Tuple2

type Tuple2[V1, V2 any] struct {
	V1 V1
	V2 V2
}

type Tuple3

type Tuple3[V1, V2, V3 any] struct {
	V1 V1
	V2 V2
	V3 V3
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL