distil

package module
v4.15.4+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2018 License: GPL-3.0 Imports: 10 Imported by: 0

README

distil

An implementation of DISTIL in golang

Documentation

Index

Constants

View Source
const MaxVersionSet = 100

This is the maximum number of versions that will be processed at once

Variables

This section is empty.

Functions

This section is empty.

Types

type DISTIL

type DISTIL struct {
	// contains filtered or unexported fields
}

DISTIL is a handle to the distil engine, including it's connections to BTrDB

func NewDISTIL

func NewDISTIL() *DISTIL

NewDISTIL creates a DISTIL handle by connecting to the given BTrDB and MongoDB addresses

func (*DISTIL) BTrDBConn

func (d *DISTIL) BTrDBConn() *btrdb.BTrDB

func (*DISTIL) MakeOrGetByPath

func (ds *DISTIL) MakeOrGetByPath(path string, unit string) *Stream

This is the same as StreamFromPath, but if the path does not exist, it will create the stream NOTE: This function should NOT be called concurrently with the same path.

func (*DISTIL) MakeOrGetByPaths

func (ds *DISTIL) MakeOrGetByPaths(paths []string, units []string) []*Stream

Same as MakeOrGetByPath but does multiple

func (*DISTIL) RegisterDistillate

func (ds *DISTIL) RegisterDistillate(r *Registration)

RegisterDistillate needs to be called once per instance of distillate with a populated Registration struct. Do not reuse the Registration struct, it is owned by the engine after this call.

func (*DISTIL) StartEngine

func (ds *DISTIL) StartEngine()

StartEngine begins processing distillates. It does not return

func (*DISTIL) StreamFromPath

func (ds *DISTIL) StreamFromPath(path string) *Stream

Obtain a stream based on a path

func (*DISTIL) StreamFromUUID

func (ds *DISTIL) StreamFromUUID(id uuid.UUID) *Stream

Obtain a Stream given a UUID

func (*DISTIL) StreamsFromPaths

func (ds *DISTIL) StreamsFromPaths(paths []string) ([]*Stream, error)

Obtain multiple streams based on paths

func (*DISTIL) StreamsFromUUIDs

func (ds *DISTIL) StreamsFromUUIDs(ids []uuid.UUID) []*Stream

Obtain a slice of streams corresponding to the given UUIDs

type Distillate

type Distillate interface {
	// Get the version of the algorithm implemented by this distillate.
	// Future revisions of the DISTIL engine will allow for recalculation
	// of the entire stream when the Version() changes.
	Version() int
	// Get the number of nanoseconds that should be loaded ahead of the
	// time range that has changed, available as negative sample indices
	// in Process()
	LeadNanos() int64
	// Get the Rebase stage for this distillate
	Rebase() Rebaser
	// This is used by the Engine to configure a handle for your
	// distillate to access DISTIL. The handle can be used any time
	// after registration with the DS field:
	//
	//    var handle *distil.DISTIL = myDistillate.DS
	//
	SetEngine(ds *DISTIL)
	// This is called once per changed range to compute the output data
	// corresponding to the changes in the input data
	Process(*InputSet, *OutputSet)
}

Distillate is the interface that all algorithms must implement in order for them to be registerable with the DISTIL engine. DistillateTools can be used to obtain default implementations

type DistillateTools

type DistillateTools struct {
	DS *DISTIL
}

DistillateTools is intended to provide default implementations of the Distillate interface, and is included in a Distillate like so:

type MyDistillate struct {
  distil.DistillateTools
  //your additional fields
}

func (*DistillateTools) LeadNanos

func (dd *DistillateTools) LeadNanos() int64

Default LeadNanos() that returns 0

func (*DistillateTools) Rebase

func (dd *DistillateTools) Rebase() Rebaser

Default Rebase() which does not modify the data

func (*DistillateTools) SetEngine

func (dd *DistillateTools) SetEngine(ds *DISTIL)

func (*DistillateTools) Version

func (dd *DistillateTools) Version() int

Default Version() that returns 1

type InputSet

type InputSet struct {
	// contains filtered or unexported fields
}

An InputSet is passed to the Process method of a distillate, it contains preloaded data for the changed time range (plus any lead time).

func (*InputSet) Get

func (is *InputSet) Get(stream int, sample int) Point

Get a data point from the InputSet, stream is an index into the InputPaths declared in the registration, sample is the point you wish to get. Sample 0 is the first sample in the changed range. Negative indices are lead samples (see LeadNanos) that can be used for context.

func (*InputSet) GetRange

func (is *InputSet) GetRange() TimeRange

Get the time range that has changed that is being processed. This does not include lead time

func (*InputSet) NumLeadSamples

func (is *InputSet) NumLeadSamples(stream int) int

Get the number of negative samples (lead samples) in the given stream

func (*InputSet) NumSamples

func (is *InputSet) NumSamples(stream int) int

Get the number of positive samples in the given stream

type IterationResult

type IterationResult int
const Abort IterationResult = 3
const NoData IterationResult = 2
const ProcessedData IterationResult = 1

type OutputSet

type OutputSet struct {
	// contains filtered or unexported fields
}

OutputSet is a handle onto the output streams and is used for writing back data from processing

func (*OutputSet) Add

func (oss *OutputSet) Add(stream int, time int64, val float64)

A utility function, this constructs a point and calls AddPoint

func (*OutputSet) AddPoint

func (oss *OutputSet) AddPoint(stream int, p Point)

Add a point to the given stream

func (*OutputSet) SetRange

func (oss *OutputSet) SetRange(r TimeRange)

Set the time range that this OutputSet is responsible for. This must be done before any points are added, and any points outside this range will be discarded. Any points that existed in the stream before the current Process that lie within this range will be deleted and replaced by the data in the current output set

type Point

type Point struct {
	// Time since the Unix epoch in nanoseconds
	T int64
	// Value
	V float64
}

A Point is the primitive telemetry data type

type Rebaser

type Rebaser interface {
	Process(start, end int64, input chan btrdb.RawPoint) chan btrdb.RawPoint
}

This specifies an input data preprocessor. It may do anything, but is typically used for rebasing input streams (removing duplicates and) padding missing values

func RebasePadSnap

func RebasePadSnap(freq int64) Rebaser

func RebasePassthrough

func RebasePassthrough() Rebaser

Return a rebaser that does not modify input data

type Registration

type Registration struct {
	Instance    Distillate
	UniqueName  string
	InputPaths  []string
	OutputPaths []string
	OutputUnits []string
}

Registration is a handle to a specific instance of a distillate, along with the information required to prepare it it

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a handle on a specific stream and can be used for querying and inserting data on it. You should not need to use this directly

func (*Stream) ChangesBetween

func (s *Stream) ChangesBetween(oldversion uint64, newversion uint64) []TimeRange

Obtain the changed ranges between the two versions

func (*Stream) CurrentVersion

func (s *Stream) CurrentVersion() uint64

Get the current version of the stream

func (*Stream) EraseRange

func (s *Stream) EraseRange(r TimeRange)

Erase everything in the stream that falls inside the given time range

func (*Stream) Exists

func (s *Stream) Exists() bool

func (*Stream) GetPoints

func (s *Stream) GetPoints(r TimeRange, rebase Rebaser, version uint64) []Point

Get points from the stream, applying the given rebase

func (*Stream) SetTagVersion

func (s *Stream) SetTagVersion(uniqueName string, version uint64)

Set the last version of the stream that uniqueName processed

func (*Stream) TagVersion

func (s *Stream) TagVersion(uniqueName string) uint64

Get the last version of the stream that uniqueName processed

func (*Stream) WritePoints

func (s *Stream) WritePoints(pts []Point)

Write the given points to the stream

type TimeRange

type TimeRange struct {
	Start int64
	End   int64
}

This represents a range of time from Start (inclusive) to End (exclusive)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL