stream

package
v0.0.0-...-39b0932 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2023 License: Apache-2.0 Imports: 7 Imported by: 3

Documentation

Overview

Package stream implements lazy functional streams using Go channels.

Streams consist of a single generator, zero or more operators, and a single terminator. Every stream shares a stop channel, which the owner may use to cleanly shut down the entire stream.

Streams exploit Go's lightweight channels and goroutines in order to implement concurrency without locking shared data structures.

This is intentionally reminiscent of the Java 8 Streams API

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ForEach

func ForEach(in *Stream, f DoFunc) chan interface{}

ForEach adds a terminator onto the stream that consumes each element and runs the given function on them.

func NewJoiner

func NewJoiner() (*Stream, *Joiner)

NewJoiner creates a controllable joiner for the given stream. New streams can be created from the Joiner and later removed.

func Reduce

func Reduce(in *Stream, initVal interface{}, f ReduceFunc) chan interface{}

Reduce adds a terminator onto the stream that accumulates a value using the given function and then returns it once the input stream terminates.

func Split

func Split(in *Stream, f FilterFunc) (*Stream, *Stream)

Split splits the input stream by the given filter function

func Tee

func Tee(in *Stream) (*Stream, *Stream)

Tee copies the input stream to a pair of output streams (like a T-shaped junction)

func Wait

func Wait(in *Stream) chan interface{}

Wait adds a terminator onto the stream that throws away all elements and just waits for stream termination.

Types

type DoFunc

type DoFunc func(interface{})

type FilterFunc

type FilterFunc func(interface{}) bool

type Joiner

type Joiner struct {
	// contains filtered or unexported fields
}

func (*Joiner) Add

func (J *Joiner) Add(s *Stream) bool

func (*Joiner) Close

func (J *Joiner) Close()

func (*Joiner) Off

func (J *Joiner) Off()

func (*Joiner) On

func (J *Joiner) On()

func (*Joiner) Remove

func (J *Joiner) Remove(s *Stream) bool

type MapFunc

type MapFunc func(interface{}) interface{}

type ReduceFunc

type ReduceFunc func(interface{}, interface{}) interface{}

type Repeater

type Repeater struct {
	// contains filtered or unexported fields
}

func NewRepeater

func NewRepeater(in *Stream) *Repeater

NewRepeater creates a controllable repeater for the given stream. It acts as a Stream terminator for the parent stream, but allows new Streams to be dynamically created and removed from the parent stream. It consumes all events from the parent stream regardless of whether there are child streams or not. If this is not the desired behavior, attach a Valve between the parent and the repeater.

func (*Repeater) Close

func (r *Repeater) Close()

func (*Repeater) NewStream

func (r *Repeater) NewStream() *Stream

type Stream

type Stream struct {
	// Ctrl is the control channel to the stream. Consumers may close
	// it to shut down the stream.
	Ctrl chan<- interface{}

	// Data is the data channel of the stream. Consumers may receive
	// from it to consume stream elements.
	Data <-chan interface{}
}

Stream represents a stream consisting of a generator, zero or more operators, and zero or one terminators. Consumers receive stream elements from the Data channel and may terminate the stream by closing the Ctrl channel.

func Buffer

func Buffer(in *Stream, size int) *Stream

Buffer stores up to the given number of elements from the input Stream before blocking

func Chargen

func Chargen() *Stream

Chargen creates a generator that produces a stream of single-character strings from a pattern reminiscent of RFC864 chargen TCP/UDP services.

func Copy

func Copy(in *Stream, n int) []*Stream

Copy duplicates elements from the input stream into the given number of output streams.

func Do

func Do(in *Stream, f DoFunc) *Stream

Do adds an operator in the stream that calls the given function for every element.

func Filter

func Filter(in *Stream, f FilterFunc) *Stream

func Iota

func Iota(args ...uint64) *Stream

Iota creates a generator that produces the given count of int elements (or infinite, if not specified). Additional optional arguments specify the start and step value.

func Join

func Join(in ...*Stream) *Stream

Join combines multiple input Streams into a single output Stream. Closing the Join closes the input streams as well.

func Limit

func Limit(in *Stream, mod api.LimitModifier) *Stream

Limits the number of results returned

func Map

func Map(in *Stream, f MapFunc) *Stream

Map adds an operator in the stream that calls the given function for every element and forwards along the returned value.

func Null

func Null() *Stream

Null creates a generator that doesn't create any output elements.

func OnOffValve

func OnOffValve(in *Stream) (*Stream, chan<- bool)

OnOffValve adds a simple on/off valve operator onto the stream. It defaults to off to prevent a race condition from accepting input before being able to be switched off.

func Overflow

func Overflow(in *Stream) *Stream

Overflow drops elements instead of blocking on the output Stream

func Throttle

func Throttle(in *Stream, mod api.ThrottleModifier) *Stream

Throttles the number of events emitted by the stream

func Ticker

func Ticker(d time.Duration) *Stream

Ticker creates a generator that produces a time.Time element every 'tick' of the specified duration.

func (*Stream) Close

func (s *Stream) Close()

Close terminates the stream

func (*Stream) Next

func (s *Stream) Next() (interface{}, bool)

Next receives the next element in the stream. It returns that element and a boolean indicating whether it was received successfully or whether the stream was closed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL