engine

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 80 Imported by: 0

Documentation

Overview

Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	HybridStoreReaderChunkNum = 7
	SegmentBatchCount         = 128
)
View Source
const (
	SidSequenceReaderRecordNum = 6
	SequenceAggRecordNum       = 3
)
View Source
const (
	MaxRetryUpdateOnShardNum = 4

	CRCLen     = 4
	BufferSize = 1024 * 1024

	// OBSFileExtension is the extension used for OBS files.
	OBSFileExtension = ".init"
)
View Source
const (
	Failpoint = "failpoint"

	BackgroundReadLimiter = "backgroundReadLimiter"
)
View Source
const (
	TierLeveMem           = 1 // in memory
	TierLeveLocalDisk     = 2
	TierLeveObjectStorage = 3
)
View Source
const (
	DefaultFileSize   = 10 * 1024 * 1024
	WALFileSuffixes   = "wal"
	WalRecordHeadSize = 1 + 4
	WalCompBufSize    = 256 * 1024
	WalCompMaxBufSize = 2 * 1024 * 1024
)
View Source
const (
	WriteWalUnKnownType = iota
	WriteWalLineProtocol
	WriteWalArrowFlight
	WriteWalEnd
)
View Source
const ColumnStoreReaderChunkNum = 7
View Source
const ColumnStoreReaderRecordNum = 7
View Source
const (
	ContentFilterDurationSpan = "content_filter_duration"
)
View Source
const (
	IncDataSegmentNum = 16
)

Variables

View Source
var AppendManyNils map[int]func(colVal *record.ColVal, count int)
View Source
var (
	DownSampleWriteDrop = true
)
View Source
var (
	IntervalRecordPool = record.NewRecordPool(record.IntervalRecordPool)
)
View Source
var (
	RecordIteratorPool = &sync.Pool{}
)

Functions

func AddLocationsWithFirstTime

func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AddLocationsWithInit

func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) error

func AddLocationsWithLimit

func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AppendColumnTimes

func AppendColumnTimes(bitmap []bool, column executor.Column, columnTimes []int64, recCol *record.ColVal)

func AppendNilRowWithTime added in v1.0.0

func AppendNilRowWithTime(rec *record.Record, t int64)

func AppendRecWithNilRows added in v1.0.0

func AppendRecWithNilRows(rec, re *record.Record, opt hybridqp.Options, seriesStart, seriesEnd, shardStart int64, last bool)

func CanNotAggOnSeriesFunc

func CanNotAggOnSeriesFunc(m map[string]*influxql.Call) bool

func GetMaxTime

func GetMaxTime(maxTime int64, rec *record.Record, isAscending bool) int64

func GetMemUsageLimit added in v1.1.0

func GetMemUsageLimit() int32

func GetMinTime

func GetMinTime(minTime int64, rec *record.Record, isAscending bool) int64

func IsMemUsageExceeded added in v1.1.0

func IsMemUsageExceeded() bool

func NewAggregateCursor

func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, hasAuxTags bool) *aggregateCursor

func NewAttachedIndexReader added in v1.2.0

func NewAttachedIndexReader(ctx *indexContext, info *executor.AttachedIndexInfo) *attachedIndexReader

func NewChunkReader

func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	schema *executor.QuerySchema, cursors []interface{}, state bool) executor.Processor

func NewDetachedIndexReader added in v1.2.0

func NewDetachedIndexReader(ctx *indexContext, obsOption *obs.ObsOptions) *detachedIndexReader

func NewEngine

func NewEngine(dataPath, walPath string, options netstorage.EngineOptions, ctx *meta.LoadCtx) (netstorage.Engine, error)

func NewFencer added in v1.0.0

func NewFencer(dataPath, walPath, db string, pt uint32) fencer

func NewFileLoopCursor

func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema,
	tagSet *tsi.TagSetInfo, start, step int, s *shard) *fileLoopCursor

func NewFileSequenceAggregator added in v1.0.0

func NewFileSequenceAggregator(schema hybridqp.Catalog, addPrefix bool, shardStartTime, shardEndTime int64) executor.Processor

func NewFloatColFloatHeapReducer

func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, floatHeapItem *FloatHeapItem) *floatColFloatHeapReducer

func NewIndexContext added in v1.2.0

func NewIndexContext(readBatch bool, batchCount int, schema hybridqp.Catalog, shardPath string) *indexContext

func NewIntegerColIntegerHeapReducer

func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, integerHeapItem *IntegerHeapItem) *integerColIntegerHeapReducer

func NewLimitCursor

func NewLimitCursor(schema *executor.QuerySchema, helper func(start, end int, src, des *record.Record)) *limitCursor

func NewRecordSchema

func NewRecordSchema(querySchema *executor.QuerySchema, auxTags []string, schema record.Schemas, filterConditions []*influxql.VarRef, engineType config.EngineType) ([]string, record.Schemas)

func NewSeriesInfoPool

func NewSeriesInfoPool(num int64) *filesInfoPool

func NewShard

func NewShard(dataPath, walPath string, lockPath *string, ident *meta.ShardIdentifier, durationInfo *meta.DurationDescriptor, tr *meta.TimeRangeInfo,
	options netstorage.EngineOptions, engineType config.EngineType) *shard

func NewTagSetCursorForTest

func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor

NewTagSetCursorForTest for ut test, will remove later

func NewTopNLinkedList

func NewTopNLinkedList(n int, ascending bool) *topNLinkedList

func NewTsspSequenceReader added in v1.0.0

func NewTsspSequenceReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, files *immutable.TSSPFiles, newSeqs []uint64, stop chan struct{}) executor.Processor

func NewWriteIntoStorageTransform added in v1.0.0

func NewWriteIntoStorageTransform(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, conf *immutable.Config, m *immutable.MmsTables, addPrefix bool) executor.Processor

func RecordCutNormal

func RecordCutNormal(start, end int, src, dst *record.Record)

func SetFullCompColdDuration

func SetFullCompColdDuration(d time.Duration)

func SetNextMethod added in v1.0.0

func SetNextMethod(cursor comm.KeyCursor)

SetNextMethod for test

Types

type AggTagSetCursor

type AggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewAggTagSetCursor

func NewAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor, singleSeries bool) *AggTagSetCursor

func (*AggTagSetCursor) Close

func (s *AggTagSetCursor) Close() error

func (*AggTagSetCursor) EndSpan

func (s *AggTagSetCursor) EndSpan()

func (*AggTagSetCursor) GetIndex added in v1.0.0

func (s *AggTagSetCursor) GetIndex(t int64) int64

func (*AggTagSetCursor) GetSchema

func (s *AggTagSetCursor) GetSchema() record.Schemas

func (*AggTagSetCursor) Init

func (s *AggTagSetCursor) Init()

func (*AggTagSetCursor) Name

func (s *AggTagSetCursor) Name() string

func (*AggTagSetCursor) Next

func (*AggTagSetCursor) NextAggData

func (s *AggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*AggTagSetCursor) NextWithMultipleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithMultipleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) NextWithSingleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithSingleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) RecordInit

func (s *AggTagSetCursor) RecordInit() error

func (*AggTagSetCursor) SetOps

func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*AggTagSetCursor) SetParaForTest added in v1.0.0

func (s *AggTagSetCursor) SetParaForTest(schema record.Schemas)

func (*AggTagSetCursor) SetSchema

func (s *AggTagSetCursor) SetSchema(schema record.Schemas)

func (*AggTagSetCursor) SinkPlan

func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*AggTagSetCursor) StartSpan

func (s *AggTagSetCursor) StartSpan(span *tracing.Span)

func (*AggTagSetCursor) TagAuxHandler

func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)

func (*AggTagSetCursor) TimeWindowsInit

func (s *AggTagSetCursor) TimeWindowsInit()

func (*AggTagSetCursor) UpdateRec added in v1.0.0

func (s *AggTagSetCursor) UpdateRec(recRow, chunkRow int)

type ChunkMetaByField added in v1.0.0

type ChunkMetaByField struct {
	// contains filtered or unexported fields
}

ChunkMetaByField build from each chunkmeta. ChunkMetaByField obtain data by sid column by column

func NewChunkMetaByField added in v1.0.0

func NewChunkMetaByField(file immutable.TSSPFile, fieldIter *FieldIter, chunkMeta immutable.ChunkMeta, recordPool *record.CircularRecordPool) *ChunkMetaByField

type ChunkMetaByFieldIters added in v1.0.0

type ChunkMetaByFieldIters struct {
	// contains filtered or unexported fields
}

ChunkMetaByFieldIters is the iterator of ChunkMetaByField.

func NewChunkMetaByFieldIters added in v1.0.0

func NewChunkMetaByFieldIters(chunkMetas []immutable.ChunkMeta, file immutable.TSSPFile, fieldIter *FieldIter, recordPool *record.CircularRecordPool) *ChunkMetaByFieldIters

type ChunkReader

type ChunkReader struct {
	executor.BaseProcessor

	Output          *executor.ChunkPort
	ResultChunkPool *executor.CircularChunkPool
	// contains filtered or unexported fields
}

func (*ChunkReader) Abort added in v1.0.1

func (r *ChunkReader) Abort()

func (*ChunkReader) Close

func (r *ChunkReader) Close()

func (*ChunkReader) Create

func (*ChunkReader) Explain

func (r *ChunkReader) Explain() []executor.ValuePair

func (*ChunkReader) GetInputNumber

func (r *ChunkReader) GetInputNumber(executor.Port) int

func (*ChunkReader) GetInputs

func (r *ChunkReader) GetInputs() executor.Ports

func (*ChunkReader) GetOutputNumber

func (r *ChunkReader) GetOutputNumber(executor.Port) int

func (*ChunkReader) GetOutputs

func (r *ChunkReader) GetOutputs() executor.Ports

func (*ChunkReader) IsSink

func (r *ChunkReader) IsSink() bool

func (*ChunkReader) Name

func (r *ChunkReader) Name() string

func (*ChunkReader) Release

func (r *ChunkReader) Release() error

func (*ChunkReader) Work

func (r *ChunkReader) Work(ctx context.Context) error

type CoProcessor

type CoProcessor interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type CoProcessorImpl

type CoProcessorImpl struct {
	Routines []Routine
}

func NewCoProcessorImpl

func NewCoProcessorImpl(routines ...Routine) *CoProcessorImpl

func (*CoProcessorImpl) AppendRoutine

func (p *CoProcessorImpl) AppendRoutine(routines ...Routine)

func (*CoProcessorImpl) WorkOnRecord

func (p *CoProcessorImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type ColumnStoreReader added in v1.1.0

type ColumnStoreReader struct {
	executor.BaseProcessor
	// contains filtered or unexported fields
}

func NewColumnStoreReader added in v1.1.0

func NewColumnStoreReader(plan hybridqp.QueryNode, frags executor.ShardsFragments) *ColumnStoreReader

func (*ColumnStoreReader) Abort added in v1.1.0

func (r *ColumnStoreReader) Abort()

func (*ColumnStoreReader) Close added in v1.1.0

func (r *ColumnStoreReader) Close()

func (*ColumnStoreReader) Explain added in v1.1.0

func (r *ColumnStoreReader) Explain() []executor.ValuePair

func (*ColumnStoreReader) FragmentCount added in v1.1.0

func (r *ColumnStoreReader) FragmentCount() int

func (*ColumnStoreReader) GetInputNumber added in v1.1.0

func (r *ColumnStoreReader) GetInputNumber(_ executor.Port) int

func (*ColumnStoreReader) GetInputs added in v1.1.0

func (r *ColumnStoreReader) GetInputs() executor.Ports

func (*ColumnStoreReader) GetOutputNumber added in v1.1.0

func (r *ColumnStoreReader) GetOutputNumber(_ executor.Port) int

func (*ColumnStoreReader) GetOutputs added in v1.1.0

func (r *ColumnStoreReader) GetOutputs() executor.Ports

func (*ColumnStoreReader) IsSink added in v1.1.0

func (r *ColumnStoreReader) IsSink() bool

func (*ColumnStoreReader) Name added in v1.1.0

func (r *ColumnStoreReader) Name() string

func (*ColumnStoreReader) Release added in v1.1.0

func (r *ColumnStoreReader) Release() error

func (*ColumnStoreReader) Run added in v1.1.0

func (r *ColumnStoreReader) Run(ctx context.Context) (iterCount, rowCountAfterFilter int, err error)

func (*ColumnStoreReader) Work added in v1.1.0

func (r *ColumnStoreReader) Work(ctx context.Context) error

type ColumnStoreReaderCreator added in v1.1.0

type ColumnStoreReaderCreator struct {
}

func (*ColumnStoreReaderCreator) Create added in v1.1.0

func (*ColumnStoreReaderCreator) CreateReader added in v1.1.0

func (c *ColumnStoreReaderCreator) CreateReader(plan hybridqp.QueryNode, indexInfo interface{}) (executor.Processor, error)

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor() *Compactor

func (*Compactor) RegisterShard

func (c *Compactor) RegisterShard(sh *shard)

func (*Compactor) SetAllOutOfOrderMergeSwitch

func (c *Compactor) SetAllOutOfOrderMergeSwitch(en bool)

func (*Compactor) SetAllShardsCompactionSwitch

func (c *Compactor) SetAllShardsCompactionSwitch(en bool)

func (*Compactor) SetSnapshotColdDuration

func (c *Compactor) SetSnapshotColdDuration(d time.Duration)

func (*Compactor) ShardCompactionSwitch

func (c *Compactor) ShardCompactionSwitch(shid uint64, en bool)

func (*Compactor) ShardOutOfOrderMergeSwitch

func (c *Compactor) ShardOutOfOrderMergeSwitch(shid uint64, en bool)

func (*Compactor) UnregisterShard

func (c *Compactor) UnregisterShard(shardId uint64)

type DBPTInfo

type DBPTInfo struct {
	// contains filtered or unexported fields
}

func NewDBPTInfo

func NewDBPTInfo(db string, id uint32, dataPath, walPath string, ctx *metaclient.LoadCtx) *DBPTInfo

func (*DBPTInfo) NewShard

func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, client metaclient.MetaClient, engineType config.EngineType) (Shard, error)

func (*DBPTInfo) OpenIndexes

func (dbPT *DBPTInfo) OpenIndexes(opId uint64, rp string, engineType config.EngineType) error

func (*DBPTInfo) OpenShards

func (dbPT *DBPTInfo) OpenShards(opId uint64, rp string, durationInfos map[uint64]*meta.ShardDurationInfo, loadStat int, client metaclient.MetaClient) error

func (*DBPTInfo) SetOption

func (dbPT *DBPTInfo) SetOption(opt netstorage.EngineOptions)

func (*DBPTInfo) SetParams added in v1.1.0

func (dbPT *DBPTInfo) SetParams(preload bool, lockPath *string, enableTagArray bool)

func (*DBPTInfo) Shard

func (dbPT *DBPTInfo) Shard(id uint64) Shard

func (*DBPTInfo) ShardIds added in v1.2.0

func (dbPT *DBPTInfo) ShardIds() []uint64

type DataBlockInfo

type DataBlockInfo struct {
	// contains filtered or unexported fields
}

type DetachedMetaInfo added in v1.2.0

type DetachedMetaInfo struct {
	// contains filtered or unexported fields
}

func NewDetachedMetaInfo added in v1.2.0

func NewDetachedMetaInfo() *DetachedMetaInfo

type DownSampleFilesInfo added in v1.0.0

type DownSampleFilesInfo struct {
	Names    []string
	OldFiles [][]string
	NewFiles [][]string
	// contains filtered or unexported fields
}

type EndPointPair

type EndPointPair struct {
	Record  *record.Record
	Ordinal int
}

type Engine

type Engine struct {
	DBPartitions map[string]map[uint32]*DBPTInfo

	DownSamplePolicies map[string]*meta2.StoreDownSamplePolicy
	// contains filtered or unexported fields
}

func (*Engine) Assign added in v1.0.0

func (e *Engine) Assign(opId uint64, db string, ptId uint32, ver uint64, durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfo *meta2.DatabaseBriefInfo, client metaclient.MetaClient) error

func (*Engine) ChangeShardTierToWarm

func (e *Engine) ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

func (*Engine) CheckPtsRemovedDone added in v1.1.1

func (e *Engine) CheckPtsRemovedDone() bool

func (*Engine) ClearIndexCache added in v1.2.0

func (e *Engine) ClearIndexCache(db string, ptId uint32, indexID uint64) error

func (*Engine) Close

func (e *Engine) Close() error

func (*Engine) CreateDBPT

func (e *Engine) CreateDBPT(db string, pt uint32, enableTagArray bool)

func (*Engine) CreateLogicalPlan

func (e *Engine) CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*Engine) CreateShard

func (e *Engine) CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta2.ShardTimeRangeInfo, mstInfo *meta2.MeasurementInfo) error

func (*Engine) Databases added in v1.1.0

func (e *Engine) Databases() []string

func (*Engine) DbPTRef

func (e *Engine) DbPTRef(db string, ptId uint32) error

func (*Engine) DbPTUnref

func (e *Engine) DbPTUnref(db string, ptId uint32)

func (*Engine) DeleteDatabase

func (e *Engine) DeleteDatabase(db string, ptId uint32) (err error)

func (*Engine) DeleteIndex

func (e *Engine) DeleteIndex(db string, ptId uint32, indexID uint64) error

func (*Engine) DeleteShard

func (e *Engine) DeleteShard(db string, ptId uint32, shardID uint64) error

todo:need confirm

func (*Engine) DropMeasurement

func (e *Engine) DropMeasurement(db string, rp string, name string, shardIds []uint64) error

func (*Engine) DropRetentionPolicy

func (e *Engine) DropRetentionPolicy(db string, rp string, ptId uint32) error

func (*Engine) DropSeries

func (e *Engine) DropSeries(database string, sources []influxql.Source, ptId []uint32, condition influxql.Expr) (int, error)

func (*Engine) ExpiredCacheIndexes added in v1.2.0

func (e *Engine) ExpiredCacheIndexes() []*meta2.IndexIdentifier

func (*Engine) ExpiredIndexes

func (e *Engine) ExpiredIndexes() []*meta2.IndexIdentifier

func (*Engine) ExpiredShards

func (e *Engine) ExpiredShards() []*meta2.ShardIdentifier

func (*Engine) FetchShardsNeedChangeStore

func (e *Engine) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)

func (*Engine) ForceFlush

func (e *Engine) ForceFlush()

func (*Engine) GetDownSamplePolicy added in v1.0.0

func (e *Engine) GetDownSamplePolicy(key string) *meta2.StoreDownSamplePolicy

func (*Engine) GetIndexInfo added in v1.2.0

func (e *Engine) GetIndexInfo(db string, ptId uint32, shardID uint64, schema *executor.QuerySchema) (*executor.AttachedIndexInfo, error)

func (*Engine) GetLockFile

func (e *Engine) GetLockFile() string

func (*Engine) GetShard added in v1.0.0

func (e *Engine) GetShard(db string, ptId uint32, shardID uint64) (Shard, error)

func (*Engine) GetShardDownSampleLevel added in v1.0.0

func (e *Engine) GetShardDownSampleLevel(db string, ptId uint32, shardID uint64) int

func (*Engine) GetShardDownSamplePolicyInfos added in v1.0.0

func (e *Engine) GetShardDownSamplePolicyInfos(meta interface {
	UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
}) ([]*meta2.ShardDownSamplePolicyInfo, error)

func (*Engine) GetShardSplitPoints

func (e *Engine) GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

func (*Engine) HierarchicalStorage added in v1.2.0

func (e *Engine) HierarchicalStorage(db string, ptId uint32, shardID uint64) error

func (*Engine) InitLogStoreCtx added in v1.2.0

func (s *Engine) InitLogStoreCtx(querySchema *executor.QuerySchema) (*idKeyCursorContext, error)

func (*Engine) LogicalPlanCost

func (e *Engine) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

func (*Engine) Offload added in v1.0.0

func (e *Engine) Offload(opId uint64, db string, ptId uint32) error

func (*Engine) Open

func (e *Engine) Open(durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfos map[string]*meta2.DatabaseBriefInfo, m meta.MetaClient) error

func (*Engine) PreAssign added in v1.0.0

func (e *Engine) PreAssign(opId uint64, db string, ptId uint32, durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfo *meta2.DatabaseBriefInfo, client metaclient.MetaClient) error

func (*Engine) PreOffload added in v1.0.0

func (e *Engine) PreOffload(opId uint64, db string, ptId uint32) error

func (*Engine) RollbackPreOffload added in v1.0.0

func (e *Engine) RollbackPreOffload(opId uint64, db string, ptId uint32) error

func (*Engine) RowCount added in v1.1.0

func (e *Engine) RowCount(db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (int64, error)

func (*Engine) ScanWithSparseIndex added in v1.1.0

func (e *Engine) ScanWithSparseIndex(ctx context.Context, db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (executor.ShardsFragments, error)

func (*Engine) SeriesCardinality

func (e *Engine) SeriesCardinality(db string, ptIDs []uint32, namesWithVer [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]meta2.MeasurementCardinalityInfo, error)

func (*Engine) SeriesExactCardinality

func (e *Engine) SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)

func (*Engine) SeriesKeys

func (e *Engine) SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)

func (*Engine) StartDownSampleTask added in v1.0.0

func (e *Engine) StartDownSampleTask(sdsp *meta2.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger,
	meta interface {
		UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
	}) error

func (*Engine) Statistics

func (e *Engine) Statistics(buffer []byte) ([]byte, error)

func (*Engine) StatisticsOps added in v1.1.0

func (e *Engine) StatisticsOps() []opsStat.OpsStatistic

func (*Engine) SysCtrl

func (e *Engine) SysCtrl(req *netstorage.SysCtrlRequest) (map[string]string, error)

func (*Engine) TagKeys added in v1.2.0

func (e *Engine) TagKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)

func (*Engine) TagValues

func (e *Engine) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (netstorage.TablesTagSets, error)

func (*Engine) TagValuesCardinality

func (e *Engine) TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)

func (*Engine) UpdateDownSampleInfo added in v1.0.0

func (e *Engine) UpdateDownSampleInfo(policies *meta2.DownSamplePoliciesInfoWithDbRp)

func (*Engine) UpdateShardDownSampleInfo added in v1.0.0

func (e *Engine) UpdateShardDownSampleInfo(infos *meta2.ShardDownSampleUpdateInfos)

func (*Engine) UpdateShardDurationInfo

func (e *Engine) UpdateShardDurationInfo(info *meta2.ShardDurationInfo) error

func (*Engine) UpdateStoreDownSamplePolicies added in v1.0.0

func (e *Engine) UpdateStoreDownSamplePolicies(info *meta2.DownSamplePolicyInfo, ident string)

func (*Engine) WriteRec added in v1.1.0

func (e *Engine) WriteRec(db, mst string, ptId uint32, shardID uint64, rec *record.Record, binaryRec []byte) error

func (*Engine) WriteRows

func (e *Engine) WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte) error

type FieldByte added in v1.2.0

type FieldByte struct {
	// contains filtered or unexported fields
}

type FieldByteTopKHeap added in v1.2.0

type FieldByteTopKHeap struct {
	// contains filtered or unexported fields
}

func (FieldByteTopKHeap) Len added in v1.2.0

func (h FieldByteTopKHeap) Len() int

func (FieldByteTopKHeap) Less added in v1.2.0

func (h FieldByteTopKHeap) Less(i, j int) bool

func (*FieldByteTopKHeap) Pop added in v1.2.0

func (h *FieldByteTopKHeap) Pop() interface{}

func (*FieldByteTopKHeap) Push added in v1.2.0

func (h *FieldByteTopKHeap) Push(x interface{})

func (FieldByteTopKHeap) Swap added in v1.2.0

func (h FieldByteTopKHeap) Swap(i, j int)

type FieldIter added in v1.0.0

type FieldIter struct {
	// contains filtered or unexported fields
}

func NewFieldIter added in v1.0.0

func NewFieldIter(querySchema *executor.QuerySchema) *FieldIter

func (*FieldIter) GetRecordSchemas added in v1.0.0

func (r *FieldIter) GetRecordSchemas() record.Schemas

func (*FieldIter) ResetPos added in v1.0.0

func (r *FieldIter) ResetPos()

type FileSequenceAggregator added in v1.0.0

type FileSequenceAggregator struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*FileSequenceAggregator) Aggregate added in v1.0.0

func (r *FileSequenceAggregator) Aggregate()

func (*FileSequenceAggregator) AggregateSameSchema added in v1.0.0

func (r *FileSequenceAggregator) AggregateSameSchema() error

func (*FileSequenceAggregator) Close added in v1.0.0

func (r *FileSequenceAggregator) Close()

func (*FileSequenceAggregator) Create added in v1.0.0

func (*FileSequenceAggregator) Explain added in v1.0.0

func (r *FileSequenceAggregator) Explain() []executor.ValuePair

func (*FileSequenceAggregator) GetInputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetInputNumber(executor.Port) int

func (*FileSequenceAggregator) GetInputs added in v1.0.0

func (r *FileSequenceAggregator) GetInputs() executor.Ports

func (*FileSequenceAggregator) GetOutputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetOutputNumber(executor.Port) int

func (*FileSequenceAggregator) GetOutputs added in v1.0.0

func (r *FileSequenceAggregator) GetOutputs() executor.Ports

func (*FileSequenceAggregator) GetProcessors added in v1.0.0

func (r *FileSequenceAggregator) GetProcessors()

func (*FileSequenceAggregator) IsSink added in v1.0.0

func (r *FileSequenceAggregator) IsSink() bool

func (*FileSequenceAggregator) Name added in v1.0.0

func (r *FileSequenceAggregator) Name() string

func (*FileSequenceAggregator) Release added in v1.0.0

func (r *FileSequenceAggregator) Release() error

func (*FileSequenceAggregator) SendRecord added in v1.0.0

func (r *FileSequenceAggregator) SendRecord(re *record.Record)

func (*FileSequenceAggregator) Work added in v1.0.0

type FloatHeapItem

type FloatHeapItem struct {
	// contains filtered or unexported fields
}

func NewFloatHeapItem

func NewFloatHeapItem(n int, cmpByValue, cmpByTime func(a, b *FloatPointItem) bool) *FloatHeapItem

func (*FloatHeapItem) Len

func (f *FloatHeapItem) Len() int

func (*FloatHeapItem) Less

func (f *FloatHeapItem) Less(i, j int) bool

func (*FloatHeapItem) Pop

func (f *FloatHeapItem) Pop() interface{}

func (*FloatHeapItem) Push

func (f *FloatHeapItem) Push(x interface{})

func (*FloatHeapItem) Reset

func (f *FloatHeapItem) Reset()

func (*FloatHeapItem) Swap

func (f *FloatHeapItem) Swap(i, j int)

type FloatPointItem

type FloatPointItem struct {
	// contains filtered or unexported fields
}

func NewFloatPointItem

func NewFloatPointItem(time int64, value float64) *FloatPointItem

type HybridStoreReader added in v1.2.0

type HybridStoreReader struct {
	executor.BaseProcessor
	// contains filtered or unexported fields
}

func NewHybridStoreReader added in v1.2.0

func NewHybridStoreReader(plan hybridqp.QueryNode, indexInfo *executor.CSIndexInfo) *HybridStoreReader

func (*HybridStoreReader) Abort added in v1.2.0

func (r *HybridStoreReader) Abort()

func (*HybridStoreReader) Close added in v1.2.0

func (r *HybridStoreReader) Close()

func (*HybridStoreReader) CreateCursors added in v1.2.0

func (r *HybridStoreReader) CreateCursors() ([]comm.KeyCursor, error)

func (*HybridStoreReader) CreateLogStoreCursor added in v1.2.0

func (r *HybridStoreReader) CreateLogStoreCursor() (comm.KeyCursor, error)

func (*HybridStoreReader) Explain added in v1.2.0

func (r *HybridStoreReader) Explain() []executor.ValuePair

func (*HybridStoreReader) GetInputNumber added in v1.2.0

func (r *HybridStoreReader) GetInputNumber(_ executor.Port) int

func (*HybridStoreReader) GetInputs added in v1.2.0

func (r *HybridStoreReader) GetInputs() executor.Ports

func (*HybridStoreReader) GetOutputNumber added in v1.2.0

func (r *HybridStoreReader) GetOutputNumber(_ executor.Port) int

func (*HybridStoreReader) GetOutputs added in v1.2.0

func (r *HybridStoreReader) GetOutputs() executor.Ports

func (*HybridStoreReader) IsSink added in v1.2.0

func (r *HybridStoreReader) IsSink() bool

func (*HybridStoreReader) Name added in v1.2.0

func (r *HybridStoreReader) Name() string

func (*HybridStoreReader) Release added in v1.2.0

func (r *HybridStoreReader) Release() error

func (*HybridStoreReader) Work added in v1.2.0

func (r *HybridStoreReader) Work(ctx context.Context) error

type IndexReader added in v1.2.0

type IndexReader interface {
	Next() (executor.IndexFrags, error)
}

type IntegerHeapItem

type IntegerHeapItem struct {
	// contains filtered or unexported fields
}

func NewIntegerHeapItem

func NewIntegerHeapItem(n int, cmpByValue, cmpByTime func(a, b *IntegerPointItem) bool) *IntegerHeapItem

func (*IntegerHeapItem) Len

func (f *IntegerHeapItem) Len() int

func (*IntegerHeapItem) Less

func (f *IntegerHeapItem) Less(i, j int) bool

func (*IntegerHeapItem) Pop

func (f *IntegerHeapItem) Pop() interface{}

func (*IntegerHeapItem) Push

func (f *IntegerHeapItem) Push(x interface{})

func (*IntegerHeapItem) Reset

func (f *IntegerHeapItem) Reset()

func (*IntegerHeapItem) Swap

func (f *IntegerHeapItem) Swap(i, j int)

type IntegerPointItem

type IntegerPointItem struct {
	// contains filtered or unexported fields
}

func NewIntegerPointItem

func NewIntegerPointItem(time int64, value int64) *IntegerPointItem

type LogReplay added in v1.0.1

type LogReplay struct {
	// contains filtered or unexported fields
}

type LogReplays added in v1.0.1

type LogReplays []LogReplay

type LogStoreAggCursor added in v1.2.0

type LogStoreAggCursor struct {
	// contains filtered or unexported fields
}

func NewLogStoreAggCursor added in v1.2.0

func NewLogStoreAggCursor(option *obs.ObsOptions, path string, version uint32, ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema) (*LogStoreAggCursor, error)

func (*LogStoreAggCursor) Close added in v1.2.0

func (s *LogStoreAggCursor) Close() error

func (*LogStoreAggCursor) EndSpan added in v1.2.0

func (s *LogStoreAggCursor) EndSpan()

func (*LogStoreAggCursor) GetSchema added in v1.2.0

func (s *LogStoreAggCursor) GetSchema() record.Schemas

func (*LogStoreAggCursor) Name added in v1.2.0

func (s *LogStoreAggCursor) Name() string

func (*LogStoreAggCursor) Next added in v1.2.0

func (*LogStoreAggCursor) NextAggData added in v1.2.0

func (s *LogStoreAggCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*LogStoreAggCursor) SetOps added in v1.2.0

func (s *LogStoreAggCursor) SetOps(ops []*comm.CallOption)

func (*LogStoreAggCursor) SetSchema added in v1.2.0

func (s *LogStoreAggCursor) SetSchema(schema record.Schemas)

func (*LogStoreAggCursor) SinkPlan added in v1.2.0

func (s *LogStoreAggCursor) SinkPlan(plan hybridqp.QueryNode)

func (*LogStoreAggCursor) StartSpan added in v1.2.0

func (s *LogStoreAggCursor) StartSpan(span *tracing.Span)

type LogStoreLimitCursor added in v1.2.0

type LogStoreLimitCursor struct {
	// contains filtered or unexported fields
}

func NewLogStoreLimitCursor added in v1.2.0

func NewLogStoreLimitCursor(option *obs.ObsOptions, path string, version uint32, ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema) (*LogStoreLimitCursor, error)

func (*LogStoreLimitCursor) Close added in v1.2.0

func (s *LogStoreLimitCursor) Close() error

func (*LogStoreLimitCursor) EndSpan added in v1.2.0

func (s *LogStoreLimitCursor) EndSpan()

func (*LogStoreLimitCursor) GetSchema added in v1.2.0

func (s *LogStoreLimitCursor) GetSchema() record.Schemas

func (*LogStoreLimitCursor) Name added in v1.2.0

func (s *LogStoreLimitCursor) Name() string

func (*LogStoreLimitCursor) Next added in v1.2.0

func (*LogStoreLimitCursor) NextAggData added in v1.2.0

func (s *LogStoreLimitCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*LogStoreLimitCursor) SetOps added in v1.2.0

func (s *LogStoreLimitCursor) SetOps(ops []*comm.CallOption)

func (*LogStoreLimitCursor) SetSchema added in v1.2.0

func (s *LogStoreLimitCursor) SetSchema(schema record.Schemas)

func (*LogStoreLimitCursor) SinkPlan added in v1.2.0

func (s *LogStoreLimitCursor) SinkPlan(plan hybridqp.QueryNode)

func (*LogStoreLimitCursor) StartSpan added in v1.2.0

func (s *LogStoreLimitCursor) StartSpan(span *tracing.Span)

type LogWriter

type LogWriter struct {
	SyncInterval time.Duration
	// contains filtered or unexported fields
}

func (*LogWriter) Switch

func (w *LogWriter) Switch() ([]string, error)

func (*LogWriter) Write

func (w *LogWriter) Write(compBuf []byte) error

type LogWriters added in v1.0.1

type LogWriters []LogWriter

type MetaIndexIterator added in v1.0.0

type MetaIndexIterator struct {
	// contains filtered or unexported fields
}

func NewMetaIndexIterators added in v1.0.0

func NewMetaIndexIterators(file immutable.TSSPFile, querySchema *executor.QuerySchema) (*MetaIndexIterator, error)

type PreAggTagSetCursor

type PreAggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewPreAggTagSetCursor

func NewPreAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *PreAggTagSetCursor

func (*PreAggTagSetCursor) Close

func (s *PreAggTagSetCursor) Close() error

func (*PreAggTagSetCursor) EndSpan

func (s *PreAggTagSetCursor) EndSpan()

func (*PreAggTagSetCursor) GetSchema

func (s *PreAggTagSetCursor) GetSchema() record.Schemas

func (*PreAggTagSetCursor) Name

func (s *PreAggTagSetCursor) Name() string

func (*PreAggTagSetCursor) Next

func (*PreAggTagSetCursor) NextAggData

func (s *PreAggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*PreAggTagSetCursor) RecordInitPreAgg

func (s *PreAggTagSetCursor) RecordInitPreAgg() error

func (*PreAggTagSetCursor) SetOps

func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*PreAggTagSetCursor) SetSchema

func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)

func (*PreAggTagSetCursor) SinkPlan

func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*PreAggTagSetCursor) StartSpan

func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)

type PtNNLock

type PtNNLock struct {
}

type Reducer

type Reducer interface {
	Aggregate(*ReducerEndpoint, *ReducerParams)
}

type ReducerEndpoint

type ReducerEndpoint struct {
	InputPoint  EndPointPair
	OutputPoint EndPointPair
}

type ReducerParams

type ReducerParams struct {
	// contains filtered or unexported fields
}

type Routine

type Routine interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type RoutineImpl

type RoutineImpl struct {
	// contains filtered or unexported fields
}

func NewRoutineImpl

func NewRoutineImpl(reducer Reducer, inOrdinal int, outOrdinal int) *RoutineImpl

func (*RoutineImpl) WorkOnRecord

func (r *RoutineImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type SeriesIter

type SeriesIter struct {
	// contains filtered or unexported fields
}

type Shard

type Shard interface {
	// IO interface
	WriteRows(rows []influx.Row, binaryRows []byte) error               // line protocol
	WriteCols(mst string, cols *record.Record, binaryCols []byte) error // native protocol
	ForceFlush()
	WaitWriteFinish()
	CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)
	CreateCursor(ctx context.Context, schema *executor.QuerySchema) ([]comm.KeyCursor, error)
	Scan(span *tracing.Span, schema *executor.QuerySchema, callBack func(num int64) error) (tsi.GroupSeries, int64, error)
	ScanWithSparseIndex(ctx context.Context, schema *executor.QuerySchema, callBack func(num int64) error) (*executor.FileFragments, error)
	GetIndexInfo(schema *executor.QuerySchema) (*executor.AttachedIndexInfo, error)
	RowCount(schema *executor.QuerySchema) (int64, error)
	NewShardKeyIdx(shardType, dataPath string, lockPath *string) error

	// admin
	OpenAndEnable(client metaclient.MetaClient) error
	IsOpened() bool
	Close() error
	ChangeShardTierToWarm()
	DropMeasurement(ctx context.Context, name string) error
	GetSplitPoints(idxes []int64) ([]string, error) // only work for tsstore (depends on sid)

	// get private member
	GetDataPath() string
	GetWalPath() string
	GetDuration() *meta.DurationDescriptor
	GetEngineType() config.EngineType
	GetIdent() *meta.ShardIdentifier
	GetID() uint64
	GetRowCount() uint64
	GetRPName() string
	GetStatistics(buffer []byte) ([]byte, error)
	GetMaxTime() int64
	GetIndexBuilder() *tsi.IndexBuilder                                // only work for tsstore(tsi)
	GetSeriesCount() int                                               // only work for tsstore
	GetTableStore() immutable.TablesStore                              // used by downsample and test
	GetTSSPFiles(mm string, isOrder bool) (*immutable.TSSPFiles, bool) // used by downsample and test
	GetTier() uint64
	IsExpired() bool
	IsTierExpired() bool

	// downsample, only work for tsstore
	CanDoDownSample() bool
	DisableDownSample()
	EnableDownSample()
	GetShardDownSamplePolicy(policy *meta.DownSamplePolicyInfo) *meta.ShardDownSamplePolicyInfo
	IsOutOfOrderFilesExist() bool
	NewDownSampleTask(sdsp *meta.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger)
	SetShardDownSampleLevel(i int)
	SetMstInfo(mstsInfo *meta.MeasurementInfo)
	StartDownSample(taskID uint64, level int, sdsp *meta.ShardDownSamplePolicyInfo, meta interface {
		UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
	}) error
	UpdateDownSampleOnShard(id uint64, level int)
	UpdateShardReadOnly(meta interface {
		UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
	}) error

	// compaction && merge, only work for tsstore
	Compact() error
	DisableCompAndMerge()
	EnableCompAndMerge()
	SetLockPath(lock *string)
	IsColdShard() bool
	CanDoShardMove() bool
	ExecShardMove() error
	DisableHierarchicalStorage()
	SetEnableHierarchicalStorage()
}

type ShardStatus added in v1.1.0

type ShardStatus struct {
	ShardId  uint64
	Opened   bool
	ReadOnly bool
}

func (ShardStatus) MarshalText added in v1.1.0

func (s ShardStatus) MarshalText() (data []byte, err error)

MarshalText keeps marshaled dict items order

type Storage added in v1.1.0

type Storage interface {
	WriteRowsToTable(s *shard, rows influx.Rows, mw *mstWriteCtx, binaryRows []byte) error
	WriteRows(s *shard, mw *mstWriteCtx) error                                    // line protocol
	WriteCols(s *shard, cols *record.Record, mst string, binaryCols []byte) error // native protocol
	WriteIndex(s *shard, rows *influx.Rows, mw *mstWriteCtx) error

	SetClient(client metaclient.MetaClient)
	SetMstInfo(s *shard, name string, mstInfo *meta.MeasurementInfo)
	SetAccumulateMetaIndex(name string, detachedMetaInfo *immutable.AccumulateMetaIndex)
	ForceFlush(s *shard)
	// contains filtered or unexported methods
}

type TagSetCursorItem

type TagSetCursorItem struct {
	// contains filtered or unexported fields
}

func (TagSetCursorItem) GetNewRecord

func (c TagSetCursorItem) GetNewRecord() (*record.Record, error)

type TierInfo

type TierInfo struct {
}

type TsspSequenceReader added in v1.0.0

type TsspSequenceReader struct {
	executor.BaseProcessor

	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*TsspSequenceReader) Close added in v1.0.0

func (r *TsspSequenceReader) Close()

func (*TsspSequenceReader) Create added in v1.0.0

func (*TsspSequenceReader) Explain added in v1.0.0

func (r *TsspSequenceReader) Explain() []executor.ValuePair

func (*TsspSequenceReader) GetInputNumber added in v1.0.0

func (r *TsspSequenceReader) GetInputNumber(executor.Port) int

func (*TsspSequenceReader) GetInputs added in v1.0.0

func (r *TsspSequenceReader) GetInputs() executor.Ports

func (*TsspSequenceReader) GetOutputNumber added in v1.0.0

func (r *TsspSequenceReader) GetOutputNumber(executor.Port) int

func (*TsspSequenceReader) GetOutputs added in v1.0.0

func (r *TsspSequenceReader) GetOutputs() executor.Ports

func (*TsspSequenceReader) IsSink added in v1.0.0

func (r *TsspSequenceReader) IsSink() bool

func (*TsspSequenceReader) Name added in v1.0.0

func (r *TsspSequenceReader) Name() string

func (*TsspSequenceReader) Release added in v1.0.0

func (r *TsspSequenceReader) Release() error

func (*TsspSequenceReader) Work added in v1.0.0

func (r *TsspSequenceReader) Work(ctx context.Context) error

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func NewWAL

func NewWAL(path string, lockPath *string, shardID uint64, walSyncInterval time.Duration, walEnabled, replayParallel bool, partitionNum int, walReplayBatchSize int) *WAL

func (*WAL) Close

func (l *WAL) Close() error

func (*WAL) Remove

func (l *WAL) Remove(files []string) error

func (*WAL) Replay

func (l *WAL) Replay(ctx context.Context, callBack func(binary []byte, rowsCtx *walRowsObjects, writeWalType WalRecordType) error) ([]string, error)

func (*WAL) Switch

func (l *WAL) Switch() ([]string, error)

func (*WAL) Write

func (l *WAL) Write(walRecord *walRecord) error

type WalRecordType

type WalRecordType byte

type WriteIntoStorageTransform added in v1.0.0

type WriteIntoStorageTransform struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.DownSampleStatePort
	// contains filtered or unexported fields
}

func (*WriteIntoStorageTransform) Close added in v1.0.0

func (r *WriteIntoStorageTransform) Close()

func (*WriteIntoStorageTransform) Create added in v1.0.0

func (*WriteIntoStorageTransform) EndFile added in v1.0.0

func (r *WriteIntoStorageTransform) EndFile() error

func (*WriteIntoStorageTransform) Explain added in v1.0.0

func (*WriteIntoStorageTransform) GetClosed added in v1.0.0

func (r *WriteIntoStorageTransform) GetClosed() chan struct{}

func (*WriteIntoStorageTransform) GetInputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetInputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputs() executor.Ports

func (*WriteIntoStorageTransform) GetOutputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetOutputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputs() executor.Ports

func (*WriteIntoStorageTransform) GetRowCount added in v1.0.0

func (r *WriteIntoStorageTransform) GetRowCount() int

func (*WriteIntoStorageTransform) InitFile added in v1.1.0

func (r *WriteIntoStorageTransform) InitFile(sRecord *executor.SeriesRecord) error

func (*WriteIntoStorageTransform) Name added in v1.0.0

func (*WriteIntoStorageTransform) Release added in v1.0.0

func (r *WriteIntoStorageTransform) Release() error

func (*WriteIntoStorageTransform) SetTaskId added in v1.0.0

func (r *WriteIntoStorageTransform) SetTaskId(taskID int)

func (*WriteIntoStorageTransform) Work added in v1.0.0

Directories

Path Synopsis
index
clv
ski
tsi
nolint
nolint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL