influxql

package
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 29 Imported by: 0

README

InfluxQL Transpiler

The InfluxQL Transpiler exists to rewrite an InfluxQL query into its equivalent query in Flux. The transpiler works off of a few simple rules that match with the equivalent method of constructing queries in InfluxDB.

NOTE: The transpiler code is not finished and may not necessarily reflect what is in this document. When they conflict, this document is considered to be the correct way to do it. If you wish to change how the transpiler works, modify this file first.

  1. Select Statement
    1. Identify the cursors
    2. Identify the query type
    3. Group the cursors
    4. Create the cursors for each group
      1. Create cursor
      2. Filter by measurement and fields
      3. Generate the pivot table
      4. Evaluate the condition
      5. Perform the grouping
      6. Evaluate the function
      7. Normalize the time column
      8. Combine windows
    5. Join the groups
    6. Map and eval columns
  2. Show Databases
    1. Create cursor
    2. Rename and Keep the name databaseName column
  3. Show Retention Policies
    1. Create cursor
    2. Filter by the database name
    3. Rename Columns
    4. Set Static Columns
    5. Keep Specific Columns
  4. Show Tag Values
    1. Create cursor
    2. Filter by the measurement
    3. Evaluate the condition
    4. Retrieve the key values
    5. Find the distinct key values
  5. Encoding the results

Select Statement

Identify the cursors

The InfluxQL query engine works by filling in variables and evaluating the query for the values in each row. The first step of transforming a query is identifying the cursors so we can figure out how to fill them correctly. A cursor is any point in the query that has a variable or a function call. Math functions do not count as function calls and are handled in the eval phase.

For the following query, it is easy to identify the cursors:

SELECT max(usage_user), usage_system FROM telegraf..cpu

max(usage_user) and usage_system are the cursors that we need to fill in for each row. Cursors are global and are not per-field.

Identify the query type

There are four types of queries: meta, raw, aggregate, and selector. A meta query is one that retrieves descriptive information about a measurement or series, rather than about the data within the measurement or series. A raw query is one where all of the cursors reference a variable. An aggregate is one where all of the cursors reference a function call. A selector is one where there is exactly one function call that is a selector (such as max() or min()) and the remaining variables, if there are any, are variables. If there is only one function call with no variables and that function is a selector, then the function type is a selector.

Group the cursors

We group the cursors based on the query type. For raw queries and selectors, all of the cursors are put into the same group. \ For aggregates, each function call is put into a separate group so they can be joined at the end.

Create the cursors for each group

We create the cursors within each group. This process is repeated for every group.

Create cursor

The cursor is generated using the following template:

create_cursor = (db, rp="autogen", start, stop=now()) => from(bucket: db+"/"+rp)
    |> range(start: start, stop: stop)

This is called once per group.

Identify the variables

Each of the variables in the group are identified. This involves inspecting the condition to collect the common variables in the expression while also retrieving the variables for each expression within the group. For a function call, this retrieves the variable used as a function argument rather than the function itself.

If a wildcard is identified in the fields, then the field filter is cleared and only the measurement filter is used. If a regex wildcard is identified, it is added as one of the field filters.

Filter by measurement and fields

A filter expression is generated by using the measurement and the fields that were identified. It follows this template:

... |> filter(fn: (r) => r._measurement == <measurement> and <field_expr>)

The <measurement> is equal to the measurement name from the FROM clause. The <field_expr> section is generated differently depending on the fields that were found. If more than one field was selected, then each of the field filters is combined by using or and the expression itself is surrounded by parenthesis. For a non-wildcard field, the following expression is used:

r._field == <name>

For a regex wildcard, the following is used:

r._field =~ <regex>

If a star wildcard was used, the <field_expr> is omitted from the filter expression.

Generate the pivot table

If there was more than one field selected or if one of the fields was some form of wildcard, a pivot expression is generated.

... |> pivot(rowKey: ["_time"], colKey: ["_field"], valueCol: "_value")
Evaluate the condition

At this point, generate the filter call to evaluate the condition. If there is no condition outside of the time selector, then this step is skipped.

Perform the grouping

We group together the streams based on the GROUP BY clause. As an example:

> SELECT mean(usage_user) FROM telegraf..cpu WHERE time >= now() - 5m GROUP BY time(5m), host
... |> group(columns: ["_measurement", "_start", "host"]) |> window(every: 5m)

If the GROUP BY time(...) doesn't exist, window() is skipped. Grouping will have a default of [_measurement, _start], regardless of whether a GROUP BY clause is present. If there are keys in the group by clause, they are concatenated with the default list. If a wildcard is used for grouping, then this step is skipped.

Evaluate the function

If this group contains a function call, the function is evaluated at this stage and invoked on the specific column. As an example:

> SELECT max(usage_user), usage_system FROM telegraf..cpu
val1 = create_cursor(bucket: "telegraf/autogen", start: -5m, m: "cpu", f: "usage_user")
val1 = create_cursor(bucket: "telegraf/autogen", start: -5m, m: "cpu", f: "usage_system")
inner_join(tables: {val1: val1, val2: val2}, except: ["_field"], fn: (tables) => {val1: tables.val1, val2: tables.val2})
    |> max(column: "val1")

For an aggregate, the following is used instead:

> SELECT mean(usage_user) FROM telegraf..cpu
create_cursor(bucket: "telegraf/autogen", start: -5m, m: "cpu", f: "usage_user")
    |> group(columns: ["_field"], mode: "except")
    |> mean(timeSrc: "_start", columns: ["_value"])

If the aggregate is combined with conditions, the column name of _value is replaced with whatever the generated column name is.

Normalize the time column

If a function was evaluated and the query type is an aggregate type or if we are grouping by time, then all of the functions need to have their time normalized. If the function is an aggregate, the following is added:

... |> mean() |> duplicate(column: "_start", as: "_time")

If it is a selector, then we need to also drop the existing _time column with the following:

... |> max() |> drop(columns: ["_time"]) |> duplicate(column: "_start", as: "_time")

This step does not apply if there are no functions.

Combine windows

If there a window operation was added, we then combine each of the function results from the windows back into a single table.

... |> window(every: inf)

This step is skipped if there was no window function.

Join the groups

If there is only one group, this does not need to be done and can be skipped.

If there are multiple groups, as is the case when there are multiple function calls, then we perform an outer_join using the time and any remaining group keys.

Map and eval the columns

After joining the results if a join was required, then a map call is used to both evaluate the math functions and name the columns. The time is also passed through the map() function so it is available for the encoder.

result |> map(fn: (r) => {_time: r._time, max: r.val1, usage_system: r.val2})

This is the final result. It will also include any tags in the group key and the time will be located in the _time variable.

TODO(jsternberg): The _time variable is only needed for selectors and raw queries. We can actually drop this variable for aggregate queries and use the _start time from the group key. Consider whether or not we should do this and if it is worth it.

Show Databases

In 2.0, not all "buckets" will be conceptually equivalent to a 1.X database. If a bucket is intended to represent a collection of 1.X data, it will be specifically identified as such. flux provides a special function databases() that will retrieve information about all registered 1.X compatible buckets.

Create Cursor

The cursor is trivially implemented as a no-argument call to the databases function:

databases() 
Rename and Keep the databaseName Column

The result of databases() has several columns. In this application, we only need the databaseName but in 1.X output, the label is name:

databases() 
  |> rename(columns: {databaseName: "name"})
  |> keep(columns: ["name"])

Show Retention Policies

Similar to SHOW DATABASES, show retention policies also returns information only for 1.X compatible buckets. It uses different columns from the same databses() function.

Create cursor

The cursor is trivially implemented as a no-argument call to the databases function:

databases() 
Filter by the database name

The databases function will return rows of database/retention policy pairs for all databases. The result of SHOW RETENTION POLICIES is defined for a single database, so we filter:

databases() |> filter(fn: (r) => r.databaseName == <DBNAME>
Rename Columns

Several columns must be renamed to match the 1.X format:

... |> rename(columns: {retentionPolicy: "name", retentionPeriod: "duration"})
Set Static Columns

Two static columns are set. In 1.X the columns for shardGroupDuration and replicaN could vary depending on the database/retention policy definition. In 2.0, there is no shardGroups to configure, and the replication level is always 2.

... |> set(key: "shardGroupDuration", value: "0") |> set(key: "replicaN", value: "2")
Keep Specific Columns

Finally, we will identify the columns in the table that we wish to keep:

... |> keep(columns: ["name", "duration", "shardGroupDuration", "replicaN", "default"])

Show Tag Values

In flux, retrieving the tag values is different than influxql. In influxdb 1.x, tags were included in the index and restricting them by time did not exist or make any sense. In the 2.0 platform, tag keys and values are scoped by time and it is more expensive to retrieve all of the tag values for all time. For this reason, there are some small changes to how the command works and therefore how it is transpiled.

Create cursor

The first step is to construct the initial cursor. This is done similar to a select statement, but we do not filter on the fields.

from(bucket: "telegraf/autogen") |>
    |> range(start: -1h)

If no time specifier is specified, as would be expected by most transpiled queries, we default to the last hour. If a time range is present in the WHERE clause, that time is used instead.

Filter by the measurement

If a FROM <measurement> clause is present in the statement, then we filter by the measurement name.

... |> filter(fn: (r) => r._measurement == <measurement>)

This step may be skipped if the FROM clause is not present. In which case, it will return the tag values for every measurement.

Evaluate the condition

The condition within the WHERE clause is evaluated. It generates a filter in the same way that a [select statement)(#evaluate-condition) would, but with the added assumption that all of the values refer to tags. There is no attempt made at determining if a value is a field or tag.

Retrieve the key values

The key values are retrieved using the keyValues function. The SHOW TAG VALUES statement requires a tag key filter.

If a single value is specified with the = operator, then that value is used as the single argument to the function.

# SHOW TAG VALUES WITH KEY = "host"
... |> keyValues(keyCols: ["host"])

If the IN operator is used, then all of the values are used as a list argument to the keyValues().

# SHOW TAG VALUES WITH KEY IN ("host", "region")
... |> keyValues(keyCols: ["host", "region"])

If any other operation is used, such as != or a regex operator, then a schema function must be used like follows:

# SHOW TAG VALUES WITH KEY != "host"
... |> keyValues(fn: (schema) => schema.keys |> filter(fn: (col) => col.name != "host"))
# SHOW TAG VALUES WITH KEY =~ /host|region/
... |> keyValues(fn: (schema) => schema.keys |> filter(fn: (col) => col.name =~ /host|region/))
# SHOW TAG VALUES WITH KEY !~ /host|region/
... |> keyValues(fn: (schema) => schema.keys |> filter(fn: (col) => col.name !~ /host|region/))

TODO(jsternberg): The schema function has not been solidifed, but the basics are that we take the list of group keys and then run a filter using the condition.

At this point, we have a table with the partition key that is organized by the keys and values of the selected columns.

Find the distinct key values

We group by the measurement and the key and then use distinct on the values. After we find the distinct values, we group these values back by their measurements again so all of the tag values for a measurement are grouped together. We then rename the columns to the expected names.

... |> group(columns: ["_measurement", "_key"])
    |> distinct(column: "_value")
    |> group(columns: ["_measurement"])
    |> rename(columns: {_key: "key", _value: "value"})
Encoding the results

Each statement will be terminated by a yield() call. This call will embed the statement id as the result name. The result name is always of type string, but the transpiler will encode an integer in this field so it can be parsed by the encoder. For example:

result |> yield(name: "0")

The edge nodes from the query specification will be used to encode the results back to the user in the JSON format used in 1.x. The JSON format from 1.x is below:

{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "_measurement",
                    "tags": {
                        "key": "value"
                    },
                    "columns": [
                        "time",
                        "value"
                    ],
                    "values": [
                        [
                            "2015-01-29T21:55:43.702900257Z",
                            2
                        ]
                    ]
                }
            ]
        }
    ]
}

The measurement name is retrieved from the _measurement column in the results. For the tags, the values in the group key that are of type string are included with both the keys and the values mapped to each other. Any values in the group key that are not strings, like the start and stop times, are ignored and discarded. If the _field key is still present in the group key, it is also discarded. For all normal fields, they are included in the array of values for each row. The _time field will be renamed to time (or whatever the time alias is set to by the query).

The chunking options that existed in 1.x are not supported by the encoder and should not be used. To minimize the amount of breaking code, using a chunking option will be ignored and the encoder will operate as normal, but it will include a message in the result so that a user can be informed that an invalid query option was used. The 1.x format has a field for sending back informational messages in it already.

TODO(jsternberg): Find a way for a column to be both used as a tag and a field. This is not currently possible because the encoder can't tell the difference between the two.

Documentation

Overview

Package influxql implements the transpiler for executing influxql queries in the 2.0 query engine.

Index

Constants

View Source
const CompilerType = "influxql"
View Source
const DialectType = "influxql"

Variables

This section is empty.

Functions

func AddCompilerMappings

func AddCompilerMappings(mappings flux.CompilerMappings, dbrpMappingSvc platform.DBRPMappingService) error

AddCompilerMappings adds the influxql specific compiler mappings.

func AddDialectMappings

func AddDialectMappings(mappings flux.DialectMappings) error

AddDialectMappings adds the influxql specific dialect mappings.

func Join

func Join(t *transpilerState, cursors []cursor, on []string) cursor

func NewResponseIterator

func NewResponseIterator(r *Response) flux.ResultIterator

NewResponseIterator constructs a flux.ResultIterator from a Response.

Types

type Compiler

type Compiler struct {
	Cluster string     `json:"cluster,omitempty"`
	DB      string     `json:"db,omitempty"`
	RP      string     `json:"rp,omitempty"`
	Bucket  string     `json:"bucket,omitempty"`
	Query   string     `json:"query"`
	Now     *time.Time `json:"now,omitempty"`
	// contains filtered or unexported fields
}

Compiler is the transpiler to convert InfluxQL to a Flux specification.

func NewCompiler

func NewCompiler(dbrpMappingSvc platform.DBRPMappingService) *Compiler

func (*Compiler) Compile

func (c *Compiler) Compile(ctx context.Context, runtime flux.Runtime) (flux.Program, error)

Compile transpiles the query into a Program.

func (*Compiler) CompilerType

func (c *Compiler) CompilerType() flux.CompilerType

func (*Compiler) WithLogicalPlannerOptions

func (c *Compiler) WithLogicalPlannerOptions(opts ...plan.LogicalOption)

type CompressionFormat

type CompressionFormat int

CompressionFormat is the format to compress the query results.

const (
	// None does not compress the results and is the default.
	None CompressionFormat = iota
	// Gzip compresses the query results with gzip.
	Gzip
)

type Config

type Config struct {
	// Bucket is the name of a bucket to use instead of the db/rp from the query.
	// If bucket is empty then the dbrp mapping is used.
	Bucket                 string
	DefaultDatabase        string
	DefaultRetentionPolicy string
	Cluster                string
	Now                    time.Time
	// FallbackToDBRP if true will use the naming convention of `db/rp`
	// for a bucket name when an mapping is not found
	FallbackToDBRP bool
}

Config modifies the behavior of the Transpiler.

type Dialect

type Dialect struct {
	TimeFormat  TimeFormat        // TimeFormat is the format of the timestamp; defaults to RFC3339Nano.
	Encoding    EncodingFormat    // Encoding is the format of the results; defaults to JSON.
	ChunkSize   int               // Chunks is the number of points per chunk encoding batch; defaults to 0 or no chunking.
	Compression CompressionFormat // Compression is the compression of the result output; defaults to None.
}

Dialect describes the output format of InfluxQL queries.

func (*Dialect) DialectType

func (d *Dialect) DialectType() flux.DialectType

func (*Dialect) Encoder

func (d *Dialect) Encoder() flux.MultiResultEncoder

func (*Dialect) SetHeaders

func (d *Dialect) SetHeaders(w http.ResponseWriter)

type EncodingFormat

type EncodingFormat int

EncodingFormat is the output format for the query response content.

const (
	// JSON marshals the response to JSON octets.
	JSON EncodingFormat = iota
	// JSONPretty marshals the response to JSON octets with indents.
	JSONPretty
	// CSV marshals the response to CSV.
	CSV
	// Msgpack has a similar structure as the  JSON response. Used?
	Msgpack
)

type Endpoint

type Endpoint struct {
	URL      string `json:"url"`
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

Endpoint contains the necessary information to connect to a specific cluster.

type Message

type Message struct {
	Level string `json:"level"`
	Text  string `json:"text"`
}

Message represents a user-facing message to be included with the result.

type MultiResultEncoder

type MultiResultEncoder struct{}

MultiResultEncoder encodes results as InfluxQL JSON format.

func NewMultiResultEncoder

func NewMultiResultEncoder() *MultiResultEncoder

func (*MultiResultEncoder) Encode

func (e *MultiResultEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error)

Encode writes a collection of results to the influxdb 1.X http response format. Expectations/Assumptions:

  1. Each result will be published as a 'statement' in the top-level list of results. The result name will be interpreted as an integer and used as the statement id.
  2. If the _measurement name is present in the group key, it will be used as the result name instead of as a normal tag.
  3. All columns in the group key must be strings and they will be used as tags. There is no current way to have a tag and field be the same name in the results. TODO(jsternberg): For full compatibility, the above must be possible.
  4. All other columns are fields and will be output in the order they are found. TODO(jsternberg): This function currently requires the first column to be a time field, but this isn't a strict requirement and will be lifted when we begin to work on transpiling meta queries.

type Response

type Response struct {
	Results []Result `json:"results,omitempty"`
	Err     string   `json:"error,omitempty"`
}

type Result

type Result struct {
	// StatementID is just the statement's position in the query. It's used
	// to combine statement results if they're being buffered in memory.
	StatementID int        `json:"statement_id"`
	Series      []*Row     `json:"series,omitempty"`
	Messages    []*Message `json:"messages,omitempty"`
	Partial     bool       `json:"partial,omitempty"`
	Err         string     `json:"error,omitempty"`
}

Result represents a resultset returned from a single statement. Rows represents a list of rows that can be sorted consistently by name/tag.

type Row

type Row struct {
	Name    string            `json:"name,omitempty"`
	Tags    map[string]string `json:"tags,omitempty"`
	Columns []string          `json:"columns,omitempty"`
	Values  [][]interface{}   `json:"values,omitempty"`
	Partial bool              `json:"partial,omitempty"`
}

Row represents a single row returned from the execution of a statement.

type Service

type Service struct {
	// Endpoints maps a cluster name to the influxdb 1.x endpoint.
	Endpoints map[string]Endpoint
}

Service is a client for the influxdb 1.x endpoint that implements the QueryService for the influxql compiler type.

func (*Service) Query

func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error)

Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint, and return results using the default decoder.

func (*Service) QueryRawJSON

func (s *Service) QueryRawJSON(ctx context.Context, req *query.Request) ([]byte, error)

QueryRawJSON will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint, and return the body of the response as a byte array.

type TimeFormat

type TimeFormat int

TimeFormat specifies the format of the timestamp in the query results.

const (
	// RFC3339Nano is the default format for timestamps for InfluxQL.
	RFC3339Nano TimeFormat = iota
	// Hour formats time as the number of hours in the unix epoch.
	Hour
	// Minute formats time as the number of minutes in the unix epoch.
	Minute
	// Second formats time as the number of seconds in the unix epoch.
	Second
	// Millisecond formats time as the number of milliseconds in the unix epoch.
	Millisecond
	// Microsecond formats time as the number of microseconds in the unix epoch.
	Microsecond
	// Nanosecond formats time as the number of nanoseconds in the unix epoch.
	Nanosecond
)

type Transpiler

type Transpiler struct {
	Config *Config
	// contains filtered or unexported fields
}

Transpiler converts InfluxQL queries into a query spec.

func NewTranspiler

func NewTranspiler(dbrpMappingSvc influxdb.DBRPMappingService) *Transpiler

func NewTranspilerWithConfig

func NewTranspilerWithConfig(dbrpMappingSvc influxdb.DBRPMappingService, cfg Config) *Transpiler

func (*Transpiler) Transpile

func (t *Transpiler) Transpile(ctx context.Context, txt string) (*ast.Package, error)

Directories

Path Synopsis
Package spectests the influxql transpiler specification tests.
Package spectests the influxql transpiler specification tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL