gstream

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewState

func NewState(ctx context.Context) *state

Types

type CursorNext

type CursorNext[T any] func(ctx context.Context) (items []T, hasNext bool, err error)

type Filter

type Filter[T any] struct {
	FlowState
	// contains filtered or unexported fields
}

Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FilterPredicate -------- ]

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism ...uint) *Filter[T]

NewFilter returns a new Filter instance.

filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Filter[T]) In

func (f *Filter[T]) In() chan<- interface{}

In returns an input channel for receiving data

func (*Filter[T]) Out

func (f *Filter[T]) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Filter[T]) To

func (f *Filter[T]) To(sink Sink)

To streams data to the given sink

func (*Filter[T]) Via

func (f *Filter[T]) Via(flow Transfer) Transfer

Via streams data through the given flow

type FilterPredicate

type FilterPredicate[T any] func(context.Context, T) (bool, error)

FilterPredicate represents a filter predicate (boolean-valued function).

type FlatMap

type FlatMap[T, R any] struct {
	FlowState
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FlatMapFunction -------- ]

out -- 1' - 2' -------- 4'- 4" - 5' -

func NewFlatMap

func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism ...uint) *FlatMap[T, R]

NewFlatMap returns a new FlatMap instance.

flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*FlatMap[T, R]) In

func (fm *FlatMap[T, R]) In() chan<- interface{}

In returns an input channel for receiving data

func (*FlatMap[T, R]) Out

func (fm *FlatMap[T, R]) Out() <-chan interface{}

Out returns an output channel for sending data

func (*FlatMap[T, R]) To

func (fm *FlatMap[T, R]) To(sink Sink)

To streams data to the given sink

func (*FlatMap[T, R]) Via

func (fm *FlatMap[T, R]) Via(flow Transfer) Transfer

Via streams data through the given flow

type FlatMapFunction

type FlatMapFunction[T, R any] func(context.Context, T) ([]R, error)

FlatMapFunction represents a FlatMap transformation function.

type FlowState

type FlowState struct {
	// contains filtered or unexported fields
}

func FlowStateWithContext

func FlowStateWithContext(ctx context.Context) FlowState

func (*FlowState) Context

func (bs *FlowState) Context() context.Context

func (*FlowState) Done

func (bs *FlowState) Done()

func (*FlowState) HasStateErr

func (bs *FlowState) HasStateErr() bool

func (*FlowState) SetStateErr

func (bs *FlowState) SetStateErr(err error)

func (*FlowState) State

func (bs *FlowState) State() *state

type Inlet

type Inlet interface {
	Stater
	In() chan<- interface{}
}

Inlet represents a type that exposes one open input.

type Map

type Map[T, R any] struct {
	FlowState
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ ---------- MapFunction ---------- ]

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism ...uint) *Map[T, R]

NewMap returns a new Map instance.

mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Map[T, R]) In

func (m *Map[T, R]) In() chan<- interface{}

In returns an input channel for receiving data

func (*Map[T, R]) Out

func (m *Map[T, R]) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Map[T, R]) To

func (m *Map[T, R]) To(sink Sink)

To streams data to the given sink

func (*Map[T, R]) Via

func (m *Map[T, R]) Via(flow Transfer) Transfer

Via streams data through the given flow

type MapFunction

type MapFunction[T, R any] func(context.Context, T) (R, error)

MapFunction represents a Map transformation function.

type MemorySink

type MemorySink[T any] interface {
	Sink
	Result() []T
}

func NewMemorySink

func NewMemorySink[T any]() MemorySink[T]

type Optional added in v1.0.1

type Optional[T any] interface {
	Get() T
	IsPresent() bool
}

func NewEmptyOptional added in v1.0.1

func NewEmptyOptional[T any]() Optional[T]

func NewOptional added in v1.0.1

func NewOptional[T any](t T) Optional[T]

type Outlet

type Outlet interface {
	Stater
	Out() <-chan interface{}
}

Outlet represents a type that exposes one open output.

type Reduce

type Reduce[T any] struct {
	FlowState
	// contains filtered or unexported fields
}

Reduce represents a “rolling” reduce on a data stream. Combines the current element with the last reduced value and emits the new value.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ --------- ReduceFunction --------- ]

out -- 1 -- 2' --- 3' - 4' ----- 5' -

func NewReduce

func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T]

NewReduce returns a new Reduce instance.

reduceFunction combines the current element with the last reduced value.

func (*Reduce[T]) In

func (r *Reduce[T]) In() chan<- interface{}

In returns an input channel for receiving data

func (*Reduce[T]) Out

func (r *Reduce[T]) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Reduce[T]) To

func (r *Reduce[T]) To(sink Sink)

To streams data to the given sink

func (*Reduce[T]) Via

func (r *Reduce[T]) Via(flow Transfer) Transfer

Via streams data through the given flow

type ReduceFunction

type ReduceFunction[T any] func(context.Context, T, T) (T, error)

ReduceFunction combines the current element with the last reduced value.

type Sink

type Sink interface {
	Inlet
	// contains filtered or unexported methods
}

Sink represents a set of stream processing steps that has one open input.

type Source

type Source interface {
	Outlet
	Via(Transfer) Transfer
}

Source represents a set of stream processing steps that has one open output.

func NewDataStream

func NewDataStream(out Outlet) Source

func NewDataStreamOfCursor

func NewDataStreamOfCursor[T any](ctx context.Context, cursor func(ctx context.Context) CursorNext[T]) Source

func NewDataStreamOfSlice added in v1.0.2

func NewDataStreamOfSlice[T any](ctx context.Context, items []T) Source

type Stater

type Stater interface {
	State() *state
	Context() context.Context

	SetStateErr(err error)
	// contains filtered or unexported methods
}

type StoreSink added in v1.0.1

type StoreSink[T any] interface {
	Sink
}

func NewStoreSink added in v1.0.1

func NewStoreSink[T any](batch int, store func(ctx context.Context, ts []T) error) StoreSink[T]

type Transfer

type Transfer interface {
	Inlet
	Outlet
	Via(Transfer) Transfer
	To(Sink)
}

Transfer represents a set of stream processing steps that has one open input and one open output.

func FanOut

func FanOut(outlet Outlet, magnitude int) []Transfer

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Merge

func Merge(outlets ...Transfer) Transfer

Merge merges multiple flows into a single flow.

func Split

func Split[T any](outlet Outlet, predicate func(T) bool) [2]Transfer

Split splits the stream into two flows according to the given boolean predicate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL