Documentation ¶
Index ¶
- func ExpectWatermarkValue(t *testing.T, ctx context.Context, ws execution.WatermarkSource, ...)
- func NextRecord(t *testing.T, ctx context.Context, rs execution.RecordStream)
- type MaximumDifferenceWatermarkGenerator
- type PercentileWatermarkGenerator
- type PercentileWatermarkGeneratorStream
- func (s *PercentileWatermarkGeneratorStream) Close(ctx context.Context, storage storage.Storage) error
- func (s *PercentileWatermarkGeneratorStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
- func (s *PercentileWatermarkGeneratorStream) Next(ctx context.Context) (*execution.Record, error)
- type Range
- type RangeStream
- type Tumble
- type TumbleStream
- type WatermarkGeneratorStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExpectWatermarkValue ¶ added in v0.3.0
func ExpectWatermarkValue(t *testing.T, ctx context.Context, ws execution.WatermarkSource, tx storage.StateTransaction, expected time.Time)
func NextRecord ¶ added in v0.3.0
Types ¶
type MaximumDifferenceWatermarkGenerator ¶ added in v0.3.0
type MaximumDifferenceWatermarkGenerator struct {
// contains filtered or unexported fields
}
func NewMaximumDifferenceWatermarkGenerator ¶ added in v0.3.0
func NewMaximumDifferenceWatermarkGenerator(source execution.Node, timeField octosql.VariableName, offset execution.Expression) *MaximumDifferenceWatermarkGenerator
func (*MaximumDifferenceWatermarkGenerator) Document ¶ added in v0.3.0
func (r *MaximumDifferenceWatermarkGenerator) Document() docs.Documentation
func (*MaximumDifferenceWatermarkGenerator) Get ¶ added in v0.3.0
func (w *MaximumDifferenceWatermarkGenerator) Get(ctx context.Context, variables octosql.Variables, streamID *execution.StreamID) (execution.RecordStream, *execution.ExecutionOutput, error)
type PercentileWatermarkGenerator ¶ added in v0.3.0
type PercentileWatermarkGenerator struct {
// contains filtered or unexported fields
}
func NewPercentileWatermarkGenerator ¶ added in v0.3.0
func NewPercentileWatermarkGenerator(source execution.Node, timeField octosql.VariableName, events, percentile, frequency execution.Expression) *PercentileWatermarkGenerator
func (*PercentileWatermarkGenerator) Document ¶ added in v0.3.0
func (r *PercentileWatermarkGenerator) Document() docs.Documentation
func (*PercentileWatermarkGenerator) Get ¶ added in v0.3.0
func (w *PercentileWatermarkGenerator) Get(ctx context.Context, variables octosql.Variables, streamID *execution.StreamID) (execution.RecordStream, *execution.ExecutionOutput, error)
type PercentileWatermarkGeneratorStream ¶ added in v0.3.0
type PercentileWatermarkGeneratorStream struct {
// contains filtered or unexported fields
}
The way this watermark generator is working is following:
- events (must be positive) - represents amount of (most recent) events stored
- percentile (must be positive and less than 100.0) - represents percentile of recently stored events that are BIGGER than watermark value ex. if percentile = 35 then 35% of events stored must be bigger than watermark value, so watermark position is at 65th percentile of events stored (in sorted way)
- frequency (must be positive) - represents amount of events to be seen before initiating next watermark update
Generator stores recent events in deque, their counts in map and number of events seen before watermark update in value state After <frequency> number of events it updates watermark
func (*PercentileWatermarkGeneratorStream) GetWatermark ¶ added in v0.3.0
func (s *PercentileWatermarkGeneratorStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
type Range ¶
type Range struct {
// contains filtered or unexported fields
}
func NewRange ¶
func NewRange(start, end execution.Expression) *Range
func (*Range) Document ¶
func (r *Range) Document() docs.Documentation
type RangeStream ¶
type RangeStream struct {
// contains filtered or unexported fields
}
type Tumble ¶
type Tumble struct {
// contains filtered or unexported fields
}
func NewTumble ¶
func NewTumble(source execution.Node, timeField octosql.VariableName, windowLength, offset execution.Expression) *Tumble
func (*Tumble) Document ¶
func (r *Tumble) Document() docs.Documentation
type TumbleStream ¶
type TumbleStream struct {
// contains filtered or unexported fields
}
type WatermarkGeneratorStream ¶ added in v0.3.0
type WatermarkGeneratorStream struct {
// contains filtered or unexported fields
}
func (*WatermarkGeneratorStream) GetWatermark ¶ added in v0.3.0
func (s *WatermarkGeneratorStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.