rxgo

package module
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2023 License: MIT Imports: 14 Imported by: 0

README

RxGo

CI Go Report Card

Reactive Extensions for the Go Language

ReactiveX

ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.

An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.

RxGo

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

Let's see a concrete example with each box being an operator:

  • We create a static Observable based on a fixed list of items using the Just operator.
  • We define a transformation function (convert a circle into a square) using the Map operator.
  • We filter each yellow square using the Filter operator.

In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.

Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.

The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.

Installation of RxGo

go get -u github.com/davidlondono/rxgo/v3

Getting Started

Hello World

Let's create our first Observable and consume an item:

observable := rxgo.Interval(time.Second)
observable.SubscribeSync(func(v uint) {
	log.Println("Value ->", v)
}, func(err error) {
	log.Println("Error ->", err)
}, func() {
	log.Println("Complete!")
})

Documentation

Package documentation: https://pkg.go.dev/github.com/davidlondono/rxgo/v3

Contributing

All contributions are very welcome! Be sure you check out the contributing guidelines first. Newcomers can take a look at ongoing issues and check for the help needed label.

Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the External Resources section.

Thanks to all the people who already contributed to RxGo!

External Resources

Special Thanks

A big thanks to JetBrains for supporting the project.

Documentation

Overview

// Package rxgo is the main RxGo package.

Index

Constants

View Source
const Infinite int64 = -1

Infinite represents an infinite wait time

Variables

View Source
var (
	// An error thrown when an Observable or a sequence was queried but has no elements.
	ErrEmpty = errors.New("rxgo: empty value")
	// An error thrown when a value or values are missing from an observable sequence.
	ErrNotFound           = errors.New("rxgo: no values match")
	ErrSequence           = errors.New("rxgo: too many values match")
	ErrArgumentOutOfRange = errors.New("rxgo: argument out of range")
	// An error thrown by the timeout operator.
	ErrTimeout = errors.New("rxgo: timeout")
)

Functions

func Assert

func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ...RxAssert)

Assert asserts the result of an iterable against a list of assertions.

func NewSafeSubscriber

func NewSafeSubscriber[T any](onNext OnNextFunc[T], onError OnErrorFunc, onComplete OnCompleteFunc) *safeSubscriber[T]

func NewSubscriber

func NewSubscriber[T any](bufferCount ...uint) *subscriber[T]

func Partition

func Partition[T any](source Observable[T], predicate PredicateFunc[T])

Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate. FIXME: redesign the API

func SendItems

func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{})

SendItems is an utility function that send a list of interface{} and indicate a strategy on whether to close the channel once the function completes.

func SwitchScan

func SwitchScan()

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, emitting values only from the most recently returned Observable.

Types

type AccumulatorFunc

type AccumulatorFunc[A any, V any] func(acc A, value V, index uint) (A, error)

type AssertPredicate

type AssertPredicate func(items []interface{}) error

AssertPredicate is a custom predicate based on the items.

type BackpressureStrategy

type BackpressureStrategy uint32

BackpressureStrategy is the backpressure strategy type.

const (
	// Block blocks until the channel is available.
	Block BackpressureStrategy = iota
	// Drop drops the message.
	Drop
)

type CloseChannelStrategy

type CloseChannelStrategy uint32

CloseChannelStrategy indicates a strategy on whether to close a channel.

const (
	// LeaveChannelOpen indicates to leave the channel open after completion.
	LeaveChannelOpen CloseChannelStrategy = iota
	// CloseChannel indicates to close the channel open after completion.
	CloseChannel
)

type Comparator

type Comparator func(interface{}, interface{}) int

Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second

type ComparatorFunc

type ComparatorFunc[A any, B any] func(prev A, curr B) bool

type ComparerFunc

type ComparerFunc[A any, B any] func(prev A, curr B) int8

type CompletedFunc

type CompletedFunc func()

CompletedFunc handles the end of a stream.

type Disposable

type Disposable context.CancelFunc

Disposable is a function to be called in order to dispose a subscription.

type Disposed

type Disposed <-chan struct{}

Disposed is a notification channel indicating when an Observable is closed.

type Duration

type Duration interface {
	// contains filtered or unexported methods
}

Duration represents a duration

func WithDuration

func WithDuration(d time.Duration) Duration

WithDuration is a duration option

type DurationFunc

type DurationFunc[T any, R any] func(value T) Observable[R]

type Either

type Either[L, R any] interface {
	IsLeft() bool
	IsRight() bool
	Left() (L, bool)
	Right() (R, bool)
}

func Left

func Left[L, R any](value L) Either[L, R]

type ErrFunc

type ErrFunc func(error)

ErrFunc handles an error in a stream.

type ErrorFunc

type ErrorFunc func() error

type FinalizerFunc

type FinalizerFunc func()

type Func

type Func func(context.Context, interface{}) (interface{}, error)

ItemToObservable defines a function that computes an observable from an item. ItemToObservable func(Item) Observable ErrorToObservable defines a function that transforms an observable from an error. ErrorToObservable func(error) Observable Func defines a function that computes a value from an input value.

type Func2

type Func2 func(context.Context, interface{}, interface{}) (interface{}, error)

Func2 defines a function that computes a value from two input values.

type FuncN

type FuncN func(...interface{}) interface{}

FuncN defines a function that computes a value from N input values.

type GroupedObservable

type GroupedObservable[K comparable, R any] interface {
	Observable[R] // Inherit from observable
	Key() K
}

func NewGroupedObservable

func NewGroupedObservable[K comparable, T any](key K, connector func() Subject[T]) GroupedObservable[K, T]

type IllegalInputError

type IllegalInputError struct {
	// contains filtered or unexported fields
}

IllegalInputError is triggered when the observable receives an illegal input.

func (IllegalInputError) Error

func (e IllegalInputError) Error() string

type IndexOutOfBoundError

type IndexOutOfBoundError struct {
	// contains filtered or unexported fields
}

IndexOutOfBoundError is triggered when the observable cannot access to the specified index.

func (IndexOutOfBoundError) Error

func (e IndexOutOfBoundError) Error() string

type Item

type Item struct {
	V interface{}
	E error
}

Item is a wrapper having either a value or an error.

func Errors

func Errors(err error) Item

Error creates an item from an error.

func Of

func Of(i interface{}) Item

Of creates an item from a value.

func (Item) Error

func (i Item) Error() bool

Error checks if an item is an error.

func (Item) SendBlocking

func (i Item) SendBlocking(ch chan<- Item)

SendBlocking sends an item and blocks until it is sent.

func (Item) SendContext

func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool

SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.

func (Item) SendNonBlocking

func (i Item) SendNonBlocking(ch chan<- Item) bool

SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.

type Iterable

type Iterable interface {
	Observe(opts ...Option) <-chan Item
}

Iterable is the basic type that can be observed.

type Marshaller

type Marshaller func(interface{}) ([]byte, error)

Marshaller defines a marshaller type (interface{} to []byte).

type NextFunc

type NextFunc func(interface{})

NextFunc handles a next item in a stream.

type Notification

type Notification[T any] interface {
	ObservableNotification[T]
	Send(Subscriber[T]) bool
	Done() bool
}

func Complete

func Complete[T any]() Notification[T]

func Error

func Error[T any](err error) Notification[T]

func Next

func Next[T any](v T) Notification[T]

type NotificationKind

type NotificationKind int

NotificationKind

const (
	NextKind NotificationKind = iota
	ErrorKind
	CompleteKind
)

type Observable

type Observable[T any] interface {
	SubscribeWith(subscriber Subscriber[T])
	SubscribeOn(finalizer ...func()) Subscriber[T]
	SubscribeSync(onNext func(v T), onError func(err error), onComplete func())
}

func Defer

func Defer[T any](factory func() Observable[T]) Observable[T]

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

func Empty

func Empty[T any]() Observable[T]

A simple Observable that emits no items to the Observer and immediately emits a complete notification.

func ForkJoin

func ForkJoin[T any](sources ...Observable[T]) Observable[[]T]

Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.

func Iif

func Iif[T any](condition func() bool, trueObservable Observable[T], falseObservable Observable[T]) Observable[T]

Checks a boolean at subscription time, and chooses between one of two observable sources

func Interval

func Interval(duration time.Duration) Observable[uint]

Interval creates an Observable emitting incremental integers infinitely between each given time interval.

func Never

func Never[T any]() Observable[T]

An Observable that emits no items to the Observer and never completes.

func Of2

func Of2[T any](item T, items ...T) Observable[T]

FIXME: rename me to `Of`

func Pipe

func Pipe[S any, O1 any](
	stream Observable[S],
	f1 OperatorFunc[S, any],
	f ...OperatorFunc[any, any],
) Observable[any]

If there is a commonly used sequence of operators in your code, use the `Pipe` function to extract the sequence into a new operator. Even if a sequence is not that common, breaking it out into a single operator can improve readability.

func Pipe1

func Pipe1[S any, O1 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
) Observable[O1]

func Pipe10

func Pipe10[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any, O10 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
	f6 OperatorFunc[O5, O6],
	f7 OperatorFunc[O6, O7],
	f8 OperatorFunc[O7, O8],
	f9 OperatorFunc[O8, O9],
	f10 OperatorFunc[O9, O10],
) Observable[O10]

func Pipe2

func Pipe2[S any, O1 any, O2 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
) Observable[O2]

func Pipe3

func Pipe3[S any, O1 any, O2 any, O3 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
) Observable[O3]

func Pipe4

func Pipe4[S any, O1 any, O2 any, O3 any, O4 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
) Observable[O4]

func Pipe5

func Pipe5[S any, O1 any, O2 any, O3 any, O4 any, O5 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
) Observable[O5]

func Pipe6

func Pipe6[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
	f6 OperatorFunc[O5, O6],
) Observable[O6]

func Pipe7

func Pipe7[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
	f6 OperatorFunc[O5, O6],
	f7 OperatorFunc[O6, O7],
) Observable[O7]

func Pipe8

func Pipe8[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
	f6 OperatorFunc[O5, O6],
	f7 OperatorFunc[O6, O7],
	f8 OperatorFunc[O7, O8],
) Observable[O8]

func Pipe9

func Pipe9[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any](
	stream Observable[S],
	f1 OperatorFunc[S, O1],
	f2 OperatorFunc[O1, O2],
	f3 OperatorFunc[O2, O3],
	f4 OperatorFunc[O3, O4],
	f5 OperatorFunc[O4, O5],
	f6 OperatorFunc[O5, O6],
	f7 OperatorFunc[O6, O7],
	f8 OperatorFunc[O7, O8],
	f9 OperatorFunc[O8, O9],
) Observable[O9]

func Range

func Range[T constraints.Unsigned](start, count T) Observable[T]

Creates an Observable that emits a sequence of numbers within a specified range.

func Scheduled

func Scheduled[T any](item T, items ...T) Observable[T]

func Throw

func Throw[T any](factory ErrorFunc) Observable[T]

Creates an observable that will create an error instance and push it to the consumer as an error immediately upon subscription. This creation function is useful for creating an observable that will create an error and error every time it is subscribed to. Generally, inside of most operators when you might want to return an errored observable, this is unnecessary. In most cases, such as in the inner return of `ConcatMap`, `MergeMap`, `Defer`, and many others, you can simply throw the error, and RxGo will pick that up and notify the consumer of the error.

func Timer

func Timer[N constraints.Unsigned](startDue time.Duration, intervalDuration ...time.Duration) Observable[N]

Creates an observable that will wait for a specified time period before emitting the number 0.

type ObservableFunc

type ObservableFunc[T any] func(subscriber Subscriber[T])

type ObservableNotification

type ObservableNotification[T any] interface {
	Kind() NotificationKind
	Value() T // returns the underlying value if it's a "Next" notification
	Err() error
	IsEnd() bool
}

type ObservationStrategy

type ObservationStrategy uint32

ObservationStrategy defines the strategy to consume from an Observable.

const (
	// Lazy is the default observation strategy, when an Observer subscribes.
	Lazy ObservationStrategy = iota
	// Eager means consuming as soon as the Observable is created.
	Eager
)

type Observer

type Observer[T any] interface {
	Next(T)
	Error(error)
	Complete()
}

func NewObserver

func NewObserver[T any](onNext OnNextFunc[T], onError OnErrorFunc, onComplete OnCompleteFunc) Observer[T]

type OnCompleteFunc

type OnCompleteFunc func()

type OnErrorFunc

type OnErrorFunc func(error)

OnErrorFunc defines a function that computes a value from an error.

type OnErrorStrategy

type OnErrorStrategy uint32

OnErrorStrategy is the Observable error strategy.

const (
	// StopOnError is the default error strategy.
	// An operator will stop processing items on error.
	StopOnError OnErrorStrategy = iota
	// ContinueOnError means an operator will continue processing items after an error.
	ContinueOnError
)

type OnNextFunc

type OnNextFunc[T any] func(T)

type OperatorFunc

type OperatorFunc[I any, O any] func(source Observable[I]) Observable[O]

func Audit

func Audit[T any, R any](durationSelector DurationFunc[T, R]) OperatorFunc[T, T]

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

func AuditTime

func AuditTime[T any, R any](duration time.Duration) OperatorFunc[T, T]

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

func Buffer

func Buffer[T any, R any](closingNotifier Observable[R]) OperatorFunc[T, []T]

Buffers the source Observable values until closingNotifier emits.

func BufferCount

func BufferCount[T any](bufferSize uint, startBufferEvery ...uint) OperatorFunc[T, []T]

Buffers the source Observable values until the size hits the maximum bufferSize given.

func BufferTime

func BufferTime[T any](bufferTimeSpan time.Duration) OperatorFunc[T, []T]

Buffers the source Observable values for a specific time period.

func BufferToggle

func BufferToggle[T any, O any](openings Observable[O], closingSelector func(value O) Observable[O]) OperatorFunc[T, []T]

Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

func BufferWhen

func BufferWhen[T any, R any](closingSelector func() Observable[R]) OperatorFunc[T, []T]

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

func Catch

func Catch[T any](catch func(err error, caught Observable[T]) Observable[T]) OperatorFunc[T, T]

Catches errors on the observable to be handled by returning a new observable or throwing an error.

func CombineLatestAll

func CombineLatestAll[T any, R any](project func(values []T) R) OperatorFunc[Observable[T], R]

Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.

func CombineLatestWith

func CombineLatestWith[T any](sources ...Observable[T]) OperatorFunc[T, []T]

Create an observable that combines the latest values from all passed observables and the source into arrays and emits them.

func ConcatAll

func ConcatAll[T any]() OperatorFunc[Observable[T], T]

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

func ConcatMap

func ConcatMap[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, R]

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

func ConcatWith

func ConcatWith[T any](sources ...Observable[T]) OperatorFunc[T, T]

Emits all of the values from the source observable, then, once it completes, subscribes to each observable source provided, one at a time, emitting all of their values, and not subscribing to the next one until it completes.

func Count

func Count[T any](predicate ...PredicateFunc[T]) OperatorFunc[T, uint]

Counts the number of emissions on the source and emits that number when the source completes.

func Debounce

func Debounce[T any, R any](durationSelector DurationFunc[T, R]) OperatorFunc[T, T]

Emits a notification from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

func DebounceTime

func DebounceTime[T any](duration time.Duration) OperatorFunc[T, T]

Emits a notification from the source Observable only after a particular time span has passed without another source emission.

func DefaultIfEmpty

func DefaultIfEmpty[T any](defaultValue T) OperatorFunc[T, T]

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

func Delay

func Delay[T any](duration time.Duration) OperatorFunc[T, T]

Delays the emission of items from the source Observable by a given timeout.

func DelayWhen

func DelayWhen[T any, R any](delayDurationSelector ProjectionFunc[T, R]) OperatorFunc[T, T]

Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

func Dematerialize

func Dematerialize[T any]() OperatorFunc[ObservableNotification[T], T]

Converts an Observable of ObservableNotification objects into the emissions that they represent.

func Distinct

func Distinct[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, T]

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

func DistinctUntilChanged

func DistinctUntilChanged[T any](comparator ...ComparatorFunc[T, T]) OperatorFunc[T, T]

Returns a result Observable that emits all values pushed by the source observable if they are distinct in comparison to the last value the result observable emitted.

func Do

func Do[T any](cb Observer[T]) OperatorFunc[T, T]

Used to perform side-effects for notifications from the source observable

func ElementAt

func ElementAt[T any](pos uint, defaultValue ...T) OperatorFunc[T, T]

Emits the single value at the specified index in a sequence of emissions from the source Observable.

func Every

func Every[T any](predicate PredicateFunc[T]) OperatorFunc[T, bool]

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

func ExhaustAll

func ExhaustAll[T any]() OperatorFunc[Observable[T], T]

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

func ExhaustMap

func ExhaustMap[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, R]

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

func Expand

func Expand[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, Either[T, R]]

Recursively projects each source value to an Observable which is merged in the output Observable.

func Filter

func Filter[T any](predicate PredicateFunc[T]) OperatorFunc[T, T]

Filter emits only those items from an Observable that pass a predicate test.

func Find

func Find[T any](predicate PredicateFunc[T]) OperatorFunc[T, Optional[T]]

Emits only the first value emitted by the source Observable that meets some condition.

func FindIndex

func FindIndex[T any](predicate PredicateFunc[T]) OperatorFunc[T, int]

Emits only the index of the first value emitted by the source Observable that meets some condition.

func First

func First[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]

Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

func GroupBy

func GroupBy[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, GroupedObservable[K, T]]

Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group. FIXME: maybe we should have a buffer channel

func IgnoreElements

func IgnoreElements[T any]() OperatorFunc[T, T]

Ignores all items emitted by the source Observable and only passes calls of complete or error.

func IsEmpty

func IsEmpty[T any]() OperatorFunc[T, bool]

Emits false if the input Observable emits any values, or emits true if the input Observable completes without emitting any values.

func Last

func Last[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]

Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.

func Map

func Map[T any, R any](mapper func(T, uint) (R, error)) OperatorFunc[T, R]

Map transforms the items emitted by an Observable by applying a function to each item.

func Materialize

func Materialize[T any]() OperatorFunc[T, ObservableNotification[T]]

Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

func Max

func Max[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]

The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

func MergeMap

func MergeMap[T any, R any](project ProjectionFunc[T, R], concurrent ...uint) OperatorFunc[T, R]

Projects each source value to an Observable which is merged in the output Observable.

func MergeScan

func MergeScan[V any, A any](accumulator func(acc A, value V, index uint) Observable[A], seed A, concurrent ...uint) OperatorFunc[V, A]

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

func MergeWith

func MergeWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, T]

FIXME: Merge the values from all observables to a single observable result.

func Min

func Min[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]

The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

func PairWise

func PairWise[T any]() OperatorFunc[T, Tuple[T, T]]

Groups pairs of consecutive emissions together and emits them as an array of two values.

func RaceWith

func RaceWith[T any](sources ...Observable[T]) OperatorFunc[T, T]

Creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable to which the operator is applied and supplied Observables.

func Reduce

func Reduce[V any, A any](accumulator AccumulatorFunc[A, V], seed A) OperatorFunc[V, A]

Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

func Repeat

func Repeat[T any, C repeatConfig](config ...C) OperatorFunc[T, T]

Returns an Observable that will resubscribe to the source stream when the source stream completes.

func Retry

func Retry[T any, C retryConfig](config ...C) OperatorFunc[T, T]

Returns an Observable that mirrors the source Observable with the exception of an error.

func Sample

func Sample[T any, R any](notifier Observable[R]) OperatorFunc[T, T]

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

func SampleTime

func SampleTime[T any](duration time.Duration) OperatorFunc[T, T]

Emits the most recently emitted value from the source Observable within periodic time intervals.

func Scan

func Scan[V any, A any](accumulator AccumulatorFunc[A, V], seed A) OperatorFunc[V, A]

Useful for encapsulating and managing state. Applies an accumulator (or "reducer function") to each value from the source after an initial state is established -- either via a seed value (second argument), or from the first value from the source.

func SequenceEqual

func SequenceEqual[T any](compareTo Observable[T], comparator ...ComparatorFunc[T, T]) OperatorFunc[T, bool]

Compares all values of two observables in sequence using an optional comparator function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

func Single

func Single[T any](predicate ...func(value T, index uint, source Observable[T]) bool) OperatorFunc[T, T]

Returns an observable that asserts that only one value is emitted from the observable that matches the predicate. If no predicate is provided, then it will assert that the observable only emits one value.

func Skip

func Skip[T any](count uint) OperatorFunc[T, T]

Returns an Observable that skips the first count items emitted by the source Observable.

func SkipLast

func SkipLast[T any](skipCount uint) OperatorFunc[T, T]

Skip a specified number of values before the completion of an observable.

func SkipUntil

func SkipUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

func SkipWhile

func SkipWhile[T any](predicate func(v T, index uint) bool) OperatorFunc[T, T]

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

func SwitchAll

func SwitchAll[T any]() OperatorFunc[Observable[T], T]

Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence

func SwitchMap

func SwitchMap[T any, R any](project func(value T, index uint) Observable[R]) OperatorFunc[T, R]

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

func Take

func Take[T any](count uint) OperatorFunc[T, T]

Emits only the first count values emitted by the source Observable.

func TakeLast

func TakeLast[T any](count uint) OperatorFunc[T, T]

Waits for the source to complete, then emits the last N values from the source, as specified by the count argument.

func TakeUntil

func TakeUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]

Emits the values emitted by the source Observable until a notifier Observable emits a value.

func TakeWhile

func TakeWhile[T any](predicate func(value T, index uint) bool) OperatorFunc[T, T]

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

func Throttle

func Throttle[T any, R any](durationSelector func(value T) Observable[R]) OperatorFunc[T, T]

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

func ThrottleTime

func ThrottleTime[T any](duration time.Duration) OperatorFunc[T, T]

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process

func ThrowIfEmpty

func ThrowIfEmpty[T any](errorFactory ...ErrorFunc) OperatorFunc[T, T]

If the source observable completes without emitting a value, it will emit an error. The error will be created at that time by the optional errorFactory argument, otherwise, the error will be `ErrEmpty`.

func Timeout

func Timeout[T any, C timeoutConfig[T]](config C) OperatorFunc[T, T]

Errors if Observable does not emit a value in given time span. FIXME: DATA RACE and send on closed channel

func ToSlice

func ToSlice[T any]() OperatorFunc[T, []T]

Collects all source emissions and emits them as an array when the source completes.

func WithLatestFrom

func WithLatestFrom[A any, B any](input Observable[B]) OperatorFunc[A, Tuple[A, B]]

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

func WithTimeInterval

func WithTimeInterval[T any]() OperatorFunc[T, TimeInterval[T]]

Emits an object containing the current value, and the time that has passed between emitting the current value and the previous value, which is calculated by using the provided scheduler's now() method to retrieve the current time at each emission, then calculating the difference.

func WithTimestamp

func WithTimestamp[T any]() OperatorFunc[T, Timestamp[T]]

Attaches a UTC timestamp to each item emitted by an observable indicating when it was emitted

func ZipAll

func ZipAll[T any]() OperatorFunc[Observable[T], []T]

Collects all observable inner sources from the source, once the source completes, it will subscribe to all inner sources, combining their values by index and emitting them.

func ZipWith

func ZipWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, []T]

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option handles configurable options.

func Serialize

func Serialize(identifier func(interface{}) int) Option

Serialize forces an Observable to make serialized calls and to be well-behaved.

func WithBufferedChannel

func WithBufferedChannel(capacity int) Option

WithBufferedChannel allows to configure the capacity of a buffered channel.

func WithCPUPool

func WithCPUPool() Option

WithCPUPool allows to specify an execution pool based on the number of logical CPUs.

func WithContext

func WithContext(ctx context.Context) Option

WithContext allows to pass a context.

func WithPool

func WithPool(pool int) Option

WithPool allows to specify an execution pool.

func WithPublishStrategy

func WithPublishStrategy() Option

WithPublishStrategy converts an ordinary Observable into a connectable Observable.

type Optional

type Optional[T any] interface {
	MustGet() T
	OrElse(fallback T) T
	IsNone() bool
	Get() (T, bool)
}

func None

func None[T any]() Optional[T]

func Some

func Some[T any](v T) Optional[T]

type Predicate

type Predicate func(interface{}) bool

ErrorFunc defines a function that computes a value from an error. ErrorFunc func(error) interface{} Predicate defines a func that returns a bool from an input value.

type PredicateFunc

type PredicateFunc[T any] func(value T, index uint) bool

type Producer

type Producer func(ctx context.Context, next chan<- Item)

Producer defines a producer implementation.

type ProjectionFunc

type ProjectionFunc[T any, R any] func(value T, index uint) Observable[R]

type RepeatConfig

type RepeatConfig struct {
	Count uint
	Delay time.Duration
}

type RetryConfig

type RetryConfig struct {
	Count          uint
	Delay          time.Duration
	ResetOnSuccess bool
}

type RxAssert

type RxAssert interface {
	// contains filtered or unexported methods
}

RxAssert lists the Observable assertions.

func CustomPredicate

func CustomPredicate(predicate AssertPredicate) RxAssert

CustomPredicate checks a custom predicate.

func HasAnError

func HasAnError() RxAssert

HasAnError checks that the observable has produce an error.

func HasError

func HasError(err error) RxAssert

HasError checks that the observable has produce a specific error.

func HasErrors

func HasErrors(errs ...error) RxAssert

HasErrors checks that the observable has produce a set of errors.

func HasItem

func HasItem(i interface{}) RxAssert

HasItem checks if a single or optional single has a specific item.

func HasItems

func HasItems(items ...interface{}) RxAssert

HasItems checks that the observable produces the corresponding items.

func HasItemsNoOrder

func HasItemsNoOrder(items ...interface{}) RxAssert

HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order.

func HasNoError

func HasNoError() RxAssert

HasNoError checks that the observable has not raised any error.

func IsNotEmpty

func IsNotEmpty() RxAssert

IsNotEmpty checks that the observable produces some items.

type Subject

type Subject[T any] interface {
	Subscriber[T]
	Subscription
}

type Subscriber

type Subscriber[T any] interface {
	Stop()
	Send() chan<- Notification[T]
	ForEach() <-chan Notification[T]
	Closed() <-chan struct{}
}

type Subscription

type Subscription interface {
	// allow user to unsubscribe the stream manually
	Unsubscribe()
}

type Supplier

type Supplier func(ctx context.Context) Item

Supplier defines a function that supplies a result from nothing.

type TimeInterval

type TimeInterval[T any] interface {
	Value() T
	Elapsed() time.Duration
}

func NewTimeInterval

func NewTimeInterval[T any](value T, elasped time.Duration) TimeInterval[T]

type TimeoutConfig

type TimeoutConfig[T any] struct {
	With func() Observable[T]
	Each time.Duration
}

type Timestamp

type Timestamp[T any] interface {
	Value() T
	Time() time.Time
}

func NewTimestamp

func NewTimestamp[T any](value T) Timestamp[T]

type TimestampItem

type TimestampItem struct {
	Timestamp time.Time
	V         interface{}
}

TimestampItem attach a timestamp to an item.

type Tuple

type Tuple[A any, B any] interface {
	First() A
	Second() B
}

func NewTuple

func NewTuple[A any, B any](a A, b B) Tuple[A, B]

Create a tuple using first and second value.

type Unmarshaller

type Unmarshaller func([]byte, interface{}) error

Unmarshaller defines an unmarshaller type ([]byte to interface).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL