stream

package
v0.3.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2017 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package stream implements lazy functional streams using Go channels.

Streams consist of a single generator, zero or more operators, and a single terminator. Every stream shares a stop channel, which the owner may use to cleanly shut down the entire stream.

Streams exploit Go's lightweight channels and goroutines in order to implement concurrency without locking shared data structures.

This is intentionally reminiscent of the Java 8 Streams API

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ForEach

func ForEach(in *Stream, f DoFunc) chan interface{}

ForEach adds a terminator onto the stream that consumes each element and runs the given function on them.

func NewJoiner

func NewJoiner() (*Stream, *Joiner)

NewJoiner creates a controllable joiner for the given stream. New streams can be created from the Joiner and later removed.

func Reduce

func Reduce(in *Stream, initVal interface{}, f ReduceFunc) chan interface{}

Reduce adds a terminator onto the stream that accumulates a value using the given function and then returns it once the input stream terminates.

func Split

func Split(in *Stream, f FilterFunc) (*Stream, *Stream)

Split splits the input stream by the given filter function

func Tee

func Tee(in *Stream) (*Stream, *Stream)

Tee copies the input stream to a pair of output streams (like a T-shaped junction)

func Wait

func Wait(in *Stream) chan interface{}

Wait adds a terminator onto the stream that throws away all elements and just waits for stream termination.

Types

type DoFunc

type DoFunc func(interface{})

DoFunc is the signature of a function that is called by Do

type FilterFunc

type FilterFunc func(interface{}) bool

FilterFunc is the signature of a function that is called by Filter

type Joiner

type Joiner struct {
	// contains filtered or unexported fields
}

Joiner is a construct that combines multiple input streams into a single output stream.

func (*Joiner) Add

func (J *Joiner) Add(s *Stream) bool

Add adds a new input stream to an existing Joiner.

func (*Joiner) Close

func (J *Joiner) Close()

Close closes a Joiner.

func (*Joiner) Off

func (J *Joiner) Off()

Off disables the output from a Joiner.

func (*Joiner) On

func (J *Joiner) On()

On enables the output from a Joiner.

func (*Joiner) Remove

func (J *Joiner) Remove(s *Stream) bool

Remove removes an input stream from an existing Joiner.

type MapFunc

type MapFunc func(interface{}) interface{}

MapFunc is the signature of a function that is called by Map

type ReduceFunc

type ReduceFunc func(interface{}, interface{}) interface{}

ReduceFunc is the signature of a function called by Reduce.

type Repeater

type Repeater struct {
	// contains filtered or unexported fields
}

Repeater is a stream that repeats elements from one input to multiple output streams.

func NewRepeater

func NewRepeater(in *Stream) *Repeater

NewRepeater creates a controllable repeater for the given stream. It acts as a Stream terminator for the parent stream, but allows new Streams to be dynamically created and removed from the parent stream. It consumes all events from the parent stream regardless of whether there are child streams or not. If this is not the desired behavior, attach a Valve between the parent and the repeater.

func (*Repeater) Close

func (r *Repeater) Close()

Close closes the input stream of a Repeater.

func (*Repeater) NewStream

func (r *Repeater) NewStream() *Stream

NewStream creates a new output stream from a Repeater stream.

type Stream

type Stream struct {
	// Ctrl is the control channel to the stream. Consumers may close
	// it to shut down the stream.
	Ctrl chan<- interface{}

	// Data is the data channel of the stream. Consumers may receive
	// from it to consume stream elements.
	Data <-chan interface{}
}

Stream represents a stream consisting of a generator, zero or more operators, and zero or one terminators. Consumers receive stream elements from the Data channel and may terminate the stream by closing the Ctrl channel.

func Buffer

func Buffer(in *Stream, size int) *Stream

Buffer stores up to the given number of elements from the input Stream before blocking

func Chargen

func Chargen() *Stream

Chargen creates a generator that produces a stream of single-character strings from a pattern reminiscent of RFC864 chargen TCP/UDP services.

func Copy

func Copy(in *Stream, n int) []*Stream

Copy duplicates elements from the input stream into the given number of output streams.

func Do

func Do(in *Stream, f DoFunc) *Stream

Do adds an operator in the stream that calls the given function for every element.

func Filter

func Filter(in *Stream, f FilterFunc) *Stream

Filter adds a filter to a stream. For each element in the stream, a false return from the filter function causes the element to be discarded.

func Iota

func Iota(args ...uint64) *Stream

Iota creates a generator that produces the given count of int elements (or infinite, if not specified). Additional optional arguments specify the start and step value.

func Join

func Join(in ...*Stream) *Stream

Join combines multiple input Streams into a single output Stream. Closing the Join closes the input streams as well.

func Limit

func Limit(in *Stream, mod api.LimitModifier) *Stream

Limit limits the number of results returned

func Map

func Map(in *Stream, f MapFunc) *Stream

Map adds an operator in the stream that calls the given function for every element and forwards along the returned value.

func Null

func Null() *Stream

Null creates a generator that doesn't create any output elements.

func OnOffValve

func OnOffValve(in *Stream) (*Stream, chan<- bool)

OnOffValve adds a simple on/off valve operator onto the stream. It defaults to off to prevent a race condition from accepting input before being able to be switched off.

func Overflow

func Overflow(in *Stream) *Stream

Overflow drops elements instead of blocking on the output Stream

func Throttle

func Throttle(in *Stream, mod api.ThrottleModifier) *Stream

Throttle limits the number of events emitted by the stream

func Ticker

func Ticker(d time.Duration) *Stream

Ticker creates a generator that produces a time.Time element every 'tick' of the specified duration.

func (*Stream) Close

func (s *Stream) Close()

Close terminates the stream

func (*Stream) Next

func (s *Stream) Next() (interface{}, bool)

Next receives the next element in the stream. It returns that element and a boolean indicating whether it was received successfully or whether the stream was closed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL