projection

package
v1.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoPublisher       = errors.New("no appendLog")
	ErrFinished          = errors.New("appendLog is finished")
	ErrContextDone       = errors.New("context is done")
	ErrHandlerReturnErr  = errors.New("handler returned error")
	ErrPublishWithOffset = errors.New("cannot publish message with offset")
)
View Source
var (
	ErrInterpreterNotInNewState = fmt.Errorf("interpreter is not in new state")
)
View Source
var ErrNotFound = errors.New("node not found")
View Source
var NotFound = errors.New("not found")

Functions

func AccumulateDiscardRetractHandlerShape

func AccumulateDiscardRetractHandlerShape() shape.Shape

func AccumulateShape

func AccumulateShape() shape.Shape

func AccumulateToJSON

func AccumulateToJSON(x *Accumulate) ([]byte, error)

func AccumulatingAndRetractingShape

func AccumulatingAndRetractingShape() shape.Shape

func AccumulatingAndRetractingToJSON

func AccumulatingAndRetractingToJSON(x *AccumulatingAndRetracting) ([]byte, error)

func AllOfShape

func AllOfShape() shape.Shape

func AllOfToJSON

func AllOfToJSON(x *AllOf) ([]byte, error)

func AnyOfShape

func AnyOfShape() shape.Shape

func AnyOfToJSON

func AnyOfToJSON(x *AnyOf) ([]byte, error)

func AtPeriod1Shape

func AtPeriod1Shape() shape.Shape

func AtPeriod1ToJSON

func AtPeriod1ToJSON(x *AtPeriod1) ([]byte, error)

func AtPeriodShape

func AtPeriodShape() shape.Shape

func AtPeriodToJSON

func AtPeriodToJSON(x *AtPeriod) ([]byte, error)

func AtWatermark1Shape

func AtWatermark1Shape() shape.Shape

func AtWatermark1ToJSON

func AtWatermark1ToJSON(x *AtWatermark1) ([]byte, error)

func AtWatermarkShape

func AtWatermarkShape() shape.Shape

func AtWatermarkToJSON

func AtWatermarkToJSON(x *AtWatermark) ([]byte, error)

func AtWindowItemSize1Shape

func AtWindowItemSize1Shape() shape.Shape

func AtWindowItemSize1ToJSON

func AtWindowItemSize1ToJSON(x *AtWindowItemSize1) ([]byte, error)

func AtWindowItemSizeShape

func AtWindowItemSizeShape() shape.Shape

func AtWindowItemSizeToJSON

func AtWindowItemSizeToJSON(x *AtWindowItemSize) ([]byte, error)

func Bool

func Bool(b bool) *bool

func DefaultContextShape

func DefaultContextShape() shape.Shape

func DiscardShape

func DiscardShape() shape.Shape

func DiscardToJSON

func DiscardToJSON(x *Discard) ([]byte, error)

func DoJoinShape

func DoJoinShape() shape.Shape

func DoJoinToJSON

func DoJoinToJSON(x *DoJoin) ([]byte, error)

func DoLoadShape

func DoLoadShape() shape.Shape

func DoLoadToJSON

func DoLoadToJSON(x *DoLoad) ([]byte, error)

func DoMapShape

func DoMapShape() shape.Shape

func DoMapToJSON

func DoMapToJSON(x *DoMap) ([]byte, error)

func DoWindowShape

func DoWindowShape() shape.Shape

func DoWindowToJSON

func DoWindowToJSON(x *DoWindow) ([]byte, error)

func Each

func Each(x schema.Schema, f func(value schema.Schema))

func EvaluateTriggerR0

func EvaluateTriggerR0[T0 TriggerDescription, T1 TriggerType](
	t0 T0,
	t1 T1,
	f0 func(x0 *AtPeriod, x1 *AtPeriod),
	f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize),
	f2 func(x0 *AtWatermark, x1 *AtWatermark),
	f3 func(x0 *AnyOf, x1 TriggerType),
	f4 func(x0 *AllOf, x1 TriggerType),
	f5 func(x0 T0, x1 T1),
)

func EvaluateTriggerR1

func EvaluateTriggerR1[T0 TriggerDescription, T1 TriggerType, TOut1 any](
	t0 T0,
	t1 T1,
	f0 func(x0 *AtPeriod, x1 *AtPeriod) TOut1,
	f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) TOut1,
	f2 func(x0 *AtWatermark, x1 *AtWatermark) TOut1,
	f3 func(x0 *AnyOf, x1 TriggerType) TOut1,
	f4 func(x0 *AllOf, x1 TriggerType) TOut1,
	f5 func(x0 T0, x1 T1) TOut1,
) TOut1

func EvaluateTriggerR2

func EvaluateTriggerR2[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any](
	t0 T0,
	t1 T1,
	f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2),
	f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) (TOut1, TOut2),
	f2 func(x0 *AtWatermark, x1 *AtWatermark) (TOut1, TOut2),
	f3 func(x0 *AnyOf, x1 TriggerType) (TOut1, TOut2),
	f4 func(x0 *AllOf, x1 TriggerType) (TOut1, TOut2),
	f5 func(x0 T0, x1 T1) (TOut1, TOut2),
) (TOut1, TOut2)

func EvaluateTriggerR3

func EvaluateTriggerR3[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any, TOut3 any](
	t0 T0,
	t1 T1,
	f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2, TOut3),
	f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) (TOut1, TOut2, TOut3),
	f2 func(x0 *AtWatermark, x1 *AtWatermark) (TOut1, TOut2, TOut3),
	f3 func(x0 *AnyOf, x1 TriggerType) (TOut1, TOut2, TOut3),
	f4 func(x0 *AllOf, x1 TriggerType) (TOut1, TOut2, TOut3),
	f5 func(x0 T0, x1 T1) (TOut1, TOut2, TOut3),
) (TOut1, TOut2, TOut3)

func EventTimeShape

func EventTimeShape() shape.Shape

func FixedWindowShape

func FixedWindowShape() shape.Shape

func FixedWindowToJSON

func FixedWindowToJSON(x *FixedWindow) ([]byte, error)

func GameShape

func GameShape() shape.Shape

func GenerateItemsEvery

func GenerateItemsEvery(start int64, size int, every time.Duration) chan Item

func HashNode

func HashNode(n Node) string

func InMemoryBagOfShape

func InMemoryBagOfShape() shape.Shape

func ItemGroupedByKeyShape

func ItemGroupedByKeyShape() shape.Shape

func ItemGroupedByWindowShape

func ItemGroupedByWindowShape() shape.Shape

func ItemShape

func ItemShape() shape.Shape

func ItemTypeShape

func ItemTypeShape() shape.Shape

func KeyWithNamespace

func KeyWithNamespace(key string, namespace string) string

func KeyedWindowKey

func KeyedWindowKey(x *KeyedWindow) string

func KeyedWindowShape

func KeyedWindowShape() shape.Shape

func MatchNodeR0

func MatchNodeR0(
	x Node,
	f1 func(x *DoWindow),
	f2 func(x *DoMap),
	f3 func(x *DoLoad),
	f4 func(x *DoJoin),
)

func MatchNodeR1

func MatchNodeR1[T0 any](
	x Node,
	f1 func(x *DoWindow) T0,
	f2 func(x *DoMap) T0,
	f3 func(x *DoLoad) T0,
	f4 func(x *DoJoin) T0,
) T0

func MatchNodeR2

func MatchNodeR2[T0, T1 any](
	x Node,
	f1 func(x *DoWindow) (T0, T1),
	f2 func(x *DoMap) (T0, T1),
	f3 func(x *DoLoad) (T0, T1),
	f4 func(x *DoJoin) (T0, T1),
) (T0, T1)

func MatchNodeR3

func MatchNodeR3[T0, T1, T2 any](
	x Node,
	f1 func(x *DoWindow) (T0, T1, T2),
	f2 func(x *DoMap) (T0, T1, T2),
	f3 func(x *DoLoad) (T0, T1, T2),
	f4 func(x *DoJoin) (T0, T1, T2),
) (T0, T1, T2)

func MatchTriggerDescriptionR0

func MatchTriggerDescriptionR0(
	x TriggerDescription,
	f1 func(x *AtPeriod),
	f2 func(x *AtWindowItemSize),
	f3 func(x *AtWatermark),
	f4 func(x *AnyOf),
	f5 func(x *AllOf),
)

func MatchTriggerDescriptionR1

func MatchTriggerDescriptionR1[T0 any](
	x TriggerDescription,
	f1 func(x *AtPeriod) T0,
	f2 func(x *AtWindowItemSize) T0,
	f3 func(x *AtWatermark) T0,
	f4 func(x *AnyOf) T0,
	f5 func(x *AllOf) T0,
) T0

func MatchTriggerDescriptionR2

func MatchTriggerDescriptionR2[T0, T1 any](
	x TriggerDescription,
	f1 func(x *AtPeriod) (T0, T1),
	f2 func(x *AtWindowItemSize) (T0, T1),
	f3 func(x *AtWatermark) (T0, T1),
	f4 func(x *AnyOf) (T0, T1),
	f5 func(x *AllOf) (T0, T1),
) (T0, T1)

func MatchTriggerDescriptionR3

func MatchTriggerDescriptionR3[T0, T1, T2 any](
	x TriggerDescription,
	f1 func(x *AtPeriod) (T0, T1, T2),
	f2 func(x *AtWindowItemSize) (T0, T1, T2),
	f3 func(x *AtWatermark) (T0, T1, T2),
	f4 func(x *AnyOf) (T0, T1, T2),
	f5 func(x *AllOf) (T0, T1, T2),
) (T0, T1, T2)

func MatchTriggerTypeR0

func MatchTriggerTypeR0(
	x TriggerType,
	f1 func(x *AtPeriod1),
	f2 func(x *AtWindowItemSize1),
	f3 func(x *AtWatermark1),
)

func MatchTriggerTypeR1

func MatchTriggerTypeR1[T0 any](
	x TriggerType,
	f1 func(x *AtPeriod1) T0,
	f2 func(x *AtWindowItemSize1) T0,
	f3 func(x *AtWatermark1) T0,
) T0

func MatchTriggerTypeR2

func MatchTriggerTypeR2[T0, T1 any](
	x TriggerType,
	f1 func(x *AtPeriod1) (T0, T1),
	f2 func(x *AtWindowItemSize1) (T0, T1),
	f3 func(x *AtWatermark1) (T0, T1),
) (T0, T1)

func MatchTriggerTypeR3

func MatchTriggerTypeR3[T0, T1, T2 any](
	x TriggerType,
	f1 func(x *AtPeriod1) (T0, T1, T2),
	f2 func(x *AtWindowItemSize1) (T0, T1, T2),
	f3 func(x *AtWatermark1) (T0, T1, T2),
) (T0, T1, T2)

func MatchWindowDescriptionR0

func MatchWindowDescriptionR0(
	x WindowDescription,
	f1 func(x *SessionWindow),
	f2 func(x *SlidingWindow),
	f3 func(x *FixedWindow),
)

func MatchWindowDescriptionR1

func MatchWindowDescriptionR1[T0 any](
	x WindowDescription,
	f1 func(x *SessionWindow) T0,
	f2 func(x *SlidingWindow) T0,
	f3 func(x *FixedWindow) T0,
) T0

func MatchWindowDescriptionR2

func MatchWindowDescriptionR2[T0, T1 any](
	x WindowDescription,
	f1 func(x *SessionWindow) (T0, T1),
	f2 func(x *SlidingWindow) (T0, T1),
	f3 func(x *FixedWindow) (T0, T1),
) (T0, T1)

func MatchWindowDescriptionR3

func MatchWindowDescriptionR3[T0, T1, T2 any](
	x WindowDescription,
	f1 func(x *SessionWindow) (T0, T1, T2),
	f2 func(x *SlidingWindow) (T0, T1, T2),
	f3 func(x *FixedWindow) (T0, T1, T2),
) (T0, T1, T2)

func MatchWindowFlushModeR0

func MatchWindowFlushModeR0(
	x WindowFlushMode,
	f1 func(x *Accumulate),
	f2 func(x *Discard),
	f3 func(x *AccumulatingAndRetracting),
)

func MatchWindowFlushModeR1

func MatchWindowFlushModeR1[T0 any](
	x WindowFlushMode,
	f1 func(x *Accumulate) T0,
	f2 func(x *Discard) T0,
	f3 func(x *AccumulatingAndRetracting) T0,
) T0

func MatchWindowFlushModeR2

func MatchWindowFlushModeR2[T0, T1 any](
	x WindowFlushMode,
	f1 func(x *Accumulate) (T0, T1),
	f2 func(x *Discard) (T0, T1),
	f3 func(x *AccumulatingAndRetracting) (T0, T1),
) (T0, T1)

func MatchWindowFlushModeR3

func MatchWindowFlushModeR3[T0, T1, T2 any](
	x WindowFlushMode,
	f1 func(x *Accumulate) (T0, T1, T2),
	f2 func(x *Discard) (T0, T1, T2),
	f3 func(x *AccumulatingAndRetracting) (T0, T1, T2),
) (T0, T1, T2)

func MessageShape

func MessageShape() shape.Shape

func NewStatsCollector

func NewStatsCollector() *statsCollector

func NodeShape

func NodeShape() shape.Shape

func NodeToJSON

func NodeToJSON(x Node) ([]byte, error)

func NodeToString

func NodeToString(node Node) string

func PackRetractAndAggregate

func PackRetractAndAggregate(x, y schema.Schema) *schema.Map

func SessionWindowShape

func SessionWindowShape() shape.Shape

func SessionWindowToJSON

func SessionWindowToJSON(x *SessionWindow) ([]byte, error)

func SessionsStatsShape

func SessionsStatsShape() shape.Shape

func SlidingWindowShape

func SlidingWindowShape() shape.Shape

func SlidingWindowToJSON

func SlidingWindowToJSON(x *SlidingWindow) ([]byte, error)

func StatsShape

func StatsShape() shape.Shape

func TickersShape

func TickersShape() shape.Shape

func ToMermaidGraph

func ToMermaidGraph(dag *DAGBuilder) string

func ToMermaidGraphWithOrder

func ToMermaidGraphWithOrder(dag *DAGBuilder, order []Node) string

func ToStr

func ToStr(x Node) string

func ToStrItem

func ToStrItem(item *Item) string

func TriggerDescriptionShape

func TriggerDescriptionShape() shape.Shape

func TriggerDescriptionToJSON

func TriggerDescriptionToJSON(x TriggerDescription) ([]byte, error)

func TriggerHandlerShape

func TriggerHandlerShape() shape.Shape

func TriggerManagerShape

func TriggerManagerShape() shape.Shape

func TriggerStateShape

func TriggerStateShape() shape.Shape

func TriggerTypeShape

func TriggerTypeShape() shape.Shape

func TriggerTypeToJSON

func TriggerTypeToJSON(x TriggerType) ([]byte, error)

func WindowBufferShape

func WindowBufferShape() shape.Shape

func WindowDescriptionShape

func WindowDescriptionShape() shape.Shape

func WindowDescriptionToJSON

func WindowDescriptionToJSON(x WindowDescription) ([]byte, error)

func WindowFlushModeShape

func WindowFlushModeShape() shape.Shape

func WindowFlushModeToJSON

func WindowFlushModeToJSON(x WindowFlushMode) ([]byte, error)

func WindowKey

func WindowKey(window *Window) string

func WindowShape

func WindowShape() shape.Shape

func WindowTriggerShape

func WindowTriggerShape() shape.Shape

Types

type Accumulate

type Accumulate struct {
	AllowLateArrival time.Duration
}

func AccumulateFromJSON

func AccumulateFromJSON(x []byte) (*Accumulate, error)

func (*Accumulate) AcceptWindowFlushMode

func (r *Accumulate) AcceptWindowFlushMode(v WindowFlushModeVisitor) any

func (*Accumulate) MarshalJSON

func (r *Accumulate) MarshalJSON() ([]byte, error)

func (*Accumulate) UnmarshalJSON

func (r *Accumulate) UnmarshalJSON(data []byte) error

type AccumulateDiscardRetractHandler

type AccumulateDiscardRetractHandler struct {
	// contains filtered or unexported fields
}

func (*AccumulateDiscardRetractHandler) Process

func (a *AccumulateDiscardRetractHandler) Process(x Item, returning func(Item)) error

func (*AccumulateDiscardRetractHandler) Retract

func (a *AccumulateDiscardRetractHandler) Retract(x Item, returning func(Item)) error

type AccumulatingAndRetracting

type AccumulatingAndRetracting struct {
	AllowLateArrival time.Duration
}

func AccumulatingAndRetractingFromJSON

func AccumulatingAndRetractingFromJSON(x []byte) (*AccumulatingAndRetracting, error)

func (*AccumulatingAndRetracting) AcceptWindowFlushMode

func (r *AccumulatingAndRetracting) AcceptWindowFlushMode(v WindowFlushModeVisitor) any

func (*AccumulatingAndRetracting) MarshalJSON

func (r *AccumulatingAndRetracting) MarshalJSON() ([]byte, error)

func (*AccumulatingAndRetracting) UnmarshalJSON

func (r *AccumulatingAndRetracting) UnmarshalJSON(data []byte) error

type AllOf

type AllOf struct {
	Triggers []TriggerDescription
}

func AllOfFromJSON

func AllOfFromJSON(x []byte) (*AllOf, error)

func (*AllOf) AcceptTriggerDescription

func (r *AllOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any

func (*AllOf) MarshalJSON

func (r *AllOf) MarshalJSON() ([]byte, error)

func (*AllOf) UnmarshalJSON

func (r *AllOf) UnmarshalJSON(data []byte) error

type AnyOf

type AnyOf struct {
	Triggers []TriggerDescription
}

func AnyOfFromJSON

func AnyOfFromJSON(x []byte) (*AnyOf, error)

func (*AnyOf) AcceptTriggerDescription

func (r *AnyOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any

func (*AnyOf) MarshalJSON

func (r *AnyOf) MarshalJSON() ([]byte, error)

func (*AnyOf) UnmarshalJSON

func (r *AnyOf) UnmarshalJSON(data []byte) error

type AtPeriod

type AtPeriod struct {
	Duration time.Duration
}

func AtPeriodFromJSON

func AtPeriodFromJSON(x []byte) (*AtPeriod, error)

func (*AtPeriod) AcceptTriggerDescription

func (r *AtPeriod) AcceptTriggerDescription(v TriggerDescriptionVisitor) any

func (*AtPeriod) MarshalJSON

func (r *AtPeriod) MarshalJSON() ([]byte, error)

func (*AtPeriod) UnmarshalJSON

func (r *AtPeriod) UnmarshalJSON(data []byte) error

type AtPeriod1

type AtPeriod1 = AtPeriod

go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark

func AtPeriod1FromJSON

func AtPeriod1FromJSON(x []byte) (*AtPeriod1, error)

func (*AtPeriod1) AcceptTriggerType

func (r *AtPeriod1) AcceptTriggerType(v TriggerTypeVisitor) any

type AtWatermark

type AtWatermark struct {
	Timestamp int64
}

func AtWatermarkFromJSON

func AtWatermarkFromJSON(x []byte) (*AtWatermark, error)

func (*AtWatermark) AcceptTriggerDescription

func (r *AtWatermark) AcceptTriggerDescription(v TriggerDescriptionVisitor) any

func (*AtWatermark) MarshalJSON

func (r *AtWatermark) MarshalJSON() ([]byte, error)

func (*AtWatermark) UnmarshalJSON

func (r *AtWatermark) UnmarshalJSON(data []byte) error

type AtWatermark1

type AtWatermark1 = AtWatermark

go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark

func AtWatermark1FromJSON

func AtWatermark1FromJSON(x []byte) (*AtWatermark1, error)

func (*AtWatermark1) AcceptTriggerType

func (r *AtWatermark1) AcceptTriggerType(v TriggerTypeVisitor) any

type AtWindowItemSize

type AtWindowItemSize struct {
	Number int
}

func AtWindowItemSizeFromJSON

func AtWindowItemSizeFromJSON(x []byte) (*AtWindowItemSize, error)

func (*AtWindowItemSize) AcceptTriggerDescription

func (r *AtWindowItemSize) AcceptTriggerDescription(v TriggerDescriptionVisitor) any

func (*AtWindowItemSize) MarshalJSON

func (r *AtWindowItemSize) MarshalJSON() ([]byte, error)

func (*AtWindowItemSize) UnmarshalJSON

func (r *AtWindowItemSize) UnmarshalJSON(data []byte) error

type AtWindowItemSize1

type AtWindowItemSize1 = AtWindowItemSize

go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark

func AtWindowItemSize1FromJSON

func AtWindowItemSize1FromJSON(x []byte) (*AtWindowItemSize1, error)

func (*AtWindowItemSize1) AcceptTriggerType

func (r *AtWindowItemSize1) AcceptTriggerType(v TriggerTypeVisitor) any

type AvgHandler

type AvgHandler struct {
	// contains filtered or unexported fields
}

func (*AvgHandler) Process

func (h *AvgHandler) Process(msg Item, returning func(Item)) error

type BagOf

type BagOf[A any] interface {
	Set(key string, value A) error
	Get(key string) (A, error)
	Del(key string) error
	Range(f func(key string, item A))
}

type Builder

type Builder interface {
	Load(f Handler, opts ...ContextOptionFunc) Builder
	Window(opts ...ContextOptionFunc) Builder
	Map(f Handler, opts ...ContextOptionFunc) Builder
	Join(a, b Builder, opts ...ContextOptionFunc) Builder
	Build() []Node
}

type ContextOptionFunc

type ContextOptionFunc func(c *DefaultContext)

func WithAccumulate

func WithAccumulate() ContextOptionFunc

func WithAccumulatingAndRetracting

func WithAccumulatingAndRetracting() ContextOptionFunc

func WithDiscard

func WithDiscard() ContextOptionFunc

func WithFixedWindow

func WithFixedWindow(width time.Duration) ContextOptionFunc

func WithName

func WithName(name string) ContextOptionFunc

func WithSessionWindow

func WithSessionWindow(gap time.Duration) ContextOptionFunc

func WithSlidingWindow

func WithSlidingWindow(width time.Duration, period time.Duration) ContextOptionFunc

func WithTriggers

func WithTriggers(and ...TriggerDescription) ContextOptionFunc

func WithWindowDescription

func WithWindowDescription(wd WindowDescription) ContextOptionFunc

func WithWindowFlushMode

func WithWindowFlushMode(fm WindowFlushMode) ContextOptionFunc

type CountHandler

type CountHandler struct {
	// contains filtered or unexported fields
}

func (*CountHandler) Process

func (h *CountHandler) Process(msg Item, returning func(Item)) error

type DAGBuilder

type DAGBuilder struct {
	// contains filtered or unexported fields
}

func NewDAGBuilder

func NewDAGBuilder() *DAGBuilder

func (*DAGBuilder) Build

func (d *DAGBuilder) Build() []Node

func (*DAGBuilder) GetByName

func (d *DAGBuilder) GetByName(name string) (*DAGBuilder, error)

func (*DAGBuilder) Join

func (d *DAGBuilder) Join(a, b Builder, opts ...ContextOptionFunc) Builder

func (*DAGBuilder) Load

func (d *DAGBuilder) Load(f Handler, opts ...ContextOptionFunc) Builder

DoLoad loads data from a source. This node is a root of the DAG. DAG can have many DoLoad nodesFromTo.

func (*DAGBuilder) Map

func (d *DAGBuilder) Map(f Handler, opts ...ContextOptionFunc) Builder

func (*DAGBuilder) Window

func (d *DAGBuilder) Window(opts ...ContextOptionFunc) Builder

type DebounceHandler

type DebounceHandler struct {
	MaxSize int
	MaxTime time.Duration
	// contains filtered or unexported fields
}

func (*DebounceHandler) Process

func (t *DebounceHandler) Process(x Item, returning func(Item)) error

func (*DebounceHandler) Retract

func (t *DebounceHandler) Retract(x Item, returning func(Item)) error

type DefaultContext

type DefaultContext struct {
	// contains filtered or unexported fields
}

func GetCtx

func GetCtx(node Node) *DefaultContext

func NewContextBuilder

func NewContextBuilder(builders ...func(config *DefaultContext)) *DefaultContext

func (*DefaultContext) Name

func (c *DefaultContext) Name() string

func (*DefaultContext) Scope

func (c *DefaultContext) Scope(name string) *DefaultContext

type Discard

type Discard struct{}

func DiscardFromJSON

func DiscardFromJSON(x []byte) (*Discard, error)

func (*Discard) AcceptWindowFlushMode

func (r *Discard) AcceptWindowFlushMode(v WindowFlushModeVisitor) any

func (*Discard) MarshalJSON

func (r *Discard) MarshalJSON() ([]byte, error)

func (*Discard) UnmarshalJSON

func (r *Discard) UnmarshalJSON(data []byte) error

type DoJoin

type DoJoin struct {
	Ctx   *DefaultContext
	Input []Node
}

func DoJoinFromJSON

func DoJoinFromJSON(x []byte) (*DoJoin, error)

func (*DoJoin) AcceptNode

func (r *DoJoin) AcceptNode(v NodeVisitor) any

func (*DoJoin) MarshalJSON

func (r *DoJoin) MarshalJSON() ([]byte, error)

func (*DoJoin) UnmarshalJSON

func (r *DoJoin) UnmarshalJSON(data []byte) error

type DoLoad

type DoLoad struct {
	Ctx    *DefaultContext
	OnLoad Handler
}

func DoLoadFromJSON

func DoLoadFromJSON(x []byte) (*DoLoad, error)

func (*DoLoad) AcceptNode

func (r *DoLoad) AcceptNode(v NodeVisitor) any

func (*DoLoad) MarshalJSON

func (r *DoLoad) MarshalJSON() ([]byte, error)

func (*DoLoad) UnmarshalJSON

func (r *DoLoad) UnmarshalJSON(data []byte) error

type DoMap

type DoMap struct {
	Ctx   *DefaultContext
	OnMap Handler
	Input Node
}

DoMap implicitly means, merge by key

func DoMapFromJSON

func DoMapFromJSON(x []byte) (*DoMap, error)

func (*DoMap) AcceptNode

func (r *DoMap) AcceptNode(v NodeVisitor) any

func (*DoMap) MarshalJSON

func (r *DoMap) MarshalJSON() ([]byte, error)

func (*DoMap) UnmarshalJSON

func (r *DoMap) UnmarshalJSON(data []byte) error

type DoWindow

type DoWindow struct {
	Ctx   *DefaultContext
	Input Node
}

func DoWindowFromJSON

func DoWindowFromJSON(x []byte) (*DoWindow, error)

func (*DoWindow) AcceptNode

func (r *DoWindow) AcceptNode(v NodeVisitor) any

func (*DoWindow) MarshalJSON

func (r *DoWindow) MarshalJSON() ([]byte, error)

func (*DoWindow) UnmarshalJSON

func (r *DoWindow) UnmarshalJSON(data []byte) error

type Dual

type Dual struct {
	// contains filtered or unexported fields
}

func NewDual

func NewDual() *Dual

func (*Dual) IsValid

func (d *Dual) IsValid() bool

func (*Dual) List

func (d *Dual) List() []*Message

func (*Dual) ReturningAggregate

func (d *Dual) ReturningAggregate(msg Item)

func (*Dual) ReturningRetract

func (d *Dual) ReturningRetract(msg Item)

type EvaluateTrigger

type EvaluateTrigger[T0 TriggerDescription, T1 TriggerType] interface {
	MatchPeriod(*AtPeriod, *AtPeriod)
	MatchCount(*AtWindowItemSize, *AtWindowItemSize)
	MatchWatermark(*AtWatermark, *AtWatermark)
	MatchAnyOfAny(*AnyOf, TriggerType)
	MatchAllOfAny(*AllOf, TriggerType)
	MatchDefault(T0, T1)
}

type EventTime

type EventTime = int64

type ExecutionGroup

type ExecutionGroup struct {
	// contains filtered or unexported fields
}

func (*ExecutionGroup) Go

func (g *ExecutionGroup) Go(f func() error)

func (*ExecutionGroup) Wait

func (g *ExecutionGroup) Wait() error

type ExecutionStatus

type ExecutionStatus int

import (

"context"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/widmogrod/mkunion/x/schema"
"sync"

)

func DefaultInMemoryInterpreter() *InMemoryInterpreter {
	return &InMemoryInterpreter{
		pubsub: NewPubSubMultiChan[Node](),
		//pubsub:  NewPubSub[Node](),
		byKeys:  make(map[Node]map[string]Item),
		running: make(map[Node]struct{}),
		stats:   NewStatsCollector(),
	}
}
const (
	ExecutionStatusNew ExecutionStatus = iota
	ExecutionStatusRunning
	ExecutionStatusError
	ExecutionStatusFinished
)

type FilterHandler

type FilterHandler struct {
	Where *predicate.WherePredicates
}

func (*FilterHandler) Process

func (f *FilterHandler) Process(x Item, returning func(Item)) error

func (*FilterHandler) Retract

func (f *FilterHandler) Retract(x Item, returning func(Item)) error

type FixedWindow

type FixedWindow struct {
	Width time.Duration
}

func FixedWindowFromJSON

func FixedWindowFromJSON(x []byte) (*FixedWindow, error)

func (*FixedWindow) AcceptWindowDescription

func (r *FixedWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any

func (*FixedWindow) MarshalJSON

func (r *FixedWindow) MarshalJSON() ([]byte, error)

func (*FixedWindow) UnmarshalJSON

func (r *FixedWindow) UnmarshalJSON(data []byte) error

type Game

type Game struct {
	SessionID string
	Players   []string
	Winner    string
	IsDraw    bool
}

func (*Game) MarshalJSON

func (r *Game) MarshalJSON() ([]byte, error)

func (*Game) UnmarshalJSON

func (r *Game) UnmarshalJSON(data []byte) error

type GenerateHandler

type GenerateHandler struct {
	Load func(push func(message Item)) error
}

func (*GenerateHandler) Process

func (h *GenerateHandler) Process(x Item, returning func(Item)) error

func (*GenerateHandler) Retract

func (h *GenerateHandler) Retract(x Item, returning func(Item)) error

type Handler

type Handler interface {
	Process(x Item, returning func(Item)) error
	Retract(x Item, returning func(Item)) error
}

func Log

func Log(prefix string) Handler

type InMemoryBagOf

type InMemoryBagOf[A any] struct {
	// contains filtered or unexported fields
}

func NewInMemoryBagOf

func NewInMemoryBagOf[A any]() *InMemoryBagOf[A]

func (*InMemoryBagOf[A]) Del

func (b *InMemoryBagOf[A]) Del(key string) error

func (*InMemoryBagOf[A]) Get

func (b *InMemoryBagOf[A]) Get(key string) (A, error)

func (*InMemoryBagOf[A]) Range

func (b *InMemoryBagOf[A]) Range(f func(key string, item A))

func (*InMemoryBagOf[A]) Set

func (b *InMemoryBagOf[A]) Set(key string, value A) error

type InMemoryTwoInterpreter

type InMemoryTwoInterpreter struct {
	// contains filtered or unexported fields
}

func NewInMemoryTwoInterpreter

func NewInMemoryTwoInterpreter() *InMemoryTwoInterpreter

func (*InMemoryTwoInterpreter) Run

func (i *InMemoryTwoInterpreter) Run(ctx context.Context, nodes []Node) error

func (*InMemoryTwoInterpreter) StatsSnapshotAndReset

func (i *InMemoryTwoInterpreter) StatsSnapshotAndReset() Stats

type Item

type Item struct {
	Key       string
	Data      schema.Schema
	EventTime EventTime
	Window    *Window
	Type      ItemType
}

func AssignWindows

func AssignWindows(x []Item, wd WindowDescription) []Item

func DropTimestamps

func DropTimestamps(x []Item) []Item

func ExpandToElements

func ExpandToElements(x []ItemGroupedByWindow) []Item

func ToElement

func ToElement(group *ItemGroupedByWindow) Item

func (*Item) MarshalJSON

func (r *Item) MarshalJSON() ([]byte, error)

func (*Item) UnmarshalJSON

func (r *Item) UnmarshalJSON(data []byte) error

type ItemGroupedByKey

type ItemGroupedByKey struct {
	Key  string
	Data []Item
}

func GroupByKey

func GroupByKey(x []Item) []ItemGroupedByKey

func (*ItemGroupedByKey) MarshalJSON

func (r *ItemGroupedByKey) MarshalJSON() ([]byte, error)

func (*ItemGroupedByKey) UnmarshalJSON

func (r *ItemGroupedByKey) UnmarshalJSON(data []byte) error

type ItemGroupedByWindow

type ItemGroupedByWindow struct {
	Key    string
	Data   *schema.List
	Window *Window
}

func GroupAlsoByWindow

func GroupAlsoByWindow(x []ItemGroupedByKey) []ItemGroupedByWindow

func (*ItemGroupedByWindow) MarshalJSON

func (r *ItemGroupedByWindow) MarshalJSON() ([]byte, error)

func (*ItemGroupedByWindow) UnmarshalJSON

func (r *ItemGroupedByWindow) UnmarshalJSON(data []byte) error

type ItemType

type ItemType uint8
const (
	ItemAggregation ItemType = iota
	ItemRetractAndAggregate
)

type JoinHandler

type JoinHandler[T any] struct {
	F func(a, b T, returning func(T)) error
}

func (*JoinHandler[T]) Process

func (j *JoinHandler[T]) Process(x Item, returning func(Item)) error

func (*JoinHandler[T]) Retract

func (j *JoinHandler[T]) Retract(x Item, returning func(Item)) error

type KeyedWindow

type KeyedWindow struct {
	Key    string
	Window *Window
}

func ToKeyedWindowFromGrouped

func ToKeyedWindowFromGrouped(x *ItemGroupedByWindow) *KeyedWindow

func ToKeyedWindowFromItem

func ToKeyedWindowFromItem(x *Item) *KeyedWindow

type ListAssert

type ListAssert struct {
	Items []Item
	Err   error
	// contains filtered or unexported fields
}

func (*ListAssert) AssertAt

func (l *ListAssert) AssertAt(index int, expected Item) bool

func (*ListAssert) AssertLen

func (l *ListAssert) AssertLen(expected int) bool

func (*ListAssert) Contains

func (l *ListAssert) Contains(expected Item) bool

func (*ListAssert) Returning

func (l *ListAssert) Returning(msg Item)

type LogHandler

type LogHandler struct {
	// contains filtered or unexported fields
}

func (*LogHandler) Process

func (l *LogHandler) Process(x Item, returning func(Item)) error

func (*LogHandler) Retract

func (l *LogHandler) Retract(x Item, returning func(Item)) error

type MapHandler

type MapHandler[A any, B any] struct {
	F func(x A, returning func(key string, value B)) error
}

func (*MapHandler[A, B]) Process

func (h *MapHandler[A, B]) Process(x Item, returning func(Item)) error

func (*MapHandler[A, B]) Retract

func (h *MapHandler[A, B]) Retract(x Item, returning func(Item)) error

type MergeHandler

type MergeHandler[A any] struct {
	Combine   func(a, b A) (A, error)
	DoRetract func(a, b A) (A, error)
}

func (*MergeHandler[A]) Process

func (h *MergeHandler[A]) Process(x Item, returning func(Item)) error

func (*MergeHandler[A]) Retract

func (h *MergeHandler[A]) Retract(x Item, returning func(Item)) error

type Message

type Message struct {
	Offset int
	// at some point of time i may need to pass type reference
	Key       string
	Item      *Item
	Watermark *int64
	// contains filtered or unexported fields
}

type Node

type Node interface {
	AcceptNode(g NodeVisitor) any
}

func NodeFromJSON

func NodeFromJSON(x []byte) (Node, error)

func ReverseSort

func ReverseSort(nodes []Node) []Node

func Sort

func Sort(dag *DAGBuilder) []Node

Sort sorts nodesFromTo in topological order https://en.wikipedia.org/wiki/Topological_sorting using Kahn's algorithm

type NodeUnionJSON

type NodeUnionJSON struct {
	Type     string          `json:"$type,omitempty"`
	DoWindow json.RawMessage `json:"projection.DoWindow,omitempty"`
	DoMap    json.RawMessage `json:"projection.DoMap,omitempty"`
	DoLoad   json.RawMessage `json:"projection.DoLoad,omitempty"`
	DoJoin   json.RawMessage `json:"projection.DoJoin,omitempty"`
}

type NodeVisitor

type NodeVisitor interface {
	VisitDoWindow(v *DoWindow) any
	VisitDoMap(v *DoMap) any
	VisitDoLoad(v *DoLoad) any
	VisitDoJoin(v *DoJoin) any
}

type PubSub

type PubSub[T comparable] struct {
	// contains filtered or unexported fields
}

func NewPubSub

func NewPubSub[T comparable]() *PubSub[T]

func (*PubSub[T]) Finish

func (p *PubSub[T]) Finish(ctx context.Context, key T)

Finish is called when a node won't publish any more messages

func (*PubSub[T]) Publish

func (p *PubSub[T]) Publish(ctx context.Context, key T, msg Message) error

Publish should return error, and not throw panic this is a temporary solution, for prototyping

func (*PubSub[T]) Register

func (p *PubSub[T]) Register(key T) error

func (*PubSub[T]) Subscribe

func (p *PubSub[T]) Subscribe(ctx context.Context, node T, fromOffset int, f func(Message) error) error

type PubSubChan

type PubSubChan[T any] struct {
	// contains filtered or unexported fields
}

func NewPubSubChan

func NewPubSubChan[T any]() *PubSubChan[T]

func (*PubSubChan[T]) Close

func (s *PubSubChan[T]) Close()

func (*PubSubChan[T]) Process

func (s *PubSubChan[T]) Process()

func (*PubSubChan[T]) Publish

func (s *PubSubChan[T]) Publish(msg T) error

func (*PubSubChan[T]) Subscribe

func (s *PubSubChan[T]) Subscribe(f func(T) error) error

type PubSubForInterpreter

type PubSubForInterpreter[T comparable] interface {
	Register(key T) error
	Publish(ctx context.Context, key T, msg Message) error
	Finish(ctx context.Context, key T)
	Subscribe(ctx context.Context, node T, fromOffset int, f func(Message) error) error
}

type PubSubMulti

type PubSubMulti[T comparable] struct {
	// contains filtered or unexported fields
}

func NewPubSubMultiChan

func NewPubSubMultiChan[T comparable]() *PubSubMulti[T]

func (*PubSubMulti[T]) Finish

func (p *PubSubMulti[T]) Finish(ctx context.Context, key T)

func (*PubSubMulti[T]) Publish

func (p *PubSubMulti[T]) Publish(ctx context.Context, key T, msg Message) error

func (*PubSubMulti[T]) Register

func (p *PubSubMulti[T]) Register(key T) error

func (*PubSubMulti[T]) Subscribe

func (p *PubSubMulti[T]) Subscribe(ctx context.Context, key T, fromOffset int, f func(Message) error) error

type PubSubSingle

type PubSubSingle struct {
	// contains filtered or unexported fields
}

func NewPubSubSingle

func NewPubSubSingle() *PubSubSingle

func (*PubSubSingle) Finish

func (p *PubSubSingle) Finish()

Finish is called when a node won't publish any more messages

func (*PubSubSingle) Publish

func (p *PubSubSingle) Publish(ctx context.Context, msg Message) error

func (*PubSubSingle) Subscribe

func (p *PubSubSingle) Subscribe(ctx context.Context, fromOffset int, f func(Message) error) error

type PubSubSingler

type PubSubSingler[T comparable] interface {
	Publish(msg T) error
	Process()
	Subscribe(f func(T) error) error
	Close()
}

type RepositorySink

type RepositorySink struct {
	// contains filtered or unexported fields
}

func NewRepositorySink

func NewRepositorySink(recordType string, store schemaless.Repository[schema.Schema]) *RepositorySink

func (*RepositorySink) FlushOnTime

func (s *RepositorySink) FlushOnTime()

func (*RepositorySink) Process

func (s *RepositorySink) Process(x Item, returning func(Item)) error

func (*RepositorySink) Retract

func (s *RepositorySink) Retract(x Item, returning func(Item)) error

type SessionWindow

type SessionWindow struct {
	GapDuration time.Duration
}

func SessionWindowFromJSON

func SessionWindowFromJSON(x []byte) (*SessionWindow, error)

func (*SessionWindow) AcceptWindowDescription

func (r *SessionWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any

func (*SessionWindow) MarshalJSON

func (r *SessionWindow) MarshalJSON() ([]byte, error)

func (*SessionWindow) UnmarshalJSON

func (r *SessionWindow) UnmarshalJSON(data []byte) error

type SessionsStats

type SessionsStats struct {
	Wins  int
	Draws int
	Loose int
}

func (*SessionsStats) MarshalJSON

func (r *SessionsStats) MarshalJSON() ([]byte, error)

func (*SessionsStats) UnmarshalJSON

func (r *SessionsStats) UnmarshalJSON(data []byte) error

type SimpleProcessHandler

type SimpleProcessHandler struct {
	P func(x Item, returning func(Item)) error
	R func(x Item, returning func(Item)) error
}

func (*SimpleProcessHandler) Process

func (s *SimpleProcessHandler) Process(x Item, returning func(Item)) error

func (*SimpleProcessHandler) Retract

func (s *SimpleProcessHandler) Retract(x Item, returning func(Item)) error

type SlidingWindow

type SlidingWindow struct {
	Width  time.Duration
	Period time.Duration
}

func SlidingWindowFromJSON

func SlidingWindowFromJSON(x []byte) (*SlidingWindow, error)

func (*SlidingWindow) AcceptWindowDescription

func (r *SlidingWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any

func (*SlidingWindow) MarshalJSON

func (r *SlidingWindow) MarshalJSON() ([]byte, error)

func (*SlidingWindow) UnmarshalJSON

func (r *SlidingWindow) UnmarshalJSON(data []byte) error

type Stats

type Stats = map[string]int

type StatsCollector

type StatsCollector interface {
	Snapshot() Stats
	Incr(key string, increment int)
}

type Tickers

type Tickers struct {
	// contains filtered or unexported fields
}

func NewTimeTicker

func NewTimeTicker() *Tickers

func (*Tickers) Register

func (t *Tickers) Register(td TriggerDescription, ts TimeSignaler)

func (*Tickers) Unregister

func (t *Tickers) Unregister(td TriggerDescription)

type TimeSignaler

type TimeSignaler interface {
	SignalDuration(duration time.Duration)
}

type TriggerDescription

type TriggerDescription interface {
	AcceptTriggerDescription(g TriggerDescriptionVisitor) any
}

func TriggerDescriptionFromJSON

func TriggerDescriptionFromJSON(x []byte) (TriggerDescription, error)

type TriggerDescriptionUnionJSON

type TriggerDescriptionUnionJSON struct {
	Type             string          `json:"$type,omitempty"`
	AtPeriod         json.RawMessage `json:"projection.AtPeriod,omitempty"`
	AtWindowItemSize json.RawMessage `json:"projection.AtWindowItemSize,omitempty"`
	AtWatermark      json.RawMessage `json:"projection.AtWatermark,omitempty"`
	AnyOf            json.RawMessage `json:"projection.AnyOf,omitempty"`
	AllOf            json.RawMessage `json:"projection.AllOf,omitempty"`
}

type TriggerDescriptionVisitor

type TriggerDescriptionVisitor interface {
	VisitAtPeriod(v *AtPeriod) any
	VisitAtWindowItemSize(v *AtWindowItemSize) any
	VisitAtWatermark(v *AtWatermark) any
	VisitAnyOf(v *AnyOf) any
	VisitAllOf(v *AllOf) any
}

type TriggerHandler

type TriggerHandler struct {
	// contains filtered or unexported fields
}

func (*TriggerHandler) Process

func (tm *TriggerHandler) Process(x Item, returning func(Item)) error

func (*TriggerHandler) Retract

func (tm *TriggerHandler) Retract(x Item, returning func(Item)) error

func (*TriggerHandler) Triggered

func (tm *TriggerHandler) Triggered(trigger TriggerType, returning func(Item)) error

type TriggerManager

type TriggerManager struct {
	// contains filtered or unexported fields
}

func NewTriggerManager

func NewTriggerManager(td TriggerDescription) *TriggerManager

func (*TriggerManager) SignalDuration

func (tm *TriggerManager) SignalDuration(duration time.Duration)

func (*TriggerManager) SignalWatermark

func (tm *TriggerManager) SignalWatermark(timestamp int64)

func (*TriggerManager) SignalWindowCreated

func (tm *TriggerManager) SignalWindowCreated(kw *KeyedWindow)

func (*TriggerManager) SignalWindowDeleted

func (tm *TriggerManager) SignalWindowDeleted(kw *KeyedWindow)

func (*TriggerManager) SignalWindowSizeReached

func (tm *TriggerManager) SignalWindowSizeReached(kw *KeyedWindow, size int)

func (*TriggerManager) WhenTrigger

func (tm *TriggerManager) WhenTrigger(f func(w *KeyedWindow))

type TriggerState

type TriggerState struct {
	// contains filtered or unexported fields
}

type TriggerType

type TriggerType interface {
	AcceptTriggerType(g TriggerTypeVisitor) any
}

func TriggerTypeFromJSON

func TriggerTypeFromJSON(x []byte) (TriggerType, error)

type TriggerTypeUnionJSON

type TriggerTypeUnionJSON struct {
	Type              string          `json:"$type,omitempty"`
	AtPeriod1         json.RawMessage `json:"projection.AtPeriod1,omitempty"`
	AtWindowItemSize1 json.RawMessage `json:"projection.AtWindowItemSize1,omitempty"`
	AtWatermark1      json.RawMessage `json:"projection.AtWatermark1,omitempty"`
}

type TriggerTypeVisitor

type TriggerTypeVisitor interface {
	VisitAtPeriod1(v *AtPeriod1) any
	VisitAtWindowItemSize1(v *AtWindowItemSize1) any
	VisitAtWatermark1(v *AtWatermark1) any
}

type WatermarkSignaler

type WatermarkSignaler interface {
	SignalWatermark(timestamp int64)
}

type Window

type Window struct {
	Start int64
	End   int64
}

type WindowBuffer

type WindowBuffer struct {
	// contains filtered or unexported fields
}

func (*WindowBuffer) Append

func (wb *WindowBuffer) Append(x Item)

func (*WindowBuffer) EachItemGroupedByWindow

func (wb *WindowBuffer) EachItemGroupedByWindow(f func(group *ItemGroupedByWindow))

func (*WindowBuffer) EachKeyedWindow

func (wb *WindowBuffer) EachKeyedWindow(kw *KeyedWindow, f func(group *ItemGroupedByWindow))

func (*WindowBuffer) GroupAlsoByWindow

func (wb *WindowBuffer) GroupAlsoByWindow(x []ItemGroupedByKey)

func (*WindowBuffer) RemoveItemGropedByWindow

func (wb *WindowBuffer) RemoveItemGropedByWindow(item *ItemGroupedByWindow)

type WindowBufferSignaler

type WindowBufferSignaler interface {
	SignalWindowCreated(kw *KeyedWindow)
	SignalWindowDeleted(kw *KeyedWindow)
	SignalWindowSizeReached(kw *KeyedWindow, size int)
}

type WindowDescription

type WindowDescription interface {
	AcceptWindowDescription(g WindowDescriptionVisitor) any
}

func WindowDescriptionFromJSON

func WindowDescriptionFromJSON(x []byte) (WindowDescription, error)

type WindowDescriptionUnionJSON

type WindowDescriptionUnionJSON struct {
	Type          string          `json:"$type,omitempty"`
	SessionWindow json.RawMessage `json:"projection.SessionWindow,omitempty"`
	SlidingWindow json.RawMessage `json:"projection.SlidingWindow,omitempty"`
	FixedWindow   json.RawMessage `json:"projection.FixedWindow,omitempty"`
}

type WindowDescriptionVisitor

type WindowDescriptionVisitor interface {
	VisitSessionWindow(v *SessionWindow) any
	VisitSlidingWindow(v *SlidingWindow) any
	VisitFixedWindow(v *FixedWindow) any
}

type WindowFlushMode

type WindowFlushMode interface {
	AcceptWindowFlushMode(g WindowFlushModeVisitor) any
}

func WindowFlushModeFromJSON

func WindowFlushModeFromJSON(x []byte) (WindowFlushMode, error)

type WindowFlushModeUnionJSON

type WindowFlushModeUnionJSON struct {
	Type                      string          `json:"$type,omitempty"`
	Accumulate                json.RawMessage `json:"projection.Accumulate,omitempty"`
	Discard                   json.RawMessage `json:"projection.Discard,omitempty"`
	AccumulatingAndRetracting json.RawMessage `json:"projection.AccumulatingAndRetracting,omitempty"`
}

type WindowFlushModeVisitor

type WindowFlushModeVisitor interface {
	VisitAccumulate(v *Accumulate) any
	VisitDiscard(v *Discard) any
	VisitAccumulatingAndRetracting(v *AccumulatingAndRetracting) any
}

type WindowTrigger

type WindowTrigger struct {
	// contains filtered or unexported fields
}

func NewWindowTrigger

func NewWindowTrigger(w *Window, td TriggerDescription) *WindowTrigger

func (*WindowTrigger) ReceiveEvent

func (wt *WindowTrigger) ReceiveEvent(triggerType TriggerType)

func (*WindowTrigger) Reset

func (wt *WindowTrigger) Reset()

func (*WindowTrigger) ShouldTrigger

func (wt *WindowTrigger) ShouldTrigger() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL