flow

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package flow implements a streaming calculation framework.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Transmit

func Transmit(state *ComponentState, downstream Inlet, current Outlet)

Transmit provides a helper function to connect the current component with the downstream component. It should be run in another goroutine.

Types

type AggregationOp

type AggregationOp interface {
	// Add puts a slice of elements as the input
	Add([]StreamRecord)
	// Snapshot takes a snapshot of the current state of the AggregationOp
	// Taking a snapshot will restore the dirty flag
	Snapshot() interface{}
	// Dirty flag means if any new item is added after the last snapshot
	Dirty() bool
}

AggregationOp defines the stateful operation for aggregation.

type AggregationOpFactory

type AggregationOpFactory func() AggregationOp

AggregationOpFactory is a factory to create AggregationOp.

type Component

type Component interface {
	// Setup is the lifecycle hook for resource preparation, e.g. start background job for listening input channel.
	// It must be called before the flow starts to process elements.
	Setup(context.Context) error
	// Teardown is the lifecycle hook for shutting down the Component
	// Implementation should ENSURE that all resource has been correctly recycled before this method returns.
	Teardown(context.Context) error
}

Component is a lifecycle controller.

type ComponentState

type ComponentState struct {
	sync.WaitGroup
}

ComponentState watches whether the component is shutdown.

type Data

type Data []any

Data indicates a aggregated data.

type DedupPriorityQueue

type DedupPriorityQueue struct {
	Items []Element
	// contains filtered or unexported fields
}

DedupPriorityQueue implements heap.Interface. DedupPriorityQueue is not thread-safe.

func NewPriorityQueue

func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupPriorityQueue

NewPriorityQueue returns a new DedupPriorityQueue.

func (*DedupPriorityQueue) Len

func (pq *DedupPriorityQueue) Len() int

Len returns the DedupPriorityQueue length.

func (*DedupPriorityQueue) Less

func (pq *DedupPriorityQueue) Less(i, j int) bool

Less is the items less comparator.

func (*DedupPriorityQueue) Peek

func (pq *DedupPriorityQueue) Peek() Element

Peek returns the first item of the DedupPriorityQueue without removing it.

func (*DedupPriorityQueue) Pop

func (pq *DedupPriorityQueue) Pop() interface{}

Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.

func (*DedupPriorityQueue) Push

func (pq *DedupPriorityQueue) Push(x interface{})

Push implements heap.Interface.Push. Appends an item to the DedupPriorityQueue.

func (*DedupPriorityQueue) ReplaceLowest

func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element)

ReplaceLowest replaces the lowest item with the newLowest.

func (*DedupPriorityQueue) Swap

func (pq *DedupPriorityQueue) Swap(i, j int)

Swap exchanges indexes of the items.

func (*DedupPriorityQueue) Values

func (pq *DedupPriorityQueue) Values() []Element

Values returns all items.

type Element

type Element interface {
	GetIndex() int
	SetIndex(int)
}

Element represents an item in the DedupPriorityQueue.

type Flow

type Flow interface {
	io.Closer
	// Filter is used to filter data.
	// The parameter f can be either predicate function for streaming,
	// or conditions for batch query.
	Filter(UnaryOperation[bool]) Flow
	// Map is used to transform data
	Map(UnaryOperation[any]) Flow
	// Window is used to split infinite data into "buckets" of finite size.
	// Currently, it is only applicable to streaming context.
	Window(WindowAssigner) WindowedFlow
	// To pipes data to the given sink
	To(sink Sink) Flow
	// Open opens the flow in the async mode for streaming scenario.
	// The first error is the error combination while opening all components,
	// while the second is a channel for receiving async errors.
	Open() <-chan error
}

Flow is an abstraction of data flow for both Streaming and Batch.

type Inlet

type Inlet interface {
	In() chan<- StreamRecord
}

Inlet represents a type that exposes one open input.

type Operator

type Operator interface {
	Inlet
	Outlet
	Component
	Exec(downstream Inlet)
}

Operator represents a set of stream processing steps that has one open input and one open output.

type Outlet

type Outlet interface {
	Out() <-chan StreamRecord
}

Outlet represents a type that exposes one open output.

type Sink

type Sink interface {
	Inlet
	Component
}

Sink represents a set of stream processing steps that has one open input.

type Source

type Source interface {
	Outlet
	Component
	Exec(downstream Inlet)
}

Source represents a set of stream processing steps that has one open output.

type StreamRecord

type StreamRecord struct {
	// contains filtered or unexported fields
}

StreamRecord is a container wraps user data and timestamp. It is the underlying transmission medium for the streaming processing.

func NewStreamRecord

func NewStreamRecord(data interface{}, ts int64) StreamRecord

NewStreamRecord returns a StreamRecord with data and timestamp.

func NewStreamRecordWithTimestampPb

func NewStreamRecordWithTimestampPb(data interface{}, timestamp *timestamppb.Timestamp) StreamRecord

NewStreamRecordWithTimestampPb returns a StreamRecord whose timestamp is parsed from protobuf's timestamp.

func NewStreamRecordWithoutTS

func NewStreamRecordWithoutTS(data interface{}) StreamRecord

NewStreamRecordWithoutTS returns a StreamRecord with data only.

func TryExactTimestamp

func TryExactTimestamp(item any) StreamRecord

TryExactTimestamp tries to get the timestamp from the item.

func (StreamRecord) Data

func (sr StreamRecord) Data() interface{}

Data returns the embedded data.

func (StreamRecord) Equal added in v0.4.0

func (sr StreamRecord) Equal(other StreamRecord) bool

Equal checks if two StreamRecord are the same.

func (StreamRecord) TimestampMillis

func (sr StreamRecord) TimestampMillis() int64

TimestampMillis returns the timestamp in millisecond.

func (StreamRecord) WithNewData

func (sr StreamRecord) WithNewData(data interface{}) StreamRecord

WithNewData sets data to StreamRecord.

type UnaryFunc

type UnaryFunc[R any] func(context.Context, interface{}) R

UnaryFunc implements UnaryOperation as type func (context.Context, interface{}).

func (UnaryFunc[R]) Apply

func (f UnaryFunc[R]) Apply(ctx context.Context, data interface{}) R

Apply implements UnOperation.Apply method.

type UnaryOperation

type UnaryOperation[R any] interface {
	Apply(ctx context.Context, data interface{}) R
}

UnaryOperation represents user-defined unary function (i.e. Map, Filter, etc).

func FilterFunc

func FilterFunc(filter UnaryOperation[bool]) (UnaryOperation[any], error)

FilterFunc transform a function to an UnaryOperation.

type Window

type Window interface {
	// MaxTimestamp returns the upper bound of the Window.
	// Unit: Millisecond
	MaxTimestamp() int64
}

Window is a bucket of elements with a finite size. timedWindow is the only implementation now.

type WindowAssigner

type WindowAssigner interface {
	// AssignWindows assigns a slice of Window according to the given timestamp, e.g. eventTime.
	// The unit of the timestamp here is MilliSecond.
	AssignWindows(timestamp int64) ([]Window, error)
}

WindowAssigner is used to assign Window(s) for a given timestamp, and thus it can create a WindowedFlow.

type WindowedFlow

type WindowedFlow interface {
	AllowedMaxWindows(windowCnt int) WindowedFlow
	// TopN applies a TopNAggregation to each Window.
	TopN(topNum int, opts ...any) Flow
}

WindowedFlow is a flow which processes incoming elements based on window. The WindowedFlow can be created with a WindowAssigner.

Directories

Path Synopsis
Package streaming implement the flow framework to provide the sliding window, top-n aggregation, and etc.
Package streaming implement the flow framework to provide the sliding window, top-n aggregation, and etc.
sources
Package sources implements data sources to sink data into the flow framework.
Package sources implements data sources to sink data into the flow framework.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL