coordinator

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DDLRetryInternalSecond      = 1
	DDLTimeOutSecond            = 30
	DMLRetryInternalMillisecond = 200
	DMLTimeOutSecond            = 30
)
View Source
const (
	MaxConcurrencyInOnePt int = 8
)
View Source
const MaxShardKey = 64 * 1024
View Source
const NotInShardDuration = -1

Variables

View Source
var ErrDatabaseNameRequired = errors.New("database name required")

ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.

Functions

func BuildFieldCall added in v1.0.0

func BuildFieldCall(info *meta2.StreamInfo, srcSchema map[string]int32, destSchema map[string]int32) ([]*streamLib.FieldCall, error)

func CreateColumnStorePlan added in v1.1.0

func CreateColumnStorePlan(schema hybridqp.Catalog, eTraits []hybridqp.Trait, builder *executor.LogicalPlanBuilderImpl) (hybridqp.QueryNode, error)

func GetStreamCtx added in v1.0.0

func GetStreamCtx() *streamCtx

func GetTagLimit added in v1.1.0

func GetTagLimit() int

func IsKeepWritingErr added in v1.1.0

func IsKeepWritingErr(err error) bool

func IsRetriedError added in v1.0.0

func IsRetriedError(err error) (isSpecial bool)

func IsRetryErrorForPtView added in v1.0.0

func IsRetryErrorForPtView(err error) bool

IsRetryErrorForPtView returns true if dbpt is not on this node.

func MarshalWithMeasurements added in v1.1.0

func MarshalWithMeasurements(buf []byte, mst string, rec *record.Record) ([]byte, error)

func PutStreamCtx added in v1.0.0

func PutStreamCtx(s *streamCtx)

func SearchLowerBoundOfRec added in v1.1.0

func SearchLowerBoundOfRec(rec *record.Record, sg *meta2.ShardGroupInfo, start int) int

func SetTagLimit added in v1.1.0

func SetTagLimit(limit int)

func UnmarshalWithMeasurements added in v1.1.0

func UnmarshalWithMeasurements(buf []byte, rec *record.Record) (string, error)

Types

type AllWriter added in v1.1.0

type AllWriter struct {
	BaseWriter
}

func (*AllWriter) Write added in v1.1.0

func (w *AllWriter) Write(lineProtocol []byte)

type BaseWriter added in v1.1.0

type BaseWriter struct {
	// contains filtered or unexported fields
}

func NewBaseWriter added in v1.1.0

func NewBaseWriter(db, rp, name string, clients []Client, logger *logger.Logger) BaseWriter

func (*BaseWriter) Clients added in v1.1.0

func (w *BaseWriter) Clients() []Client

func (*BaseWriter) Name added in v1.1.0

func (w *BaseWriter) Name() string

func (*BaseWriter) Run added in v1.1.0

func (w *BaseWriter) Run()

func (*BaseWriter) Send added in v1.1.0

func (w *BaseWriter) Send(wr *WriteRequest)

func (*BaseWriter) Start added in v1.1.0

func (w *BaseWriter) Start(concurrency, buffersize int)

func (*BaseWriter) Stop added in v1.1.0

func (w *BaseWriter) Stop()

type Client added in v1.1.0

type Client interface {
	Send(db, rp string, lineProtocol []byte) error
	Destination() string
}

type ClusterShardMapper

type ClusterShardMapper struct {
	//Node   *meta.Node
	Logger *logger.Logger
	// Remote execution timeout
	Timeout time.Duration
	meta.MetaClient
	NetStore netstorage.Storage
}

ClusterShardMapper implements a ShardMapper for Remote shards.

func (*ClusterShardMapper) Close

func (csm *ClusterShardMapper) Close() error

func (*ClusterShardMapper) MapShards

type ClusterShardMapping

type ClusterShardMapping struct {
	ShardMapper *ClusterShardMapper
	NetStore    netstorage.Storage

	MetaClient meta.MetaClient

	// Remote execution timeout
	Timeout time.Duration

	ShardMap map[Source]map[uint32][]executor.ShardInfo // {source: {ptId: []ShardInfo}},

	// MinTime is the minimum time that this shard mapper will allow.
	// Any attempt to use a time before this one will automatically result in using
	// this time instead.
	MinTime time.Time

	// MaxTime is the maximum time that this shard mapper will allow.
	// Any attempt to use a time after this one will automatically result in using
	// this time instead.
	MaxTime time.Time

	Logger *logger.Logger
	// contains filtered or unexported fields
}

ClusterShardMapping maps data sources to a list of shard information.

func NewClusterShardMapping added in v1.1.0

func NewClusterShardMapping(csm *ClusterShardMapper, tmin, tmax time.Time) *ClusterShardMapping

func (*ClusterShardMapping) Close

func (csm *ClusterShardMapping) Close() error

Close clears out the list of mapped shards.

func (*ClusterShardMapping) CreateLogicalPlan

func (csm *ClusterShardMapping) CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) (hybridqp.QueryNode, error)

func (*ClusterShardMapping) FieldDimensions

func (csm *ClusterShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, schema *influxql.Schema, err error)

func (*ClusterShardMapping) GetETraits added in v1.1.0

func (csm *ClusterShardMapping) GetETraits(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) ([]hybridqp.Trait, error)

func (*ClusterShardMapping) GetSeriesKey added in v1.1.0

func (csm *ClusterShardMapping) GetSeriesKey() []byte

func (*ClusterShardMapping) GetShardAndSourcesMap added in v1.1.0

func (csm *ClusterShardMapping) GetShardAndSourcesMap(sources influxql.Sources) (map[uint64]map[uint32][]executor.ShardInfo, map[uint32]influxql.Sources, error)

func (*ClusterShardMapping) GetSources

func (csm *ClusterShardMapping) GetSources(sources influxql.Sources) influxql.Sources

there are multi source return when one source input because measurement regex

func (*ClusterShardMapping) LogicalPlanCost

func (*ClusterShardMapping) MapType

func (*ClusterShardMapping) MapTypeBatch

func (csm *ClusterShardMapping) MapTypeBatch(m *influxql.Measurement, fields map[string]*influxql.FieldNameSpace, schema *influxql.Schema) error

func (*ClusterShardMapping) NodeNumbers

func (csm *ClusterShardMapping) NodeNumbers() int

func (*ClusterShardMapping) RemoteQueryETraitsAndSrc added in v1.1.0

func (csm *ClusterShardMapping) RemoteQueryETraitsAndSrc(ctx context.Context, opts *query.ProcessorOptions, schema hybridqp.Catalog,
	shardsMapByNode map[uint64]map[uint32][]executor.ShardInfo, sourcesMapByPtId map[uint32]influxql.Sources) ([]hybridqp.Trait, error)

type ComMetaClient added in v1.1.0

type ComMetaClient interface {
	Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config.EngineType,
		colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error)
	CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta2.ShardGroupInfo, error)
}

type HTTPClient added in v1.1.0

type HTTPClient struct {
	// contains filtered or unexported fields
}

func NewHTTPClient added in v1.1.0

func NewHTTPClient(url *url.URL, timeout time.Duration) *HTTPClient

func NewHTTPSClient added in v1.1.0

func NewHTTPSClient(url *url.URL, timeout time.Duration, skipVerify bool, certs string) (*HTTPClient, error)

func (*HTTPClient) Destination added in v1.1.0

func (c *HTTPClient) Destination() string

func (*HTTPClient) Send added in v1.1.0

func (c *HTTPClient) Send(db, rp string, lineProtocol []byte) error

type IMetaExecutor

type IMetaExecutor interface {
	SetTimeOut(timeout time.Duration)
	EachDBNodes(database string, fn func(nodeID uint64, pts []uint32) error) error
	Close() error
}

type LocalStore added in v1.2.0

type LocalStore struct {
	// contains filtered or unexported fields
}

func NewLocalStore added in v1.2.0

func NewLocalStore(store Storage) *LocalStore

func (*LocalStore) WriteRows added in v1.2.0

func (s *LocalStore) WriteRows(ctx *netstorage.WriteContext, _ uint64, _ uint32, database, rp string, _ time.Duration) error

type MetaClient added in v1.1.0

type MetaClient interface {
	Databases() map[string]*meta.DatabaseInfo
	Database(string) (*meta.DatabaseInfo, error)
	GetMaxSubscriptionID() uint64
	WaitForDataChanged() chan struct{}
}

type MetaExecutor

type MetaExecutor struct {
	Logger     *logger.Logger
	MetaClient meta.MetaClient
	// contains filtered or unexported fields
}

MetaExecutor executes meta queries on all data nodes.

func NewMetaExecutor

func NewMetaExecutor() *MetaExecutor

NewMetaExecutor returns a new initialized *MetaExecutor.

func (*MetaExecutor) Close

func (m *MetaExecutor) Close() error

func (*MetaExecutor) EachDBNodes

func (m *MetaExecutor) EachDBNodes(database string, fn func(nodeID uint64, pts []uint32) error) error

func (*MetaExecutor) SetTimeOut

func (m *MetaExecutor) SetTimeOut(timeout time.Duration)

type PWMetaClient added in v1.0.0

type PWMetaClient interface {
	Database(name string) (di *meta2.DatabaseInfo, err error)
	RetentionPolicy(database, policy string) (*meta2.RetentionPolicyInfo, error)
	CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta2.ShardGroupInfo, error)
	DBPtView(database string) (meta2.DBPtInfos, error)
	Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto2.FieldSchema) error
	CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config.EngineType,
		colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error)
	GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int
	GetStreamInfos() map[string]*meta2.StreamInfo
	GetDstStreamInfos(db, rp string, dstSis *[]*meta2.StreamInfo) bool
	DBRepGroups(database string) []meta2.ReplicaGroup
	GetReplicaN(database string) (int, error)
}

type PointsWriter

type PointsWriter struct {
	MetaClient PWMetaClient

	TSDBStore TSDBStore
	// contains filtered or unexported fields
}

PointsWriter handles writes across multiple local and remote data nodes.

func NewPointsWriter

func NewPointsWriter(timeout time.Duration) *PointsWriter

NewPointsWriter returns a new instance of PointsWriter for a node.

func (*PointsWriter) ApplyTimeRangeLimit added in v1.0.0

func (w *PointsWriter) ApplyTimeRangeLimit(limit []toml.Duration)

func (*PointsWriter) Close added in v1.0.0

func (w *PointsWriter) Close()

func (*PointsWriter) MapRowToMeasurement added in v1.0.0

func (w *PointsWriter) MapRowToMeasurement(ctx *injestionCtx, id uint64, mst string, r *influx.Row) error

func (*PointsWriter) RetryWritePointRows added in v1.0.0

func (w *PointsWriter) RetryWritePointRows(database, retentionPolicy string, rows []influx.Row) error

RetryWritePointRows make sure sql client got the latest metadata.

func (*PointsWriter) SetStore added in v1.2.0

func (w *PointsWriter) SetStore(store Storage)

type RWMetaClient added in v1.1.0

type RWMetaClient interface {
	Database(name string) (di *meta.DatabaseInfo, err error)
	RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
	CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta.ShardGroupInfo, error)
	DBPtView(database string) (meta.DBPtInfos, error)
	Measurement(database string, rpName string, mstName string) (*meta.MeasurementInfo, error)
	UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto.FieldSchema) error
	CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config.EngineType,
		colStoreInfo *meta.ColStoreInfo, schemaInfo []*proto.FieldSchema, options *meta.Options) (*meta.MeasurementInfo, error)
	GetShardInfoByTime(database, retentionPolicy string, t time.Time, ptIdx int, nodeId uint64, engineType config.EngineType) (*meta.ShardInfo, error)
}

type RecMsg added in v1.1.0

type RecMsg struct {
	Database        string
	RetentionPolicy string
	Measurement     string
	Rec             interface{}
}

RecMsg data structure of the message of the record.

type RecordWriter added in v1.1.0

type RecordWriter struct {
	MetaClient RWMetaClient

	StorageEngine interface {
		WriteRec(db, rp, mst string, ptId uint32, shardID uint64, rec *record.Record, binaryRec []byte) error
	}
	// contains filtered or unexported fields
}

RecordWriter handles writes the local data node.

func NewRecordWriter added in v1.1.0

func NewRecordWriter(timeout time.Duration, ptNum, recMsgChFactor int) *RecordWriter

func (*RecordWriter) Close added in v1.1.0

func (w *RecordWriter) Close() error

func (*RecordWriter) Open added in v1.1.0

func (w *RecordWriter) Open() error

func (*RecordWriter) RetryWriteLogRecord added in v1.2.0

func (w *RecordWriter) RetryWriteLogRecord(database, retentionPolicy, measurement string, rec *record.Record) error

func (*RecordWriter) RetryWriteRecord added in v1.1.0

func (w *RecordWriter) RetryWriteRecord(database, retentionPolicy, measurement string, rec array.Record) error

type RoundRobinWriter added in v1.1.0

type RoundRobinWriter struct {
	BaseWriter
	// contains filtered or unexported fields
}

func (*RoundRobinWriter) Write added in v1.1.0

func (w *RoundRobinWriter) Write(lineProtocol []byte)

type ShardRow added in v1.1.0

type ShardRow struct {
	// contains filtered or unexported fields
}

type ShardRows added in v1.1.0

type ShardRows []ShardRow

func (ShardRows) Len added in v1.1.0

func (srs ShardRows) Len() int

func (ShardRows) Less added in v1.1.0

func (srs ShardRows) Less(i, j int) bool

func (ShardRows) Swap added in v1.1.0

func (srs ShardRows) Swap(i, j int)

type ShowTagKeysExecutor added in v1.2.0

type ShowTagKeysExecutor struct {
	// contains filtered or unexported fields
}

func NewShowTagKeysExecutor added in v1.2.0

func NewShowTagKeysExecutor(logger *logger.Logger, mc meta.MetaClient, me IMetaExecutor, store netstorage.Storage) *ShowTagKeysExecutor

func (*ShowTagKeysExecutor) Execute added in v1.2.0

type ShowTagValuesExecutor

type ShowTagValuesExecutor struct {
	// contains filtered or unexported fields
}

func NewShowTagValuesExecutor

func NewShowTagValuesExecutor(logger *logger.Logger, mc meta.MetaClient, me IMetaExecutor, store netstorage.Storage) *ShowTagValuesExecutor

func (*ShowTagValuesExecutor) Cardinality

func (e *ShowTagValuesExecutor) Cardinality(dimensions influxql.Dimensions)

func (*ShowTagValuesExecutor) Execute

type Source

type Source struct {
	Database        string
	RetentionPolicy string
}

Source contains the database and retention policy source for data.

type Storage added in v1.2.0

type Storage interface {
	WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte) error
}

type Stream added in v1.0.0

type Stream struct {
	TSDBStore TSDBStore

	MetaClient PWMetaClient
	// contains filtered or unexported fields
}

func NewStream added in v1.0.0

func NewStream(tsdbStore TSDBStore, metaClient PWMetaClient, logger *logger.Logger, timeout time.Duration) *Stream

func (*Stream) GenerateGroupKey added in v1.0.0

func (s *Stream) GenerateGroupKey(ctx *streamCtx, keys []string, value *influx.Row) string

type SubscriberManager added in v1.1.0

type SubscriberManager struct {
	Logger *logger.Logger
	// contains filtered or unexported fields
}

func NewSubscriberManager added in v1.1.0

func NewSubscriberManager(c config.Subscriber, m MetaClient, l *logger.Logger) *SubscriberManager

func (*SubscriberManager) InitWriters added in v1.1.0

func (s *SubscriberManager) InitWriters()

func (*SubscriberManager) NewSubscriberWriter added in v1.1.0

func (s *SubscriberManager) NewSubscriberWriter(db, rp, name, mode string, destinations []string) (SubscriberWriter, error)

func (*SubscriberManager) Send added in v1.1.0

func (s *SubscriberManager) Send(db, rp string, lineProtocol []byte)

func (*SubscriberManager) StopAllWriters added in v1.1.0

func (s *SubscriberManager) StopAllWriters()

func (*SubscriberManager) Update added in v1.1.0

func (s *SubscriberManager) Update()

func (*SubscriberManager) UpdateWriters added in v1.1.0

func (s *SubscriberManager) UpdateWriters()

func (*SubscriberManager) WalkDatabases added in v1.1.0

func (s *SubscriberManager) WalkDatabases(fn func(db *meta.DatabaseInfo))

type SubscriberWriter added in v1.1.0

type SubscriberWriter interface {
	Write(lineProtocol []byte)
	Name() string
	Run()
	Start(concurrency, buffersize int)
	Stop()
	Clients() []Client
}

type TSDBStore added in v1.0.0

type TSDBStore interface {
	WriteRows(ctx *netstorage.WriteContext, nodeID uint64, pt uint32, database, rp string, timeout time.Duration) error
}

type TagValuesSlice

type TagValuesSlice []netstorage.TableTagSets

func (TagValuesSlice) Len

func (a TagValuesSlice) Len() int

func (TagValuesSlice) Less

func (a TagValuesSlice) Less(i, j int) bool

func (TagValuesSlice) Swap

func (a TagValuesSlice) Swap(i, j int)

type WriteRequest added in v1.1.0

type WriteRequest struct {
	Client       int
	LineProtocol []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL