query

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0, MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueryDurationKey qCtxKey = iota

	QueryIDKey

	IndexScanDagStartTimeKey
)
View Source
const (
	// DefaultQueryTimeout is the default timeout for executing a query.
	// A value of zero will have no query timeout.
	DefaultQueryTimeout = time.Duration(0)
)
View Source
const (
	// PanicCrashEnv is the environment variable that, when set, will prevent
	// the handler from recovering any panics.
	PanicCrashEnv = "INFLUXDB_PANIC_CRASH"
)

Statistics for the Executor

Variables

View Source
var (
	// ErrInvalidQuery is returned when executing an unknown query type.
	ErrInvalidQuery = errors.New("invalid query")

	// ErrNotExecuted is returned when a statement is not executed in a query.
	// This can occur when a previous statement in the same query has errored.
	ErrNotExecuted = errors.New("not executed")

	// ErrQueryAborted is an error returned when the query is aborted.
	ErrQueryAborted = errors.New("query aborted")

	// ErrQueryEngineShutdown is an error sent when the query cannot be
	// created because the query engine was shutdown.
	ErrQueryEngineShutdown = errors.New("query engine shutdown")

	// ErrQueryTimeoutLimitExceeded is an error when a query hits the max time allowed to run.
	ErrQueryTimeoutLimitExceeded = errors.New("query-timeout limit exceeded")

	// ErrAlreadyKilled is returned when attempting to kill a query that has already been killed.
	ErrAlreadyKilled = errors.New("already killed")
)
View Source
var OpenAuthorizer = openAuthorizer{}

OpenAuthorizer can be shared by all goroutines.

View Source
var OpenCoarseAuthorizer openCoarseAuthorizer

OpenCoarseAuthorizer is a fully permissive implementation of CoarseAuthorizer.

View Source
var TimeFilterProtection bool

Functions

func ContainDim

func ContainDim(des []string, src string) bool

func DecodeQuerySchema

func DecodeQuerySchema(pb *internal.QuerySchema, opt hybridqp.Options) (hybridqp.Catalog, error)

func EncodeQuerySchema

func EncodeQuerySchema(schema hybridqp.Catalog) *internal.QuerySchema

func ErrDatabaseNotFound

func ErrDatabaseNotFound(name string) error

ErrDatabaseNotFound returns a database not found error for the given database name.

func ErrMaxConcurrentQueriesLimitExceeded

func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error

ErrMaxConcurrentQueriesLimitExceeded is an error when a query cannot be run because the maximum number of queries has been reached.

func ErrMaxSelectPointsLimitExceeded

func ErrMaxSelectPointsLimitExceeded(n, limit int) error

ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points.

func RegistryAggregateFunction

func RegistryAggregateFunction(name string, function AggregateFunc) bool

func RegistryMaterializeFunction

func RegistryMaterializeFunction(name string, function MaterializeFunc) bool

func RegistryStmtBuilderCreator

func RegistryStmtBuilderCreator(creator StmtBuilderCreator) bool

func RewriteStatement

func RewriteStatement(stmt influxql.Statement) (influxql.Statement, error)

RewriteStatement rewrites stmt into a new statement, if applicable.

func SetBatchqueryLimit

func SetBatchqueryLimit(concurrence int)

func SubStrThreeParaFunc

func SubStrThreeParaFunc(srcStr string, start, subStrLen int64) string

func SubStrTwoParaFunc

func SubStrTwoParaFunc(srcStr string, start int64) string

Types

type AbsentFunc

type AbsentFunc struct {
	BaseInfo
	BaseAgg
}

func (*AbsentFunc) CallTypeFunc

func (f *AbsentFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*AbsentFunc) CompileFunc

func (f *AbsentFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type AggregateFunc

type AggregateFunc interface {
	BaseFunc
	CompileFunc(expr *influxql.Call, c *compiledField) error
	CanPushDown() bool
	SortedMergeCall() bool
	MergeCall() bool
	CanPushDownSeries() bool
	OptimizeAgg() bool
}

func GetAggregateOperator

func GetAggregateOperator(name string) AggregateFunc

type BaseAgg

type BaseAgg struct {
	// contains filtered or unexported fields
}

func (*BaseAgg) CanPushDown

func (b *BaseAgg) CanPushDown() bool

func (*BaseAgg) CanPushDownSeries

func (b *BaseAgg) CanPushDownSeries() bool

func (*BaseAgg) MergeCall

func (b *BaseAgg) MergeCall() bool

func (*BaseAgg) OptimizeAgg

func (b *BaseAgg) OptimizeAgg() bool

func (*BaseAgg) SortedMergeCall

func (b *BaseAgg) SortedMergeCall() bool

type BaseFunc

type BaseFunc interface {
	CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)
	GetFuncType() FuncType
}

type BaseInfo

type BaseInfo struct {
	FuncType FuncType
}

func (*BaseInfo) GetFuncType

func (b *BaseInfo) GetFuncType() FuncType

type BottomFunc

type BottomFunc struct {
	BaseInfo
	BaseAgg
}

func (*BottomFunc) CallTypeFunc

func (f *BottomFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*BottomFunc) CompileFunc

func (f *BottomFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type CoarseAuthorizer

type CoarseAuthorizer interface {
	// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
	AuthorizeDatabase(p originql.Privilege, name string) bool
}

CoarseAuthorizer determines if certain operations are authorized at the database level.

It is supported both in OSS and Enterprise.

type CompileOptions

type CompileOptions struct {
	Now time.Time
}

CompileOptions are the customization options for the compiler.

type CountFunc

type CountFunc struct {
	BaseInfo
	BaseAgg
}

func (*CountFunc) CallTypeFunc

func (f *CountFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*CountFunc) CompileFunc

func (f *CountFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type CumulativeSumFunc

type CumulativeSumFunc struct {
	BaseInfo
	BaseAgg
}

func (*CumulativeSumFunc) CallTypeFunc

func (f *CumulativeSumFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*CumulativeSumFunc) CompileFunc

func (f *CumulativeSumFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type DerivativeFunc

type DerivativeFunc struct {
	BaseInfo
	BaseAgg
}

func (*DerivativeFunc) CallTypeFunc

func (f *DerivativeFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*DerivativeFunc) CompileFunc

func (f *DerivativeFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type DifferenceFunc

type DifferenceFunc struct {
	BaseInfo
	BaseAgg
}

func (*DifferenceFunc) CallTypeFunc

func (f *DifferenceFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*DifferenceFunc) CompileFunc

func (f *DifferenceFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type DistinctFunc

type DistinctFunc struct {
	BaseInfo
	BaseAgg
}

func (*DistinctFunc) CallTypeFunc

func (f *DistinctFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*DistinctFunc) CompileFunc

func (f *DistinctFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type ElapsedFunc

type ElapsedFunc struct {
	BaseInfo
	BaseAgg
}

func (*ElapsedFunc) CallTypeFunc

func (f *ElapsedFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*ElapsedFunc) CompileFunc

func (f *ElapsedFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type ExecutionContext

type ExecutionContext struct {
	context.Context

	// The query ID of the executing query.
	QueryID uint64

	// Output channel where results and errors should be sent.
	Results chan *query.Result

	// Options used to start this query.
	ExecutionOptions

	// point writer which belong to the query, it is used for INTO statement
	PointsWriter interface {
		RetryWritePointRows(database, retentionPolicy string, points []influx.Row) error
	}
	// contains filtered or unexported fields
}

ExecutionContext contains state that the query is currently executing with.

func (*ExecutionContext) Done

func (ctx *ExecutionContext) Done() <-chan struct{}

func (*ExecutionContext) Err

func (ctx *ExecutionContext) Err() error

func (*ExecutionContext) Send

func (ctx *ExecutionContext) Send(result *query.Result, seq int) error

Send sends a Result to the Results channel and will exit if the query has been interrupted or aborted.

func (*ExecutionContext) Value

func (ctx *ExecutionContext) Value(key interface{}) interface{}

type ExecutionOptions

type ExecutionOptions struct {
	// The database the query is running against.
	Database string

	// The retention policy the query is running against.
	RetentionPolicy string

	// Authorizer handles series-level authorization
	Authorizer FineAuthorizer

	// CoarseAuthorizer handles database-level authorization
	CoarseAuthorizer CoarseAuthorizer

	// Node to execute on.
	NodeID uint64

	// The requested maximum number of points to return in each result.
	ChunkSize int

	// If this query return chunk once by once
	Chunked bool

	// If this query is being executed in a read-only context.
	ReadOnly bool

	QueryLimitEn bool

	// Quiet suppresses non-essential output from the query executor.
	Quiet bool

	// AbortCh is a channel that signals when results are no longer desired by the caller.
	AbortCh <-chan struct{}

	// The ChunkImpl maximum number of points to contain. Developers are advised to change only.
	InnerChunkSize int

	// The results of the query executor
	RowsChan chan RowsChan

	ParallelQuery bool

	// IncQuery indicates whether the query is a incremental query.
	IncQuery bool

	// QueryID indicates the representation of the query.
	QueryID string

	// IterID indicates the number of iteration in incremental query, starting from 0.
	IterID int32
}

ExecutionOptions contains the options for executing a query.

func NewExecutionOptions

func NewExecutionOptions(db, rp string, nodeID uint64, chunkSize, innerChunkSize int, chunked, readOnly, quiet, parallelQuery bool) *ExecutionOptions

type Executor

type Executor struct {
	// Used for executing a statement in the query.
	StatementExecutor StatementExecutor

	// Used for tracking running queries.
	TaskManager *TaskManager

	// writer is used for INTO statement
	PointsWriter interface {
		RetryWritePointRows(database, retentionPolicy string, points []influx.Row) error
	}

	// Logger to use for all logging.
	// Defaults to discarding all log output.
	Logger *logger.Logger
}

Executor executes every statement in an Query.

func NewExecutor

func NewExecutor(concurrence int) *Executor

NewExecutor returns a new instance of Executor.

func (*Executor) Close

func (e *Executor) Close() error

Close kills all running queries and prevents new queries from being attached.

func (*Executor) ExecuteQuery

func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}, qDuration *statistics.SQLSlowQueryStatistics) <-chan *query2.Result

ExecuteQuery executes each statement within a query.

func (*Executor) WithLogger

func (e *Executor) WithLogger(log *logger.Logger)

SetLogOutput sets the writer to which all logs are written. It must not be called after Open is called.

type FieldMapper

type FieldMapper struct {
	influxql.FieldMapper
}

FieldMapper is a FieldMapper that wraps another FieldMapper and exposes the functions implemented by the query engine.

func (FieldMapper) CallType

func (m FieldMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error)

type FineAuthorizer

type FineAuthorizer interface {
	// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
	AuthorizeDatabase(p originql.Privilege, name string) bool

	// AuthorizeSeriesRead determines if a series is authorized for reading
	AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool

	// AuthorizeSeriesWrite determines if a series is authorized for writing
	AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool
}

FineAuthorizer determines if certain operations are authorized at the series level.

It is only supported in InfluxDB Enterprise. In OSS it always returns true.

type FirstFunc

type FirstFunc struct {
	BaseInfo
	BaseAgg
}

func (*FirstFunc) CallTypeFunc

func (f *FirstFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*FirstFunc) CompileFunc

func (f *FirstFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type FuncType

type FuncType int
const (
	STRING FuncType = iota
	MATH
	AGG_NORMAL  // Traverse the data in the current time window to complete the calculation.The traversal process does not depend on other data in the time window.eg: max,min
	AGG_SLICE   // All data in the time window needs to be cached during calculation. eg: median,percentile
	AGG_HEAP    // Same as AGG_SLICE but requires heap sorting. eg: top,bottom
	AGG_TRANS   // Calculation depends on part data in the time window. eg: derivative
	AGG_SPECIAL // Special categories,Need to implement custom iterator.
)

functions/operator type The agg operator is classified based on the implementation process.

type FunctionFactory

type FunctionFactory struct {
	// contains filtered or unexported fields
}

func GetFunctionFactoryInstance

func GetFunctionFactoryInstance() *FunctionFactory

func NewFunctionFactory

func NewFunctionFactory() *FunctionFactory

func (*FunctionFactory) AddAggFunc

func (r *FunctionFactory) AddAggFunc(name string, function AggregateFunc)

func (*FunctionFactory) AddMaterFunc

func (r *FunctionFactory) AddMaterFunc(name string, function MaterializeFunc)

func (*FunctionFactory) FindAggFunc

func (r *FunctionFactory) FindAggFunc(name string) (AggregateFunc, bool)

func (*FunctionFactory) FindMaterFunc

func (r *FunctionFactory) FindMaterFunc(name string) (MaterializeFunc, bool)

func (*FunctionFactory) GetAggregateOp

func (r *FunctionFactory) GetAggregateOp() map[string]AggregateFunc

type FunctionTypeMapper

type FunctionTypeMapper struct {
}

FunctionTypeMapper handles the type mapping for all functions implemented by the query engine.

func (FunctionTypeMapper) CallType

func (m FunctionTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error)

func (FunctionTypeMapper) MapType

func (FunctionTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType

func (FunctionTypeMapper) MapTypeBatch

func (FunctionTypeMapper) MapTypeBatch(measurement *influxql.Measurement, field map[string]*influxql.FieldNameSpace, schema *influxql.Schema) error

type HistogramFunc

type HistogramFunc struct {
	BaseInfo
	BaseAgg
}

func (*HistogramFunc) CallTypeFunc

func (f *HistogramFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*HistogramFunc) CompileFunc

func (f *HistogramFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type IRateFunc

type IRateFunc struct {
	BaseInfo
	BaseAgg
}

func (*IRateFunc) CallTypeFunc

func (f *IRateFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*IRateFunc) CompileFunc

func (f *IRateFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type IntegralFunc

type IntegralFunc struct {
	BaseInfo
	BaseAgg
}

func (*IntegralFunc) CallTypeFunc

func (f *IntegralFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*IntegralFunc) CompileFunc

func (f *IntegralFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type LastFunc

type LastFunc struct {
	BaseInfo
	BaseAgg
}

func (*LastFunc) CallTypeFunc

func (f *LastFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*LastFunc) CompileFunc

func (f *LastFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type LogicalPlanCreator

type LogicalPlanCreator interface {
	// Creates a simple iterator for use in an InfluxQL Logical.
	CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) (hybridqp.QueryNode, error)

	// Determines the potential cost for creating an iterator.
	LogicalPlanCost(source *influxql.Measurement, opt ProcessorOptions) (hybridqp.LogicalPlanCost, error)

	GetSources(sources influxql.Sources) influxql.Sources
	GetETraits(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) ([]hybridqp.Trait, error)
	GetSeriesKey() []byte
}

type MaterializeFunc

type MaterializeFunc interface {
	BaseFunc
	CompileFunc(expr *influxql.Call, c *compiledField) error
	CallFunc(name string, args []interface{}) (interface{}, bool)
}

func GetMathFunction

func GetMathFunction(name string) MaterializeFunc

func GetStringFunction

func GetStringFunction(name string) MaterializeFunc

type MathTypeMapper

type MathTypeMapper struct{}

func (MathTypeMapper) CallType

func (MathTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error)

func (MathTypeMapper) MapType

func (MathTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType

func (MathTypeMapper) MapTypeBatch

func (MathTypeMapper) MapTypeBatch(measurement *influxql.Measurement, field map[string]*influxql.FieldNameSpace, schema *influxql.Schema) error

type MathValuer

type MathValuer struct{}

func (MathValuer) Call

func (v MathValuer) Call(name string, args []interface{}) (interface{}, bool)

func (MathValuer) SetValuer

func (MathValuer) SetValuer(v influxql.Valuer, index int)

func (MathValuer) Value

func (MathValuer) Value(key string) (interface{}, bool)

type MaxFunc

type MaxFunc struct {
	BaseInfo
	BaseAgg
}

func (*MaxFunc) CallTypeFunc

func (f *MaxFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*MaxFunc) CompileFunc

func (f *MaxFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type MeanFunc

type MeanFunc struct {
	BaseInfo
	BaseAgg
}

func (*MeanFunc) CallTypeFunc

func (f *MeanFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*MeanFunc) CompileFunc

func (f *MeanFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type MedianFunc

type MedianFunc struct {
	BaseInfo
	BaseAgg
}

func (*MedianFunc) CallTypeFunc

func (f *MedianFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*MedianFunc) CompileFunc

func (f *MedianFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type MinFunc

type MinFunc struct {
	BaseInfo
	BaseAgg
}

func (*MinFunc) CallTypeFunc

func (f *MinFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*MinFunc) CompileFunc

func (f *MinFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type ModeFunc

type ModeFunc struct {
	BaseInfo
	BaseAgg
}

func (*ModeFunc) CallTypeFunc

func (f *ModeFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*ModeFunc) CompileFunc

func (f *ModeFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type Monitor

type Monitor interface {
	// Monitor starts a new goroutine that will monitor a query. The function
	// will be passed in a channel to signal when the query has been finished
	// normally. If the function returns with an error and the query is still
	// running, the query will be terminated.
	Monitor(fn MonitorFunc)
}

Monitor monitors the status of a query and returns whether the query should be aborted with an error.

func MonitorFromContext

func MonitorFromContext(ctx context.Context) Monitor

MonitorFromContext returns a Monitor embedded within the Context if one exists.

type MonitorFunc

type MonitorFunc func(<-chan struct{}) error

MonitorFunc is a function that will be called to check if a query is currently healthy. If the query needs to be interrupted for some reason, the error should be returned by this function.

type MovingAverageFunc

type MovingAverageFunc struct {
	BaseInfo
	BaseAgg
}

func (*MovingAverageFunc) CallTypeFunc

func (f *MovingAverageFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*MovingAverageFunc) CompileFunc

func (f *MovingAverageFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type NonNegativeDerivativeFunc

type NonNegativeDerivativeFunc struct {
	BaseInfo
	BaseAgg
}

func (*NonNegativeDerivativeFunc) CallTypeFunc

func (f *NonNegativeDerivativeFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*NonNegativeDerivativeFunc) CompileFunc

func (f *NonNegativeDerivativeFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type NonNegativeDifferenceFunc

type NonNegativeDifferenceFunc struct {
	BaseInfo
	BaseAgg
}

func (*NonNegativeDifferenceFunc) CallTypeFunc

func (f *NonNegativeDifferenceFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*NonNegativeDifferenceFunc) CompileFunc

func (f *NonNegativeDifferenceFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type PercentileApproxFunc

type PercentileApproxFunc struct {
	BaseInfo
	BaseAgg
}

func (*PercentileApproxFunc) CallTypeFunc

func (f *PercentileApproxFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*PercentileApproxFunc) CompileFunc

func (f *PercentileApproxFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type PercentileFunc

type PercentileFunc struct {
	BaseInfo
	BaseAgg
}

func (*PercentileFunc) CallTypeFunc

func (f *PercentileFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*PercentileFunc) CompileFunc

func (f *PercentileFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type PercentileOGSketchFunc

type PercentileOGSketchFunc struct {
	BaseInfo
	BaseAgg
}

func (*PercentileOGSketchFunc) CallTypeFunc

func (f *PercentileOGSketchFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*PercentileOGSketchFunc) CompileFunc

func (f *PercentileOGSketchFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type PreparedStatement

type PreparedStatement interface {
	BuildLogicalPlan(ctx context.Context) (hybridqp.QueryNode, hybridqp.Trait, error)
	Select(ctx context.Context) (hybridqp.Executor, error)

	ChangeCreator(hybridqp.ExecutorBuilderCreator)
	ChangeOptimizer(hybridqp.ExecutorBuilderOptimizer)
	Statement() *influxql.SelectStatement

	// Explain outputs the explain plan for this statement.
	Explain() (string, error)

	// Close closes the resources associated with this prepared statement.
	// This must be called as the mapped shards may hold open resources such
	// as network connections.
	Close() error
}

PreparedStatement is a prepared statement that is ready to be executed.

func NewPreparedStatement

func NewPreparedStatement(stmt *influxql.SelectStatement, opt hybridqp.Options,
	shards interface {
		LogicalPlanCreator
		io.Closer
	}, columns []string, MaxPointN int, now time.Time) PreparedStatement

func Prepare

func Prepare(stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error)

Prepare will compile the statement with the default compile options and then prepare the query.

type ProcessorOptions

type ProcessorOptions struct {
	Name string
	Expr influxql.Expr
	// Expression to iterate for.
	// This can be VarRef or a Call.
	Exprs []influxql.Expr

	// Auxiliary tags or values to also retrieve for the point.
	Aux []influxql.VarRef

	FieldAux []influxql.VarRef
	TagAux   []influxql.VarRef

	// Data sources from which to receive data. This is only used for encoding
	// measurements over RPC and is no longer used in the open source version.
	Sources []influxql.Source

	// Group by interval and tags.
	Interval   hybridqp.Interval
	Dimensions []string            // The final dimensions of the query (stays the same even in subqueries).
	GroupBy    map[string]struct{} // Dimensions to group points by in intermediate iterators.
	Location   *time.Location

	// Fill options.
	Fill      influxql.FillOption
	FillValue interface{}

	// Condition to filter by.
	Condition influxql.Expr

	// Time range for the iterator.
	StartTime int64
	EndTime   int64

	// Limits the number of points per series.
	Limit, Offset int

	// Limits the number of series.
	SLimit, SOffset int

	// Sorted in time ascending order if true.
	Ascending bool

	// Removes the measurement name. Useful for meta queries.
	StripName bool

	// Removes duplicate rows from raw queries.
	Dedupe bool

	// Determines if this is a query for raw data or an aggregate/selector.
	Ordered bool

	Parallel bool

	// Limits on the creation of iterators.
	MaxSeriesN int

	// If this channel is set and is closed, the iterator should try to exit
	// and close as soon as possible.
	InterruptCh <-chan struct{}

	// Authorizer can limit access to data
	Authorizer FineAuthorizer

	// The requested maximum number of points to return in each result.
	ChunkedSize int

	// If this query return chunk once by once
	Chunked bool

	ChunkSize int

	MaxParallel int
	AbortChan   <-chan struct{}
	RowsChan    chan RowsChan
	Query       string

	EnableBinaryTreeMerge int64

	QueryId uint64

	// hint supported (need to marshal)
	HintType hybridqp.HintType

	// SeriesKey is assigned only the query is single time series, and it's used in the index.
	SeriesKey []byte

	GroupByAllDims bool

	SortFields influxql.SortFields

	HasFieldWildcard bool
	// useful for topQuery/subQuery, modify by out-query opt.stmtId in newSubOpt
	StmtId int

	LogQueryCurrId string

	// IncQuery indicates whether the query is a incremental query.
	IncQuery bool

	// IterID indicates the number of iteration in incremental query, starting from 0.
	IterID int32
	// contains filtered or unexported fields
}

ProcessorOptions is an object passed to CreateIterator to specify creation options.

func NewProcessorOptionsStmt

func NewProcessorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) (opt ProcessorOptions, err error)

NewProcessorOptionsStmt creates the iterator options from stmt.

func (*ProcessorOptions) ChunkSizeNum

func (opt *ProcessorOptions) ChunkSizeNum() int

func (*ProcessorOptions) Clone

func (opt *ProcessorOptions) Clone() *ProcessorOptions

func (*ProcessorOptions) DerivativeInterval

func (opt *ProcessorOptions) DerivativeInterval() hybridqp.Interval

DerivativeInterval returns the time interval for the derivative function.

func (*ProcessorOptions) ElapsedInterval

func (opt *ProcessorOptions) ElapsedInterval() hybridqp.Interval

ElapsedInterval returns the time interval for the elapsed function.

func (*ProcessorOptions) FieldWildcard

func (opt *ProcessorOptions) FieldWildcard() bool

func (*ProcessorOptions) GetCondition

func (opt *ProcessorOptions) GetCondition() influxql.Expr

func (*ProcessorOptions) GetDimensions

func (opt *ProcessorOptions) GetDimensions() []string

GetDimensions retrieves the dimensions for this query.

func (*ProcessorOptions) GetEndTime

func (opt *ProcessorOptions) GetEndTime() int64

func (*ProcessorOptions) GetGroupBy

func (opt *ProcessorOptions) GetGroupBy() map[string]struct{}

func (*ProcessorOptions) GetHintType

func (opt *ProcessorOptions) GetHintType() hybridqp.HintType

func (*ProcessorOptions) GetInterval

func (opt *ProcessorOptions) GetInterval() time.Duration

func (*ProcessorOptions) GetIterId

func (opt *ProcessorOptions) GetIterId() int32

func (*ProcessorOptions) GetLimit

func (opt *ProcessorOptions) GetLimit() int

func (*ProcessorOptions) GetLocation

func (opt *ProcessorOptions) GetLocation() *time.Location

func (*ProcessorOptions) GetLogQueryCurrId

func (opt *ProcessorOptions) GetLogQueryCurrId() string

func (*ProcessorOptions) GetMaxParallel

func (opt *ProcessorOptions) GetMaxParallel() int

func (*ProcessorOptions) GetMeasurements

func (opt *ProcessorOptions) GetMeasurements() []*influxql.Measurement

func (*ProcessorOptions) GetOffset

func (opt *ProcessorOptions) GetOffset() int

func (*ProcessorOptions) GetOptDimension

func (opt *ProcessorOptions) GetOptDimension() []string

func (*ProcessorOptions) GetSortFields

func (opt *ProcessorOptions) GetSortFields() influxql.SortFields

func (*ProcessorOptions) GetSourcesNames

func (opt *ProcessorOptions) GetSourcesNames() []string

func (*ProcessorOptions) GetStartTime

func (opt *ProcessorOptions) GetStartTime() int64

func (*ProcessorOptions) GetStmtId

func (opt *ProcessorOptions) GetStmtId() int

func (*ProcessorOptions) HasInterval

func (opt *ProcessorOptions) HasInterval() bool

func (*ProcessorOptions) HaveLocalMst

func (opt *ProcessorOptions) HaveLocalMst() bool

func (*ProcessorOptions) HaveOnlyCSStore

func (opt *ProcessorOptions) HaveOnlyCSStore() bool

func (*ProcessorOptions) ISChunked

func (opt *ProcessorOptions) ISChunked() bool

func (*ProcessorOptions) IntegralInterval

func (opt *ProcessorOptions) IntegralInterval() hybridqp.Interval

IntegralInterval returns the time interval for the integral function.

func (*ProcessorOptions) IsAscending

func (opt *ProcessorOptions) IsAscending() bool

func (*ProcessorOptions) IsGroupByAllDims

func (opt *ProcessorOptions) IsGroupByAllDims() bool

func (*ProcessorOptions) IsIncQuery

func (opt *ProcessorOptions) IsIncQuery() bool

func (*ProcessorOptions) IsTimeSorted

func (opt *ProcessorOptions) IsTimeSorted() bool

func (*ProcessorOptions) IsUnifyPlan

func (opt *ProcessorOptions) IsUnifyPlan() bool

func (*ProcessorOptions) MarshalBinary

func (opt *ProcessorOptions) MarshalBinary() ([]byte, error)

MarshalBinary encodes opt into a binary format.

func (*ProcessorOptions) MergeSorted

func (opt *ProcessorOptions) MergeSorted() bool

MergeSorted returns true if the options require a sorted merge.

func (*ProcessorOptions) OptionsName

func (opt *ProcessorOptions) OptionsName() string

func (*ProcessorOptions) SeekTime

func (opt *ProcessorOptions) SeekTime() int64

SeekTime returns the time the iterator should start from. For ascending iterators this is the start time, for descending iterators it's the end time.

func (*ProcessorOptions) SetAscending

func (opt *ProcessorOptions) SetAscending(a bool)

func (*ProcessorOptions) SetFill

func (opt *ProcessorOptions) SetFill(fill influxql.FillOption)

func (*ProcessorOptions) SetHintType

func (opt *ProcessorOptions) SetHintType(h hybridqp.HintType)

func (*ProcessorOptions) SetSortFields

func (opt *ProcessorOptions) SetSortFields(sortFields influxql.SortFields)

func (*ProcessorOptions) StopTime

func (opt *ProcessorOptions) StopTime() int64

StopTime returns the time the iterator should end at. For ascending iterators this is the end time, for descending iterators it's the start time.

func (*ProcessorOptions) UnmarshalBinary

func (opt *ProcessorOptions) UnmarshalBinary(buf []byte) error

UnmarshalBinary decodes from a binary format in to opt.

func (*ProcessorOptions) UpdateSources

func (opt *ProcessorOptions) UpdateSources(sources influxql.Sources)

func (*ProcessorOptions) Window

func (opt *ProcessorOptions) Window(t int64) (start, end int64)

Window returns the time window [start,end) that t falls within.

func (*ProcessorOptions) Zone

func (opt *ProcessorOptions) Zone(ns int64) (string, int64)

Zone returns the zone information for the given time. The offset is in nanoseconds.

type QueryIDRegister

type QueryIDRegister interface {
	RetryRegisterQueryIDOffset(host string) (uint64, error)
}

type QueryInfo

type QueryInfo struct {
	ID       uint64        `json:"id"`
	Query    string        `json:"query"`
	Database string        `json:"database"`
	Duration time.Duration `json:"duration"`
	Status   TaskStatus    `json:"status"`
}

QueryInfo represents the information for a query.

type RateFunc

type RateFunc struct {
	BaseInfo
	BaseAgg
}

func (*RateFunc) CallTypeFunc

func (f *RateFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*RateFunc) CompileFunc

func (f *RateFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type RowsChan

type RowsChan struct {
	Rows    models.Rows // models.Rows of data
	Partial bool        // is partial of rows
}

type SampleFunc

type SampleFunc struct {
	BaseInfo
	BaseAgg
}

func (*SampleFunc) CallTypeFunc

func (f *SampleFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*SampleFunc) CompileFunc

func (f *SampleFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type SelectOptions

type SelectOptions struct {
	// Authorizer is used to limit access to data
	Authorizer FineAuthorizer

	// Node to exclusively read from.
	// If zero, all nodes are used.
	NodeID uint64

	// Maximum number of concurrent series.
	MaxSeriesN int

	// Maximum number of concurrent fileds.
	MaxFieldsN int

	// Maximum number of points to read from the query.
	// This requires the passed in context to have a Monitor that is
	// created using WithMonitor.
	MaxPointN int

	// Maximum number of buckets for a statement.
	MaxBucketsN int

	// Maximum number of memory a query can use
	MaxQueryMem int64

	// Maximum parallelism a query can use
	MaxQueryParallel int

	// The number of point for chunk
	ChunkSize int

	// The requested maximum number of points to return in each result.
	ChunkedSize int

	Chunked bool

	QueryLimitEn bool

	QueryTimeCompareEnabled bool

	AbortChan <-chan struct{}
	RowsChan  chan RowsChan

	HintType hybridqp.HintType

	IncQuery bool
	QueryID  string
	IterID   int32
}

SelectOptions are options that customize the select call.

type ShardGroup

type ShardGroup interface {
	LogicalPlanCreator
	influxql.FieldMapper
	io.Closer
}

ShardGroup represents a shard or a collection of shards that can be accessed for creating iterators. When creating iterators, the resource used for reading the iterators should be separate from the resource used to map the shards. When the ShardGroup is closed, it should not close any resources associated with the created Iterator. Those resources belong to the Iterator and will be closed when the Iterator itself is closed. The query engine operates under this assumption and will close the shard group after creating the iterators, but before the iterators are actually read.

type ShardMapper

type ShardMapper interface {
	MapShards(sources influxql.Sources, t influxql.TimeRange, opt SelectOptions, condition influxql.Expr) (ShardGroup, error)
	Close() error
}

ShardMapper retrieves and maps shards into an IteratorCreator that can later be used for executing queries.

type SlidingWindowFunc

type SlidingWindowFunc struct {
	BaseInfo
	BaseAgg
}

func (*SlidingWindowFunc) CallTypeFunc

func (f *SlidingWindowFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*SlidingWindowFunc) CompileFunc

func (f *SlidingWindowFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type SpreadFunc

type SpreadFunc struct {
	BaseInfo
	BaseAgg
}

func (*SpreadFunc) CallTypeFunc

func (f *SpreadFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*SpreadFunc) CompileFunc

func (f *SpreadFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type Statement

type Statement interface {
	// Prepare prepares the statement by mapping shards and finishing the creation
	// of the query plan.
	Prepare(shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error)
}

Statement is a compiled query statement.

func Compile

func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (Statement, error)

type StatementExecutor

type StatementExecutor interface {
	// ExecuteStatement executes a statement. Results should be sent to the
	// results channel in the ExecutionContext.
	ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext, seq int) error
	Statistics(buffer []byte) ([]byte, error)
}

StatementExecutor executes a statement within the Executor.

type StatementNormalizer

type StatementNormalizer interface {
	// NormalizeStatement adds a default database and policy to the
	// measurements in the statement.
	NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error
}

StatementNormalizer normalizes a statement before it is executed.

type StddevFunc

type StddevFunc struct {
	BaseInfo
	BaseAgg
}

func (*StddevFunc) CallTypeFunc

func (f *StddevFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*StddevFunc) CompileFunc

func (f *StddevFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type StmtBuilder

type StmtBuilder interface {
}

type StmtBuilderCreator

type StmtBuilderCreator interface {
	Create(stmt *influxql.SelectStatement, opt hybridqp.Options,
		shards interface {
			LogicalPlanCreator
			io.Closer
		}, columns []string, MaxPointN int, now time.Time) PreparedStatement
}

type StmtBuilderCreatorFactory

type StmtBuilderCreatorFactory struct {
	// contains filtered or unexported fields
}

func GetStmtBuilderFactoryInstance

func GetStmtBuilderFactoryInstance() *StmtBuilderCreatorFactory

func NewStmtBuilderCreatorFactory

func NewStmtBuilderCreatorFactory() *StmtBuilderCreatorFactory

func (*StmtBuilderCreatorFactory) Attach

func (r *StmtBuilderCreatorFactory) Attach(creator StmtBuilderCreator)

func (*StmtBuilderCreatorFactory) Create

func (r *StmtBuilderCreatorFactory) Create(stmt *influxql.SelectStatement, opt hybridqp.Options,
	shards interface {
		LogicalPlanCreator
		io.Closer
	}, columns []string, MaxPointN int, now time.Time) StmtBuilder

func (*StmtBuilderCreatorFactory) Get

type StringFunctionTypeMapper

type StringFunctionTypeMapper struct{}

type mapper

func (StringFunctionTypeMapper) CallType

func (StringFunctionTypeMapper) MapType

func (StringFunctionTypeMapper) MapTypeBatch

type StringValuer

type StringValuer struct{}

valuer

func (StringValuer) Call

func (v StringValuer) Call(name string, args []interface{}) (interface{}, bool)

func (StringValuer) SetValuer

func (StringValuer) SetValuer(_ influxql.Valuer, _ int)

func (StringValuer) Value

func (StringValuer) Value(_ string) (interface{}, bool)

type SumFunc

type SumFunc struct {
	BaseInfo
	BaseAgg
}

func (*SumFunc) CallTypeFunc

func (f *SumFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*SumFunc) CompileFunc

func (f *SumFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is the internal data structure for managing queries. For the public use data structure that gets returned, see Task.

func (*Task) Error

func (q *Task) Error() error

Error returns any asynchronous error that may have occurred while executing the query.

func (*Task) Monitor

func (q *Task) Monitor(fn MonitorFunc)

Monitor starts a new goroutine that will monitor a query. The function will be passed in a channel to signal when the query has been finished normally. If the function returns with an error and the query is still running, the query will be terminated.

type TaskManager

type TaskManager struct {
	// Query execution timeout.
	QueryTimeout time.Duration

	// Log queries if they are slower than this time.
	// If zero, slow queries will never be logged.
	LogQueriesAfter time.Duration

	// Maximum number of concurrent queries.
	MaxConcurrentQueries int

	// Logger to use for all logging.
	// Defaults to discarding all log output.
	Logger *logger.Logger

	Register QueryIDRegister
	Host     string
	// contains filtered or unexported fields
}

TaskManager takes care of all aspects related to managing running queries.

func NewTaskManager

func NewTaskManager() *TaskManager

NewTaskManager creates a new TaskManager.

func (*TaskManager) AssignQueryID

func (t *TaskManager) AssignQueryID() uint64

AssignQueryID assign a query id for a sql

func (*TaskManager) AttachQuery

func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}, qStat *statistics.SQLSlowQueryStatistics) (*ExecutionContext, func(), error)

AttachQuery attaches a running query to be managed by the TaskManager. Returns the query id of the newly attached query or an error if it was unable to assign a query id or attach the query to the TaskManager. This function also returns a channel that will be closed when this query finishes running.

After a query finishes running, the system is free to reuse a query id.

func (*TaskManager) Close

func (t *TaskManager) Close() error

Close kills all running queries and prevents new queries from being attached.

func (*TaskManager) DetachQuery

func (t *TaskManager) DetachQuery(qid uint64) error

DetachQuery removes a query from the query table. If the query is not in the killed state, this will also close the related channel.

func (*TaskManager) ExecuteStatement

func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext, seq int) error

ExecuteStatement executes a statement containing one of the task management queries.

func (*TaskManager) InitQueryIDByOffset

func (t *TaskManager) InitQueryIDByOffset(offset uint64)

func (*TaskManager) KillQuery

func (t *TaskManager) KillQuery(qid uint64) error

KillQuery enters a query into the killed state and closes the channel from the TaskManager. This method can be used to forcefully terminate a running query.

func (*TaskManager) Queries

func (t *TaskManager) Queries() []QueryInfo

Queries returns a list of all running queries with information about them.

func (*TaskManager) Statistics

func (t *TaskManager) Statistics(buffer []byte) ([]byte, error)

type TaskStatus

type TaskStatus int
const (
	// RunningTask is set when the task is running.
	RunningTask TaskStatus = iota + 1

	// KilledTask is set when the task is killed, but resources are still
	// being used.
	KilledTask
)

func (TaskStatus) MarshalJSON

func (t TaskStatus) MarshalJSON() ([]byte, error)

func (TaskStatus) String

func (t TaskStatus) String() string

func (*TaskStatus) UnmarshalJSON

func (t *TaskStatus) UnmarshalJSON(data []byte) error

type TopFunc

type TopFunc struct {
	BaseInfo
	BaseAgg
}

func (*TopFunc) CallTypeFunc

func (f *TopFunc) CallTypeFunc(name string, args []influxql.DataType) (influxql.DataType, error)

func (*TopFunc) CompileFunc

func (f *TopFunc) CompileFunc(expr *influxql.Call, c *compiledField) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL