Documentation ¶
Overview ¶
Package stream provides the ability to safely read asynchronous, dynamic application state from Gio layout code.
It does so by providing several constructs:
- Controllers, which connect the lifecycle of asynchronously generated data streams to the frame lifecycle of a Gio application window. Controllers handle invalidating the window when new data is available on active streams and also shut down streams when they are no longer in use.
- Streams, which are bound to a particular controller and provide asynchronous state updates while they are in use by visible widgets. When not in use, their state updates shut down to conserve resources.
- Transformations are functions that can operate on stream channels to easily create data streams from disparate sources of asynchronous information.
- Inputs provide data from the application GUI that can be safely read within a stream's asynchronous processing.
Additionally, stream provides constructs for supervising persistent or stateful asynchronous operations from your UI:
- Mutators, which maintain a list of running stateful operations and ensure that they all shut down as cleanly as possible before your application exits.
- MutationPools, which allow you to manage groups of related stateful operations and prevent duplicate operations from being created at the same time.
- Mutations, which are a (streamable) handle onto a running stateful operation.
Controllers ¶
Each window using streams needs its own Controller, which can be constructed with NewController. The controller is used to construct streams bound to that window, and has a method (Controller.Sweep) that must be invoked every frame in order to ensure that streams which are not in use go inert.
Typical use looks like this:
func loop(w *app.Window) error { // Make a context that lives as long as the window. windowCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Make a controller for this window. controller := stream.NewController(windowCtx, w.Invalidate) var ops op.Ops for event := range w.Events() { switch event := event.(type) { case system.DestroyEvent: return event.Err case system.FrameEvent: gtx := layout.NewContext(&ops, event) // Your layout here, passing the controller so that code can instantiate streams with it. layoutUI(gtx, controller) event.Frame(gtx.Ops) // Transition any active unread streams to be inactive. controller.Sweep() } } return nil }
Providing the window's Invalidate function to NewController ensures that the controller can trigger a window invalidation when new data arrives on a stream visible within the window, ensuring that the UI is redrawn automatically.
Note that Controller.Sweep is being invoked every frame.
Streams ¶
A Stream is a restartable asynchronous computation which is automatically started and stopped based on whether it is actively used by the GUI. You can read from a stream to receive the most recent value created by the stream (if any is available yet).
Only the most recent value sent over a stream is important. Application programs should assume that any given element sent may be overridden by values sent after it. As such, you should not write streams to trickle updates over individual elements in a collection, but should instead send entire updated collections over the stream.
You can construct a Stream with New:
// Make a stream that will emit increasing integers every second. myStream := stream.New(controller, func(ctx context.Context) <-chan int { out := make(chan int) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ out <- ticks case <- ctx.Done(): return } } }() return out })
The controller provided to NewStream will be responsible for starting/stopping the stream based on whether it is in use. The Provider function provided is responsible for providing a receive-only channel of values that will close when the provided context is cancelled.
In most real applications, the Provider function passed to NewStream will perform I/O or interface with another goroutine in order to provide correct updates over the stream. See the section on Transformations for more about how to construct real-world provider functions.
Reading a stream is accomplished through one of the Read* methods. Of these, *Stream.Read is the most fundamental, and will be discussed first:
ticks, status := myStream.Read(gtx) if Status == stream.Waiting { // We haven't received a value over the stream yet. } else { // We have a value, so do something with ticks. }
Reading from a stream never blocks, so it's quite common to read a stream before any value is available yet. In that case, the stream's returned status will be Waiting, and the returned value should be ignored.
If the status is Emitting, a new value/error are coming out of the stream this frame.
If the status is Cached, the returned value is not new, but has been emitted before.
Often your UI will not care about the status at all, but would rather simply work with a default value until the first data is available on the stream. To facilitate this, we have *Stream.ReadDefault:
ticks := myStream.ReadDefault(gtx, 0)
You can rely upon the returned value to either be your supplied default value or the latest value from the stream.
Finally, sometimes you want to synchronize an existing variable or struct field with the latest data from a stream. *Stream.ReadInto accomplishes this:
var ticks int // Assume we declared this elsewhere, perhaps as a field. myStream.ReadInto(gtx, &ticks, 0)
As you can see, reading from a stream does not require a great deal of code unless your use-case demands special consideration of the status of the stream.
Results ¶
The Result type provides an easy way to send both a value and error across a channel packed into a single type. It isn't meant to be a general-purpose type, but rather to make it easier to propagate errors from the providers of your streams to your UI when you need to. We provide a number of helper transformations and stream types to make working with streams of results easier.
ResultStream accepts a ProviderR instead of a Provider and allows you to read the error from a stream value ergonomically. We can rewrite the above tick stream example to use Result like so:
// Make a stream that will emit increasing integers (or an error) every second. myResultStream := stream.NewR(controller, func(ctx context.Context) <-chan stream.Result[int] { out := make(chan stream.Result[int]) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ if ticks %2 == 0 { out <- stream.ResultFrom(ticks, nil) } else { out <- stream.ResultFrom(0, fmt.Errorf("odd number")) } case <- ctx.Done(): return } } }() return out })
We can then read from it with:
ticks, status, err := myResultStream.Read(gtx) if Status == stream.Waiting { // We haven't received a value over the stream yet. } else if err != nil { // We have an error. } else { // We have a value, so do something with ticks. }
We also provide *ResultStream.ReadDefault and *ResultStream.ReadInto for use-cases where the status is less important.
Transformations ¶
Most of the top-level functions in this package are transformations. They make it easier to construct channels of Ts or Result[T]s, combine those channels, and change their element type. They are intended to be used within the sourceProviders passed to New and NewR. Consider the following:
// Simple function to emit an increasing integer each time the provided duratione elapses. func tickerProvider(ctx context.Context, dur time.Duration) <-chan int { out := make(chan int) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() ticks := 0 for { select { case <-ticker.C: ticks++ out <- ticks case <- ctx.Done(): return } } }() return out } // Make a stream that will emit the product of two streams of integers as a floating-point // value. myStream := stream.New(controller, func(ctx context.Context) <-chan float64 { everySecond := tickerProvider(ctx, time.Second) everyFewSeconds := tickerProvider(ctx, time.Second*3) products := stream.Zip(everySecond, everyFewSeconds, func(a, b int) int { return a*b }) asFloats := stream.Transform(products, func(a int) float64 { return float64(a) } return asFloats })
As you can see, these functions make it relatively easy to leverage multiple [Provider]s and [ProviderR]s and combine their results. The above could have converted the output type to be a float64 within the Zip call, but uses the long way to demonstrate how to use Transform. Note that each transformation employed in constructing a stream will add some latency before the UI receives new values, so try not to let your transformation pipeline get too deep.
Transformations with an R suffix work with streams of Result elements and provide extra behaviors to make error handling easier.
Inputs ¶
If your user interface needs to perform a complex filter operation (too expensive to do directly within the UI), you can use an Input to push the filter criteria into your sourceProvider. Construct an Input with its initial value using NewInput.
You can then reference that input within a Provider function for a stream:
// Make a stream that will emit the product of the counter input and a value that increments // each second. count := NewInput(5) myStream := stream.New(controller, func(ctx context.Context) <-chan int { everySecond := tickerProvider(ctx, time.Second) return stream.Zip(everySecond, count.Stream(ctx), func(a, b int) int { return a*b }) })
As the user interacts with your UI, you can update the value of your input with Input.Send:
count.Send(10)
This method will not block unless you are performing concurrent sends, and will enable your sourceProvider to consume whatever value you send. Much like streams, only the most recently-sent value is guaranteed to be consumed by the stream.
Mutators ¶
A Mutator is created at the start of an application to supervise stateful operations. Creating one looks like this:
// Make a context that will last the lifetime of your application. appCtx, cancel := context.WithCancel(context.Background()) defer cancel() mutator := stream.NewMutator(appCtx, time.Second*5) // Run your application here, blocking until you want to end the app. // Block until running mutations end. mutator.Shutdown()
The mutator will wait up to the duration provided in its constructor for all running mutations to end. After that it will cancel any mutation contexts that are not already cancelled and wait up to the provided duration again. Then it will unblock and allow the application process to exit.
MutationPools ¶
A MutationPool wraps a Mutator and provides the ability to launch stateful operations associated with a unique "key". This key mechanism can be used to ensure that two logically-identical mutations are not created at the same time for applications that need that.
A mutation pool is created like so:
// Assuming you already have a mutator. var mutator *Mutator // Create a pool using ints as the type of the unique key and time.Time's as the value type // emitted by the supervised mutations. tickerPool := stream.NewMutationPool[int,time.Time](mutator)
You can use any comparable type as a key, and any type as the mutation value type. Usually applications will want to encode identifiers for what a mutation is acting upon and how into the key. The value type should be whatever the result of the mutation is.
You can stream running mutations from the UI with:
// Assuming the MutationPool above and a stream controller. runningTickersStream := stream.New(controller, tickerPool.Stream) running, status := runningTickersStream.Read(gtx)
Mutations ¶
A Mutation is a stateful process running asynchronously. You can use them for almost anything, from modifying a database to tracking the progress of a complex user flow through your application. The important difference between a mutation and a normal stream is that a mutation is *not* cancelled when the UI element that launched it stops being drawn. Mutations persist until they complete or the application shuts down (at which point the supervising Mutator will try to ensure that mutations have a chance to complete before killing them).
Mutations can take several forms:
- Processes that make a change to application state (like a database query or API request) and then exit.
- Processes that manage a multi-step sequence of operations (like guiding a user through a tour of an application across many different pages) and then exit when done.
- Processes that run providing useful services for the lifetime of your application (a mutation can maintain a frequently-needed, frequently-updated value for easy access by your UI).
The only requirement for a mutation (encapsulated by the MutationProvider type) is that it eventually closes its output channel (signalling that it is complete). Mutations must terminate themselves as quickly as possible (closing their output channel) when their context is cancelled.
To launch a mutation, you need a MutationPool to host it, and the mutation must result in a sequence of values matching the MutationPool's value type. To reuse the pool from the previous example blocks, we need a mutation that produces a sequence of time.Time values.
key := 0 mutation, isNew := stream.Mutate(tickerPool, key, func(ctx context.Context) <-chan time.Time { out := make(chan time.Time) go func() { defer close(out) ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case t := <-ticker.C: out <- t } } }() return out })
The above creates a mutation that emits times from a persistent ticker. That ticker will only stop when the mutation's context is cancelled. The returned mutation value is a handle on this running mutation. It can be used to stream values from the output channel of a mutation. The isNew return value tells the caller whether the mutation pool has returned an already-running mutation for the same key, or has created a new mutation for that key.
You can read the results of a mutation from the UI by creating a stream:
tickStream := stream.New(controller, mutation.Stream) tick, status := tickStream.Read(gtx)
Like any other stream, you can apply transformation functions within a MutationProvider to modify the output of the mutation for *all* consumers or you can apply transformation functions in a Provider to modify the output for just the current stream.
Index ¶
- Variables
- func Debounce[T any](input <-chan T, interval time.Duration) <-chan T
- func DebounceUntilStable[T any](input <-chan T, interval time.Duration) <-chan T
- func Distinct[T comparable](input <-chan T) <-chan T
- func DistinctFunc[T any](input <-chan T, same func(a, b T) bool) <-chan T
- func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]
- func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]
- func Filter[T, U any](in <-chan T, keep func(T) (U, bool)) <-chan U
- func FilterR[T, U any](in <-chan Result[T], keep func(T, error) (U, error, bool)) <-chan Result[U]
- func Latest[T any](in <-chan T) <-chan T
- func Multiplex[In, Out, State any](input <-chan In, ...) <-chan Outdeprecated
- func MultiplexR[In, Out, State any](input <-chan Result[In], ...) <-chan Result[Out]deprecated
- func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T
- func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]
- func ProviderRebuilder[In, Out any](input <-chan In, deepCopy func(In) In, equal func(a, b In) bool, ...) <-chan Out
- func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U
- func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]
- func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Out
- func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], ...) <-chan Result[Out]
- func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]
- type ARCPool
- type ComparableKeyer
- type Controller
- type Input
- type Mutation
- func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
- func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
- func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, ...) (mut *Mutation[T], isNew bool)
- func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)
- type MutationPool
- type MutationProvider
- type Mutator
- type PoolKeyer
- type Provider
- type ProviderR
- type Result
- type ResultStream
- func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)
- func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)
- func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error
- func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)
- type Source
- type Status
- type Stream
Constants ¶
This section is empty.
Variables ¶
var ErrNilController = fmt.Errorf("stream has nil controller")
ErrNilController indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.
var ErrNilProvider = fmt.Errorf("stream has nil provider function")
ErrNilProvider indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.
Functions ¶
func Debounce ¶
Debounce coalesces values emitted on the input channel so that only one value is emitted each interval. If it is not waiting after emitting a recent value, it will immediately emit a new value, but will collect values emitted within interval time after that and will only emit the final value.
func DebounceUntilStable ¶
DebounceUntilStable emits a value one `interval` after the last received input. If the inputs are rapid, the timer will reset on each input until the inputs settle enough for interval to pass between them.
func Distinct ¶
func Distinct[T comparable](input <-chan T) <-chan T
Distinct is DistinctFunc using "a == b" as the same func.
func DistinctFunc ¶
DistinctFunc returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.
func DistinctFuncR ¶
func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]
DistinctFuncR returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.
func DistinctR ¶
func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]
DistinctR is DistinctFuncR using "a == b && errors.Is(aErr, bErr)" as the same func.
func Filter ¶
Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped.
func FilterR ¶
FilterR implements Filter for Result channels, automatically unpacking the value/error of the input and repacking the U and error returned by keep into a Result.
func Latest ¶
func Latest[T any](in <-chan T) <-chan T
Latest creates a buffered channel that will exhibit no backpressure. While waiting to send its next output element, it will always be ready to receive a new input, and it will replace its next outbound element with any new element. This is useful when you want to skip doing the work for intermediate values when multiple values are flowing through a stream pipeline.
func Multiplex
deprecated
func Multiplex[In, Out, State any]( input <-chan In, choose func(ctx context.Context, state State, val In) (<-chan Out, State), ) <-chan Out
Multiplex allows dynamically reconfiguring its output stream based on the last value of its input stream. The provided choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).
Deprecated: Multiplex is very useful, but requires the caller to do too many subtle things right within the choose func. Use ProviderRebuilder instead to achieve the same thing.
func MultiplexR
deprecated
func MultiplexR[In, Out, State any]( input <-chan Result[In], choose func(ctx context.Context, state State, val In, err error) (<-chan Result[Out], State), ) <-chan Result[Out]
MultiplexR allows dynamically reconfiguring its output stream based on the last value of its input stream. The provide choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).
Deprecated: Multiplex is very useful, but requires the caller to do too many subtle things right within the choose func. Use ProviderRebuilder instead to achieve the same thing.
func PostProcess ¶
func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T
PostProcess immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.
func PostProcessR ¶
func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]
PostProcessR immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.
func ProviderRebuilder ¶
func ProviderRebuilder[In, Out any]( input <-chan In, deepCopy func(In) In, equal func(a, b In) bool, provider func(context.Context, In) <-chan Out, ) <-chan Out
ProviderRebuilder uses an input stream to configure an output stream. The first time a value is received on input, the provider function will be invoked on that input to produce an output stream.
Each subsequent value received on input will be compared with the previous value for equality, using the provided equal function, and the provider function will be re-run when the new input value is not considered equal. The previous invocation of the provider will have its context cancelled and its output channel drained.
The provider function may return nil to signal that the previous provider's output (if any) should still be used.
The provided deepCopy function must make a copy of In that is equal to In and shares no memory with it.
func Transform ¶
func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U
Transform returns a new channel which will emit every value sent over input transformed by the transformer function.
func TransformR ¶
func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]
TransformR returns a new channel which will emit every value sent over input transformed by the transformer function.
func Zip ¶
func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Out
Zip combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either input channel closes.
func ZipR ¶
func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U, aErr, bErr error) (Out, error)) <-chan Result[Out]
ZipR combines two streams of types T and U into a stream of type Out. ZipR works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either input channel closes.
func ZipWrapR ¶
func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]
ZipWrapR behaves as ZipR except that it handles combining errors from channels by wrapping them together and will not invoke the zipFunc if either channel errored (instead emitting the error result).
- If a errored and b did not, set a's error as the output error
- If b errored and a did not, set b's error as the output error
- If both a and b errored, set an error wrapping both as the output error
- If there is an output error, return the zero value of Out and that error.
- Otherwise, run zipFunc and return its results.
Types ¶
type ARCPool ¶
type ARCPool[V PoolKeyer[K], K comparable, T any] struct { // contains filtered or unexported fields }
ARCPool manages a pool of [PersistentStream]s that are created and destroyed automatically by refcounting the number of consumers of each stream.
func NewARCPool ¶
func NewARCPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(ctx context.Context, v V) <-chan T) *ARCPool[V, K, T]
NewARCPool defines a new ARCPool with a provider function that is responsible for actually doing the work of the PersistentStream used for each key. The provider func should be safe to cancel at any time, so it should ideally have no side effects on other application state.
type ComparableKeyer ¶
type ComparableKeyer[K comparable] struct { // contains filtered or unexported fields }
ComparableKeyer wraps simple comparable types so that they implement PoolKeyer.
func Comparable ¶
func Comparable[K comparable](k K) ComparableKeyer[K]
Comparable wraps the given k as a ComparableKeyer.
func (ComparableKeyer[K]) MutationPoolKey ¶
func (c ComparableKeyer[K]) MutationPoolKey() K
MutationPoolKey returns the comparable key for this ComparableKeyer.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller manages the lifecycle of asynchronous streams of data, connecting them to the frame event loop of a given application window.
func NewController ¶
func NewController(ctx context.Context, invalidator func()) *Controller
NewController constructs a controller bound to the window lifecycle of a single application window. The provided invalidator func must trigger an invalidation of that window, and the Sweep() method must be invoked during the processing of each frame for that window. A controller can be shared among all async loading for a single window.
func (*Controller) Done ¶
func (s *Controller) Done()
Done cleans up all streams. It should be invoked when an application window is closed in order to ensure that all associated processing shuts down with the window.
func (*Controller) Sweep ¶
func (s *Controller) Sweep() (active, swept int)
Sweep cleans up inactive streams. It must be invoked once per frame by the event loop for the window that the stream is bound to. It returns the number of active streams after the sweep, as well as the number of streams that were swept away by the call (making them inert). Note that this is not the same as the number of inert streams.
type Input ¶
type Input[T any] struct { // contains filtered or unexported fields }
Input provides data to a stream in a threadsafe way and without blocking (unless you perform concurrent sends, then it may block). It can be used to feed data into a stream data transformation pipeline from the UI event loop.
func NewDistinctInput ¶
func NewDistinctInput[T comparable](initialValue T) Input[T]
NewInput constructs an input like NewInput, but the input will not emit values that are the same as the most-recently-emitted values. In this case, sameness is defined as having equal values (==).
func NewInput ¶
NewInput constructs an input, setting it to emit initialValue the first time it is read.
func NewInputEmpty ¶
NewInputEmpty creates a new input with no initial value. Applications may supply a filter function that will drop sending elements if it returns true.
func (*Input[T]) Send ¶
func (s *Input[T]) Send(t T)
Send emits t on the input, replacing any previously-emitted value that has not already been consumed.
type Mutation ¶
type Mutation[T any] struct { // contains filtered or unexported fields }
Mutation is a handle on the results of an asynchronous application state change.
func Mutate ¶
func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
Mutate attempts to start a mutation using the mutationProv and bound to the unique key. If key is already registered to a running mutation, that mutation instance will be returned instead.
If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.
func MutateKeyed ¶
func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
MutateKeyed is identical to Mutate except that it automatically derives the mutation key from a provided PoolKeyer instead of requiring the key to be passed explicitly. This can sometimes be more ergonomic.
func MutateTimeout ¶
func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)
MutateTimeout does the same thing as Mutate, but sets a timeout on the mutations' context.
func PersistentStream ¶
func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)
PersistentStream attempts to start a stream with its lifecycle bound to a mutation pool instead of the UI. This is useful when many UI streams want to consume the same data, as they can all consume the output of a single persistent stream (via the returned mutation's Stream method) instead of each independently querying or generating the source data of interest. If key is already registered to a running mutation, that mutation instance will be returned instead.
NOTE(whereswaldon): the existence of this method points to mutations being misnamed. They are really a kind of stream that has a different lifecycle and are allowed to have side effects within the application. This method provides a way to access the different lifecycle while committing to not have side effects. It's unclear what a better name would be though.
If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.
type MutationPool ¶
type MutationPool[K comparable, T any] struct { // contains filtered or unexported fields }
MutationPool supervises the lifecycle of a group of related mutations. Specifically, it prevents two mutations with the same key from coexisting, and it offers a method to stream all running mutations.
func NewMutationPool ¶
func NewMutationPool[K comparable, T any](mutator *Mutator) *MutationPool[K, T]
NewMutationPool creates a MutationPool powered by the provided mutator.
type MutationProvider ¶
MutationProvider starts an asynchronous application state change and returns a channel of results from that process. The provided context is not bound to the lifecycle of a specific stream, but to the mutation itself. If the context is cancelled, the mutation provider should stop the mutation and emit values on its output channel indicating where it stopped. When the mutation is complete (whether a result of running to completion or cancellation), the returned channel must be closed.
type Mutator ¶
type Mutator struct {
// contains filtered or unexported fields
}
Mutator manages the lifecycle of asynchronous application state mutations.
func NewMutator ¶
NewMutator creates a new mutator which will spawn all mutations using the provided context. The provided timeout configures how long Mutator.Shutdown will wait before giving up on a clean shutdown, see the docs on that method for details.
func (*Mutator) Shutdown ¶
Shutdown stops the creation of new mutations (causing [Mutator.Exec] to return false) and blocks until all existing mutations complete. If the timeout provided to NewMutator elapses without all mutations shutting down, all mutation contexts will be cancelled. Then Shutdown will again wait up to the configured timeout for all mutations to end cleanly. If some mutations are still not shutdown after the second timeout, Shutdown will return an error. Otherwise (in the case of clean shutdown) it will return nil.
type PoolKeyer ¶
type PoolKeyer[K comparable] interface { // MutationPoolKey returns a key that identifies the work performed by a mutation for deduplication // purposes. If two mutations have the same key, [MutationPool]s will deduplicate them so the only // one can execute at a time. MutationPoolKey() K }
PoolKeyer describes a type that can generate a unique mutation pool key for itself.
type Provider ¶
Provider is a function that returns a channel of values which will be closed when the provided context is cancelled. Provider functions are usually used to provide data as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.
type ProviderR ¶
ProviderR is a function that returns a channel of [Result]s which will be closed when the provided context is cancelled. ProviderR functions are usually used to provide data and errors as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.
type Result ¶
Result is a convenience type for bundling data and an error together so they can be sent over a channel.
func ResultFrom ¶
ResultFrom constructs a Result by bundling the given t and e together.
type ResultStream ¶
ResultStream provides Result[T]s from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically.
func NewR ¶
func NewR[T any](controller *Controller, provider ProviderR[T]) *ResultStream[T]
NewR creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.
func (*ResultStream[T]) Read ¶
func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)
Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.
func (*ResultStream[T]) ReadDefault ¶
func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)
ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.
func (*ResultStream[T]) ReadInto ¶
func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error
ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t. The returned error is the error result of the latest stream value or nil.
func (*ResultStream[T]) ReadNew ¶
func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)
ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:
if value, status, err := s.Read(gtx); status == stream.Emitting { // Do logic that should only occur when a value is emitted from the stream. }
It can be written as:
if value, ok, err := s.ReadNew(gtx); ok { // Do logic that should only occur when a value is emitted from the stream. }
type Source ¶
type Source[S, T any] struct { // contains filtered or unexported fields }
Source helps contruct [Provider]s that emit a single, shared value. It may not be suitable for all applications. Streams created using a Source will receive the latest available value, but may not receive every value emitted by the valuer if new values arrive quickly. S is an internal state type, protected by the source's lock, that is used to track the current state of the source. T is the result type emitted on streams from this source. If constructed with NewSourceCtx, a Source can be "closed" much like a channel by cancelling its input context.
func NewSource ¶
NewSource constructs a source using the provided valuer function to transform its current state (S) into a T. valuer must be idempotent. The boolean return value from valuer indicates whether the T should be emitted over the stream or discarded. The valuer should deep copy all data it uses in T to ensure that other invocations of valuer do not reference the same memory.
func NewSourceCtx ¶
NewSourceCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.
func (*Source[S, T]) Stream ¶
Stream is a Provider function. The returned channel will close when the provided context is cancelled or (if the Source was constructed with NewSourceCtx) when the source's context is cancelled, and will emit any values set by Update() for which the valuer function provided at construction returns true. If Update() is invoked quickly, only the final value is guaranteed to be emitted on the channel returned by Stream.
func (*Source[S, T]) Update ¶
func (s *Source[S, T]) Update(fn func(oldState S) S)
Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to Update will have no effect.
func (*Source[S, T]) UpdateIf ¶
Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn iff fn returns true. If fn returns false, the returned value is discarded and no value will be sent on the stream's output. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to UpdateIf will have no effect.
type Status ¶
type Status uint8
Status describes the state of the data read from the stream.
const ( // Waiting indicates that the stream has never received a value. Waiting Status = iota // Emitting indicates that the stream is emitting a new value. Emitting // Cached indicates that the stream is emitting a cached copy of the most // recently received value. Cached // Complete indicates that the stream provider closed its output channel after // emitting at least one value and without being cancelled. This indicates that // the work for this stream is complete and the stream need not be restarted. // The last value received over the channel will always be returned with this // status. Complete // Incomplete indicates that the stream provider closed its output channel without // ever emitting a value. This is usually a bug in the stream provider. Any stream // value received with this status should be ignored. Incomplete // Uninitialized means that the stream has never been constructed. This status is // only returned if a nil stream is read. Uninitialized )
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream provides Ts from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically. A nil stream can be read from safely, but it is invalid to construct a stream literal without invoking a constructor function. Such streams will panic when used.
func New ¶
func New[T any](controller *Controller, provider Provider[T]) *Stream[T]
New creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.
func Once ¶
func Once[T any](controller *Controller, do func(ctx context.Context) T) *Stream[T]
Once creates a stream that will only emit a single value, and will not restart itself if it completes sucessfully. This is useful for one-shot async computations.
func (*Stream[T]) Read ¶
Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.
func (*Stream[T]) ReadDefault ¶
ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.
func (*Stream[T]) ReadInto ¶
ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t.
func (*Stream[T]) ReadNew ¶
ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:
if value, status := s.Read(gtx); status == stream.Emitting { // Do logic that should only occur when a value is emitted from the stream. }
It can be written as:
if value, ok := s.ReadNew(gtx); ok { // Do logic that should only occur when a value is emitted from the stream. }
Many streams do not need special handling for when events are emitted, and should not use this method.