stream

package
v0.0.0-...-3aec24a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2022 License: Apache-2.0 Imports: 1 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](stream Stream[T]) ([]T, error)

Collect aggregates a stream into a slice. If an error is hit, the items observed thus far are still returned, but they may not represent the complete set.

func CollectPages

func CollectPages[T any](stream Stream[[]T]) ([]T, error)

CollectPages aggregates a paginated stream into a slice. If an error is hit, the pages observed thus far are still returned, but they may not represent the complete set.

func Drain

func Drain[T any](stream Stream[T]) error

Drain consumes a stream to completion.

Types

type Stream

type Stream[T any] interface {
	// Next attempts to advance the stream to the next item. If false is returned,
	// then no more items are available. Next() and Item() must not be called after the
	// first time Next() returns false.
	Next() bool
	// Item gets the current item. Invoking Item() is only safe if Next was previously
	// invoked *and* returned true. Invoking Item() before invoking Next(), or after Next()
	// returned false may cause panics or other unpredictable behavior. Whether or not the
	// item returned is safe for access after the stream is advanced again is dependent
	// on the implementation and should be documented (e.g. an I/O based stream might
	// re-use an underlying buffer).
	Item() T
	// Done checks for any errors that occurred during streaming and informs the stream
	// that we've finished consuming items from it. Invoking Next() or Item() after Done()
	// has been called is not permitted. Done may trigger cleanup operations, but unlike Close()
	// the error reported is specifically related to failures that occurred *during* streaming,
	// meaning that if Done() returns an error, there is a high likelihood that the complete
	// set of values was not observed. For this reason, Done() should always be checked explicitly
	// rather than deferred as Close() might be.
	Done() error
}

Stream is a generic interface for streaming APIs. This package was built with the intention of making it easier to write streaming resource getters, and may not be be suitable for applications outside of that specific usecase. Streams may panic if misused. See the Collect function for an example of the correct consumption pattern.

NOTE: streams almost always perform worse than slices in go. unless you're dealing with a resource that scales linearly with cluster size, you are probably better off just working with slices.

func Empty

func Empty[T any]() Stream[T]

Empty creates an empty stream (equivalent to Fail(nil)).

func Fail

func Fail[T any](err error) Stream[T]

Fail creates an empty stream that fails immediately with the supplied error.

func FilterMap

func FilterMap[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]

FilterMap maps a stream of type A into a stream of type B, filtering out items when fn returns false.

func Func

func Func[T any](fn func() (T, error), doneFuncs ...func()) Stream[T]

Func builds a stream from a closure. The supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may cause infinite loops. Cleanup functions may be optionally provided which will be run on close. If wrapping a paginated API, consider using PageFunc instead.

func MapWhile

func MapWhile[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]

MapWhile maps a stream of type A into a stream of type B, halting early if fn returns false.

func Once

func Once[T any](item T) Stream[T]

Once creates a stream that yields a single item.

func PageFunc

func PageFunc[T any](fn func() ([]T, error), doneFuncs ...func()) Stream[T]

PageFunc is equivalent to Func except that it performs internal depagination. As with Func, the supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may result in infinite loops.

func Slice

func Slice[T any](items []T) Stream[T]

Slice constructs a stream from a slice.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL