pipescript

package module
v0.0.0-...-5e0ed37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 14 Imported by: 1

README

ReportCard GoDoc

PipeScript

Pipescript can be used as a standalone executable (pipes) for quick data filtering and analysis. It can also be plugged into databases as a query and filtering language (as is done in heedy's timeseries plugin).

The Issue

SQL is the standard query language for databases. Unfortunately, SQL is not designed for usage with large time series, where the dataset might be enormous, and several disparate series need to be combined into datasets for use in ML applications.

First off, when operating on time series, we can make many assumptions, such as timestamps being ordered. This allows us to implement many operations on data as streaming transformations, rather than large aggregations. PipeScript can perform analysis on enormous files without much memory use, as all calculations are done (almost) locally.

Second, the natural form for time series is with each datapoint from separate sensors/data streams to have its own timestamp. Your phone reports usage data independently of your laptop. Nevertheless, the real power of our datasets comes when we can combine many different streams, all with differing time stamps into a coherent (and tabular) whole. This requires more manipulation of the data than a simple JOIN, and can be done very efficiently on streaming ordered time series.

PipeScript is a very basic transform language and interpolation/dataset generation machinery which works entirely on streaming (arbitrarily sized) data. It is the main query engine used for heedy timeseries.

Tutorial

A tutorial will be available with heedy documentation once it is released.

Standalone

Compiling
go get github.com/heedy/pipescript
go test github.com/heedy/pipescript/...
go install github.com/heedy/pipescript/pipes
Usage

The standalone can be used to run PipeScript queries on datapoint, json, and csv formatted data. The following will count the number of datapoints in a csv file:

pipes run -i myfile.csv -ifmt csv --notimestamp "count"
SQL

If you want to perform SQL queries on files, please look at q. Unfortunately, advanced SQL operations require keeping the entire dataset in memory - something PipeScript tries to avoid.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TransformRegistry is the map of all the transforms that are currently registered.
	// Do not manually add/remove elements from this map.
	// Use Transform.Register to insert new transforms.
	TransformRegistry = map[string]*Transform{}

	// RegistryLock enables adding/deleting transforms during runtime. It is exported, since some
	// applications (heedy) might want to print out the registry
	RegistryLock = &sync.RWMutex{}
)
View Source
var AddTransform = &Transform{
	Name:        "add",
	Description: "add",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 + f2
		}
		return out, err
	}),
}
View Source
var AndTransform = &Transform{
	Name:        "and",
	Description: "logical and",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to and against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to and against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Bool()
		if err == nil {
			var f2 bool
			f2, err = args[1].Bool()

			out.Data = f1 && f2
		}
		return out, err
	}),
}
View Source
var DivTransform = &Transform{
	Name:        "sub",
	Description: "divides datapoint by arg",

	Args: []TransformArg{
		TransformArg{
			Description: "denominator",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 / f2
		}
		return out, err
	}),
}
View Source
var Dt = &Transform{
	Name:          "dt",
	Description:   "Gives access to the datapoint's duration",
	Documentation: string(resources.MustAsset("docs/transforms/dt.md")),
	Constructor: func(transform *Transform, consts []interface{}, pipes []*Pipe) (TransformIterator, error) {
		return dtIter{}, nil
	},
}
View Source
var EqTransform = &Transform{
	Name:        "eq",
	Description: "returns true if the data of the incoming stream is not equal to the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		out.Data = Equal(args[0].Data, args[1].Data)
		return out, nil
	}),
}
View Source
var ErrBeforeStart = errors.New("There is no value for this query yet")
View Source
var GtTransform = &Transform{
	Name:        "gt",
	Description: "returns true if the data of the incoming stream is greater than the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 > f2
		}
		return out, err
	}),
}
View Source
var GteTransform = &Transform{
	Name:        "gte",
	Description: "returns true if the data of the incoming stream is greater than or equal to  the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 >= f2
		}
		return out, err
	}),
}
View Source
var Identity = &Transform{
	Name:          "d",
	Description:   `Represents the data portion of the current datapoint. It is the identity transform.`,
	Documentation: string(resources.MustAsset("docs/transforms/d.md")),
	Args: []TransformArg{
		TransformArg{
			Description: "If it is given an integer, peeks to the relative index in array. If string, tries to get the object key",
			Type:        ConstArgType,
			Optional:    true,
			Default:     MustPipe(NewConstTransform(0), nil),
			Schema: map[string]interface{}{
				"oneOf": []interface{}{
					map[string]interface{}{
						"type": "string",
					},
					map[string]interface{}{
						"type": "integer",
					},
				},
			},
		},
	},
	Constructor: func(transform *Transform, consts []interface{}, pipes []*Pipe) (TransformIterator, error) {
		idx, ok := Int(consts[0])
		if ok {
			return peekIterator{int(idx)}, nil
		}

		_, ok = consts[0].(string)
		if !ok {
			return nil, errors.New("Argument of invalid type")
		}
		return &Basic{
			ConstArgs: consts,
			PipeArgs:  pipes,
			Args:      nil,
			f:         basicGetElement,
		}, nil
	},
}
View Source
var LtTransform = &Transform{
	Name:        "lt",
	Description: "returns true if the data of the incoming stream is less than the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 < f2
		}
		return out, err
	}),
}
View Source
var LteTransform = &Transform{
	Name:        "lte",
	Description: "returns true if the data of the incoming stream is less than or equal to the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 <= f2
		}
		return out, err
	}),
}
View Source
var ModTransform = &Transform{
	Name:        "mod",
	Description: "modulo",

	Args: []TransformArg{
		TransformArg{
			Description: "mod by this",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Int()
		if err == nil {
			var f2 int64
			f2, err = args[1].Int()

			out.Data = f1 % f2
		}
		return out, err
	}),
}
View Source
var MulTransform = &Transform{
	Name:        "sub",
	Description: "multiplies datapoint by arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to multiply",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 * f2
		}
		return out, err
	}),
}
View Source
var NeTransform = &Transform{
	Name:        "ne",
	Description: "returns true if the data of the incoming stream is not equal to the value of the first arg",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to check against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		out.Data = !Equal(args[0].Data, args[1].Data)
		return out, nil
	}),
}
View Source
var NegTransform = &Transform{
	Name:        "neg",
	Description: "Negation of numbers",

	Constructor: NewBasic(nil, func(dp *Datapoint, args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		b, ok := dp.Data.(bool)
		if ok {
			out.Data = !b
			return out, nil
		}
		f, err := dp.Float()
		out.Data = -f
		return out, err
	}),
}
View Source
var NotTransform = &Transform{
	Name:        "not",
	Description: "Boolean not",

	Constructor: NewBasic(nil, func(dp *Datapoint, args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		b, err := dp.Bool()
		out.Data = !b
		return out, err
	}),
}
View Source
var OrTransform = &Transform{
	Name:        "or",
	Description: "logical or",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to or against data",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to and against data",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Bool()
		if err == nil {
			var f2 bool
			f2, err = args[1].Bool()

			out.Data = f1 || f2
		}
		return out, err
	}),
}
View Source
var PowTransform = &Transform{
	Name:        "pow",
	Description: "sets datapoint to arg's power",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to multiply",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = math.Pow(f1, f2)
		}
		return out, err
	}),
}
View Source
var SubTransform = &Transform{
	Name:        "sub",
	Description: "subtracts first argument from the datapoint",

	Args: []TransformArg{
		TransformArg{
			Description: "Value to subtract from the datapoint",
			Type:        TransformArgType,
		},
		TransformArg{
			Description: "Value to add to the datapoint",
			Type:        TransformArgType,
		},
	},
	Constructor: NewArgBasic(func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error) {
		f1, err := args[0].Float()
		if err == nil {
			var f2 float64
			f2, err = args[1].Float()

			out.Data = f1 - f2
		}
		return out, err
	}),
}
View Source
var T = &Transform{
	Name:          "t",
	Description:   "Gives access to the datapoint's timestamp",
	Documentation: string(resources.MustAsset("docs/transforms/t.md")),
	Constructor: func(transform *Transform, consts []interface{}, pipes []*Pipe) (TransformIterator, error) {
		return tIter{}, nil
	},
}

Functions

func Bool

func Bool(v interface{}) (bool, bool)

func Equal

func Equal(o1, o2 interface{}) bool

func Float

func Float(v interface{}) (float64, bool)

Float takes an interface that was marshalled with the json package and returns it as a float, or gives an error

func FloatNoBool

func FloatNoBool(v interface{}) (float64, bool)

func Int

func Int(v interface{}) (int64, bool)

Int takes an interface that was unmarshalled with the json package, and returns it as an int, or returns false

func IntNoBool

func IntNoBool(v interface{}) (int64, bool)

func String

func String(v interface{}) (string, bool)

String extracts the data as a string, or returns an error

func ToString

func ToString(v interface{}) string

func Unregister

func Unregister(name string)

Unregister removes the given named transform from the registry

Types

type Aggregator

type Aggregator struct {
	ConstArgs []interface{}
	PipeArgs  []*Pipe
	// contains filtered or unexported fields
}

func (*Aggregator) Next

func (a *Aggregator) Next(e *TransformEnv, out *Datapoint) (*Datapoint, error)

func (*Aggregator) OneToOne

func (a *Aggregator) OneToOne() bool

type AggregatorFunc

type AggregatorFunc func(e *TransformEnv, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error)

type ArgBasic

type ArgBasic struct {
	ConstArgs []interface{}
	PipeArgs  []*Pipe
	Args      []*Datapoint
	// contains filtered or unexported fields
}

func (*ArgBasic) GetConst

func (s *ArgBasic) GetConst(args []interface{}) (interface{}, error)

func (*ArgBasic) Next

func (s *ArgBasic) Next(e *TransformEnv, out *Datapoint) (*Datapoint, error)

func (*ArgBasic) OneToOne

func (s *ArgBasic) OneToOne() bool

type ArgBasicFunc

type ArgBasicFunc func(args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error)

type ArgType

type ArgType int

ArgType gives the types of args that a trasnform can accept

const (
	ConstArgType ArgType = iota
	TransformArgType
	OneToOnePipeArgType
	PipeArgType
)

func (ArgType) MarshalJSON

func (a ArgType) MarshalJSON() ([]byte, error)

func (ArgType) String

func (a ArgType) String() string

func (*ArgType) UnmarshalJSON

func (a *ArgType) UnmarshalJSON(b []byte) error

type Basic

type Basic struct {
	ConstArgs []interface{}
	PipeArgs  []*Pipe
	Args      []*Datapoint
	// contains filtered or unexported fields
}

func (*Basic) GetConst

func (s *Basic) GetConst(dpi interface{}, args []interface{}) (interface{}, error)

func (*Basic) Next

func (s *Basic) Next(e *TransformEnv, out *Datapoint) (*Datapoint, error)

func (*Basic) OneToOne

func (s *Basic) OneToOne() bool

type BasicFunc

type BasicFunc func(dp *Datapoint, args []*Datapoint, consts []interface{}, pipes []*Pipe, out *Datapoint) (*Datapoint, error)

type BasicInit

type BasicInit func(consts []interface{}, pipes []*Pipe) ([]interface{}, []*Pipe, error)

type Buffer

type Buffer struct {
	Input Iterator

	Pages *list.List
	// The last page, after which there are no more datapoints
	// Initially set to max int64
	EndPage uint64
	// An error that was encountered
	Error error

	Iterators []*BufferIterator
}

func NewBuffer

func NewBuffer(n Iterator) *Buffer

func (*Buffer) Iterator

func (b *Buffer) Iterator() *BufferIterator

type BufferIterator

type BufferIterator struct {
	Buf  *Buffer
	Elem *list.Element
	J    int
}

func (*BufferIterator) Close

func (i *BufferIterator) Close()

func (*BufferIterator) Next

func (i *BufferIterator) Next() (*Datapoint, error)

func (*BufferIterator) Peek

func (i *BufferIterator) Peek(idx int) (*Datapoint, error)

type ChanResult

type ChanResult struct {
	DP  *Datapoint
	Err error
}

type ChannelPipe

type ChannelPipe struct {
	Sender   chan *Datapoint
	Receiver chan ChanResult
}

func NewChannelPipe

func NewChannelPipe(p *Pipe) *ChannelPipe

func (*ChannelPipe) Close

func (cp *ChannelPipe) Close()

type ConstIterator

type ConstIterator struct {
	Value interface{}
}

func (*ConstIterator) Next

func (c *ConstIterator) Next(e *TransformEnv, out *Datapoint) (*Datapoint, error)

func (*ConstIterator) OneToOne

func (c *ConstIterator) OneToOne() bool

type Datapoint

type Datapoint struct {
	Timestamp float64     `json:"t"`
	Duration  float64     `json:"dt"`
	Data      interface{} `json:"d"`
}

func (*Datapoint) Bool

func (dp *Datapoint) Bool() (bool, error)

func (*Datapoint) Float

func (dp *Datapoint) Float() (float64, error)

func (*Datapoint) Int

func (dp *Datapoint) Int() (int64, error)

func (*Datapoint) MapElement

func (dp *Datapoint) MapElement(k string) (interface{}, error)

func (*Datapoint) String

func (dp *Datapoint) String() (string, error)

func (*Datapoint) Time

func (dp *Datapoint) Time() time.Time

Time returns the unix time object

func (*Datapoint) ToString

func (dp *Datapoint) ToString() string

type DatapointArrayIterator

type DatapointArrayIterator struct {
	Datapoints []Datapoint
	// contains filtered or unexported fields
}

func NewDatapointArrayIterator

func NewDatapointArrayIterator(dpa []Datapoint) *DatapointArrayIterator

func (*DatapointArrayIterator) Next

func (d *DatapointArrayIterator) Next(out *Datapoint) (*Datapoint, error)

type EmptyIterator

type EmptyIterator struct{}

func (EmptyIterator) Next

func (e EmptyIterator) Next(dp *Datapoint) (*Datapoint, error)

type Iterator

type Iterator interface {
	Next(*Datapoint) (*Datapoint, error)
}

type IteratorFromBI

type IteratorFromBI struct {
	BI *BufferIterator
}

func (IteratorFromBI) Next

func (it IteratorFromBI) Next(out *Datapoint) (*Datapoint, error)

type Pipe

type Pipe struct {
	Arr []*PipeElement
}

func MustParse

func MustParse(script string) *Pipe

func MustPipe

func MustPipe(t *Transform, args []*Pipe) *Pipe

func NewElementPipe

func NewElementPipe(t *Transform, args []*Pipe) (*Pipe, error)

func NewPipe

func NewPipe() *Pipe

func NewTransformPipe

func NewTransformPipe(tname string, args []*Pipe) (*Pipe, error)

func Parse

func Parse(script string) (*Pipe, error)

Parse parses the given transform, and returns the corresponding pipe

func (*Pipe) Append

func (p *Pipe) Append(e *PipeElement)

Append returns a pipe with the given PipeElement added

func (*Pipe) Copy

func (p *Pipe) Copy() *Pipe

func (*Pipe) GetConst

func (p *Pipe) GetConst() (interface{}, error)

func (Pipe) Input

func (p Pipe) Input(b *Buffer)

Input sets the pipe's input to the given buffer

func (*Pipe) InputIterator

func (p *Pipe) InputIterator(n Iterator)

InputNexter uses an iterator interface as input

func (*Pipe) IsBasic

func (p *Pipe) IsBasic() bool

func (*Pipe) Join

func (p *Pipe) Join(p2 *Pipe)

func (*Pipe) Last

func (p *Pipe) Last(dp *Datapoint) (*Datapoint, error)

func (*Pipe) MarshalJSON

func (p *Pipe) MarshalJSON() ([]byte, error)

func (*Pipe) Next

func (p *Pipe) Next(dp *Datapoint) (*Datapoint, error)

func (*Pipe) OneToOne

func (p *Pipe) OneToOne() bool

func (*Pipe) Simplify

func (p *Pipe) Simplify() *Pipe

func (*Pipe) String

func (p *Pipe) String() string

func (*Pipe) UnmarshalJSON

func (p *Pipe) UnmarshalJSON(b []byte) error

type PipeElement

type PipeElement struct {
	Env  *TransformEnv
	Args []*Pipe

	Iter TransformIterator

	// These three are here to allow copying a pipe by re-generating it from scratch
	ConstArgs []interface{}
	PipeArgs  []*Pipe
	Transform *Transform
}

func NewPipeElement

func NewPipeElement(t *Transform, args []*Pipe) (*PipeElement, error)

func (*PipeElement) Copy

func (pe *PipeElement) Copy() *PipeElement

func (*PipeElement) GetConst

func (pe *PipeElement) GetConst(in interface{}) (interface{}, error)

func (*PipeElement) Input

func (pe *PipeElement) Input(b *Buffer)

func (*PipeElement) IsBasic

func (pe *PipeElement) IsBasic() bool

func (*PipeElement) Next

func (pe *PipeElement) Next(out *Datapoint) (*Datapoint, error)

func (*PipeElement) String

func (pe *PipeElement) String() string

type TestCase

type TestCase struct {
	Pipescript  string      // The script to run
	Input       []Datapoint // The input datapoints
	Parsed      string      // The output of the parsed pipe.String(). "" if there is to be an error
	OutputError bool        // Whether there is to be an error during output
	Output      []Datapoint // The output Datapoints
}

func (TestCase) Run

func (tc TestCase) Run(t *testing.T)

type Transform

type Transform struct {
	Name          string                 `json:"name"`          // The name of the transform
	Description   string                 `json:"description"`   // A single line description of the transform
	Documentation string                 `json:"documentation"` // Full markdown documentation of the transform
	InputSchema   map[string]interface{} `json:"ischema"`       // The schema of the input datapoint that the given transform expects (optional)
	OutputSchema  map[string]interface{} `json:"oschema"`       // The schema of the output data that this transform gives (optional).
	Args          []TransformArg         `json:"args"`          // The arguments that the transform accepts

	Constructor TransformConstructor `json:"-"` // The function that constructs a transform
}

func NewConstTransform

func NewConstTransform(v interface{}) *Transform

func NewObjectTransform

func NewObjectTransform(obj map[string]*Pipe) *Transform

func (*Transform) Register

func (t *Transform) Register() error

Register registers the transform with the system.

type TransformArg

type TransformArg struct {
	Description string                 `json:"description"`       // A description of what the arg represents
	Schema      map[string]interface{} `json:"schema"`            // The schema that the arg conforms to
	Optional    bool                   `json:"optional"`          // Whether the arg is optional
	Default     *Pipe                  `json:"default,omitempty"` // If the arg is optional, what is its default value
	Type        ArgType                `json:"arg_type"`          // The type expected of the arg
}

type TransformConstructor

type TransformConstructor func(transform *Transform, consts []interface{}, pipes []*Pipe) (TransformIterator, error)

func NewArgBasic

func NewArgBasic(f ArgBasicFunc) TransformConstructor

type TransformEnv

type TransformEnv struct {
	Iter     *BufferIterator
	ArgIters []*BufferIterator
}

func (*TransformEnv) Next

func (te *TransformEnv) Next(args []*Datapoint) (*Datapoint, []*Datapoint, error)

func (*TransformEnv) Peek

func (te *TransformEnv) Peek(idx int, args []*Datapoint) (*Datapoint, []*Datapoint, error)

type TransformIterator

type TransformIterator interface {
	Next(*TransformEnv, *Datapoint) (*Datapoint, error)
	OneToOne() bool
}

Directories

Path Synopsis
* Copyright (c) 2015 The heedy Contributors (see AUTHORS) Licensed under the MIT license.
* Copyright (c) 2015 The heedy Contributors (see AUTHORS) Licensed under the MIT license.
Package transforms imports all of the transforms that are available with PipeScript.
Package transforms imports all of the transforms that are available with PipeScript.
core
Package core contains the core basic transforms which are available in PipeScript.
Package core contains the core basic transforms which are available in PipeScript.
datetime
Package datetime offers transforms useful for processing timestamps
Package datetime offers transforms useful for processing timestamps
numeric
Package numeric contains basic statistical transforms (such as mean)
Package numeric contains basic statistical transforms (such as mean)
strings
Package strings implements basic text processing transforms
Package strings implements basic text processing transforms

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL