csv2lp

package
v2.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2021 License: MIT Imports: 14 Imported by: 0

README

CSV to Line Protocol

csv2lp library converts CSV (comma separated values) to InfluxDB Line Protocol.

  1. it can process CSV result of a (simple) flux query that exports data from a bucket
  2. it allows the processing of existing CSV files

Usage

The entry point is the CsvToLineProtocol function that accepts a (utf8) reader with CSV data and returns a reader with line protocol data.

Examples

Example 1 - Flux Query Result

csv:

#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:17:57Z,0,time_steal,cpu,cpu1,rsavage.prod
,,0,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:07Z,0,time_steal,cpu,cpu1,rsavage.prod

#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:01Z,2.7263631815907954,usage_user,cpu,cpu-total,tahoecity.prod
,,1,2020-02-25T22:17:54.068926364Z,2020-02-25T22:22:54.068926364Z,2020-02-25T22:18:11Z,2.247752247752248,usage_user,cpu,cpu-total,tahoecity.prod

line protocol data:

cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0 1582669087000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.7263631815907954 1582669081000000000
cpu,cpu=cpu-total,host=tahoecity.prod usage_user=2.247752247752248 1582669091000000000
Example 2 - Simple CSV file

csv:

#datatype measurement,tag,tag,double,double,ignored,dateTime:number
m,cpu,host,time_steal,usage_user,nothing,time
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000

line protocol data:

cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.7 1482669077000000000
cpu,cpu=cpu1,host=rsavage.prod time_steal=0,usage_user=2.2 1482669087000000000

Data type can be supplied in the column name, the CSV can be shortened to:

m|measurement,cpu|tag,host|tag,time_steal|double,usage_user|double,nothing|ignored,time|dateTime:number
cpu,cpu1,rsavage.prod,0,2.7,a,1482669077000000000
cpu,cpu1,rsavage.prod,0,2.2,b,1482669087000000000
Example 3 - Data Types with default values

csv:

#datatype measurement,tag,string,double,boolean,long,unsignedLong,duration,dateTime
#default test,annotatedDatatypes,,,,,,
m,name,s,d,b,l,ul,dur,time
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z

line protocol data:

test,name=annotatedDatatypes s="str1",d=1,b=true,l=1i,ul=1u,dur=1000000i 1
test,name=annotatedDatatypes s="str2",d=2,b=false,l=2i,ul=2u,dur=2000i 1578737410000000000

Default value can be supplied in the column label after data type, the CSV could be also:

m|measurement|test,name|tag|annotatedDatatypes,s|string,d|double,b|boolean,l|long,ul|unsignedLong,dur|duration,time|dateTime
,,str1,1.0,true,1,1,1ms,1
,,str2,2.0,false,2,2,2us,2020-01-11T10:10:10Z
Example 4 - Advanced usage

csv:

#constant measurement,test
#constant tag,name,datetypeFormats
#timezone -0500
t|dateTime:2006-01-02|1970-01-02,"d|double:,. ","b|boolean:y,Y:n,N|y"
1970-01-01,"123.456,78",
,"123 456,78",Y
  • measurement and extra tags is defined using the #constant annotation
  • timezone for dateTime is to -0500 (EST)
  • t column is of dateTime data type of format is 2006-01-02, default value is January 2nd 1970
  • d column is of double data type with , as a fraction delimiter and . as ignored separators that used to visually separate large numbers into groups
  • b column os of boolean data type that considers y or Y truthy, n or N falsy and empty column values as truthy

line protocol data:

test,name=datetypeFormats d=123456.78,b=true 18000000000000
test,name=datetypeFormats d=123456.78,b=true 104400000000000
Example 5 - Custom column separator
sep=;
m|measurement;available|boolean:y,Y:|n;dt|dateTime:number
test;nil;1
test;N;2
test;";";3
test;;4
test;Y;5
  • the first line can define a column separator character for next lines, here: ;
  • other lines use this separator, available|boolean:y,Y does not need to be wrapped in double quotes

line protocol data:

test available=false 1
test available=false 2
test available=false 3
test available=false 4
test available=true 5

CSV Data On Input

This library supports all the concepts of flux result annotated CSV and provides a few extensions that allow to process existing/custom CSV files. The conversion to line protocol is driven by contents of annotation rows and layout of the header row.

New data types

Existing data types are supported. The CSV input can also contain the following data types that are used to associate a column value to a part of a protocol line

  • measurement data type identifies a column that carries the measurement name
  • tag data type identifies a column with a tag value, column label (from the header row) is the tag name
  • time is an alias for existing dateTime type , there is at most one such column in a CSV row
  • ignore and ignored data types are used to identify columns that are ignored when creating a protocol line
  • field data type is used to copy the column data to a protocol line as-is
New CSV annotations
  • #constant annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import
    • the format of a constant annotation row is #constant,datatype,name,value', it contains supported datatype, a column name, and a constant value
    • column name can be omitted for dateTime or measurement columns, so the annotation can be simply #constant,measurement,cpu
  • #concat annotation adds a new column that is concatenated from existing columns according to a template
    • the format of a concat annotation row is #concat,datatype,name,template', it contains supported datatype, a column name, and a template value
    • the template is a string with ${columnName} placeholders, in which the placeholders are replaced by values of existing columns
      • for example: #concat,string,fullName,${firstName} ${lastName}
    • column name can be omitted for dateTime or measurement columns
  • #timezone annotation specifies the time zone of the data using an offset, which is either +hhmm or -hhmm or Local to use the local/computer time zone. Examples: #timezone,+0100 #timezone -0500 #timezone Local
Data type with data format

All data types can include the format that is used to parse column data. It is then specified as datatype:format. The following data types support format:

  • dateTime:format
    • the following formats are predefined:
      • dateTime:RFC3339 format is 2006-01-02T15:04:05Z07:00
      • dateTime:RFC3339Nano format is 2006-01-02T15:04:05.999999999Z07:00
      • dateTime:number represent UTCs time since epoch in nanoseconds
    • a custom layout as described in the time package, for example dateTime:2006-01-02 parses 4-digit-year , '-' , 2-digit month ,'-' , 2 digit day of the month
    • if the time format includes a time zone, the parsed date time respects the time zone; otherwise the timezone dependends on the presence of the new #timezone annotation; if there is no #timezone annotation, UTC is used
  • double:format
    • the format's first character is used to separate integer and fractional part (usually . or ,), second and next format's characters (such as as , _) are removed from the column value, these removed characters are typically used to visually separate large numbers into groups
    • for example:
      • a Spanish locale value 3.494.826.157,123 is of double:,. type; the same double value is 3494826157.123
      • 1_000_000 is of double:._ type to be a million double
    • note that you have to quote column delimiters whenever they appear in a CSV column value, for example:
      • #constant,"double:,.",myColumn,"1.234,011"
  • long:format and unsignedLong:format support the same format as double, but everything after and including a fraction character is ignored
    • the format can be appended with strict to fail when a fraction digit is present, for example:
      • 1000.000 is 1000 when parsed as long, but fails when parsed as long:strict
      • 1_000,000 is 1000 when parsed as long:,_, but fails when parsed as long:strict,_
  • boolean:truthy:falsy
    • truthy and falsy are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет
    • a boolean data type (without the format) parses column values that start with any of tTyY1 as true values, fFnN0 as false values and fails on other values
    • a column with an empty value is excluded in the protocol line unless a default value is supplied either using #default annotation or in a header line (see below)
Header row with data types and default values

The header row (i.e. the row that define column names) can also define column data types when supplied as name|datatype; for example cpu|tag defines a tag column named cpu . Moreover, it can also specify a default value when supplied as name|datatype|default; for example, count|long|0 defines a field column named count of long data type that will not skip the field if a column value is empty, but uses '0' as the column value.

  • this approach helps to easily specify column names, types and defaults in a single row
  • this is an alternative to using 3 lines being #datatype and #default annotations and a simple header row
Custom CSV column separator

A CSV file can start with a line sep=; to inform about a character that is used to separate columns, by default , is used as a column separator. This method is frequently used (Excel).

Error handling

The CSV conversion stops on the first error by default, line and column are reported together with the error. The CsvToLineReader's SkipRowOnError function can change it to skip error rows and log errors instead.

Support Existing CSV files

The majority of existing CSV files can be imported by skipping the first X lines of existing data (so that custom header line can be then provided) and prepending extra annotation/header lines to let this library know of how to convert the CSV to line protocol. The following functions helps to change the data on input

Documentation

Overview

Package csv2lp transforms CSV data to InfluxDB line protocol

Index

Constants

View Source
const (
	RFC3339     = "RFC3339"
	RFC3339Nano = "RFC3339Nano"
)

predefined dateTime formats

Variables

This section is empty.

Functions

func CreateDecoder

func CreateDecoder(encoding string) (func(io.Reader) io.Reader, error)

CreateDecoder creates a decoding reader from the supplied encoding to UTF-8, or returns an error

func IsTypeSupported

func IsTypeSupported(dataType string) bool

IsTypeSupported returns true if the data type is supported

func MultiCloser

func MultiCloser(closers ...io.Closer) io.Closer

MultiCloser creates am io.Closer that silently closes supplied io.Closer instances

func SkipHeaderLinesReader

func SkipHeaderLinesReader(skipLines int, reader io.Reader) io.Reader

SkipHeaderLinesReader wraps a reader to skip the first skipLines lines in CSV data input

Types

type CsvColumnError

type CsvColumnError struct {
	Column string
	Err    error
}

CsvColumnError indicates conversion error in a specific column

func (CsvColumnError) Error

func (e CsvColumnError) Error() string

Error interface implementation

type CsvLineError

type CsvLineError struct {
	// 1 is the first line
	Line int
	Err  error
}

CsvLineError is returned for csv conversion errors

func CreateRowColumnError

func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError

CreateRowColumnError wraps an existing error to add line and column coordinates

func (CsvLineError) Error

func (e CsvLineError) Error() string

type CsvTable

type CsvTable struct {
	// contains filtered or unexported fields
}

CsvTable contains metadata about columns and a state of the CSV processing

func (*CsvTable) AddRow

func (t *CsvTable) AddRow(row []string) bool

AddRow updates the state of the CSV table with a new header, annotation or data row. Returns true if the row is a data row.

func (*CsvTable) AppendLine

func (t *CsvTable) AppendLine(buffer []byte, row []string, lineNumber int) ([]byte, error)

AppendLine appends a protocol line to the supplied buffer using a CSV row and returns appended buffer or an error if any

func (*CsvTable) Column

func (t *CsvTable) Column(label string) *CsvTableColumn

Column returns the first column of the supplied label or nil

func (*CsvTable) ColumnLabels

func (t *CsvTable) ColumnLabels() []string

ColumnLabels returns available columns labels

func (*CsvTable) Columns

func (t *CsvTable) Columns() []*CsvTableColumn

Columns returns available columns

func (*CsvTable) CreateLine

func (t *CsvTable) CreateLine(row []string) (line string, err error)

CreateLine produces a protocol line out of the supplied row or returns error

func (*CsvTable) DataColumnsInfo

func (t *CsvTable) DataColumnsInfo() string

DataColumnsInfo returns a string representation of columns that are used to process CSV data

func (*CsvTable) FieldName

func (t *CsvTable) FieldName() *CsvTableColumn

FieldName returns field name column or nil

func (*CsvTable) FieldValue

func (t *CsvTable) FieldValue() *CsvTableColumn

FieldValue returns field value column or nil

func (*CsvTable) Fields

func (t *CsvTable) Fields() []*CsvTableColumn

Fields returns fields

func (*CsvTable) IgnoreDataTypeInColumnName

func (t *CsvTable) IgnoreDataTypeInColumnName(val bool)

IgnoreDataTypeInColumnName sets a flag that can ignore dataType parsing in column names. When true, column names can then contain '|'. By default, column name can also contain datatype and a default value when named `name|datatype` or `name|datatype|default`, for example `ready|boolean|true`

func (*CsvTable) Measurement

func (t *CsvTable) Measurement() *CsvTableColumn

Measurement returns measurement column or nil

func (*CsvTable) NextTable

func (t *CsvTable) NextTable()

NextTable resets the table to a state in which it expects annotations and header rows

func (*CsvTable) Tags

func (t *CsvTable) Tags() []*CsvTableColumn

Tags returns tags

func (*CsvTable) Time

func (t *CsvTable) Time() *CsvTableColumn

Time returns time column or nil

type CsvTableColumn

type CsvTableColumn struct {
	// Label is a column label from the header row, such as "_start", "_stop", "_time"
	Label string
	// DataType such as "string", "long", "dateTime" ...
	DataType string
	// DataFormat is a format of DataType, such as "RFC3339", "2006-01-02"
	DataFormat string
	// LinePart is a line part of the column (0 means not determined yet), see linePart constants
	LinePart int
	// DefaultValue is used when column's value is an empty string.
	DefaultValue string
	// Index of this column when reading rows, -1 indicates a virtual column with DefaultValue data
	Index int
	// TimeZone of dateTime column, applied when parsing dateTime DataType
	TimeZone *time.Location
	// ParseF is an optional function used to convert column's string value to interface{}
	ParseF func(value string) (interface{}, error)
	// ComputeValue is an optional function used to compute column value out of row data
	ComputeValue func(row []string) string
	// contains filtered or unexported fields
}

CsvTableColumn represents processing metadata about a csv column

func (*CsvTableColumn) LineLabel

func (c *CsvTableColumn) LineLabel() string

LineLabel returns escaped name of the column so it can be then used as a tag name or field name in line protocol

func (*CsvTableColumn) Value

func (c *CsvTableColumn) Value(row []string) string

Value returns the value of the column for the supplied row

type CsvToLineReader

type CsvToLineReader struct {

	// Table collects information about used columns
	Table CsvTable
	// LineNumber represents line number of csv.Reader, 1 is the first
	LineNumber int

	// RowSkipped is called when a row is skipped because of data parsing error
	RowSkipped func(source *CsvToLineReader, lineError error, row []string)
	// contains filtered or unexported fields
}

CsvToLineReader represents state of transformation from csv data to lien protocol reader

func CsvToLineProtocol

func CsvToLineProtocol(reader io.Reader) *CsvToLineReader

CsvToLineProtocol transforms csv data into line protocol data

func (*CsvToLineReader) Comma

func (state *CsvToLineReader) Comma() rune

Comma returns a field delimiter used in an input CSV file

func (*CsvToLineReader) LogTableColumns

func (state *CsvToLineReader) LogTableColumns(val bool) *CsvToLineReader

LogTableColumns turns on/off logging of table data columns before reading data rows

func (*CsvToLineReader) Read

func (state *CsvToLineReader) Read(p []byte) (n int, err error)

Read implements io.Reader that returns protocol lines

func (*CsvToLineReader) SkipRowOnError

func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader

SkipRowOnError controls whether to fail on every CSV conversion error (false) or to log the error and continue (true)

type LineReader

type LineReader struct {
	// LineNumber of the next read operation, 0 is the first line by default.
	// It can be set to 1 start counting from 1.
	LineNumber int
	// LastLineNumber is the number of the last read row.
	LastLineNumber int
	// contains filtered or unexported fields
}

LineReader wraps an io.Reader to count lines that go though read function and returns at most one line during every invocation of read. It provides a workaround to golang's CSV reader that does not expose current line number at all (see https://github.com/golang/go/issues/26679)

At most one line is returned by every read in order to ensure that golang's CSV reader buffers at most one single line into its nested bufio.Reader.

func NewLineReader

func NewLineReader(rd io.Reader) *LineReader

NewLineReader returns a new LineReader.

func NewLineReaderSize

func NewLineReaderSize(rd io.Reader, size int) *LineReader

NewLineReaderSize returns a new Reader whose buffer has at least the specified size.

func (*LineReader) Read

func (lr *LineReader) Read(p []byte) (int, error)

Read reads data into p. It fills in data that either does not contain \n or ends with \n. It returns the number of bytes read into p.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL