octosql: github.com/cube2222/octosql/execution Index | Files | Directories

package execution

import "github.com/cube2222/octosql/execution"

Index

Package Files

batch.go distinct.go engine.go execution.go execution.pb.go filter.go function.go group_by.go hashmap.go in_memory_stream.go logic.go lookup_join.go map.go order_by.go output_queue.go process.go process.pb.go record.go record.pb.go relation.go requalifier.go shuffle.go shuffle.pb.go shuffle_strategy.go sourcestorage.go stream_join.go test_utils.go trigger.go types.go

Variables

var ErrEndOfStream = errors.New("end of stream")
var ErrNewTransactionRequired = fmt.Errorf("new transaction required")
var MaxWatermark = time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)

Based on protocol buffer max timestamp value.

var SystemSource string = "sys"

func AreStreamsEqual Uses

func AreStreamsEqual(ctx context.Context, first, second RecordStream) error

func AreStreamsEqualNoOrdering Uses

func AreStreamsEqualNoOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualNoOrderingWithCount Uses

func AreStreamsEqualNoOrderingWithCount(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, count int) error

func AreStreamsEqualNoOrderingWithIDCheck Uses

func AreStreamsEqualNoOrderingWithIDCheck(ctx context.Context, stateStorage storage.Storage, gotStream, wantStream RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking Uses

func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking(ctx context.Context, stateStorage storage.Storage, got, want RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualWithOrdering Uses

func AreStreamsEqualWithOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream) error

func DefaultEquality Uses

func DefaultEquality(record1 *Record, record2 *Record) error

func EqualityOfAll Uses

func EqualityOfAll(fs ...RecordEqualityFunc) func(record1 *Record, record2 *Record) error

func EqualityOfEventTimeField Uses

func EqualityOfEventTimeField(record1 *Record, record2 *Record) error

func EqualityOfEverythingButIDs Uses

func EqualityOfEverythingButIDs(record1 *Record, record2 *Record) error

func EqualityOfFieldsAndValues Uses

func EqualityOfFieldsAndValues(record1 *Record, record2 *Record) error

func EqualityOfID Uses

func EqualityOfID(record1 *Record, record2 *Record) error

func EqualityOfUndo Uses

func EqualityOfUndo(record1 *Record, record2 *Record) error

func GetAndStartAllShuffles Uses

func GetAndStartAllShuffles(ctx context.Context, stateStorage storage.Storage, rootStreamID *StreamID, nodes []Node, variables octosql.Variables) ([]RecordStream, []*ExecutionOutput, error)

This is used to start the whole plan. It starts each phase (separated by shuffles) one by one and takes care to properly pass shuffle ID's to shuffle receivers and senders.

func GetRawStringID Uses

func GetRawStringID() string

func GetSourceStringID Uses

func GetSourceStringID(tx storage.StateTransaction, inputName octosql.Value) (string, error)

GetSourceStreamID loads the StreamID of the given input stream in case it exists (from a previous run maybe?) Otherwise it allocates a new StreamID and saves it.

func NewErrWaitForChanges Uses

func NewErrWaitForChanges(subscription *storage.Subscription) error

func ParseType Uses

func ParseType(str string) octosql.Value

ParseType tries to parse the given string into any type it succeeds to. Returns back the string on failure.

func SystemField Uses

func SystemField(field string) octosql.VariableName

type Aggregate Uses

type Aggregate interface {
    docs.Documented
    AddValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error
    RetractValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error
    GetValue(ctx context.Context, tx storage.StateTransaction) (octosql.Value, error)
    String() string
}

type AggregatePrototype Uses

type AggregatePrototype func() Aggregate

type AliasedExpression Uses

type AliasedExpression struct {
    // contains filtered or unexported fields
}

func NewAliasedExpression Uses

func NewAliasedExpression(name octosql.VariableName, expr Expression) *AliasedExpression

func (*AliasedExpression) ExpressionValue Uses

func (alExpr *AliasedExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*AliasedExpression) Name Uses

func (alExpr *AliasedExpression) Name() octosql.VariableName

type And Uses

type And struct {
    Left, Right Formula
}

func NewAnd Uses

func NewAnd(left Formula, right Formula) *And

func (*And) Evaluate Uses

func (f *And) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type AreEqualConfig Uses

type AreEqualConfig struct {
    Equality RecordEqualityFunc
}

type AreEqualOpt Uses

type AreEqualOpt func(*AreEqualConfig)

func WithEqualityBasedOn Uses

func WithEqualityBasedOn(fs ...RecordEqualityFunc) AreEqualOpt

type BatchSizeManager Uses

type BatchSizeManager struct {
    // contains filtered or unexported fields
}

The batch size manager decides if a batch should take more records. It tries to satisfy the target latency and will try not to ever surpass it. It will also grow the batch size on successful commit by at least 1. In case the commit is too big to finalize, it will drastically reduce the batch size.

func NewBatchSizeManager Uses

func NewBatchSizeManager(latencyTarget time.Duration) *BatchSizeManager

func (*BatchSizeManager) CommitAborted Uses

func (bsm *BatchSizeManager) CommitAborted()

func (*BatchSizeManager) CommitSuccessful Uses

func (bsm *BatchSizeManager) CommitSuccessful()

func (*BatchSizeManager) CommitTooBig Uses

func (bsm *BatchSizeManager) CommitTooBig()

func (*BatchSizeManager) MarkRecordsProcessed Uses

func (bsm *BatchSizeManager) MarkRecordsProcessed(count int)

You can use this to process records in a way other than one by one.

func (*BatchSizeManager) RecordsLeftToTake Uses

func (bsm *BatchSizeManager) RecordsLeftToTake() int

You can use this to process records in a way other than one by one.

func (*BatchSizeManager) Reset Uses

func (bsm *BatchSizeManager) Reset()

func (*BatchSizeManager) ShouldTakeNextRecord Uses

func (bsm *BatchSizeManager) ShouldTakeNextRecord() bool

type Constant Uses

type Constant struct {
    Value bool
}

func NewConstant Uses

func NewConstant(value bool) *Constant

func (Constant) Evaluate Uses

func (f Constant) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type ConstantStrategy Uses

type ConstantStrategy struct {
    // contains filtered or unexported fields
}

func (*ConstantStrategy) CalculatePartition Uses

func (s *ConstantStrategy) CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)

type ConstantStrategyPrototype Uses

type ConstantStrategyPrototype struct {
    // contains filtered or unexported fields
}

func (*ConstantStrategyPrototype) Get Uses

func (s *ConstantStrategyPrototype) Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)

type ConstantValue Uses

type ConstantValue struct {
    // contains filtered or unexported fields
}

func NewConstantValue Uses

func NewConstantValue(value octosql.Value) *ConstantValue

func (*ConstantValue) ExpressionValue Uses

func (dv *ConstantValue) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type CountingTrigger Uses

type CountingTrigger struct {
    // contains filtered or unexported fields
}

func NewCountingTrigger Uses

func NewCountingTrigger(count Expression) *CountingTrigger

func (*CountingTrigger) Get Uses

func (c *CountingTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type Datatype Uses

type Datatype string
const (
    DatatypeBoolean Datatype = "boolean"
    DatatypeInt     Datatype = "int"
    DatatypeFloat64 Datatype = "float64"
    DatatypeString  Datatype = "string"
    DatatypeTuple   Datatype = "octosql.Value"
)

type DelayTrigger Uses

type DelayTrigger struct {
    // contains filtered or unexported fields
}

func NewDelayTrigger Uses

func NewDelayTrigger(delay Expression) *DelayTrigger

func (*DelayTrigger) Get Uses

func (c *DelayTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type Distinct Uses

type Distinct struct {
    // contains filtered or unexported fields
}

func NewDistinct Uses

func NewDistinct(storage storage.Storage, source Node, eventTimeField octosql.VariableName) *Distinct

func (*Distinct) Get Uses

func (node *Distinct) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type DistinctStream Uses

type DistinctStream struct {
    // contains filtered or unexported fields
}

func (*DistinctStream) AddRecord Uses

func (ds *DistinctStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*DistinctStream) Trigger Uses

func (ds *DistinctStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type DummyNode Uses

type DummyNode struct {
    // contains filtered or unexported fields
}

func NewDummyNode Uses

func NewDummyNode(data []*Record) *DummyNode

func (*DummyNode) Get Uses

func (dn *DummyNode) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Equal Uses

type Equal struct {
}

func (*Equal) Apply Uses

func (rel *Equal) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type ErrWaitForChanges Uses

type ErrWaitForChanges struct {
    *storage.Subscription
}

func GetErrWaitForChanges Uses

func GetErrWaitForChanges(err error) *ErrWaitForChanges

func (*ErrWaitForChanges) Error Uses

func (e *ErrWaitForChanges) Error() string

type ExecutionOutput Uses

type ExecutionOutput struct {
    // Watermark source is the highest (in the execution tree)
    // watermark source available, which the record consumer should consume.
    WatermarkSource WatermarkSource

    // Next shuffles contains information about the next shuffles down the execution plan
    // which need to be started.
    NextShuffles map[string]ShuffleData

    // Tasks to run are functions which need to be run asynchronously,
    // after the storage initialization has been committed (and will thus be available for reading).
    TasksToRun []Task
}

This struct represents additional metadata to be returned with Get() and used recursively (like WatermarkSource)

func NewExecutionOutput Uses

func NewExecutionOutput(ws WatermarkSource, nextShuffles map[string]ShuffleData, tasksToRun []Task) *ExecutionOutput

type Expression Uses

type Expression interface {
    ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)
}

func NewRecordExpression Uses

func NewRecordExpression() Expression

type Field Uses

type Field struct {
    Name octosql.VariableName
}

type Filter Uses

type Filter struct {
    // contains filtered or unexported fields
}

func NewFilter Uses

func NewFilter(formula Formula, child Node) *Filter

func (*Filter) Get Uses

func (node *Filter) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type FilteredStream Uses

type FilteredStream struct {
    // contains filtered or unexported fields
}

func (*FilteredStream) Close Uses

func (stream *FilteredStream) Close(ctx context.Context, storage storage.Storage) error

func (*FilteredStream) Next Uses

func (stream *FilteredStream) Next(ctx context.Context) (*Record, error)

type Formula Uses

type Formula interface {
    Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)
}

type Function Uses

type Function struct {
    Name          string
    ArgumentNames [][]string
    Description   docs.Documentation
    Validator     Validator
    Logic         func(...octosql.Value) (octosql.Value, error)
}

func (*Function) Document Uses

func (f *Function) Document() docs.Documentation

type FunctionExpression Uses

type FunctionExpression struct {
    // contains filtered or unexported fields
}

func NewFunctionExpression Uses

func NewFunctionExpression(fun *Function, args []Expression) *FunctionExpression

func (*FunctionExpression) ExpressionValue Uses

func (fe *FunctionExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type GetTestStreamOption Uses

type GetTestStreamOption func(*StreamID)

func GetTestStreamWithStreamID Uses

func GetTestStreamWithStreamID(id *StreamID) GetTestStreamOption

type GreaterEqual Uses

type GreaterEqual struct {
}

func (*GreaterEqual) Apply Uses

func (rel *GreaterEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type GroupBy Uses

type GroupBy struct {
    // contains filtered or unexported fields
}

func NewGroupBy Uses

func NewGroupBy(storage storage.Storage, source Node, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy

func (*GroupBy) Get Uses

func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type GroupByStream Uses

type GroupByStream struct {
    // contains filtered or unexported fields
}

func (*GroupByStream) AddRecord Uses

func (gb *GroupByStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*GroupByStream) Trigger Uses

func (gb *GroupByStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type HashMap Uses

type HashMap struct {
    // contains filtered or unexported fields
}

func NewHashMap Uses

func NewHashMap() *HashMap

func (*HashMap) Get Uses

func (hm *HashMap) Get(key octosql.Value) (interface{}, bool, error)

func (*HashMap) GetIterator Uses

func (hm *HashMap) GetIterator() *Iterator

func (*HashMap) Set Uses

func (hm *HashMap) Set(key octosql.Value, value interface{}) error

type In Uses

type In struct {
}

func (*In) Apply Uses

func (rel *In) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type InMemoryStream Uses

type InMemoryStream struct {
    // contains filtered or unexported fields
}

func NewInMemoryStream Uses

func NewInMemoryStream(ctx context.Context, data []*Record) *InMemoryStream

func (*InMemoryStream) Close Uses

func (ims *InMemoryStream) Close(ctx context.Context, storage storage.Storage) error

func (*InMemoryStream) Next Uses

func (ims *InMemoryStream) Next(ctx context.Context) (*Record, error)

type IntermediateRecordStore Uses

type IntermediateRecordStore interface {
    // ReadyForMore is used to check if the intermediate record store is able to consume more data.
    // This allows it to communicate back-pressure.
    ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
    AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
    Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
    UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
    GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
    TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
    MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
    MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
    Close(ctx context.Context, storage storage.Storage) error
}

type Iterator Uses

type Iterator struct {
    // contains filtered or unexported fields
}

func (*Iterator) Next Uses

func (iter *Iterator) Next() (octosql.Value, interface{}, bool)

Next returns next key, value, exists

type JobOutputQueueIntermediateRecordStore Uses

type JobOutputQueueIntermediateRecordStore struct {
    // contains filtered or unexported fields
}

func (*JobOutputQueueIntermediateRecordStore) AddRecord Uses

func (j *JobOutputQueueIntermediateRecordStore) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*JobOutputQueueIntermediateRecordStore) Close Uses

func (j *JobOutputQueueIntermediateRecordStore) Close(ctx context.Context, storage storage.Storage) error

func (*JobOutputQueueIntermediateRecordStore) GetWatermark Uses

func (j *JobOutputQueueIntermediateRecordStore) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*JobOutputQueueIntermediateRecordStore) MarkEndOfStream Uses

func (j *JobOutputQueueIntermediateRecordStore) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*JobOutputQueueIntermediateRecordStore) MarkError Uses

func (j *JobOutputQueueIntermediateRecordStore) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*JobOutputQueueIntermediateRecordStore) Next Uses

func (j *JobOutputQueueIntermediateRecordStore) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*JobOutputQueueIntermediateRecordStore) ReadyForMore Uses

func (j *JobOutputQueueIntermediateRecordStore) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*JobOutputQueueIntermediateRecordStore) TriggerKeys Uses

func (j *JobOutputQueueIntermediateRecordStore) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*JobOutputQueueIntermediateRecordStore) UpdateWatermark Uses

func (j *JobOutputQueueIntermediateRecordStore) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type JoinType Uses

type JoinType int
const (
    INNER_JOIN JoinType = 0
    LEFT_JOIN  JoinType = 1
    OUTER_JOIN JoinType = 2
)

func (JoinType) String Uses

func (j JoinType) String() string

type JoinedStream Uses

type JoinedStream struct {
    // contains filtered or unexported fields
}

func (*JoinedStream) AddRecord Uses

func (js *JoinedStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*JoinedStream) Trigger Uses

func (js *JoinedStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type KeyHashingStrategy Uses

type KeyHashingStrategy struct {
    // contains filtered or unexported fields
}

func (*KeyHashingStrategy) CalculatePartition Uses

func (s *KeyHashingStrategy) CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)

TODO: The key should really be calculated by the preceding map. Like all group by values.

type KeyHashingStrategyPrototype Uses

type KeyHashingStrategyPrototype struct {
    // contains filtered or unexported fields
}

func (*KeyHashingStrategyPrototype) Get Uses

func (s *KeyHashingStrategyPrototype) Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)

type LessEqual Uses

type LessEqual struct {
}

func (*LessEqual) Apply Uses

func (rel *LessEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type LessThan Uses

type LessThan struct {
}

func (*LessThan) Apply Uses

func (rel *LessThan) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Like Uses

type Like struct {
}

func (*Like) Apply Uses

func (rel *Like) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type LogicExpression Uses

type LogicExpression struct {
    // contains filtered or unexported fields
}

func NewLogicExpression Uses

func NewLogicExpression(formula Formula) *LogicExpression

func (*LogicExpression) ExpressionValue Uses

func (le *LogicExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type LookupJoin Uses

type LookupJoin struct {
    // contains filtered or unexported fields
}

func NewLookupJoin Uses

func NewLookupJoin(maxJobsCount int, stateStorage storage.Storage, source Node, joined Node, isLeftJoin bool) *LookupJoin

func (*LookupJoin) Get Uses

func (node *LookupJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type LookupJoinStream Uses

type LookupJoinStream struct {
    // contains filtered or unexported fields
}

func (*LookupJoinStream) AddRecord Uses

func (rs *LookupJoinStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*LookupJoinStream) Close Uses

func (rs *LookupJoinStream) Close(ctx context.Context, storage storage.Storage) error

func (*LookupJoinStream) GetNextRecord Uses

func (rs *LookupJoinStream) GetNextRecord(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*LookupJoinStream) GetWatermark Uses

func (rs *LookupJoinStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*LookupJoinStream) HandleControlMessages Uses

func (rs *LookupJoinStream) HandleControlMessages(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) MarkEndOfStream Uses

func (rs *LookupJoinStream) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) MarkError Uses

func (rs *LookupJoinStream) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*LookupJoinStream) Next Uses

func (rs *LookupJoinStream) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*LookupJoinStream) ReadyForMore Uses

func (rs *LookupJoinStream) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) RunScheduler Uses

func (rs *LookupJoinStream) RunScheduler(ctx context.Context)

The scheduler takes records from the toBeJoined queue, and starts jobs to do joins. Control messages (records too, to satisfy the initial ordering of messages) are put on a controlMessages queue, where they will be handled by the receiver.

func (*LookupJoinStream) RunWorker Uses

func (rs *LookupJoinStream) RunWorker(ctx context.Context, id *RecordID) error

The worker drives streams to completion, puts received records to output queues scoped by record id. In the end, it puts an EndOfStream message on the queue.

func (*LookupJoinStream) TriggerKeys Uses

func (j *LookupJoinStream) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*LookupJoinStream) UpdateWatermark Uses

func (rs *LookupJoinStream) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type Map Uses

type Map struct {
    // contains filtered or unexported fields
}

func NewMap Uses

func NewMap(expressions []NamedExpression, child Node, keep bool) *Map

func (*Map) Get Uses

func (node *Map) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type MappedStream Uses

type MappedStream struct {
    // contains filtered or unexported fields
}

func (*MappedStream) Close Uses

func (stream *MappedStream) Close(ctx context.Context, storage storage.Storage) error

func (*MappedStream) Next Uses

func (stream *MappedStream) Next(ctx context.Context) (*Record, error)

type Metadata Uses

type Metadata struct {
    Id                   *RecordID `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
    Undo                 bool      `protobuf:"varint,2,opt,name=undo,proto3" json:"undo,omitempty"`
    EventTimeField       string    `protobuf:"bytes,3,opt,name=eventTimeField,proto3" json:"eventTimeField,omitempty"`
    XXX_NoUnkeyedLiteral struct{}  `json:"-"`
    XXX_unrecognized     []byte    `json:"-"`
    XXX_sizecache        int32     `json:"-"`
}

func (*Metadata) Descriptor Uses

func (*Metadata) Descriptor() ([]byte, []int)

func (*Metadata) GetEventTimeField Uses

func (m *Metadata) GetEventTimeField() string

func (*Metadata) GetId Uses

func (m *Metadata) GetId() *RecordID

func (*Metadata) GetUndo Uses

func (m *Metadata) GetUndo() bool

func (*Metadata) ProtoMessage Uses

func (*Metadata) ProtoMessage()

func (*Metadata) Reset Uses

func (m *Metadata) Reset()

func (*Metadata) String Uses

func (m *Metadata) String() string

func (*Metadata) XXX_DiscardUnknown Uses

func (m *Metadata) XXX_DiscardUnknown()

func (*Metadata) XXX_Marshal Uses

func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Metadata) XXX_Merge Uses

func (m *Metadata) XXX_Merge(src proto.Message)

func (*Metadata) XXX_Size Uses

func (m *Metadata) XXX_Size() int

func (*Metadata) XXX_Unmarshal Uses

func (m *Metadata) XXX_Unmarshal(b []byte) error

type MoreThan Uses

type MoreThan struct {
}

func (*MoreThan) Apply Uses

func (rel *MoreThan) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type MultiTrigger Uses

type MultiTrigger struct {
    // contains filtered or unexported fields
}

func NewMultiTrigger Uses

func NewMultiTrigger(triggers ...TriggerPrototype) *MultiTrigger

func (*MultiTrigger) Get Uses

func (m *MultiTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type NamedExpression Uses

type NamedExpression interface {
    Expression
    Name() octosql.VariableName
}

type NextShuffleMetadataChange Uses

type NextShuffleMetadataChange struct {
    ShuffleIDAddSuffix string
    Partition          int
    Source             Node
}

func NewNextShuffleMetadataChange Uses

func NewNextShuffleMetadataChange(shuffleIDAddSuffix string, partition int, source Node) *NextShuffleMetadataChange

func (*NextShuffleMetadataChange) Get Uses

func (n *NextShuffleMetadataChange) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Node Uses

type Node interface {
    Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
}

type NodeExpression Uses

type NodeExpression struct {
    // contains filtered or unexported fields
}

func NewNodeExpression Uses

func NewNodeExpression(node Node, stateStorage storage.Storage) *NodeExpression

func (*NodeExpression) ExpressionValue Uses

func (ne *NodeExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type Not Uses

type Not struct {
    Child Formula
}

func NewNot Uses

func NewNot(child Formula) *Not

func (*Not) Evaluate Uses

func (f *Not) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type NotEqual Uses

type NotEqual struct {
}

func (*NotEqual) Apply Uses

func (rel *NotEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type NotIn Uses

type NotIn struct {
}

func (*NotIn) Apply Uses

func (rel *NotIn) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Or Uses

type Or struct {
    Left, Right Formula
}

func NewOr Uses

func NewOr(left Formula, right Formula) *Or

func (*Or) Evaluate Uses

func (f *Or) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type OrderBy Uses

type OrderBy struct {
    // contains filtered or unexported fields
}

func NewOrderBy Uses

func NewOrderBy(storage storage.Storage, source Node, exprs []Expression, directions []OrderDirection, eventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *OrderBy

func (*OrderBy) Get Uses

func (node *OrderBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type OrderByKey Uses

type OrderByKey struct {
    // contains filtered or unexported fields
}

func NewOrderByKey Uses

func NewOrderByKey(key []byte) *OrderByKey

func (*OrderByKey) MonotonicMarshal Uses

func (k *OrderByKey) MonotonicMarshal() []byte

func (*OrderByKey) MonotonicUnmarshal Uses

func (k *OrderByKey) MonotonicUnmarshal(data []byte) error

type OrderByStream Uses

type OrderByStream struct {
    // contains filtered or unexported fields
}

func (*OrderByStream) AddRecord Uses

func (ob *OrderByStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*OrderByStream) Trigger Uses

func (ob *OrderByStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type OrderDirection Uses

type OrderDirection string
const (
    Ascending  OrderDirection = "asc"
    Descending OrderDirection = "desc"
)

type OutputOptions Uses

type OutputOptions struct {
    OrderByExpressions []Expression
    OrderByDirections  []OrderDirection
    Limit              Expression
    Offset             Expression
}

func NewOutputOptions Uses

func NewOutputOptions(
    orderByExpressions []Expression,
    orderByDirections []OrderDirection,
    limit Expression,
    offset Expression,
) *OutputOptions

type OutputQueue Uses

type OutputQueue struct {
    // contains filtered or unexported fields
}

func NewOutputQueue Uses

func NewOutputQueue(tx storage.StateTransaction) *OutputQueue

func (*OutputQueue) Peek Uses

func (q *OutputQueue) Peek(ctx context.Context, msg proto.Message) error

func (*OutputQueue) Pop Uses

func (q *OutputQueue) Pop(ctx context.Context, msg proto.Message) error

func (*OutputQueue) Push Uses

func (q *OutputQueue) Push(ctx context.Context, element proto.Message) error

type PipelineMetadata Uses

type PipelineMetadata struct {
    // The ID for the next shuffle.
    NextShuffleID *ShuffleID

    // The partition of the current stream.
    Partition int
}

type Predicate Uses

type Predicate struct {
    Left     Expression
    Relation Relation
    Right    Expression
}

func NewPredicate Uses

func NewPredicate(left Expression, relation Relation, right Expression) *Predicate

func (*Predicate) Evaluate Uses

func (f *Predicate) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type ProcessByKey Uses

type ProcessByKey struct {
    // contains filtered or unexported fields
}

func (*ProcessByKey) AddRecord Uses

func (p *ProcessByKey) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*ProcessByKey) Close Uses

func (p *ProcessByKey) Close(ctx context.Context, storage storage.Storage) error

func (*ProcessByKey) GetWatermark Uses

func (p *ProcessByKey) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ProcessByKey) MarkEndOfStream Uses

func (p *ProcessByKey) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*ProcessByKey) MarkError Uses

func (p *ProcessByKey) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*ProcessByKey) Next Uses

func (p *ProcessByKey) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*ProcessByKey) ReadyForMore Uses

func (p *ProcessByKey) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*ProcessByKey) TriggerKeys Uses

func (p *ProcessByKey) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*ProcessByKey) UpdateWatermark Uses

func (p *ProcessByKey) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type ProcessFunction Uses

type ProcessFunction interface {
    AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error
    Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error) // New Records and Retractions
}

type PullEngine Uses

type PullEngine struct {
    // contains filtered or unexported fields
}

func NewPullEngine Uses

func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, sources []RecordStream, streamID *StreamID, watermarkSource WatermarkSource, shouldPrefixStreamID bool, ctx context.Context) *PullEngine

func (*PullEngine) Close Uses

func (engine *PullEngine) Close(ctx context.Context, storage storage.Storage) error

func (*PullEngine) GetWatermark Uses

func (engine *PullEngine) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*PullEngine) Next Uses

func (engine *PullEngine) Next(ctx context.Context) (*Record, error)

func (*PullEngine) Run Uses

func (engine *PullEngine) Run()

type QueueElement Uses

type QueueElement struct {
    // Types that are valid to be assigned to Type:
    //	*QueueElement_Record
    //	*QueueElement_Watermark
    //	*QueueElement_EndOfStream
    //	*QueueElement_Error
    Type                 isQueueElement_Type `protobuf_oneof:"type"`
    XXX_NoUnkeyedLiteral struct{}            `json:"-"`
    XXX_unrecognized     []byte              `json:"-"`
    XXX_sizecache        int32               `json:"-"`
}

func (*QueueElement) Descriptor Uses

func (*QueueElement) Descriptor() ([]byte, []int)

func (*QueueElement) GetEndOfStream Uses

func (m *QueueElement) GetEndOfStream() bool

func (*QueueElement) GetError Uses

func (m *QueueElement) GetError() string

func (*QueueElement) GetRecord Uses

func (m *QueueElement) GetRecord() *Record

func (*QueueElement) GetType Uses

func (m *QueueElement) GetType() isQueueElement_Type

func (*QueueElement) GetWatermark Uses

func (m *QueueElement) GetWatermark() *timestamp.Timestamp

func (*QueueElement) ProtoMessage Uses

func (*QueueElement) ProtoMessage()

func (*QueueElement) Reset Uses

func (m *QueueElement) Reset()

func (*QueueElement) String Uses

func (m *QueueElement) String() string

func (*QueueElement) XXX_DiscardUnknown Uses

func (m *QueueElement) XXX_DiscardUnknown()

func (*QueueElement) XXX_Marshal Uses

func (m *QueueElement) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueueElement) XXX_Merge Uses

func (m *QueueElement) XXX_Merge(src proto.Message)

func (*QueueElement) XXX_OneofWrappers Uses

func (*QueueElement) XXX_OneofWrappers() []interface{}

XXX_OneofWrappers is for the internal use of the proto package.

func (*QueueElement) XXX_Size Uses

func (m *QueueElement) XXX_Size() int

func (*QueueElement) XXX_Unmarshal Uses

func (m *QueueElement) XXX_Unmarshal(b []byte) error

type QueueElement_EndOfStream Uses

type QueueElement_EndOfStream struct {
    EndOfStream bool `protobuf:"varint,3,opt,name=endOfStream,proto3,oneof"`
}

type QueueElement_Error Uses

type QueueElement_Error struct {
    Error string `protobuf:"bytes,4,opt,name=error,proto3,oneof"`
}

type QueueElement_Record Uses

type QueueElement_Record struct {
    Record *Record `protobuf:"bytes,1,opt,name=record,proto3,oneof"`
}

type QueueElement_Watermark Uses

type QueueElement_Watermark struct {
    Watermark *timestamp.Timestamp `protobuf:"bytes,2,opt,name=watermark,proto3,oneof"`
}

type Record Uses

type Record struct {
    Metadata             *Metadata        `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
    FieldNames           []string         `protobuf:"bytes,2,rep,name=fieldNames,proto3" json:"fieldNames,omitempty"`
    Data                 []*octosql.Value `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"`
    XXX_NoUnkeyedLiteral struct{}         `json:"-"`
    XXX_unrecognized     []byte           `json:"-"`
    XXX_sizecache        int32            `json:"-"`
}

func NewRecord Uses

func NewRecord(fields []octosql.VariableName, data map[octosql.VariableName]octosql.Value, opts ...RecordOption) *Record

func NewRecordFromRecord Uses

func NewRecordFromRecord(record *Record, opts ...RecordOption) *Record

func NewRecordFromSlice Uses

func NewRecordFromSlice(fields []octosql.VariableName, data []octosql.Value, opts ...RecordOption) *Record

func NewRecordFromSliceWithNormalize Uses

func NewRecordFromSliceWithNormalize(fields []octosql.VariableName, data []interface{}, opts ...RecordOption) *Record

func Normalize Uses

func Normalize(rec *Record) *Record

func ReadAll Uses

func ReadAll(ctx context.Context, stateStorage storage.Storage, stream RecordStream) ([]*Record, error)

func ReadAllWithCount Uses

func ReadAllWithCount(ctx context.Context, stateStorage storage.Storage, stream RecordStream, count int) ([]*Record, error)

func (*Record) AsTuple Uses

func (r *Record) AsTuple() octosql.Value

func (*Record) AsVariables Uses

func (r *Record) AsVariables() octosql.Variables

func (*Record) Descriptor Uses

func (*Record) Descriptor() ([]byte, []int)

func (*Record) Equal Uses

func (r *Record) Equal(other *Record) bool

func (*Record) EventTime Uses

func (r *Record) EventTime() octosql.Value

func (*Record) EventTimeField Uses

func (r *Record) EventTimeField() octosql.VariableName

func (*Record) Fields Uses

func (r *Record) Fields() []Field

func (*Record) GetData Uses

func (m *Record) GetData() []*octosql.Value

func (*Record) GetFieldNames Uses

func (m *Record) GetFieldNames() []string

func (*Record) GetMetadata Uses

func (m *Record) GetMetadata() *Metadata

func (*Record) GetVariableNames Uses

func (r *Record) GetVariableNames() []octosql.VariableName

func (*Record) Hash Uses

func (r *Record) Hash() (uint64, error)

func (*Record) ID Uses

func (r *Record) ID() *RecordID

func (*Record) IsUndo Uses

func (r *Record) IsUndo() bool

func (*Record) ProtoMessage Uses

func (*Record) ProtoMessage()

func (*Record) Reset Uses

func (m *Record) Reset()

func (*Record) Show Uses

func (r *Record) Show() string

func (*Record) ShowFields Uses

func (r *Record) ShowFields() []Field

func (*Record) String Uses

func (m *Record) String() string

func (*Record) Value Uses

func (r *Record) Value(field octosql.VariableName) octosql.Value

func (*Record) XXX_DiscardUnknown Uses

func (m *Record) XXX_DiscardUnknown()

func (*Record) XXX_Marshal Uses

func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Record) XXX_Merge Uses

func (m *Record) XXX_Merge(src proto.Message)

func (*Record) XXX_Size Uses

func (m *Record) XXX_Size() int

func (*Record) XXX_Unmarshal Uses

func (m *Record) XXX_Unmarshal(b []byte) error

type RecordEqualityFunc Uses

type RecordEqualityFunc func(record1 *Record, record2 *Record) error

type RecordExpression Uses

type RecordExpression struct{}

func (*RecordExpression) ExpressionValue Uses

func (re *RecordExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type RecordID Uses

type RecordID struct {
    ID                   string   `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

func NewRecordID Uses

func NewRecordID(id string) *RecordID

GetRandomRecordID can be used to get a new random RecordID.

func NewRecordIDFromStreamIDWithOffset Uses

func NewRecordIDFromStreamIDWithOffset(streamID *StreamID, offset int) *RecordID

NewRecordIDFromStreamIDWithOffset can be used to get a new RecordID deterministically based on the streamID and record offset.

func (*RecordID) AsPrefix Uses

func (id *RecordID) AsPrefix() []byte

This is a helper function to use a record ID as a storage prefix.

func (*RecordID) Descriptor Uses

func (*RecordID) Descriptor() ([]byte, []int)

func (*RecordID) GetID Uses

func (m *RecordID) GetID() string

func (*RecordID) MonotonicMarshal Uses

func (id *RecordID) MonotonicMarshal() []byte

func (*RecordID) MonotonicUnmarshal Uses

func (id *RecordID) MonotonicUnmarshal(data []byte) error

func (*RecordID) ProtoMessage Uses

func (*RecordID) ProtoMessage()

func (*RecordID) Reset Uses

func (m *RecordID) Reset()

func (RecordID) Show Uses

func (id RecordID) Show() string

func (*RecordID) String Uses

func (m *RecordID) String() string

func (*RecordID) XXX_DiscardUnknown Uses

func (m *RecordID) XXX_DiscardUnknown()

func (*RecordID) XXX_Marshal Uses

func (m *RecordID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*RecordID) XXX_Merge Uses

func (m *RecordID) XXX_Merge(src proto.Message)

func (*RecordID) XXX_Size Uses

func (m *RecordID) XXX_Size() int

func (*RecordID) XXX_Unmarshal Uses

func (m *RecordID) XXX_Unmarshal(b []byte) error

type RecordOption Uses

type RecordOption func(stream *Record)

func WithEventTimeField Uses

func WithEventTimeField(field octosql.VariableName) RecordOption

func WithID Uses

func WithID(id *RecordID) RecordOption

func WithMetadataFrom Uses

func WithMetadataFrom(base *Record) RecordOption

func WithNoUndo Uses

func WithNoUndo() RecordOption

func WithUndo Uses

func WithUndo() RecordOption

type RecordStream Uses

type RecordStream interface {
    Next(ctx context.Context) (*Record, error)
    Close(ctx context.Context, storage storage.Storage) error
}

func GetTestStream Uses

func GetTestStream(t *testing.T, stateStorage storage.Storage, variables octosql.Variables, node Node, opts ...GetTestStreamOption) RecordStream

type Regexp Uses

type Regexp struct {
}

func (*Regexp) Apply Uses

func (rel *Regexp) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Relation Uses

type Relation interface {
    Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)
}

func NewEqual Uses

func NewEqual() Relation

func NewGreaterEqual Uses

func NewGreaterEqual() Relation

func NewIn Uses

func NewIn() Relation

func NewLessEqual Uses

func NewLessEqual() Relation

func NewLessThan Uses

func NewLessThan() Relation

func NewLike Uses

func NewLike() Relation

func NewMoreThan Uses

func NewMoreThan() Relation

func NewNotEqual Uses

func NewNotEqual() Relation

func NewNotIn Uses

func NewNotIn() Relation

func NewRegexp Uses

func NewRegexp() Relation

type RequalifiedStream Uses

type RequalifiedStream struct {
    // contains filtered or unexported fields
}

func (*RequalifiedStream) Close Uses

func (stream *RequalifiedStream) Close(ctx context.Context, storage storage.Storage) error

func (*RequalifiedStream) Next Uses

func (stream *RequalifiedStream) Next(ctx context.Context) (*Record, error)

type Requalifier Uses

type Requalifier struct {
    // contains filtered or unexported fields
}

func NewRequalifier Uses

func NewRequalifier(qualifier string, child Node) *Requalifier

func (*Requalifier) Get Uses

func (node *Requalifier) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Shuffle Uses

type Shuffle struct {
    // contains filtered or unexported fields
}

func NewShuffle Uses

func NewShuffle(outputPartitionCount int, strategyPrototype ShuffleStrategyPrototype, sources []Node) *Shuffle

func (*Shuffle) Get Uses

func (s *Shuffle) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

func (*Shuffle) StartSources Uses

func (s *Shuffle) StartSources(ctx context.Context, stateStorage storage.Storage, shuffleID *ShuffleID, variables octosql.Variables) (map[string]ShuffleData, error)

type ShuffleData Uses

type ShuffleData struct {
    ShuffleID *ShuffleID
    Shuffle   *Shuffle
    Variables octosql.Variables
}

type ShuffleID Uses

type ShuffleID struct {
    Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

func GetSourceShuffleID Uses

func GetSourceShuffleID(tx storage.StateTransaction, inputName octosql.Value) (*ShuffleID, error)

func NewShuffleID Uses

func NewShuffleID(id string) *ShuffleID

func (*ShuffleID) AsMapKey Uses

func (id *ShuffleID) AsMapKey() string

func (*ShuffleID) AsPrefix Uses

func (id *ShuffleID) AsPrefix() []byte

func (*ShuffleID) Descriptor Uses

func (*ShuffleID) Descriptor() ([]byte, []int)

func (*ShuffleID) GetId Uses

func (m *ShuffleID) GetId() string

func (*ShuffleID) ProtoMessage Uses

func (*ShuffleID) ProtoMessage()

func (*ShuffleID) Reset Uses

func (m *ShuffleID) Reset()

func (*ShuffleID) String Uses

func (m *ShuffleID) String() string

func (*ShuffleID) XXX_DiscardUnknown Uses

func (m *ShuffleID) XXX_DiscardUnknown()

func (*ShuffleID) XXX_Marshal Uses

func (m *ShuffleID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ShuffleID) XXX_Merge Uses

func (m *ShuffleID) XXX_Merge(src proto.Message)

func (*ShuffleID) XXX_Size Uses

func (m *ShuffleID) XXX_Size() int

func (*ShuffleID) XXX_Unmarshal Uses

func (m *ShuffleID) XXX_Unmarshal(b []byte) error

type ShuffleReceiver Uses

type ShuffleReceiver struct {
    // contains filtered or unexported fields
}

ShuffleReceiver is a RecordStream abstraction on a shuffle and receives records from it for a partition.

func NewShuffleReceiver Uses

func NewShuffleReceiver(streamID *StreamID, shuffleID *ShuffleID, sourcePartitionCount int, partition int) *ShuffleReceiver

func (*ShuffleReceiver) Close Uses

func (rs *ShuffleReceiver) Close(ctx context.Context, storage storage.Storage) error

func (*ShuffleReceiver) GetWatermark Uses

func (rs *ShuffleReceiver) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ShuffleReceiver) Next Uses

func (rs *ShuffleReceiver) Next(ctx context.Context) (*Record, error)

type ShuffleSender Uses

type ShuffleSender struct {
    // contains filtered or unexported fields
}

ShuffleSender is used to send data to a shuffle from a given partition.

func NewShuffleSender Uses

func NewShuffleSender(streamID *StreamID, shuffleID *ShuffleID, shuffleStrategy ShuffleStrategy, outputPartitionCount int, partition int) *ShuffleSender

func (*ShuffleSender) AddRecord Uses

func (node *ShuffleSender) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*ShuffleSender) Close Uses

func (node *ShuffleSender) Close(ctx context.Context, storage storage.Storage) error

func (*ShuffleSender) GetWatermark Uses

func (node *ShuffleSender) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ShuffleSender) MarkEndOfStream Uses

func (node *ShuffleSender) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*ShuffleSender) MarkError Uses

func (node *ShuffleSender) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*ShuffleSender) Next Uses

func (node *ShuffleSender) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*ShuffleSender) ReadyForMore Uses

func (node *ShuffleSender) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*ShuffleSender) TriggerKeys Uses

func (node *ShuffleSender) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*ShuffleSender) UpdateWatermark Uses

func (node *ShuffleSender) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type ShuffleStrategy Uses

type ShuffleStrategy interface {
    // Return output partition index based on the record and output partition count.
    CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)
}

func NewConstantStrategy Uses

func NewConstantStrategy(partition int) ShuffleStrategy

func NewKeyHashingStrategy Uses

func NewKeyHashingStrategy(variables octosql.Variables, key []Expression) ShuffleStrategy

type ShuffleStrategyPrototype Uses

type ShuffleStrategyPrototype interface {
    Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)
}

func NewConstantStrategyPrototype Uses

func NewConstantStrategyPrototype(partition int) ShuffleStrategyPrototype

func NewKeyHashingStrategyPrototype Uses

func NewKeyHashingStrategyPrototype(key []Expression) ShuffleStrategyPrototype

type StarExpression Uses

type StarExpression struct {
    // contains filtered or unexported fields
}

func NewStarExpression Uses

func NewStarExpression(qualifier string) *StarExpression

func (*StarExpression) ExpressionValue Uses

func (se *StarExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*StarExpression) Fields Uses

func (se *StarExpression) Fields(variables octosql.Variables) []octosql.VariableName

func (*StarExpression) Name Uses

func (se *StarExpression) Name() octosql.VariableName

type StreamID Uses

type StreamID struct {
    Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

StreamID is a unique identifier for a RecordStream node. This StreamID should prefix all state storage keys this node uses.

func GetRawStreamID Uses

func GetRawStreamID() *StreamID

GetRawStreamID can be used to get a new random StreamID without saving it.

func GetSourceStreamID Uses

func GetSourceStreamID(tx storage.StateTransaction, inputName octosql.Value) (*StreamID, error)

func NewStreamID Uses

func NewStreamID(str string) *StreamID

NewStreamID can be used to create a StreamID without saving it.

func (*StreamID) AsPrefix Uses

func (id *StreamID) AsPrefix() []byte

A RecordStream node should use its StreamID as a prefix to all storage operations. This is a helper function to make that easier.

func (*StreamID) Descriptor Uses

func (*StreamID) Descriptor() ([]byte, []int)

func (*StreamID) GetId Uses

func (m *StreamID) GetId() string

func (*StreamID) ProtoMessage Uses

func (*StreamID) ProtoMessage()

func (*StreamID) Reset Uses

func (m *StreamID) Reset()

func (*StreamID) String Uses

func (m *StreamID) String() string

func (*StreamID) XXX_DiscardUnknown Uses

func (m *StreamID) XXX_DiscardUnknown()

func (*StreamID) XXX_Marshal Uses

func (m *StreamID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StreamID) XXX_Merge Uses

func (m *StreamID) XXX_Merge(src proto.Message)

func (*StreamID) XXX_Size Uses

func (m *StreamID) XXX_Size() int

func (*StreamID) XXX_Unmarshal Uses

func (m *StreamID) XXX_Unmarshal(b []byte) error

type StreamJoin Uses

type StreamJoin struct {
    // contains filtered or unexported fields
}

func NewStreamJoin Uses

func NewStreamJoin(leftSource, rightSource Node, leftKey, rightKey []Expression, storage storage.Storage, eventTimeField octosql.VariableName, joinType JoinType, triggerPrototype TriggerPrototype) *StreamJoin

func (*StreamJoin) Get Uses

func (node *StreamJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Task Uses

type Task func() error

type Trigger Uses

type Trigger interface {
    docs.Documented
    RecordReceived(ctx context.Context, tx storage.StateTransaction, key octosql.Value, eventTime time.Time) error
    UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
    PollKeysToFire(ctx context.Context, tx storage.StateTransaction, batchSize int) ([]octosql.Value, error)
    KeysFired(ctx context.Context, tx storage.StateTransaction, key []octosql.Value) error
}

type TriggerPrototype Uses

type TriggerPrototype interface {
    Get(ctx context.Context, variables octosql.Variables) (Trigger, error)
}

type TupleExpression Uses

type TupleExpression struct {
    // contains filtered or unexported fields
}

func NewTuple Uses

func NewTuple(expressions []Expression) *TupleExpression

func (*TupleExpression) ExpressionValue Uses

func (tup *TupleExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type UnionWatermarkGenerator Uses

type UnionWatermarkGenerator struct {
    // contains filtered or unexported fields
}

func NewUnionWatermarkGenerator Uses

func NewUnionWatermarkGenerator(sources []WatermarkSource) *UnionWatermarkGenerator

func (*UnionWatermarkGenerator) GetWatermark Uses

func (uwg *UnionWatermarkGenerator) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

type Validator Uses

type Validator interface {
    docs.Documented
    Validate(args ...octosql.Value) error
}

type Variable Uses

type Variable struct {
    // contains filtered or unexported fields
}

func NewVariable Uses

func NewVariable(name octosql.VariableName) *Variable

func (*Variable) ExpressionValue Uses

func (v *Variable) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*Variable) Name Uses

func (v *Variable) Name() octosql.VariableName

type WatermarkSource Uses

type WatermarkSource interface {
    GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
}

type WatermarkTrigger Uses

type WatermarkTrigger struct {
}

func NewWatermarkTrigger Uses

func NewWatermarkTrigger() *WatermarkTrigger

func (*WatermarkTrigger) Get Uses

func (c *WatermarkTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type ZeroWatermarkGenerator Uses

type ZeroWatermarkGenerator struct {
}

func NewZeroWatermarkGenerator Uses

func NewZeroWatermarkGenerator() *ZeroWatermarkGenerator

func (*ZeroWatermarkGenerator) GetWatermark Uses

func (s *ZeroWatermarkGenerator) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

Directories

PathSynopsis
aggregates
functions
functions/docgen
trigger
tvf
tvf/docgen

Package execution imports 25 packages (graph) and is imported by 27 packages. Updated 2020-09-20. Refresh now. Tools for package owners.