batch

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2020 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const REFRESH_DELAY = time.Second / 4

Variables

This section is empty.

Functions

func GetAllFields

func GetAllFields(records []*execution.Record) []string

Types

type BatchTablePrinter

type BatchTablePrinter struct {
	// contains filtered or unexported fields
}

func NewWholeTablePrinter

func NewWholeTablePrinter(stateStorage storage.Storage, recordsLister RecordsLister, tableFormatter TableFormatter) *BatchTablePrinter

func (*BatchTablePrinter) Run

func (printer *BatchTablePrinter) Run(ctx context.Context) error

type LiveTablePrinter

type LiveTablePrinter struct {
	// contains filtered or unexported fields
}

func NewLiveTablePrinter

func NewLiveTablePrinter(stateStorage storage.Storage, recordsLister RecordsLister, tableFormatter TableFormatter) *LiveTablePrinter

func (*LiveTablePrinter) Run

func (printer *LiveTablePrinter) Run(ctx context.Context) error

type RecordData

type RecordData struct {
	Ids                  []*execution.RecordID `protobuf:"bytes,1,rep,name=ids,proto3" json:"ids,omitempty"`
	IsUndo               bool                  `protobuf:"varint,2,opt,name=isUndo,proto3" json:"isUndo,omitempty"`
	Record               *execution.Record     `protobuf:"bytes,3,opt,name=record,proto3" json:"record,omitempty"`
	XXX_NoUnkeyedLiteral struct{}              `json:"-"`
	XXX_unrecognized     []byte                `json:"-"`
	XXX_sizecache        int32                 `json:"-"`
}

func (*RecordData) Descriptor

func (*RecordData) Descriptor() ([]byte, []int)

func (*RecordData) GetIds

func (m *RecordData) GetIds() []*execution.RecordID

func (*RecordData) GetIsUndo

func (m *RecordData) GetIsUndo() bool

func (*RecordData) GetRecord

func (m *RecordData) GetRecord() *execution.Record

func (*RecordData) ProtoMessage

func (*RecordData) ProtoMessage()

func (*RecordData) Reset

func (m *RecordData) Reset()

func (*RecordData) String

func (m *RecordData) String() string

func (*RecordData) XXX_DiscardUnknown

func (m *RecordData) XXX_DiscardUnknown()

func (*RecordData) XXX_Marshal

func (m *RecordData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*RecordData) XXX_Merge

func (m *RecordData) XXX_Merge(src proto.Message)

func (*RecordData) XXX_Size

func (m *RecordData) XXX_Size() int

func (*RecordData) XXX_Unmarshal

func (m *RecordData) XXX_Unmarshal(b []byte) error

type RecordsLister

type RecordsLister interface {
	ListRecords(ctx context.Context, tx storage.StateTransaction) ([]*execution.Record, error)
	GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
	GetEndOfStream(ctx context.Context, tx storage.StateTransaction) (bool, error)
	GetErrorMessage(ctx context.Context, tx storage.StateTransaction) (string, error)
}

type TableFormatter

type TableFormatter func(w io.Writer, records []*execution.Record, watermark time.Time, err error) error

type TableOutput

type TableOutput struct {
	StreamID            *execution.StreamID
	EventTimeField      octosql.VariableName
	OrderingExpressions []execution.Expression
	OrderingDirections  []execution.OrderDirection
	Limit               *int
	Offset              *int
}

func NewTableOutput

func NewTableOutput(
	streamID *execution.StreamID,
	eventTimeField octosql.VariableName,
	orderingExpressions []execution.Expression,
	orderingDirections []execution.OrderDirection,
	limit *int,
	offset *int,
) *TableOutput

func (*TableOutput) AddRecord

func (o *TableOutput) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *execution.Record) error

func (*TableOutput) Close

func (o *TableOutput) Close(ctx context.Context, storage storage.Storage) error

func (*TableOutput) GetEndOfStream

func (o *TableOutput) GetEndOfStream(ctx context.Context, tx storage.StateTransaction) (bool, error)

func (*TableOutput) GetErrorMessage

func (o *TableOutput) GetErrorMessage(ctx context.Context, tx storage.StateTransaction) (string, error)

func (*TableOutput) GetWatermark

func (o *TableOutput) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*TableOutput) ListRecords

func (o *TableOutput) ListRecords(ctx context.Context, tx storage.StateTransaction) ([]*execution.Record, error)

func (*TableOutput) MarkEndOfStream

func (o *TableOutput) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*TableOutput) MarkError

func (o *TableOutput) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*TableOutput) Next

func (*TableOutput) ReadyForMore

func (o *TableOutput) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*TableOutput) TriggerKeys

func (o *TableOutput) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*TableOutput) UpdateWatermark

func (o *TableOutput) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL