iop

package
v0.4.86 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2024 License: GPL-3.0 Imports: 52 Imported by: 5

README

Input-Process-Output (ipo)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RemoveTrailingDecZeros removes the trailing zeros in CastToString
	RemoveTrailingDecZeros = false
	SampleSize             = 900
)
View Source
var Transforms = map[string]TransformFunc{}

Functions

func AutoDecompress added in v0.2.3

func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)

AutoDecompress auto detexts compression to decompress. Otherwise return same reader

func CleanHeaderRow

func CleanHeaderRow(header []string) []string

CleanHeaderRow cleans the header row from incompatible characters

func CleanName added in v0.3.283

func CleanName(name string) (newName string)

func CompareColumns

func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)

CompareColumns compared two columns to see if there are similar

func CreateDummyFields

func CreateDummyFields(numCols int) (cols []string)

CreateDummyFields creates dummy columns for csvs with no header row

func GetISO8601DateMap

func GetISO8601DateMap(t time.Time) map[string]interface{}

GetISO8601DateMap return a map of date parts for string formatting

func IsDummy

func IsDummy(columns []Column) bool

IsDummy returns true if the columns are injected by CreateDummyFields

func MakeRowsChan

func MakeRowsChan() chan []any

MakeRowsChan returns a buffered channel with default size

func NewJSONStream added in v0.3.1

func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream

func ReplaceAccents added in v0.3.257

func ReplaceAccents(sp *StreamProcessor, val string) (string, error)

func Row

func Row(vals ...any) []any

Row is a row

func ScanCarrRet

func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanCarrRet removes the \r runes that are without \n rightafter

func Unzip

func Unzip(src string, dest string) ([]string, error)

Unzip will decompress a zip archive, moving all files and folders within the zip file (parameter 1) to an output directory (parameter 2).

Types

type Avro added in v0.4.34

type Avro struct {
	Path   string
	Reader *goavro.OCFReader
	Data   *Dataset
	// contains filtered or unexported fields
}

Avro is a avro` object

func NewAvroStream added in v0.4.34

func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error)

func (*Avro) Columns added in v0.4.34

func (a *Avro) Columns() Columns

type Batch added in v0.3.185

type Batch struct {
	Columns  Columns
	Rows     chan []any
	Previous *Batch
	Count    int64
	Limit    int64
	// contains filtered or unexported fields
}

func (*Batch) AddTransform added in v0.3.215

func (b *Batch) AddTransform(transf func(row []any) []any)

func (*Batch) Close added in v0.3.185

func (b *Batch) Close()

func (*Batch) ColumnsChanged added in v0.3.185

func (b *Batch) ColumnsChanged() bool

func (*Batch) Ds added in v0.3.215

func (b *Batch) Ds() *Datastream

func (*Batch) ID added in v0.3.185

func (b *Batch) ID() string

func (*Batch) IsFirst added in v0.3.185

func (b *Batch) IsFirst() bool

func (*Batch) Push added in v0.3.185

func (b *Batch) Push(row []any)

func (*Batch) Shape added in v0.3.185

func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)

type BatchReader added in v0.3.188

type BatchReader struct {
	Batch   *Batch
	Columns Columns
	Reader  io.Reader
	Counter int
}

type CSV

type CSV struct {
	Path            string
	NoHeader        bool
	Delimiter       rune
	FieldsPerRecord int
	Columns         []Column
	File            *os.File
	Data            Dataset
	Reader          io.Reader
	Config          map[string]string
	NoDebug         bool
	// contains filtered or unexported fields
}

CSV is a csv object

func (*CSV) InferSchema

func (c *CSV) InferSchema() error

InferSchema returns a sample of n rows

func (*CSV) NewReader

func (c *CSV) NewReader() (*io.PipeReader, error)

NewReader creates a Reader

func (*CSV) Read

func (c *CSV) Read() (data Dataset, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) ReadStream

func (c *CSV) ReadStream() (ds *Datastream, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) Sample

func (c *CSV) Sample(n int) (Dataset, error)

Sample returns a sample of n rows

func (*CSV) SetFields

func (c *CSV) SetFields(fields []string)

SetFields sets the fields

func (*CSV) WriteStream

func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)

WriteStream to CSV file

type Column

type Column struct {
	Position    int         `json:"position"`
	Name        string      `json:"name"`
	Type        ColumnType  `json:"type"`
	DbType      string      `json:"db_type,omitempty"`
	DbPrecision int         `json:"db_precision,omitempty"`
	DbScale     int         `json:"db_scale,omitempty"`
	Sourced     bool        `json:"-"` // whether is was sourced from a typed source
	Stats       ColumnStats `json:"stats,omitempty"`

	Table       string `json:"table,omitempty"`
	Schema      string `json:"schema,omitempty"`
	Database    string `json:"database,omitempty"`
	Description string `json:"description,omitempty"`

	Metadata map[string]string `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Column represents a schemata column

func InferFromStats

func InferFromStats(columns []Column, safe bool, noDebug bool) []Column

InferFromStats using the stats to infer data types

func (*Column) GoType added in v0.3.299

func (col *Column) GoType() reflect.Type

func (*Column) IsBool

func (col *Column) IsBool() bool

IsBool returns whether the column is a boolean

func (*Column) IsDatetime

func (col *Column) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (*Column) IsDecimal

func (col *Column) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (*Column) IsInteger

func (col *Column) IsInteger() bool

IsInteger returns whether the column is an integer

func (*Column) IsNumber

func (col *Column) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (*Column) IsString

func (col *Column) IsString() bool

IsString returns whether the column is a string

func (*Column) IsUnique added in v0.3.66

func (col *Column) IsUnique() bool

func (*Column) Key added in v0.3.66

func (col *Column) Key() string

func (*Column) SetLengthPrecisionScale added in v0.4.82

func (col *Column) SetLengthPrecisionScale()

SetLengthPrecisionScale parse length, precision, scale

func (*Column) SetMetadata added in v0.4.86

func (col *Column) SetMetadata(key string, value string)

type ColumnStats

type ColumnStats struct {
	MinLen    int    `json:"min_len,omitempty"`
	MaxLen    int    `json:"max_len,omitempty"`
	MaxDecLen int    `json:"max_dec_len,omitempty"`
	Min       int64  `json:"min"`
	Max       int64  `json:"max"`
	NullCnt   int64  `json:"null_cnt"`
	IntCnt    int64  `json:"int_cnt,omitempty"`
	DecCnt    int64  `json:"dec_cnt,omitempty"`
	BoolCnt   int64  `json:"bool_cnt,omitempty"`
	JsonCnt   int64  `json:"json_cnt,omitempty"`
	StringCnt int64  `json:"string_cnt,omitempty"`
	DateCnt   int64  `json:"date_cnt,omitempty"`
	TotalCnt  int64  `json:"total_cnt"`
	UniqCnt   int64  `json:"uniq_cnt"`
	Checksum  uint64 `json:"checksum"`
}

ColumnStats holds statistics for a column

func (*ColumnStats) DistinctPercent added in v0.3.66

func (cs *ColumnStats) DistinctPercent() float64

func (*ColumnStats) DuplicateCount added in v0.3.66

func (cs *ColumnStats) DuplicateCount() int64

func (*ColumnStats) DuplicatePercent added in v0.3.66

func (cs *ColumnStats) DuplicatePercent() float64

type ColumnType added in v0.3.66

type ColumnType string
const (
	BigIntType     ColumnType = "bigint"
	BinaryType     ColumnType = "binary"
	BoolType       ColumnType = "bool"
	DateType       ColumnType = "date"
	DatetimeType   ColumnType = "datetime"
	DecimalType    ColumnType = "decimal"
	IntegerType    ColumnType = "integer"
	JsonType       ColumnType = "json"
	SmallIntType   ColumnType = "smallint"
	StringType     ColumnType = "string"
	TextType       ColumnType = "text"
	TimestampType  ColumnType = "timestamp"
	TimestampzType ColumnType = "timestampz"
	FloatType      ColumnType = "float"
	TimeType       ColumnType = "time"
	TimezType      ColumnType = "timez"
)

func (ColumnType) IsBool added in v0.3.66

func (ct ColumnType) IsBool() bool

IsBool returns whether the column is a boolean

func (ColumnType) IsDatetime added in v0.3.66

func (ct ColumnType) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (ColumnType) IsDecimal added in v0.3.66

func (ct ColumnType) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (ColumnType) IsInteger added in v0.3.66

func (ct ColumnType) IsInteger() bool

IsInteger returns whether the column is an integer

func (ColumnType) IsJSON added in v0.3.130

func (ct ColumnType) IsJSON() bool

IsJSON returns whether the column is a json

func (ColumnType) IsNumber added in v0.3.66

func (ct ColumnType) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (ColumnType) IsString added in v0.3.66

func (ct ColumnType) IsString() bool

IsString returns whether the column is a string

func (ColumnType) IsValid added in v0.3.252

func (ct ColumnType) IsValid() bool

IsValid returns whether the column has a valid type

type Columns

type Columns []Column

Columns represent many columns

func NewColumns added in v0.3.168

func NewColumns(cols ...Column) Columns

NewColumnsFromFields creates Columns from fields

func NewColumnsFromFields

func NewColumnsFromFields(fields ...string) (cols Columns)

NewColumnsFromFields creates Columns from fields

func (Columns) Add added in v0.3.185

func (cols Columns) Add(newCols Columns, overwrite bool) (col2 Columns, added Columns)

func (Columns) Clone added in v0.3.67

func (cols Columns) Clone() (newCols Columns)

Names return the column names

func (Columns) Coerce added in v0.4.21

func (cols Columns) Coerce(castCols Columns, hasHeader bool) (newCols Columns)

Coerce casts columns into specified types

func (Columns) Dataset added in v0.0.5

func (cols Columns) Dataset() Dataset

Dataset return an empty inferred dataset

func (Columns) DbTypes added in v0.3.185

func (cols Columns) DbTypes(args ...bool) []string

DbTypes return the column names/db types args -> (lower bool, cleanUp bool)

func (Columns) FieldMap added in v0.0.5

func (cols Columns) FieldMap(toLower bool) map[string]int

FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased

func (Columns) GetColumn

func (cols Columns) GetColumn(name string) Column

GetColumn returns the matched Col

func (Columns) GetKeys added in v0.4.86

func (cols Columns) GetKeys(keyType KeyType) Columns

GetKeys gets key columns

func (Columns) IsDifferent added in v0.3.185

func (cols Columns) IsDifferent(newCols Columns) bool

func (Columns) IsDummy

func (cols Columns) IsDummy() bool

IsDummy returns true if the columns are injected by CreateDummyFields

func (Columns) IsSimilarTo added in v0.3.202

func (cols Columns) IsSimilarTo(otherCols Columns) bool

IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order

func (Columns) Keys added in v0.3.244

func (cols Columns) Keys() []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) MakeRec added in v0.3.188

func (cols Columns) MakeRec(row []any) map[string]any

func (Columns) MakeShaper added in v0.3.202

func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)

func (Columns) Names added in v0.0.5

func (cols Columns) Names(args ...bool) []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) SetKeys added in v0.4.86

func (cols Columns) SetKeys(keyType KeyType, names ...string)

SetKeys sets key columns

func (Columns) Types added in v0.3.185

func (cols Columns) Types(args ...bool) []string

Types return the column names/types args -> (lower bool, cleanUp bool)

type Compressor

type Compressor interface {
	Self() Compressor
	Compress(io.Reader) io.Reader
	Decompress(io.Reader) (io.Reader, error)
	Suffix() string
}

Compressor implements differnt kind of compression

func NewCompressor added in v0.2.3

func NewCompressor(cpType CompressorType) Compressor

type CompressorType

type CompressorType string

CompressorType is an int type for enum for the Compressor Type

const (
	// AutoCompressorType is for auto compression
	AutoCompressorType CompressorType = "AUTO"
	// NoneCompressorType is for no compression
	NoneCompressorType CompressorType = "NONE"
	// ZipCompressorType is for Zip compression
	ZipCompressorType CompressorType = "ZIP"
	// GzipCompressorType is for Gzip compression
	GzipCompressorType CompressorType = "GZIP"
	// SnappyCompressorType is for Snappy compression
	SnappyCompressorType CompressorType = "SNAPPY"
	// ZStandardCompressorType is for ZStandard
	ZStandardCompressorType CompressorType = "ZSTD"
)

func CompressorTypePtr added in v0.3.56

func CompressorTypePtr(v CompressorType) *CompressorType

CompressorTypePtr returns a pointer to the CompressorType value passed in.

type Dataflow

type Dataflow struct {
	Columns  Columns
	Buffer   [][]interface{}
	StreamCh chan *Datastream
	Streams  []*Datastream
	Context  *g.Context
	Limit    uint64
	InBytes  uint64
	OutBytes uint64

	Ready           bool
	Inferred        bool
	FsURL           string
	OnColumnChanged func(col Column) error
	OnColumnAdded   func(col Column) error

	StreamMap map[string]*Datastream

	SchemaVersion int // for column type version
	// contains filtered or unexported fields
}

Dataflow is a collection of concurrent Datastreams

func MakeDataFlow

func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)

MakeDataFlow create a dataflow from datastreams

func NewDataflow

func NewDataflow(limit ...int) (df *Dataflow)

NewDataflow creates a new dataflow

func (*Dataflow) AddColumns added in v0.3.185

func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)

SetColumns sets the columns

func (*Dataflow) AddInBytes added in v0.3.0

func (df *Dataflow) AddInBytes(bytes uint64)

AddInBytes add ingress bytes

func (*Dataflow) AddOutBytes added in v0.3.0

func (df *Dataflow) AddOutBytes(bytes uint64)

AddOutBytes add egress bytes

func (*Dataflow) Bytes

func (df *Dataflow) Bytes() (inBytes, outBytes uint64)

func (*Dataflow) ChangeColumn added in v0.3.185

func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool

SetColumns sets the columns

func (*Dataflow) CleanUp added in v0.3.0

func (df *Dataflow) CleanUp()

CleanUp refers the defer functions

func (*Dataflow) Close

func (df *Dataflow) Close()

Close closes the df

func (*Dataflow) CloseCurrentBatches added in v0.3.185

func (df *Dataflow) CloseCurrentBatches()

func (*Dataflow) Collect added in v0.3.104

func (df *Dataflow) Collect() (data Dataset, err error)

Collect reads from one or more streams and return a dataset

func (*Dataflow) Count

func (df *Dataflow) Count() (cnt uint64)

Count returns the aggregate count

func (*Dataflow) Defer

func (df *Dataflow) Defer(f func())

Defer runs a given function as close of Dataflow

func (*Dataflow) DsTotalBytes added in v0.3.0

func (df *Dataflow) DsTotalBytes() (bytes uint64)

func (*Dataflow) Err

func (df *Dataflow) Err() (err error)

Err return the error if any

func (*Dataflow) IsClosed

func (df *Dataflow) IsClosed() bool

IsClosed is true is ds is closed

func (*Dataflow) IsEmpty

func (df *Dataflow) IsEmpty() bool

IsEmpty returns true is ds.Rows of all channels as empty

func (*Dataflow) MakeStreamCh added in v0.3.136

func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)

MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit

func (*Dataflow) Pause added in v0.3.111

func (df *Dataflow) Pause(exceptDs ...string) bool

Pause pauses all streams

func (*Dataflow) PushStreamChan added in v0.3.102

func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)

func (*Dataflow) SetColumns

func (df *Dataflow) SetColumns(columns []Column)

SetColumns sets the columns

func (*Dataflow) SetEmpty

func (df *Dataflow) SetEmpty()

SetEmpty sets all underlying datastreams empty

func (*Dataflow) SetReady added in v0.3.102

func (df *Dataflow) SetReady()

SetReady sets the df.ready

func (*Dataflow) Size

func (df *Dataflow) Size() int

Size is the number of streams

func (*Dataflow) SyncColumns added in v0.3.185

func (df *Dataflow) SyncColumns()

SyncColumns a workaround to synch the ds.Columns to the df.Columns

func (*Dataflow) SyncStats

func (df *Dataflow) SyncStats()

SyncStats sync stream processor stats aggregated to the df.Columns

func (*Dataflow) Unpause added in v0.3.111

func (df *Dataflow) Unpause(exceptDs ...string)

Unpause unpauses all streams

func (*Dataflow) WaitClosed added in v0.3.185

func (df *Dataflow) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Dataflow) WaitReady

func (df *Dataflow) WaitReady() error

WaitReady waits until dataflow is ready

type Dataset

type Dataset struct {
	Result        *sqlx.Rows
	Columns       Columns
	Rows          [][]interface{}
	SQL           string
	Duration      float64
	Sp            *StreamProcessor
	Inferred      bool
	SafeInference bool
	NoDebug       bool
}

Dataset is a query returned dataset

func NewDataset

func NewDataset(columns Columns) (data Dataset)

NewDataset return a new dataset

func NewDatasetFromMap

func NewDatasetFromMap(m map[string]interface{}) (data Dataset)

NewDatasetFromMap return a new dataset

func ReadCsv

func ReadCsv(path string) (Dataset, error)

ReadCsv reads CSV and returns dataset

func (*Dataset) AddColumns added in v0.3.185

func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Dataset) Append

func (data *Dataset) Append(row []interface{})

Append appends a new row

func (*Dataset) ColValues

func (data *Dataset) ColValues(col int) []interface{}

ColValues returns the values of a one column as array

func (*Dataset) ColValuesStr

func (data *Dataset) ColValuesStr(col int) []string

ColValuesStr returns the values of a one column as array or string

func (*Dataset) FirstRow

func (data *Dataset) FirstRow() []interface{}

FirstRow returns the first row

func (*Dataset) FirstVal

func (data *Dataset) FirstVal() interface{}

FirstVal returns the first value from the first row

func (*Dataset) GetFields

func (data *Dataset) GetFields(lower ...bool) []string

GetFields return the fields of the Data

func (*Dataset) InferColumnTypes

func (data *Dataset) InferColumnTypes()

InferColumnTypes determines the columns types

func (*Dataset) Pick added in v0.3.205

func (data *Dataset) Pick(colNames ...string) (nData Dataset)

Pick returns a new dataset with specified columns

func (*Dataset) Print added in v0.3.170

func (data *Dataset) Print(limit int)

Print pretty prints the data with a limit 0 is unlimited

func (*Dataset) Records

func (data *Dataset) Records(lower ...bool) []map[string]interface{}

Records return rows of maps

func (*Dataset) RecordsCasted added in v0.3.291

func (data *Dataset) RecordsCasted(lower ...bool) []map[string]interface{}

RecordsCasted return rows of maps or casted values

func (*Dataset) RecordsString added in v0.3.291

func (data *Dataset) RecordsString(lower ...bool) []map[string]interface{}

RecordsString return rows of maps or string values

func (*Dataset) SetFields

func (data *Dataset) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Dataset) Sort added in v0.3.168

func (data *Dataset) Sort(args ...any)

Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending

func (*Dataset) Stream

func (data *Dataset) Stream() *Datastream

Stream returns a datastream of the dataset

func (*Dataset) ToJSONMap

func (data *Dataset) ToJSONMap() map[string]interface{}

ToJSONMap converst to a JSON object

func (*Dataset) WriteCsv

func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)

WriteCsv writes to a writer

type Datastream

type Datastream struct {
	Columns       Columns
	Buffer        [][]any
	BatchChan     chan *Batch
	Batches       []*Batch
	CurrentBatch  *Batch
	Count         uint64
	Context       *g.Context
	Ready         bool
	Bytes         uint64
	Sp            *StreamProcessor
	SafeInference bool
	NoDebug       bool
	Inferred      bool

	ID       string
	Metadata Metadata // map of column name to metadata type
	// contains filtered or unexported fields
}

Datastream is a stream of rows

func MergeDataflow

func MergeDataflow(df *Dataflow) (dsN *Datastream)

MergeDataflow merges the dataflow streams into one

func NewDatastream

func NewDatastream(columns Columns) (ds *Datastream)

NewDatastream return a new datastream

func NewDatastreamContext

func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)

NewDatastreamContext return a new datastream

func NewDatastreamIt

func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)

NewDatastreamIt with it

func ReadCsvStream

func ReadCsvStream(path string) (ds *Datastream, err error)

ReadCsvStream reads CSV and returns datasream

func (*Datastream) AddBytes

func (ds *Datastream) AddBytes(b int64)

AddBytes add bytes as processed

func (*Datastream) AddColumns added in v0.3.185

func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Datastream) CastRowToString added in v0.3.168

func (ds *Datastream) CastRowToString(row []any) []string

CastRowToString returns the row as string casted

func (*Datastream) ChangeColumn added in v0.3.185

func (ds *Datastream) ChangeColumn(i int, newType ColumnType)

ChangeColumn applies a column type change

func (*Datastream) Chunk

func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)

Chunk splits the datastream into chunk datastreams (in sequence)

func (*Datastream) Close

func (ds *Datastream) Close()

Close closes the datastream

func (*Datastream) Collect

func (ds *Datastream) Collect(limit int) (Dataset, error)

Collect reads a stream and return a dataset limit of 0 is unlimited

func (*Datastream) ConsumeAvroReader added in v0.4.34

func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)

ConsumeAvroReader uses the provided reader to stream rows

func (*Datastream) ConsumeAvroReaderSeeker added in v0.4.34

func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeAvroReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeCsvReader added in v0.1.0

func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)

ConsumeCsvReader uses the provided reader to stream rows

func (*Datastream) ConsumeJsonReader added in v0.1.0

func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)

ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream

func (*Datastream) ConsumeParquetReader added in v0.3.220

func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeParquetReaderSeeker added in v0.3.220

func (ds *Datastream) ConsumeParquetReaderSeeker(reader io.ReaderAt) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReader added in v0.4.37

func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReaderSeeker added in v0.4.37

func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeXmlReader added in v0.3.1

func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)

ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream

func (*Datastream) Defer

func (ds *Datastream) Defer(f func())

Defer runs a given function as close of Datastream

func (*Datastream) Df added in v0.3.111

func (ds *Datastream) Df() *Dataflow

func (*Datastream) Err

func (ds *Datastream) Err() (err error)

Err return the error if any

func (*Datastream) GetConfig added in v0.3.202

func (ds *Datastream) GetConfig() (configMap map[string]string)

GetConfig get config

func (*Datastream) GetFields

func (ds *Datastream) GetFields(args ...bool) []string

GetFields return the fields of the Data

func (*Datastream) IsClosed

func (ds *Datastream) IsClosed() bool

IsClosed is true is ds is closed

func (*Datastream) LatestBatch added in v0.3.202

func (ds *Datastream) LatestBatch() *Batch

func (*Datastream) Map

func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)

Map applies the provided function to every row and returns the result

func (*Datastream) MapParallel

func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)

MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.

func (*Datastream) NewBatch added in v0.3.185

func (ds *Datastream) NewBatch(columns Columns) *Batch

NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added

func (*Datastream) NewCsvBufferReader

func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader

NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvBufferReaderChnl

func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)

NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory

func (*Datastream) NewCsvBytesChnl

func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)

NewCsvBytesChnl returns a channel yield chunk of bytes of csv

func (*Datastream) NewCsvReader

func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader

NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvReaderChnl

func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)

NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewIterator added in v0.3.185

func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator

func (*Datastream) NewJsonLinesReaderChnl added in v0.3.16

func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewJsonReaderChnl added in v0.3.16

func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

func (*Datastream) NewParquetReaderChnl added in v0.3.220

func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)

NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) Pause added in v0.3.185

func (ds *Datastream) Pause()

func (*Datastream) Push

func (ds *Datastream) Push(row []any)

Push return the fields of the Data

func (*Datastream) Records

func (ds *Datastream) Records() <-chan map[string]any

Records return rows of maps

func (*Datastream) Rows

func (ds *Datastream) Rows() chan []any

func (*Datastream) SetConfig

func (ds *Datastream) SetConfig(configMap map[string]string)

SetConfig sets the ds.config values

func (*Datastream) SetEmpty

func (ds *Datastream) SetEmpty()

SetEmpty sets the ds.Rows channel as empty

func (*Datastream) SetFields

func (ds *Datastream) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Datastream) SetMetadata added in v0.3.109

func (ds *Datastream) SetMetadata(jsonStr string)

func (*Datastream) SetReady added in v0.3.102

func (ds *Datastream) SetReady()

SetReady sets the ds.ready

func (*Datastream) Shape

func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)

Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows

func (*Datastream) Split added in v0.3.0

func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)

Split splits the datastream into parallel datastreams

func (*Datastream) Start

func (ds *Datastream) Start() (err error)

Start generates the stream Should cycle the Iter Func until done

func (*Datastream) TryPause added in v0.3.185

func (ds *Datastream) TryPause() bool

func (*Datastream) Unpause added in v0.3.185

func (ds *Datastream) Unpause()

Unpause unpauses all streams

func (*Datastream) WaitClosed added in v0.4.23

func (ds *Datastream) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Datastream) WaitReady

func (ds *Datastream) WaitReady() error

WaitReady waits until datastream is ready

type GzipCompressor added in v0.2.3

type GzipCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*GzipCompressor) Compress added in v0.2.3

func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*GzipCompressor) Decompress added in v0.2.3

func (cp *GzipCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*GzipCompressor) Suffix added in v0.2.3

func (cp *GzipCompressor) Suffix() string

type Iterator

type Iterator struct {
	Row       []any
	Reprocess chan []any
	IsCasted  bool
	Counter   uint64
	Context   *g.Context
	Closed    bool
	// contains filtered or unexported fields
}

Iterator is the row provider for a datastream

func (*Iterator) Ds added in v0.3.126

func (it *Iterator) Ds() *Datastream

type KeyType added in v0.4.86

type KeyType string
const (
	PrimaryKey   KeyType = "primary_key"
	AggregateKey KeyType = "aggregate_key"
	UpdateKey    KeyType = "update_key"
	SortKey      KeyType = "sort_key"
)

type KeyValue added in v0.3.109

type KeyValue struct {
	Key   string `json:"key"`
	Value any    `json:"value"`
}

type Metadata added in v0.3.109

type Metadata struct {
	StreamURL KeyValue `json:"stream_url"`
	LoadedAt  KeyValue `json:"loaded_at"`
}

func (*Metadata) AsMap added in v0.3.109

func (m *Metadata) AsMap() map[string]any

AsMap return as map

type NoneCompressor added in v0.2.3

type NoneCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*NoneCompressor) Compress added in v0.2.3

func (cp *NoneCompressor) Compress(reader io.Reader) io.Reader

func (*NoneCompressor) Decompress added in v0.2.3

func (cp *NoneCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

func (*NoneCompressor) Suffix added in v0.2.3

func (cp *NoneCompressor) Suffix() string

type Parquet

type Parquet struct {
	Path   string
	Reader *parquet.Reader
	Data   *Dataset
	// contains filtered or unexported fields
}

Parquet is a parquet object

func NewParquetStream added in v0.3.220

func NewParquetStream(reader io.ReaderAt, columns Columns) (p *Parquet, err error)

func (*Parquet) Columns

func (p *Parquet) Columns() Columns

type RecNode added in v0.4.57

type RecNode struct {
	// contains filtered or unexported fields
}

func NewRecNode added in v0.4.57

func NewRecNode(cols Columns) *RecNode

func (*RecNode) Compression added in v0.4.57

func (rn *RecNode) Compression() compress.Codec

func (*RecNode) Encoding added in v0.4.57

func (rn *RecNode) Encoding() encoding.Encoding

func (*RecNode) Fields added in v0.4.57

func (rn *RecNode) Fields() []parquet.Field

func (*RecNode) GoType added in v0.4.57

func (rn *RecNode) GoType() reflect.Type

func (*RecNode) ID added in v0.4.57

func (rn *RecNode) ID() int

func (*RecNode) Leaf added in v0.4.57

func (rn *RecNode) Leaf() bool

func (*RecNode) Optional added in v0.4.57

func (rn *RecNode) Optional() bool

func (*RecNode) Repeated added in v0.4.57

func (rn *RecNode) Repeated() bool

func (*RecNode) Required added in v0.4.57

func (rn *RecNode) Required() bool

func (*RecNode) String added in v0.4.57

func (rn *RecNode) String() string

func (*RecNode) Type added in v0.4.57

func (rn *RecNode) Type() parquet.Type

type Record added in v0.3.185

type Record struct {
	Columns *Columns
	Values  []any
}

type SAS added in v0.4.37

type SAS struct {
	Path   string
	Reader *datareader.SAS7BDAT
	Data   *Dataset
	// contains filtered or unexported fields
}

SAS is a sas7bdat object

func NewSASStream added in v0.4.37

func NewSASStream(reader io.ReadSeeker, columns Columns) (s *SAS, err error)

func (*SAS) Columns added in v0.4.37

func (s *SAS) Columns() Columns

type SSHClient

type SSHClient struct {
	Host       string
	Port       int
	User       string
	Password   string
	TgtHost    string
	TgtPort    int
	PrivateKey string
	Passphrase string
	Err        error
	// contains filtered or unexported fields
}

SSHClient is a client to connect to a ssh server with the main goal of forwarding ports

func (*SSHClient) Close

func (s *SSHClient) Close()

Close stops the client connection

func (*SSHClient) Connect

func (s *SSHClient) Connect() (err error)

Connect connects to the server

func (*SSHClient) GetOutput

func (s *SSHClient) GetOutput() (stdout string, stderr string)

GetOutput return stdout & stderr outputs

func (*SSHClient) OpenPortForward

func (s *SSHClient) OpenPortForward() (localPort int, err error)

OpenPortForward forwards the port as specified

func (*SSHClient) RunAsProcess

func (s *SSHClient) RunAsProcess() (localPort int, err error)

RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key

func (*SSHClient) SftpClient

func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)

SftpClient returns an SftpClient

type Shaper added in v0.3.202

type Shaper struct {
	Func       func([]any) []any
	SrcColumns Columns
	TgtColumns Columns
	ColMap     map[int]int
}

type SnappyCompressor added in v0.2.3

type SnappyCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*SnappyCompressor) Compress added in v0.2.3

func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*SnappyCompressor) Decompress added in v0.2.3

func (cp *SnappyCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*SnappyCompressor) Suffix added in v0.2.3

func (cp *SnappyCompressor) Suffix() string

type StreamProcessor

type StreamProcessor struct {
	N uint64
	// contains filtered or unexported fields
}

StreamProcessor processes rows and values

func NewStreamProcessor

func NewStreamProcessor() *StreamProcessor

NewStreamProcessor returns a new StreamProcessor

func (*StreamProcessor) CastRow

func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}

CastRow casts each value of a row slows down processing about 40%?

func (*StreamProcessor) CastToString

func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string

CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow

func (*StreamProcessor) CastToTime

func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)

CastToTime converts interface to time

func (*StreamProcessor) CastType

func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}

CastType casts the type of an interface CastType is used to cast the interface place holders?

func (*StreamProcessor) CastVal

func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}

CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/flarco/dbio/iop' -bench '^BenchmarkProcessVal'

func (*StreamProcessor) CastValWithoutStats

func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}

CastValWithoutStats casts the value without counting stats

func (*StreamProcessor) GetType

func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)

GetType returns the type of an interface

func (*StreamProcessor) ParseString

func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}

ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"

func (*StreamProcessor) ParseTime

func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)

ParseTime parses a date string and returns time.Time

func (*StreamProcessor) ParseVal

func (sp *StreamProcessor) ParseVal(val interface{}) interface{}

ParseVal parses the value into its appropriate type

func (*StreamProcessor) ProcessRow

func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}

ProcessRow processes a row

func (*StreamProcessor) ProcessVal

func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}

ProcessVal processes a value

func (*StreamProcessor) SetConfig

func (sp *StreamProcessor) SetConfig(configMap map[string]string)

SetConfig sets the data.Sp.config values

type TransformFunc added in v0.3.272

type TransformFunc func(*StreamProcessor, string) (string, error)

type ZStandardCompressor added in v0.2.3

type ZStandardCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*ZStandardCompressor) Compress added in v0.2.3

func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*ZStandardCompressor) Decompress added in v0.2.3

func (cp *ZStandardCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*ZStandardCompressor) Suffix added in v0.2.3

func (cp *ZStandardCompressor) Suffix() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL