reducer

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

Functions

func DefaultOptions

func DefaultOptions() *options

func NewServer

func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server

NewServer creates a new reduce server.

Types

type Datum

type Datum interface {
	Value() []byte
	EventTime() time.Time
	Watermark() time.Time
}

Datum contains methods to get the payload information.

func NewHandlerDatum

func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time) Datum

type IntervalWindow

type IntervalWindow interface {
	StartTime() time.Time
	EndTime() time.Time
}

IntervalWindow contains methods to get the information for a given interval window.

func NewIntervalWindow

func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by reduce functions

func MessageToDrop

func MessageToDrop() Message

MessageToDrop creates a Message to be dropped

func NewMessage

func NewMessage(value []byte) Message

NewMessage creates a Message with value

func (Message) Keys

func (m Message) Keys() []string

Keys returns message keys

func (Message) Tags

func (m Message) Tags() []string

Tags returns message tags

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithKeys

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to the message

func (Message) WithTags

func (m Message) WithTags(tags []string) Message

WithTags is used to assign the tags to the message tags will be used for conditional forwarding

type Messages

type Messages []Message

func MessagesBuilder

func MessagesBuilder() Messages

MessagesBuilder returns an empty instance of Messages

func (Messages) Append

func (m Messages) Append(msg Message) Messages

Append appends a Message

func (Messages) Items

func (m Messages) Items() []Message

Items returns the message list

type Metadata

type Metadata interface {
	IntervalWindow() IntervalWindow
}

Metadata contains methods to get the metadata for the reduce operation.

func NewMetadata

func NewMetadata(window IntervalWindow) Metadata

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(f string) Option

WithServerInfoFilePath sets the server info file path to the given path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.

type Reducer

type Reducer interface {
	Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
}

Reducer is the interface of reduce function implementation.

type ReducerCreator added in v0.6.0

type ReducerCreator interface {
	// Create creates a Reducer, will be invoked once for every keyed window.
	Create() Reducer
}

ReducerCreator is the interface which is used to create a Reducer.

func SimpleCreatorWithReduceFn added in v0.6.0

func SimpleCreatorWithReduceFn(f func(context.Context, []string, <-chan Datum, Metadata) Messages) ReducerCreator

SimpleCreatorWithReduceFn creates a simple ReducerCreator for the given reduce function.

type Service

type Service struct {
	reducepb.UnimplementedReduceServer
	CreateReduceHandler ReducerCreator
}

Service implements the proto gen server interface and contains the reduce operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) ReduceFn

func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error

ReduceFn applies a reduce function to a request stream and returns a list of results.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL