goreactive

package module
v0.0.0-...-5eaf8cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 5 Imported by: 2

README

Go Reactive

A reactive streams library for Go in the spirit of Reactive Extensions implemented with generic functions.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Discard

func Discard[T any](ctx context.Context, src Observable[T]) error

Discard discards all items from 'src' and returns an error if any.

func First

func First[T any](ctx context.Context, src Observable[T]) (item T, err error)

First returns the first item from 'src' observable and then closes it.

func SplitHead

func SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])

SplitHead splits the source 'src' into two: 'head' which receives the first item, and 'tail' that receives the rest. Errors from source are only handed to 'tail'.

func ToChannels

func ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)

ToChannels converts an observable into an item channel and error channel. When the source closes both channels are closed and an error (which may be nil) is always sent to the error channel.

func ToSlice

func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)

ToSlice converts an Observable into a slice.

Types

type FuncObservable

type FuncObservable[T any] func(context.Context, func(T) error) error

FuncObservable wraps a function that implements Observe. Convenience when declaring a struct to implement Observe() is overkill.

func (FuncObservable[T]) Observe

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T) error) error

type Observable

type Observable[T any] interface {
	// Observe starts observing a stream of T's.
	// 'next' is called on each element sequentially. If it returns an error the stream closes
	// and this error is returned by Observe().
	// When 'ctx' is cancelled the stream closes and ctx.Err() is returned.
	//
	// Implementations of Observe() must maintain the following invariants:
	// - Observe blocks until the stream and any upstreams are closed.
	// - 'next' is called sequentially from the goroutine that called Observe() in
	//   order to maintain good stack traces and not require observer to be thread-safe.
	// - if 'next' returns an error it must not be called again and the same error
	//   must be returned by Observe().
	//
	// Handling of context cancellation is asynchronous and implementation may
	// choose to block on 'next' and only handle cancellation after 'next' returns.
	// If 'next' implements a long-running operation, then it is expected that the caller
	// will handle 'ctx' cancellation in the 'next' function itself.
	Observe(ctx context.Context, next func(T) error) error
}

func Broadcast

func Broadcast[T any](ctx context.Context, bufSize int, src Observable[T]) Observable[T]

Broadcast creates a publish-subscribe observable that broadcasts items from the 'src' observable to subscribers.

It immediately and only once observes the input and broadcasts the items to downstream observers. If 'ctx' is cancelled all current observers are completed, the input observable is cancelled.

'bufSize' is the number of items to buffer per observer before backpressure towards the source.

func CoalesceByKey

func CoalesceByKey[K comparable, V any](src Observable[V], toKey func(V) K, bufferSize int) Observable[V]

CoalesceByKey buffers updates from the input observable and keeps only the latest version of the value for the same key when the observer is slow in consuming the values.

func Concat

func Concat[T any](srcs ...Observable[T]) Observable[T]

Concat takes one or more observable of the same type and emits the items from each of them in order.

func Delay

func Delay[T any](src Observable[T], interval time.Duration) Observable[T]

Delay emits item from input at most once per given time interval.

func Empty

func Empty[T any]() Observable[T]

Empty creates an empty observable that completes immediately.

func Error

func Error[T any](err error) Observable[T]

Error creates an observable that fails immediately with given error.

func Filter

func Filter[T any](src Observable[T], filter func(T) bool) Observable[T]

Filter keeps only the elements for which the filter function returns true.

func FlatMap

func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]

FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the function is flattened (hence FlatMap).

func Flatten

func Flatten[T any](src Observable[[]T]) Observable[T]

Flatten takes an observable of slices of T and returns an observable of T.

func FromChannel

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an observable from a channel. The channel is consumed by the first observer.

func FromSlice

func FromSlice[T any](items []T) Observable[T]

FromSlice converts a slice into an Observable.

func Interval

func Interval(interval time.Duration) Observable[int]

Interval emits an increasing counter value every 'interval' period.

func Map

func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]

Map applies a function onto an observable.

func Merge

func Merge[T any](srcs ...Observable[T]) Observable[T]

Merge multiple observables into one. Error from one of the sources cancels context and completes the stream.

func OnNext

func OnNext[T any](src Observable[T], f func(T)) Observable[T]

OnNext calls the supplied function on each emitted item.

func ParallelMap

func ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]

ParallelMap maps a function in parallel to the source. The errors from downstream are propagated asynchronously towards the source.

func Range

func Range(from, to int) Observable[int]

Range creates an observable that emits integers in range from...to-1.

func Reduce

func Reduce[T, Result any](src Observable[T], init Result, reduce func(T, Result) Result) Observable[Result]

Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single result state.

func Retry

func Retry[T any](src Observable[T], shouldRetry func(err error) bool) Observable[T]

Retry resubscribes to the observable if it completes with an error.

func Single

func Single[T any](item T) Observable[T]

Single creates an observable with a single item.

func Stuck

func Stuck[T any]() Observable[T]

Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.

func Take

func Take[T any](n int, src Observable[T]) Observable[T]

Take takes 'n' items from the source 'src'. The context given to source observable is cancelled if it emits more than 'n' items. If all 'n' items were emitted this cancelled error is ignored.

Directories

Path Synopsis
doc
talks/example Module
sources
http Module
k8s Module
stream module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL