archiver

package
v0.0.0-...-87d254f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2016 License: GPL-3.0 Imports: 31 Imported by: 6

Documentation

Overview

Package giles implements an archiver that follows the sMAP protocol

Overview

Part of the motivation for the creation of Giles was to emphasize the distinction between sMAP the software (originally written in Python) and sMAP the profile. The Giles archiver is an implementation of the latter, and is intended to be fully compatible with existing sMAP tools.

One of the "innovations" that Giles brings to the sMAP ecosystem is the notion that what is typically thought of as the sMAP "archiver" is really a collection of components: the message bus/frontend, the timeseries store, the metadata store, and the query language. All of these are closely linked, of course, but treating them as separate entities means that we can use different timeseries or metadata databases or even different implementations of the query language (perhaps over Apache Spark/Mlib?)

These functions define aggregation functions to be used inside other operators

Index

Constants

View Source
const (
	COALESCE_TIMEOUT = 1000  // milliseconds
	COALESCE_MAX     = 16384 // num readings
)
View Source
const (
	SELECT_TYPE queryType = iota
	DELETE_TYPE
	SET_TYPE
	DATA_TYPE
	APPLY_TYPE
)
View Source
const (
	IN_TYPE dataqueryType = iota
	BEFORE_TYPE
	AFTER_TYPE
)
View Source
const AFTER = 57354
View Source
const ALL = 57364
View Source
const AND = 57368
View Source
const APPLY = 57350
View Source
const AS = 57367
View Source
const BEFORE = 57353
View Source
const COMMA = 57363
View Source
const DATA = 57352
View Source
const DELETE = 57348
View Source
const DISTINCT = 57347
View Source
const EQ = 57361
View Source
const HAS = 57370
View Source
const IN = 57372
View Source
const LBRACK = 57376
View Source
const LEFTPIPE = 57365
View Source
const LIKE = 57366
View Source
const LIMIT = 57355
View Source
const LPAREN = 57374
View Source
const LVALUE = 57358
View Source
const NEQ = 57362
View Source
const NEWLINE = 57380
View Source
const NOT = 57371
View Source
const NOW = 57357
View Source
const NUMBER = 57378
View Source
const OPERATOR = 57360
View Source
const OR = 57369
View Source
const QSTRING = 57359
View Source
const RBRACK = 57377
View Source
const RPAREN = 57375
View Source
const SELECT = 57346
View Source
const SEMICOLON = 57379
View Source
const SET = 57349
View Source
const SQEofCode = 1
View Source
const SQErrCode = 2
View Source
const SQFlag = -1000
View Source
const SQLast = 159
View Source
const SQMaxDepth = 200
View Source
const SQNprod = 65
View Source
const SQPrivate = 57344
View Source
const STREAMLIMIT = 57356
View Source
const TIMEUNIT = 57381
View Source
const TO = 57373
View Source
const WHERE = 57351

Variables

View Source
var OpLookup map[string]OperationType
View Source
var SQAct = []int{}/* 159 elements not displayed */
View Source
var SQChk = []int{}/* 136 elements not displayed */
View Source
var SQDebug = 0
View Source
var SQDef = []int{}/* 136 elements not displayed */
View Source
var SQExca = []int{
	-1, 1,
	1, -1,
	-2, 0,
}
View Source
var SQPact = []int{}/* 136 elements not displayed */
View Source
var SQPgo = []int{

	0, 15, 158, 11, 9, 4, 157, 85, 10, 71,
	12, 156, 17, 16, 155, 3, 1, 0, 8, 2,
	153,
}
View Source
var SQR1 = []int{

	0, 20, 20, 20, 20, 20, 20, 20, 20, 7,
	7, 9, 8, 8, 4, 4, 4, 4, 4, 4,
	6, 6, 6, 6, 12, 12, 12, 12, 13, 13,
	14, 14, 14, 14, 15, 15, 16, 16, 16, 16,
	17, 17, 3, 2, 2, 2, 2, 2, 2, 2,
	18, 19, 1, 1, 1, 1, 1, 10, 10, 11,
	11, 5, 5, 5, 5,
}
View Source
var SQR2 = []int{

	0, 4, 3, 4, 4, 3, 4, 3, 6, 1,
	3, 3, 1, 3, 3, 3, 3, 5, 5, 5,
	1, 1, 2, 1, 9, 7, 5, 5, 1, 2,
	2, 1, 1, 1, 2, 3, 0, 2, 2, 4,
	0, 2, 2, 3, 3, 3, 3, 2, 3, 4,
	1, 1, 3, 3, 2, 3, 1, 1, 3, 3,
	4, 3, 3, 5, 5,
}
View Source
var SQStatenames = []string{}
View Source
var SQStates []string
View Source
var SQTok1 = []int{

	1,
}
View Source
var SQTok2 = []int{

	2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
	12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
	22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
	32, 33, 34, 35, 36, 37, 38, 39,
}
View Source
var SQTok3 = []int{
	0,
}
View Source
var SQTokenNames []string
View Source
var SQToknames = []string{
	"SELECT",
	"DISTINCT",
	"DELETE",
	"SET",
	"APPLY",
	"WHERE",
	"DATA",
	"BEFORE",
	"AFTER",
	"LIMIT",
	"STREAMLIMIT",
	"NOW",
	"LVALUE",
	"QSTRING",
	"OPERATOR",
	"EQ",
	"NEQ",
	"COMMA",
	"ALL",
	"LEFTPIPE",
	"LIKE",
	"AS",
	"AND",
	"OR",
	"HAS",
	"NOT",
	"IN",
	"TO",
	"LPAREN",
	"RPAREN",
	"LBRACK",
	"RBRACK",
	"NUMBER",
	"SEMICOLON",
	"NEWLINE",
	"TIMEUNIT",
}

Functions

func PrintConfig

func PrintConfig(c *Config)

func SQParse

func SQParse(SQlex SQLexer) int

func SQStatname

func SQStatname(s int) string

func SQTokname

func SQTokname(c int) string

func SQlex1

func SQlex1(lex SQLexer, lval *SQSymType) int

Types

type APIKeyManager

type APIKeyManager interface {

	// Returns True if the given api key exists
	ApiKeyExists(apikey string) (bool, error)

	// Creates a new key with the given name registered to the given email. The public argument
	// maps to the public attribute of the key. Returns the key
	NewKey(name, email string, public bool) (string, error)

	// Retrieves the key with the given name registered to the given email
	GetKey(name, email string) (string, error)

	// Lists all keys registered to the given email. Returns a list of k/v pairs for each
	// found key, giving us name, public, etc
	ListKeys(email string) ([]map[string]interface{}, error)

	// Deletes the key with the given name registered to the given email. Returns the key as well
	DeleteKeyByName(name, email string) (string, error)

	// Deletes the key with the given value.
	DeleteKeyByValue(key string) (string, error)

	// Retrieves the owner information for the given key
	Owner(key string) (map[string]interface{}, error)
}

type Archiver

type Archiver struct {
	// contains filtered or unexported fields
}

This is the central object for the archiver process and contains most of the requisite logic for the core features of the archiver. One of the focuses of Giles is to facilitate adapting the sMAP protocol to different interfaces; the handlers packages (HTTP, WS, etc) provide handler functions that in turn call the archiver's core functions. Most of these core functions use easily usable data formats (such as bson.M), so the handler functions just have to deal with translating data formats

For now, because the metadata interface was designed with a MongoDB backend in mind, most of the in-transit data types for dealing with metadata use the MongoDB interface defined by http://godoc.org/gopkg.in/mgo.v2/bson and http://godoc.org/gopkg.in/mgo.v2. I suggest taking a quick look though their documentation and how they talk to Mongo to get a feel for what the incoming/outgoing data is going to look like.

func NewArchiver

func NewArchiver(c *Config) (a *Archiver)

Creates a new Archiver instance:

func (*Archiver) AddData

func (a *Archiver) AddData(readings map[string]*SmapMessage, apikey string) error

Takes a map of string/SmapMessage (path, sMAP JSON object) and commits them to the underlying databases. First, checks that write permission is granted with the accompanied apikey (generated with the gilescmd CLI tool), then saves the metadata, pushes the readings out to any concerned republish clients, and commits the reading to the timeseries database. Returns an error, which is nil if all went well

func (*Archiver) GetData

func (a *Archiver) GetData(streamids []string, start, end uint64, query_uot, to_uot UnitOfTime) (interface{}, error)

For each of the streamids, fetches all data between start and end (where start < end). The units for start/end are given by query_uot. We give the units so that each time series database can convert the incoming timestamps to whatever it needs (most of these will query the metadata store for the unit of time for the data stream it is accessing)

func (*Archiver) GetTags

func (a *Archiver) GetTags(select_tags, where_tags bson.M) ([]interface{}, error)

For all streams that match the provided where clause in where_tags, returns the values of the requested tags. where_tags is a bson.M object that follows the same syntax as a MongoDB query. select_tags is a map[string]int corresponding to which tags we wish returned. A value of 1 means the tag will be returned (and ignores all other tags), and a value of 0 means the tag will NOT be returned (and all other tags will be).

func (*Archiver) GetUUIDs

func (a *Archiver) GetUUIDs(where_tags bson.M) ([]string, error)

Returns a list of UUIDs for all streams that match the provided 'where' clause. where_tags is a bson.M object that follows the same syntax as a MongoDB query. This query is executed against the underlying metadata store. As we move into supporting multiple possible metadata storage solutions, this interface may change.

func (*Archiver) HandleMetadataSubscriber

func (a *Archiver) HandleMetadataSubscriber(s Subscriber, query, apikey string)

func (*Archiver) HandleQuery

func (a *Archiver) HandleQuery(querystring, apikey string) (interface{}, error)

Takes the body of the query and the apikey that accompanies the query. First parses the string query into an intermediary form (the abstract syntax tree as the AST type). Depending on the action, it will check to see if the provided API key grants sufficient permission to return the results. If so, returns those results as []byte (marshaled JSON). Most of this method is just switch statements dependent on different components of the generated AST. Any actual computation is done as calls to the Archiver API, so if you want to use your own query language or handle queries in some external handler, then you shouldn't need to use any of this method; just use the Archiver API

func (*Archiver) HandleQuerySubscriber

func (a *Archiver) HandleQuerySubscriber(s Subscriber, query, apikey string)

func (*Archiver) HandleSubscriber

func (a *Archiver) HandleSubscriber(s Subscriber, query, apikey string)

For all streams that match the WHERE clause in the provided query string, will push all subsequent incoming information (data and tags) on those streams to the client associated with the provided http.ResponseWriter.

func (*Archiver) HandleSubscriber2

func (a *Archiver) HandleSubscriber2(s Subscriber, query, apikey string)

func (*Archiver) HandleUUIDSubscriber

func (a *Archiver) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)

func (*Archiver) NextData

func (a *Archiver) NextData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)

For each of the streamids, fetches data after the start time. If limit is < 0, fetches all data. If limit >= 0, fetches only that number of points. See Archiver.GetData for explanation of query_uot

func (*Archiver) PrevData

func (a *Archiver) PrevData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)

For each of the streamids, fetches data before the start time. If limit is < 0, fetches all data. If limit >= 0, fetches only that number of points. See Archiver.GetData for explanation of query_uot

func (*Archiver) PrintStatus

func (a *Archiver) PrintStatus()

func (*Archiver) Query2

func (a *Archiver) Query2(querystring string, apikey string, w io.Writer) error

func (*Archiver) SetTags

func (a *Archiver) SetTags(update_tags, where_tags map[string]interface{}, apikey string) (int, error)

For all streams that match the provided where clause in where_tags, sets the key-value pairs specified in update_tags.

func (*Archiver) StreamingQuery

func (a *Archiver) StreamingQuery(querystring, apikey string, sendback Subscriber) error

func (*Archiver) TagsUUID

func (a *Archiver) TagsUUID(uuid string) (bson.M, error)

Returns all tags for the stream with the provided UUID

type Cache

type Cache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCache

func NewCache(size uint32) *Cache

func (*Cache) Get

func (c *Cache) Get(key string) (interface{}, bool)

func (*Cache) Set

func (c *Cache) Set(key string, value interface{})

type ChunkedStreamingDataNode

type ChunkedStreamingDataNode struct {
	// contains filtered or unexported fields
}

func (*ChunkedStreamingDataNode) Run

func (csn *ChunkedStreamingDataNode) Run(input interface{}) (interface{}, error)

type Config

type Config struct {
	Archiver struct {
		TSDB           *string
		Metadata       *string
		Objects        *string
		Keepalive      *int
		EnforceKeys    bool
		LogLevel       *string
		MaxConnections *int
	}

	ReadingDB struct {
		Port    *string
		Address *string
	}

	Quasar struct {
		Port    *string
		Address *string
	}

	Mongo struct {
		Port           *string
		Address        *string
		UpdateInterval *int
	}

	Venkman struct {
		Port    *string
		Address *string
	}

	HTTP struct {
		Enabled bool
		Port    *int
	}
	WebSockets struct {
		Enabled bool
		Port    *int
	}
	CapnProto struct {
		Enabled bool
		Port    *int
	}
	MsgPack struct {
		TcpEnabled bool
		TcpPort    *int
		UdpEnabled bool
		UdpPort    *int
	}

	SSH struct {
		Enabled            bool
		Port               *string
		PrivateKey         *string
		AuthorizedKeysFile *string
		User               *string
		Pass               *string
		PasswordEnabled    bool
		KeyAuthEnabled     bool
	}

	Profile struct {
		CpuProfile     *string
		MemProfile     *string
		BenchmarkTimer *int
		Enabled        bool
	}
}

func LoadConfig

func LoadConfig(filename string) *Config

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

func NewConnectionPool

func NewConnectionPool(newConn func() *TSDBConn, maxConnections int) *ConnectionPool

func (*ConnectionPool) Get

func (pool *ConnectionPool) Get() *TSDBConn

func (*ConnectionPool) Put

func (pool *ConnectionPool) Put(c *TSDBConn)

type CountNode

type CountNode struct {
}

* Count Node *

func (*CountNode) Run

func (cn *CountNode) Run(input interface{}) (interface{}, error)

type DataType

type DataType uint
const (
	SCALAR DataType = 1 << iota
	OBJECT
)

type Dict

type Dict map[string]interface{}

type EchoNode

type EchoNode struct {
	// contains filtered or unexported fields
}

func (*EchoNode) Run

func (en *EchoNode) Run(input interface{}) (interface{}, error)

Takes the first argument and encodes it as msgpack

type EdgeNode

type EdgeNode struct {
	// contains filtered or unexported fields
}

The Edge operator essentially takes the 1st order derivative of a stream

func (*EdgeNode) Run

func (en *EdgeNode) Run(input interface{}) (interface{}, error)

arg0: list of SmapNumbersResponse to compute max of. Must be scalars

type Element

type Element struct {
	// contains filtered or unexported fields
}

type IncomingSmapMessage

type IncomingSmapMessage struct {
	// Readings for this message
	Readings [][]json.RawMessage
	// If this struct corresponds to a sMAP collection,
	// then Contents contains a list of paths contained within
	// this collection
	Contents []string `json:",omitempty"`
	// Map of the metadata
	Metadata bson.M `json:",omitempty"`
	// Map containing the actuator reference
	Actuator bson.M `json:",omitempty"`
	// Map of the properties
	Properties bson.M `json:",omitempty"`
	// Unique identifier for this stream. Should be empty for Collections
	UUID string `json:"uuid"`
	// Path of this stream (thus far)
	Path string
}

type List

type List []string

type MaxNode

type MaxNode struct {
	// contains filtered or unexported fields
}

func (*MaxNode) Run

func (msn *MaxNode) Run(input interface{}) (interface{}, error)

arg0: list of SmapNumbersResponse to compute MIN of. Must be scalars

type MeanNode

type MeanNode struct {
}

func (*MeanNode) Run

func (mn *MeanNode) Run(input interface{}) (interface{}, error)

type MetadataStore

type MetadataStore interface {
	// if called with True (this is default), checks all API keys. For testing or
	// "sandbox" deployments, it can be helpful to call this with False, which will
	// allow ALL operations on ANY streams.
	EnforceKeys(enforce bool)

	// Returns true if the key @apikey is allowed to write to each of the
	// streams listed in @messages. Should check each SmapMessage.UUID value.
	CheckKey(apikey string, messages map[string]*SmapMessage) (bool, error)

	// Associates metadata k/v pairs with non-terminal (non-timeseries) Paths
	SavePathMetadata(messages map[string]*SmapMessage) error

	// Associates metadata k/v pairs with timeserise paths. Inherits
	// from PathMetadata before applying timeseries-specific tags
	SaveTimeseriesMetadata(messages map[string]*SmapMessage) error

	// Retrieves the tags indicated by @target for documents that match the
	// @where clause. If @is_distinct is true, then it will return a list of
	// distinct values for the tag @distinct_key
	GetTags(target bson.M, is_distinct bool, distinct_key string, where bson.M) ([]interface{}, error)

	// Normal metadata save method
	SaveTags(messages *map[string]*SmapMessage) error

	// For all documents that match the where clause @where, apply the updates
	// contained in @updates, provided that the key @apikey is valid for all of
	// them
	UpdateTags(updates bson.M, apikey string, where bson.M) (bson.M, error)

	// Removes all documents that match the where clause @where, provided that the
	// key @apikey is valid for them
	RemoveDocs(apikey string, where bson.M) (bson.M, error)

	// Unapplies all tags in the list @target for all documents that match the
	// where clause @where, after checking the API key
	RemoveTags(target bson.M, apikey string, where bson.M) (bson.M, error)

	// Returns all metadata for a given UUID
	UUIDTags(uuid string) (bson.M, error)

	// Resolve a where clause to a slice of UUIDs
	GetUUIDs(where bson.M) ([]string, error)

	// Returns the unit of time for the stream identified by the given UUID.
	GetUnitOfTime(uuid string) UnitOfTime

	// Returns the stream type for the stream identified by the given UUID
	GetStreamType(uuid string) StreamType

	// General purpose metadata Find
	Find(findclause, selectClause bson.M) (interface{}, error)

	// General purpose metadata Find
	FindDistinct(findClause bson.M, distinctKey string) (interface{}, error)
}

The metadata store should support the following operations

type MinNode

type MinNode struct {
	// contains filtered or unexported fields
}

func (*MinNode) Run

func (msn *MinNode) Run(input interface{}) (interface{}, error)

arg0: list of SmapNumbersResponse to compute MIN of. Must be scalars

type MongoObjectStore

type MongoObjectStore struct {
	// contains filtered or unexported fields
}

The object store interface into Mongo uses a collection named 'objects'. Each document in this collection contains 3 keys:

uuid: the stream identifier
object: a binary blob (byte array) of data (MsgPack encoded)
timestamp: the timestamp associated with this record IN NANOSECONDS

This is obviously a very primitive interface to the object store, and doesn't do nice things like transaction coalescence.

func NewMongoObjectStore

func NewMongoObjectStore(address *net.TCPAddr) *MongoObjectStore

func (*MongoObjectStore) AddObject

func (ms *MongoObjectStore) AddObject(msg *SmapMessage) (bool, error)

func (*MongoObjectStore) AddStore

func (ms *MongoObjectStore) AddStore(store MetadataStore)

func (*MongoObjectStore) GetObjects

func (ms *MongoObjectStore) GetObjects(uuid string, start uint64, end uint64, uot UnitOfTime) (SmapObjectResponse, error)

func (*MongoObjectStore) NextObject

func (ms *MongoObjectStore) NextObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)

func (*MongoObjectStore) PrevObject

func (ms *MongoObjectStore) PrevObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)

type MongoStore

type MongoStore struct {

	// list of timeseries we have committed. Only send upserts to mongo
	// via SaveTags if a) we have metadata or b) we don't have the uuid
	// in this lookup
	UUIDS     map[string]struct{}
	CacheLock sync.RWMutex
	// contains filtered or unexported fields
}

TODO: copy the session for a transaction -- this is faster

func NewMongoStore

func NewMongoStore(address *net.TCPAddr, interval int) *MongoStore

func (*MongoStore) ApiKeyExists

func (ms *MongoStore) ApiKeyExists(apikey string) (bool, error)

func (*MongoStore) CanWrite

func (ms *MongoStore) CanWrite(apikey, uuid string) (bool, error)

func (*MongoStore) CheckKey

func (ms *MongoStore) CheckKey(apikey string, messages map[string]*SmapMessage) (bool, error)

func (*MongoStore) DeleteKeyByName

func (ms *MongoStore) DeleteKeyByName(name, email string) (string, error)

func (*MongoStore) DeleteKeyByValue

func (ms *MongoStore) DeleteKeyByValue(key string) (string, error)

func (*MongoStore) EnforceKeys

func (ms *MongoStore) EnforceKeys(enforce bool)

func (*MongoStore) Find

func (ms *MongoStore) Find(findClause, selectClause bson.M) (interface{}, error)

func (*MongoStore) FindDistinct

func (ms *MongoStore) FindDistinct(findClause bson.M, distinctKey string) (interface{}, error)

func (*MongoStore) GetKey

func (ms *MongoStore) GetKey(name, email string) (string, error)

func (*MongoStore) GetStreamId

func (ms *MongoStore) GetStreamId(uuid string) uint32

func (*MongoStore) GetStreamType

func (ms *MongoStore) GetStreamType(uuid string) StreamType

func (*MongoStore) GetTags

func (ms *MongoStore) GetTags(target bson.M, is_distinct bool, distinct_key string, where bson.M) ([]interface{}, error)

Retrieves the tags indicated by `target` for documents that match the `where` clause. If `is_distinct` is true, then it will return a list of distinct values for the tag `distinct_key`

func (*MongoStore) GetUUIDs

func (ms *MongoStore) GetUUIDs(where bson.M) ([]string, error)

Resolve a query to a slice of UUIDs

func (*MongoStore) GetUnitOfTime

func (ms *MongoStore) GetUnitOfTime(uuid string) UnitOfTime

retrieve the unit of time for the stream identified by the given UUID. Should return one of ns, us, ms, s; defaults to ms

func (*MongoStore) ListKeys

func (ms *MongoStore) ListKeys(email string) ([]map[string]interface{}, error)

func (*MongoStore) NewKey

func (ms *MongoStore) NewKey(name, email string, public bool) (string, error)

func (*MongoStore) Owner

func (ms *MongoStore) Owner(key string) (map[string]interface{}, error)

func (*MongoStore) RemoveDocs

func (ms *MongoStore) RemoveDocs(apikey string, where bson.M) (bson.M, error)

func (*MongoStore) RemoveTags

func (ms *MongoStore) RemoveTags(updates bson.M, apikey string, where bson.M) (bson.M, error)

func (*MongoStore) SavePathMetadata

func (ms *MongoStore) SavePathMetadata(messages map[string]*SmapMessage) error

SavePathMetadata takes a map of paths to sMAP messages and saves them to the path metadata store. The Path keys can be terminal paths, e.g. the full path of a timeseries, which are identified by the linked sMAP message having a UUID (and optionally a Readings) field. Path keys can also be non-terminal paths, which do not have an associated UUID.

This method stores the metadata associated with paths, indexed by each path. The pathmetadata collection should be queried internally by SaveTimeseriesMetadata to build up the full document for each individual timeseries

func (*MongoStore) SaveTags

func (ms *MongoStore) SaveTags(messages *map[string]*SmapMessage) error

The incoming messages will be in the form of {pathname: metadata/properties/etc}. Only the timeseries will have UUIDs attached. When we receive a message like this, we need to compress all of the prefix-path kv pairs into each of the timeseries, and then save those timeseries to the metadata collection

func (*MongoStore) SaveTimeseriesMetadata

func (ms *MongoStore) SaveTimeseriesMetadata(messages map[string]*SmapMessage) error

Here, we are handed a chunk of incoming sMAP messages, which can include both non-terminal and terminal (timeseries) paths. Timeseries (terminal) paths are identified by having a UUID key in their SmapMessage struct. For each of these, we decompose the full timeseries Path into its components -- e.g. /a/b/c -> /, /a, /a/b -- and inherit from the PathMetadata collection into the metadata for this source. Timeseries-specific metadata is then upserted into this document, and the result is saved in the Metadata collection TODO: this is super slow: Find and Upsert take the most time. Avoid this method if we already have all the new metadata! Important to note: sMAP 2.0 archiver does *not* do path inheritance of metadata within the archiver. Inheritance only happens w/n a source, when multiple messages are sent

func (*MongoStore) StartUpdateCacheLoop

func (ms *MongoStore) StartUpdateCacheLoop()

func (*MongoStore) UUIDTags

func (ms *MongoStore) UUIDTags(uuid string) (bson.M, error)

Return all metadata for a certain UUID

func (*MongoStore) UpdateTags

func (ms *MongoStore) UpdateTags(updates bson.M, apikey string, where bson.M) (bson.M, error)

type NetworkNode

type NetworkNode struct {
	// contains filtered or unexported fields
}

func (*NetworkNode) Run

func (nn *NetworkNode) Run(input interface{}) (interface{}, error)

type Node

type Node struct {
	Id       string
	Tags     map[string]interface{}
	In       chan interface{}
	Done     <-chan struct{}
	Children map[string]*Node
	Op       Operator
}

func NewChunkedStreamingDataNode

func NewChunkedStreamingDataNode(done <-chan struct{}, args ...interface{}) (n *Node)

arg0: archiver reference arg3: query.y query struct

func NewCountNode

func NewCountNode(done <-chan struct{}, args ...interface{}) (n *Node)

func NewEchoNode

func NewEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)

func NewEdgeNode

func NewEdgeNode(done <-chan struct{}, args ...interface{}) (n *Node)

func NewMaxNode

func NewMaxNode(done <-chan struct{}, args ...interface{}) (n *Node)

** Max Node **/ TODO: implement max over axis

func NewMeanNode

func NewMeanNode(done <-chan struct{}, args ...interface{}) (n *Node)

func NewMinNode

func NewMinNode(done <-chan struct{}, args ...interface{}) (n *Node)

TODO: implement min over axis

func NewNetworkNode

func NewNetworkNode(done <-chan struct{}, args ...interface{}) (n *Node)

arg0: URI

func NewNode

func NewNode(operation Operator, done <-chan struct{}) (n *Node)

func NewNopNode

func NewNopNode(done <-chan struct{}, args ...interface{}) (n *Node)

func NewSelectDataNode

func NewSelectDataNode(done <-chan struct{}, args ...interface{}) (n *Node)

arg0: archiver reference arg1: query.y dataquery struct

func NewStreamingEchoNode

func NewStreamingEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)

arg0: send channel

func NewSubscribeDataNode

func NewSubscribeDataNode(done <-chan struct{}, args ...interface{}) (n *Node)

arg0: archiver reference arg1: querystring arg2: apikey arg3: query.y dataquery struct

func NewWhereNode

func NewWhereNode(done <-chan struct{}, args ...interface{}) (n *Node)

First argument are the k/v tags for this node, second are the arguments to the constructor arg0: BSON where clause, most likely from a parsed query arg1: pointer to a metadata store

func NewWindowNode

func NewWindowNode(done <-chan struct{}, args ...interface{}) (n *Node)

func (*Node) AddChild

func (n *Node) AddChild(child *Node) bool

func (*Node) HasInput

func (n *Node) HasInput(structure, datatype uint) (res bool)

func (*Node) HasOutput

func (n *Node) HasOutput(structure, datatype uint) (res bool)

type NodeConstructor

type NodeConstructor func(<-chan struct{}, ...interface{}) *Node

type NopNode

type NopNode struct {
	Wait chan struct{}
}

Node to pause a pipeline

func (*NopNode) Run

func (nop *NopNode) Run(input interface{}) (interface{}, error)

type ObjectStore

type ObjectStore interface {
	// archive the given SmapMessage that contains non-numerical Readings
	AddObject(*SmapMessage) (bool, error)
	// retrieve blob closest before the reference time for the given UUID
	PrevObject(string, uint64, UnitOfTime) (SmapObjectResponse, error)
	// retrieve blob closest after the reference time for the given UUIDs
	NextObject(string, uint64, UnitOfTime) (SmapObjectResponse, error)
	// retrieves all blobs between the start/end times for the given UUIDs
	GetObjects(string, uint64, uint64, UnitOfTime) (SmapObjectResponse, error)
	// Adds a pointer to metadata store for streamid/uuid conversion and the like
	AddStore(MetadataStore)
}

The object store is a database for binary blobs rather than explicit timeseries data.

type OpNode

type OpNode struct {
	Operator  string
	Arguments Dict
}

type OperationType

type OperationType uint
const (
	WINDOW OperationType = iota
	MIN
	MAX
	MEAN
	COUNT
	EDGE
	NETWORK
)

type Operator

type Operator interface {
	Run(input interface{}) (interface{}, error)
}

type QuasarDB

type QuasarDB struct {
	// contains filtered or unexported fields
}

func NewQuasarDB

func NewQuasarDB(address *net.TCPAddr, maxConnections int) *QuasarDB

func (*QuasarDB) Add

func (quasar *QuasarDB) Add(sb *StreamBuf) bool

func (*QuasarDB) AddStore

func (quasar *QuasarDB) AddStore(s MetadataStore)

func (*QuasarDB) GetConnection

func (quasar *QuasarDB) GetConnection() (net.Conn, error)

func (*QuasarDB) GetData

func (quasar *QuasarDB) GetData(uuids []string, start uint64, end uint64, uot UnitOfTime) ([]SmapNumbersResponse, error)

func (*QuasarDB) LiveConnections

func (quasar *QuasarDB) LiveConnections() int

func (*QuasarDB) Next

func (quasar *QuasarDB) Next(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)

func (*QuasarDB) Prev

func (quasar *QuasarDB) Prev(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)

type QuasarReading

type QuasarReading struct {
	// contains filtered or unexported fields
}

type Query

type Query struct {
	// contains filtered or unexported fields
}

hashable (Type A in a map) version of a query

func (Query) Match

func (q Query) Match(cs *QueryChangeSet) *QueryChangeSet

func (Query) MatchSelectClause

func (q Query) MatchSelectClause(msg *SmapMessage) (ret *SmapMessage)

type QueryChangeSet

type QueryChangeSet struct {
	// new messages (streams) that match this query
	New map[string]*SmapMessage `json:",omitempty"`

	Del []string `json:",omitempty"`
	// contains filtered or unexported fields
}

func NewQueryChangeSet

func NewQueryChangeSet() *QueryChangeSet

func (*QueryChangeSet) AddNew

func (cs *QueryChangeSet) AddNew(msg *SmapMessage)

func (*QueryChangeSet) Debug

func (qs *QueryChangeSet) Debug()

func (*QueryChangeSet) DelStream

func (cs *QueryChangeSet) DelStream(uuid string)

func (*QueryChangeSet) IsEmpty

func (cs *QueryChangeSet) IsEmpty() bool

func (*QueryChangeSet) MarshalJSON

func (cs *QueryChangeSet) MarshalJSON() ([]byte, error)

func (*QueryChangeSet) NewStream

func (cs *QueryChangeSet) NewStream(uuid string, msg *SmapMessage)

type QueryHash

type QueryHash string

type QueryProcessor

type QueryProcessor struct {
	// contains filtered or unexported fields
}

func NewQueryProcessor

func NewQueryProcessor(a *Archiver) (qp *QueryProcessor)

func (*QueryProcessor) CheckOutToIn

func (qp *QueryProcessor) CheckOutToIn(out, in *Node) bool

Checks that the ouput of node @out is compatible with the input of node @in. First checks that the structures match. If structures match, then it checks the data type. If the datatypes do not match, then we return false This does not actually resolve the types of the nodes, but rather just checks that they are potentially compatible. The actual type resolution is performed when the nodes are evaluated

func (*QueryProcessor) GetNodeFromOp

func (qp *QueryProcessor) GetNodeFromOp(op *OpNode, query *query) *Node

func (*QueryProcessor) Parse

func (qp *QueryProcessor) Parse(querystring string) *SQLex

type Reading

type Reading interface {
	GetTime() uint64
	GetValue() interface{}
	IsObject() bool
}

type RepublishClient

type RepublishClient struct {
	// contains filtered or unexported fields
}

This is the type used within the Republisher to track the subscribers

type Republisher

type Republisher struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

This is a more thought-out version of the republisher that was first included in Giles. The focus of this version of the republisher is SPEED: efficient discovery of who to deliver a new message to, and efficient reevaluation of queries in the face of new commands + data

func NewRepublisher

func NewRepublisher(a *Archiver) (r *Republisher)

func (*Republisher) ChangeSubscriptions

func (r *Republisher) ChangeSubscriptions(readings map[string]*SmapMessage) map[QueryHash]*QueryChangeSet

When we receive a new set of readings, we make a set (map[Queryhash]struct{}) of which queries could potentially be affected by the incoming data. We then reevaluate each of these queries, and keep track of which changed (true return value on ReevaluateQuery). For each of the clients for the changed queries, we send the updates. We look up which clients to send to based on what their where-clause is (looking at republisher.keyconcern for each key mentioned in msg.{Metadata, Properties, Actuator}

func (*Republisher) EvaluateQuery

func (r *Republisher) EvaluateQuery(qh QueryHash)

Given a query hash, reevaluate the associated WHERE clause to get the new set of UUIDs that match the query. We now have a set of clients attached to a specific query and a set of clients associated with each of the UUIDs. Compare the previous list of UUIDs with the current list of UUIDs. For each UUID that is now in the set, add the list of concerned clients to the subscribers. For each UUID that is now NOT in the set, remove the list of concerned clients from that subscriber list

func (*Republisher) HandleMetadataSubscriber

func (r *Republisher) HandleMetadataSubscriber(s Subscriber, query, apikey string)

A Metadata subscriber wants notifications on changes on the set of streams that match the where-clause provided by the `query` parameter

func (*Republisher) HandleQuery

func (r *Republisher) HandleQuery(querystring string) (*Query, error)

Given a query string, tokenize and parse the query, also keeping track of what keys are mentioned in the query.

func (*Republisher) HandleQuery2

func (r *Republisher) HandleQuery2(query string) (*Query, error)

func (*Republisher) HandleSubscriber

func (r *Republisher) HandleSubscriber(s Subscriber, query, apikey string, membership bool)

This is the Archiver API call. @s is a Subscriber, an interface that allows us to agnostically send "published" messages to the subscribed clients. Subscriber is wrapped in a RepublishClient internally so that the Republisher can keep track of necessary state. @query is the query string that describes what the client is subscribing to. This query should be a valid sMAP query

func (*Republisher) HandleSubscriber2

func (r *Republisher) HandleSubscriber2(s Subscriber, query, apikey string, legacy bool)

func (*Republisher) HandleUUIDSubscriber

func (r *Republisher) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)

A UUID subscriber is interested in all metadata associated with a given stream. We store a lookup from each uuid to a list of concerned clients. There is nothing to reevaluate here, so we can do normal lookups. TODO: to consider: update operations are expensive -- do we block other clients when

updating subscriptions?

func (*Republisher) MetadataChange

func (r *Republisher) MetadataChange(msg *SmapMessage)

We call MetadataChange with an incoming sMAP message that includes changes to the metadata of a stream that could affect republish subscriptions TODO: store up queries and do r.EvaluateQuery once each at end. With this current scheme,

we can have  multiple queries be re-evaluated twice

func (*Republisher) MetadataChangeKeys

func (r *Republisher) MetadataChangeKeys(keys []string)

Same as MetadataChange, but operates on a known list of keys rather than a sMAP message TODO: store up queries and do r.EvaluateQuery once each at end. With this current scheme,

we can have  multiple queries be re-evaluated twice

func (*Republisher) ReevaluateQuery

func (r *Republisher) ReevaluateQuery(qh QueryHash, cs *QueryChangeSet) bool

reevaluate the query corresponding to the given QueryHash. Return true if the results of the query changed (streams add or remove)

func (*Republisher) Republish

func (r *Republisher) Republish(msg *SmapMessage)

Publish @msg to all clients subscribing to @msg.UUID

func (*Republisher) RepublishKeyChanges

func (r *Republisher) RepublishKeyChanges(keys []string) map[QueryHash]*QueryChangeSet

When a metadata change comes in from somewhere other than a smap message, we calculate the subscription changes and notify subscribers

func (*Republisher) RepublishReadings

func (r *Republisher) RepublishReadings(messages map[string]*SmapMessage)

We receive a new message from a client, and want to send it out to the subscribers. A subscriber is interested in 1 of 3 things: * (all metadata), data before now (most recent data point) or a list of metadata tags.

type SQLex

type SQLex struct {
	// contains filtered or unexported fields
}

func NewSQLex

func NewSQLex(s string) *SQLex

func (*SQLex) Error

func (sq *SQLex) Error(s string)

func (*SQLex) Lex

func (sq *SQLex) Lex(lval *SQSymType) int

type SQLexer

type SQLexer interface {
	Lex(lval *SQSymType) int
	Error(s string)
}

type SQSymType

type SQSymType struct {
	// contains filtered or unexported fields
}

type SSHConfigServer

type SSHConfigServer struct {
	// contains filtered or unexported fields
}

The SSHConfigServer offers a command-line based alternative to the PowerDB2 administration interface. If you are using the giles.go program as an interfaced to the Archiver API (this is by default), then this is automatically configured from the following section of the giles.cfg file:

[SSH]
Port=2222
PrivateKey=./id_rsa
AuthorizedKeysFile=/home/gabe/.ssh/authorized_keys
User=admin
Pass=supersecurepassword
PasswordEnabled=false
KeyAuthEnabled=true

Currently, the shell supports the following commands, which map to direct calls to the Metadata store in metadata.go. This makes it simple to not only extend the range of commands supported, but also introduce additional interfaces (e.g. a command-line utility).

Right now, the administration is focused around creating/deleting/viewing api keys, which are necessary to publish data to the sMAP archiver.

[[General]]
quit -- exits the session
help -- prints this help

[[Key Management]]
newkey <name> <email> <public?> -- creates a new API key and prints it
getkey <name> <email> -- retrieve the API key for the given name and email
listkeys <email> -- list all API keys and names for the given email
delkey <name> <email> -- deletes the key associated with the given name and email
delkey <key> -- deletes the given key
owner <key> -- retrieves owner (name, email) for given key

func NewSSHConfigServer

func NewSSHConfigServer(manager APIKeyManager, port, privatekey, authorizedKeysFile, confuser, confpass string, passenabled, keyenabled bool) *SSHConfigServer

func (*SSHConfigServer) Listen

func (scs *SSHConfigServer) Listen()

type SelectDataNode

type SelectDataNode struct {
	// contains filtered or unexported fields
}

* Select Data Node *

func (*SelectDataNode) Run

func (sn *SelectDataNode) Run(input interface{}) (interface{}, error)

type SelectTagsNode

type SelectTagsNode struct {
}

* Select Tags Node *

type SmapItem

type SmapItem struct {
	Data interface{}
	UUID string `json:"uuid"`
}

type SmapMessage

type SmapMessage struct {
	// Readings for this message
	Readings []Reading
	// If this struct corresponds to a sMAP collection,
	// then Contents contains a list of paths contained within
	// this collection
	Contents []string `json:",omitempty"`
	// Map of the metadata
	Metadata bson.M `json:",omitempty"`
	// Map containing the actuator reference
	Actuator bson.M `json:",omitempty"`
	// Map of the properties
	Properties bson.M `json:",omitempty"`
	// Unique identifier for this stream. Should be empty for Collections
	UUID string `json:"uuid"`
	// Path of this stream (thus far)
	Path string `json:", omitempty"`
}

This is the general-purpose struct for all INCOMING sMAP messages. This struct is designed to match the format of sMAP JSON, as that is the primary data format.

func (*SmapMessage) GetKey

func (sm *SmapMessage) GetKey(key string) interface{}

Key names like uuid, Metadata.Key, Properties.Key, etc. Fetches the corresponding value from the

func (*SmapMessage) GetValuesFor

func (sm *SmapMessage) GetValuesFor(q *Query)

func (*SmapMessage) HasKeysFrom

func (sm *SmapMessage) HasKeysFrom(keys []string) bool

Returns true if the current message contains keys mentioned in the provided list

func (*SmapMessage) HasMetadata

func (sm *SmapMessage) HasMetadata() bool

func (*SmapMessage) IsTimeseries

func (sm *SmapMessage) IsTimeseries() bool

func (*SmapMessage) ToJson

func (sm *SmapMessage) ToJson() []byte

Convenience method to turn a sMAP message into marshaled JSON

func (*SmapMessage) ToSmapReading

func (sm *SmapMessage) ToSmapReading() *SmapReading

func (*SmapMessage) UnmarshalJSON

func (sm *SmapMessage) UnmarshalJSON(b []byte) (err error)

type SmapNumberReading

type SmapNumberReading struct {
	// uint64 timestamp
	Time uint64
	// value associated with this timestamp
	Value float64
}

func (*SmapNumberReading) GetTime

func (s *SmapNumberReading) GetTime() uint64

func (*SmapNumberReading) GetValue

func (s *SmapNumberReading) GetValue() interface{}

func (*SmapNumberReading) IsObject

func (s *SmapNumberReading) IsObject() bool

func (*SmapNumberReading) MarshalJSON

func (s *SmapNumberReading) MarshalJSON() ([]byte, error)

type SmapNumbersResponse

type SmapNumbersResponse struct {
	Readings []*SmapNumberReading
	UUID     string `json:"uuid"`
}

type SmapObjectReading

type SmapObjectReading struct {
	// uint64 timestamp
	Time uint64
	// value associated with this timestamp
	Value interface{}
}

func (*SmapObjectReading) GetTime

func (s *SmapObjectReading) GetTime() uint64

func (*SmapObjectReading) GetValue

func (s *SmapObjectReading) GetValue() interface{}

func (*SmapObjectReading) IsObject

func (s *SmapObjectReading) IsObject() bool

func (*SmapObjectReading) MarshalJSON

func (s *SmapObjectReading) MarshalJSON() ([]byte, error)

type SmapObjectResponse

type SmapObjectResponse struct {
	Readings []*SmapObjectReading
	UUID     string `json:"uuid"`
}

type SmapReading

type SmapReading struct {
	// Readings will be interpreted as a list of [uint64, float64] = [time, value]
	// OR as a lsit of [uint64, []byte] = [time, value]
	Readings [][]interface{}
	// Unique identifier for this stream
	UUID string `json:"uuid"`
}

Struct representing data readings to and from sMAP

type StreamBuf

type StreamBuf struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewStreamBuf

func NewStreamBuf(uuid string, uot UnitOfTime, txc *TransactionCoalescer) *StreamBuf

type StreamLockMap

type StreamLockMap map[string](*sync.Mutex)

type StreamMap

type StreamMap map[string](*StreamBuf)

type StreamType

type StreamType uint
const (
	OBJECT_STREAM StreamType = iota
	NUMERIC_STREAM
)

type StreamingEchoNode

type StreamingEchoNode struct {
	// contains filtered or unexported fields
}

* Streaming Echo Node *

func (*StreamingEchoNode) Run

func (sen *StreamingEchoNode) Run(input interface{}) (interface{}, error)

type StructureType

type StructureType uint
const (
	LIST StructureType = 1 << iota
	TIMESERIES
)

type SubscribeDataNode

type SubscribeDataNode struct {
	// contains filtered or unexported fields
}

func (*SubscribeDataNode) GetNotify

func (sn *SubscribeDataNode) GetNotify() <-chan bool

func (*SubscribeDataNode) Run

func (sn *SubscribeDataNode) Run(input interface{}) (interface{}, error)

func (*SubscribeDataNode) Send

func (sn *SubscribeDataNode) Send(msg interface{})

* implement the Subscriber interface for SubscribeDataNode *

func (*SubscribeDataNode) SendError

func (sn *SubscribeDataNode) SendError(err error)

type Subscriber

type Subscriber interface {
	// Called by the Republisher when there is a new message to send to the
	// client. Send should transform the message to the appropriate format
	// before forwarding to the actual client.
	Send(interface{})

	// Called by Republisher when there is an error with the subscription
	SendError(error)

	// GetNotify is called by the Republisher to get a pointer to a "notify"
	// channel. When the client is closed and no longer wants to subscribe,
	// a value should be sent on the returned channel to signal to the Republisher
	// to unsubscribe the client. The client can of course disconnect on its own
	// without notifying the Republisher, but this means we cannot protect against
	// memory leaks resulting from infinitely adding new clients
	GetNotify() <-chan bool
}

Subscriber is an interface that should be implemented by each protocol adapter that wants to support sMAP republish pub-sub.

type TSDB

type TSDB interface {
	// add the following SmapReading to the timeseries database
	Add(*StreamBuf) bool
	// uuids, reference time, limit, unit of time
	// retrieve data before reference time
	Prev([]string, uint64, int32, UnitOfTime) ([]SmapNumbersResponse, error)
	// retrieve data after reference time
	Next([]string, uint64, int32, UnitOfTime) ([]SmapNumbersResponse, error)
	// uuids, start time, end time, unit of time
	GetData([]string, uint64, uint64, UnitOfTime) ([]SmapNumbersResponse, error)
	// get a new connection to the timeseries database
	GetConnection() (net.Conn, error)
	// return the number of live connections
	LiveConnections() int
	// Adds a pointer to metadata store for streamid/uuid conversion and the like
	AddStore(MetadataStore)
}

TSDB (or TimeSeries DataBase) is a subset of functionality expected by Giles for (timestamp, value) oriented database. The relevant read/write types are SmapReading and can be found in json.go and readingdb.go respectively (although their locations are likely to change). The UnitOfTime parameters indicate how to interpret the timesteps that are given as parameters

type TSDBConn

type TSDBConn struct {
	// contains filtered or unexported fields
}

func (*TSDBConn) Close

func (c *TSDBConn) Close() error

func (*TSDBConn) IsClosed

func (c *TSDBConn) IsClosed() bool

func (*TSDBConn) Read

func (c *TSDBConn) Read(b []byte) (int, error)

func (*TSDBConn) Write

func (c *TSDBConn) Write(b []byte) (int, error)

type TieredSmapMessage

type TieredSmapMessage map[string]*SmapMessage

func (*TieredSmapMessage) CollapseToTimeseries

func (tsm *TieredSmapMessage) CollapseToTimeseries()

This performs the metadata inheritance for the paths and messages inside this collection of SmapMessages. Inheritance starts from the root path "/" can progresses towards the leaves. First, get a list of all of the potential timeseries (any path that contains a UUID) Then, for each of the prefixes for the path of that timeserie (util.getPrefixes), grab the paths from the TieredSmapMessage that match the prefixes. Sort these in "decreasing" order and apply to the metadata. Finally, delete all non-timeseries paths

func (*TieredSmapMessage) ToBson

func (tsm *TieredSmapMessage) ToBson() []bson.M

type TransactionCoalescer

type TransactionCoalescer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTransactionCoalescer

func NewTransactionCoalescer(tsdb *TSDB, store *MetadataStore) *TransactionCoalescer

func (*TransactionCoalescer) AddSmapMessage

func (txc *TransactionCoalescer) AddSmapMessage(sm *SmapMessage)

Called to add an incoming SmapMessage to the underlying timeseries database. A SmapMessage contains an array of Readings and the UUID for the stream the readings belong to. The Readings must be added to a StreamBuffer for coalescing. This StreamBuffer is either a) pre-existing and still open, b) pre-existing and committing or c) not existing. In the

func (*TransactionCoalescer) Commit

func (txc *TransactionCoalescer) Commit(sb *StreamBuf)

type UUIDSTATE

type UUIDSTATE uint
const (
	OLD UUIDSTATE = iota
	NEW
	SAME
	DEL
)

type UnitOfTime

type UnitOfTime uint

unit of time indicators

const (
	// nanoseconds 1000000000
	UOT_NS UnitOfTime = 1
	// microseconds 1000000
	UOT_US UnitOfTime = 2
	// milliseconds 1000
	UOT_MS UnitOfTime = 3
	// seconds 1
	UOT_S UnitOfTime = 4
)

type WhereNode

type WhereNode struct {
	// contains filtered or unexported fields
}

* Where Node * A WhereNode takes a where clause in its constructor.

func (*WhereNode) Run

func (wn *WhereNode) Run(input interface{}) (interface{}, error)

Evaluates the where clause into a set of uuids

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

func (*WindowNode) Run

func (wn *WindowNode) Run(input interface{}) (interface{}, error)

TODO: do we assume that data is sorted?

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL