protocol

package module
v0.0.0-...-b1ad95c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: MIT Imports: 14 Imported by: 42

README

line-protocol

Go Reference

This is an encoder for the influx line protocol.

It has an interface similar to the standard library's json.Encoder.

some caveats.
  • It is not concurrency-safe. If you want to make multiple calls to Encoder.Encode concurrently you have to manage the concurrency yourself.
  • It can only encode values that are uint64, int64, int, float32, float64, string, or bool.
  • Ints are converted to int64, float32's to float64.
  • If UintSupport is not set, uint64s are converted to int64's and if they are larger than the max int64, they get truncated to the max int64 instead of overflowing.
Example:
buf := &bytes.Buffer{}
serializer := protocol.NewEncoder(buf)
serializer.SetMaxLineBytes(1024)
serializer.SetFieldTypeSupport(UintSupport)
serializer.Encode(e) // where e is something that implements the protocol.Metric interface

Documentation

Index

Constants

View Source
const LineProtocol_en_align int = 86
View Source
const LineProtocol_en_discard_line int = 35
View Source
const LineProtocol_en_main int = 47
View Source
const LineProtocol_en_series int = 38
View Source
const LineProtocol_error int = 0
View Source
const LineProtocol_first_final int = 47
View Source
const LineProtocol_start int = 47

Variables

View Source
var (
	ErrNameParse      = errors.New("expected measurement name")
	ErrFieldParse     = errors.New("expected field")
	ErrTagParse       = errors.New("expected tag")
	ErrTimestampParse = errors.New("expected timestamp")
	ErrParse          = errors.New("parse error")
	EOF               = errors.New("EOF")
)
View Source
var (
	// ErrNeedMoreSpace tells us that the Decoder's io.Reader is full.
	ErrNeedMoreSpace = &MetricError{"need more space"}

	// ErrInvalidName tells us that the chosen name is invalid.
	ErrInvalidName = &MetricError{"invalid name"}

	// ErrNoFields tells us that there were no serializable fields in the line/metric.
	ErrNoFields = &MetricError{"no serializable fields"}
)
View Source
var ErrIsInf = &FieldError{"is Inf"}

ErrIsInf is a field error for when a float field is Inf.

View Source
var ErrIsNaN = &FieldError{"is NaN"}

ErrIsNaN is a field error for when a float field is NaN.

Functions

func NewMachine

func NewMachine(handler Handler) *machine

func NewSeriesMachine

func NewSeriesMachine(handler Handler) *machine

func NewStreamMachine

func NewStreamMachine(r io.Reader, handler Handler) *streamMachine

Types

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder marshals Metrics into influxdb line protocol. It is not safe for concurrent use, make a new one! The default behavior when encountering a field error is to ignore the field and move on. If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true)

func NewEncoder

func NewEncoder(w io.Writer) *Encoder

NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol as defined by: https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/

func (*Encoder) Encode

func (e *Encoder) Encode(m Metric) (int, error)

Encode marshals a Metric to the io.Writer in the Encoder

func (*Encoder) FailOnFieldErr

func (e *Encoder) FailOnFieldErr(s bool)

FailOnFieldErr whether or not to fail on a field error or just move on. The default behavior to move on

func (*Encoder) SetFieldSortOrder

func (e *Encoder) SetFieldSortOrder(s FieldSortOrder)

SetFieldSortOrder sets a sort order for the data. The options are: NoSortFields (doesn't sort the fields) SortFields (sorts the keys in alphabetical order)

func (*Encoder) SetFieldTypeSupport

func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport)

SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64

func (*Encoder) SetMaxLineBytes

func (e *Encoder) SetMaxLineBytes(i int)

SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer

func (*Encoder) SetPrecision

func (e *Encoder) SetPrecision(p time.Duration)

SetPrecision sets time precision for writes Default is nanoseconds precision

func (*Encoder) Write

func (e *Encoder) Write(name []byte, ts time.Time, tagKeys, tagVals, fieldKeys [][]byte, fieldVals []interface{}) (int, error)

Write writes out data to a line protocol encoder. Note: it does no sorting. It assumes you have done your own sorting for tagValues

type Field

type Field struct {
	Key   string
	Value interface{}
}

Field holds the keys and values for a bunch of Metric Field k/v pairs where Value can be a uint64, int64, int, float32, float64, string, or bool.

type FieldError

type FieldError struct {
	// contains filtered or unexported fields
}

FieldError is an error causing a field to be unserializable.

func (FieldError) Error

func (e FieldError) Error() string

type FieldSortOrder

type FieldSortOrder int

FieldSortOrder is a type for controlling if Fields are sorted

const (
	// NoSortFields tells the Decoder to not sort the fields.
	NoSortFields FieldSortOrder = iota

	// SortFields tells the Decoder to sort the fields.
	SortFields
)

type FieldTypeSupport

type FieldTypeSupport int

FieldTypeSupport is a type for the parser to understand its type support.

const (
	// UintSupport means the parser understands uint64s and can store them without having to convert to int64.
	UintSupport FieldTypeSupport = 1 << iota
)

type Handler

type Handler interface {
	SetMeasurement(name []byte) error
	AddTag(key []byte, value []byte) error
	AddInt(key []byte, value []byte) error
	AddUint(key []byte, value []byte) error
	AddFloat(key []byte, value []byte) error
	AddString(key []byte, value []byte) error
	AddBool(key []byte, value []byte) error
	SetTimestamp(tm []byte) error
}

type Metric

type Metric interface {
	Time() time.Time
	Name() string
	TagList() []*Tag
	FieldList() []*Field
}

Metric is the interface for marshaling, if you implement this interface you can be marshalled into the line protocol. Woot!

func FromMetric

func FromMetric(other Metric) Metric

FromMetric returns a deep copy of the metric with any tracking information removed.

type MetricError

type MetricError struct {
	// contains filtered or unexported fields
}

MetricError is an error causing a metric to be unserializable.

func (MetricError) Error

func (e MetricError) Error() string

type MetricHandler

type MetricHandler struct {
	// contains filtered or unexported fields
}

MetricHandler implements the Handler interface and produces Metric.

func NewMetricHandler

func NewMetricHandler() *MetricHandler

func (*MetricHandler) AddBool

func (h *MetricHandler) AddBool(key []byte, value []byte) error

func (*MetricHandler) AddFloat

func (h *MetricHandler) AddFloat(key []byte, value []byte) error

func (*MetricHandler) AddInt

func (h *MetricHandler) AddInt(key []byte, value []byte) error

func (*MetricHandler) AddString

func (h *MetricHandler) AddString(key []byte, value []byte) error

func (*MetricHandler) AddTag

func (h *MetricHandler) AddTag(key []byte, value []byte) error

func (*MetricHandler) AddUint

func (h *MetricHandler) AddUint(key []byte, value []byte) error

func (*MetricHandler) Metric

func (h *MetricHandler) Metric() (Metric, error)

func (*MetricHandler) SetMeasurement

func (h *MetricHandler) SetMeasurement(name []byte) error

func (*MetricHandler) SetTimeFunc

func (h *MetricHandler) SetTimeFunc(f TimeFunc)

func (*MetricHandler) SetTimePrecision

func (h *MetricHandler) SetTimePrecision(p time.Duration)

func (*MetricHandler) SetTimestamp

func (h *MetricHandler) SetTimestamp(tm []byte) error

type MutableMetric

type MutableMetric interface {
	Metric
	SetTime(time.Time)
	AddTag(key, value string)
	AddField(key string, value interface{})
}

MutableMetric represents a metric that can be be modified.

func New

func New(
	name string,
	tags map[string]string,
	fields map[string]interface{},
	tm time.Time,
) (MutableMetric, error)

New creates a new metric via maps.

type ParseError

type ParseError struct {
	Offset     int
	LineOffset int
	LineNumber int
	Column     int
	// contains filtered or unexported fields
}

ParseError indicates a error in the parsing of the text.

func (*ParseError) Error

func (e *ParseError) Error() string

type Parser

type Parser struct {
	DefaultTags map[string]string

	sync.Mutex
	// contains filtered or unexported fields
}

Parser is an InfluxDB Line Protocol parser that implements the parsers.Parser interface.

func NewParser

func NewParser(handler *MetricHandler) *Parser

NewParser returns a Parser than accepts line protocol

func NewSeriesParser

func NewSeriesParser(handler *MetricHandler) *Parser

NewSeriesParser returns a Parser than accepts a measurement and tagset

func (Parser) Column

func (m Parser) Column() int

Column returns the current column.

func (Parser) LineNumber

func (m Parser) LineNumber() int

LineNumber returns the current line number. Lines are counted based on the regular expression `\r?\n`.

func (Parser) LineOffset

func (m Parser) LineOffset() int

LineOffset returns the byte offset of the current line.

func (Parser) Next

func (m Parser) Next() error

Next parses the next metric line and returns nil if it was successfully processed. If the line contains a syntax error an error is returned, otherwise if the end of file is reached before finding a metric line then EOF is returned.

func (*Parser) Parse

func (p *Parser) Parse(input []byte) ([]Metric, error)

Parse interprets line-protocol bytes as many metrics.

func (Parser) Position

func (m Parser) Position() int

Position returns the current byte offset into the data.

func (Parser) SetData

func (m Parser) SetData(data []byte)

func (*Parser) SetTimeFunc

func (p *Parser) SetTimeFunc(f TimeFunc)

SetTimeFunc allows default times to be set when no time is specified for a metric in line-protocol.

type StreamParser

type StreamParser struct {
	// contains filtered or unexported fields
}

StreamParser is an InfluxDB Line Protocol parser. It is not safe for concurrent use in multiple goroutines.

func NewStreamParser

func NewStreamParser(r io.Reader) *StreamParser

NewStreamParser parses from a reader and iterates the machine metric by metric. Not safe for concurrent use in multiple goroutines.

func (*StreamParser) Column

func (p *StreamParser) Column() int

Column returns the current column.

func (*StreamParser) LineNumber

func (p *StreamParser) LineNumber() int

LineNumber returns the current line number. Lines are counted based on the regular expression `\r?\n`.

func (*StreamParser) LineOffset

func (p *StreamParser) LineOffset() int

LineOffset returns the byte offset of the current line.

func (*StreamParser) LineText

func (p *StreamParser) LineText() string

LineText returns the text of the current line that has been parsed so far.

func (*StreamParser) Next

func (p *StreamParser) Next() (Metric, error)

Next parses the next item from the stream. You can repeat calls to this function until it returns EOF.

func (*StreamParser) Position

func (p *StreamParser) Position() int

Position returns the current byte offset into the data.

func (*StreamParser) SetTimeFunc

func (p *StreamParser) SetTimeFunc(f TimeFunc)

SetTimeFunc changes the function used to determine the time of metrics without a timestamp. The default TimeFunc is time.Now. Useful mostly for testing, or perhaps if you want all metrics to have the same timestamp.

func (*StreamParser) SetTimePrecision

func (p *StreamParser) SetTimePrecision(u time.Duration)

SetTimePrecision specifies units for the time stamp.

type Tag

type Tag struct {
	Key   string
	Value string
}

Tag holds the keys and values for a bunch of Tag k/v pairs.

type TimeFunc

type TimeFunc func() time.Time

TimeFunc is used to override the default time for a metric with no specified timestamp.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL