Documentation ¶
Overview ¶
Package giles implements an archiver that follows the sMAP protocol
Overview ¶
Part of the motivation for the creation of Giles was to emphasize the distinction between sMAP the software (originally written in Python) and sMAP the profile. The Giles archiver is an implementation of the latter, and is intended to be fully compatible with existing sMAP tools.
One of the "innovations" that Giles brings to the sMAP ecosystem is the notion that what is typically thought of as the sMAP "archiver" is really a collection of components: the message bus/frontend, the timeseries store, the metadata store, and the query language. All of these are closely linked, of course, but treating them as separate entities means that we can use different timeseries or metadata databases or even different implementations of the query language (perhaps over Apache Spark/Mlib?)
These functions define aggregation functions to be used inside other operators
Index ¶
- Constants
- Variables
- func PrintConfig(c *Config)
- func SQParse(SQlex SQLexer) int
- func SQStatname(s int) string
- func SQTokname(c int) string
- func SQlex1(lex SQLexer, lval *SQSymType) int
- type APIKeyManager
- type Archiver
- func (a *Archiver) AddData(readings map[string]*SmapMessage, apikey string) error
- func (a *Archiver) GetData(streamids []string, start, end uint64, query_uot, to_uot UnitOfTime) (interface{}, error)
- func (a *Archiver) GetTags(select_tags, where_tags bson.M) ([]interface{}, error)
- func (a *Archiver) GetUUIDs(where_tags bson.M) ([]string, error)
- func (a *Archiver) HandleMetadataSubscriber(s Subscriber, query, apikey string)
- func (a *Archiver) HandleQuery(querystring, apikey string) (interface{}, error)
- func (a *Archiver) HandleQuerySubscriber(s Subscriber, query, apikey string)
- func (a *Archiver) HandleSubscriber(s Subscriber, query, apikey string)
- func (a *Archiver) HandleSubscriber2(s Subscriber, query, apikey string)
- func (a *Archiver) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)
- func (a *Archiver) NextData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)
- func (a *Archiver) PrevData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)
- func (a *Archiver) PrintStatus()
- func (a *Archiver) Query2(querystring string, apikey string, w io.Writer) error
- func (a *Archiver) SetTags(update_tags, where_tags map[string]interface{}, apikey string) (int, error)
- func (a *Archiver) StreamingQuery(querystring, apikey string, sendback Subscriber) error
- func (a *Archiver) TagsUUID(uuid string) (bson.M, error)
- type Cache
- type ChunkedStreamingDataNode
- type Config
- type ConnectionPool
- type CountNode
- type DataType
- type Dict
- type EchoNode
- type EdgeNode
- type Element
- type IncomingSmapMessage
- type List
- type MaxNode
- type MeanNode
- type MetadataStore
- type MinNode
- type MongoObjectStore
- func (ms *MongoObjectStore) AddObject(msg *SmapMessage) (bool, error)
- func (ms *MongoObjectStore) AddStore(store MetadataStore)
- func (ms *MongoObjectStore) GetObjects(uuid string, start uint64, end uint64, uot UnitOfTime) (SmapObjectResponse, error)
- func (ms *MongoObjectStore) NextObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)
- func (ms *MongoObjectStore) PrevObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)
- type MongoStore
- func (ms *MongoStore) ApiKeyExists(apikey string) (bool, error)
- func (ms *MongoStore) CanWrite(apikey, uuid string) (bool, error)
- func (ms *MongoStore) CheckKey(apikey string, messages map[string]*SmapMessage) (bool, error)
- func (ms *MongoStore) DeleteKeyByName(name, email string) (string, error)
- func (ms *MongoStore) DeleteKeyByValue(key string) (string, error)
- func (ms *MongoStore) EnforceKeys(enforce bool)
- func (ms *MongoStore) Find(findClause, selectClause bson.M) (interface{}, error)
- func (ms *MongoStore) FindDistinct(findClause bson.M, distinctKey string) (interface{}, error)
- func (ms *MongoStore) GetKey(name, email string) (string, error)
- func (ms *MongoStore) GetStreamId(uuid string) uint32
- func (ms *MongoStore) GetStreamType(uuid string) StreamType
- func (ms *MongoStore) GetTags(target bson.M, is_distinct bool, distinct_key string, where bson.M) ([]interface{}, error)
- func (ms *MongoStore) GetUUIDs(where bson.M) ([]string, error)
- func (ms *MongoStore) GetUnitOfTime(uuid string) UnitOfTime
- func (ms *MongoStore) ListKeys(email string) ([]map[string]interface{}, error)
- func (ms *MongoStore) NewKey(name, email string, public bool) (string, error)
- func (ms *MongoStore) Owner(key string) (map[string]interface{}, error)
- func (ms *MongoStore) RemoveDocs(apikey string, where bson.M) (bson.M, error)
- func (ms *MongoStore) RemoveTags(updates bson.M, apikey string, where bson.M) (bson.M, error)
- func (ms *MongoStore) SavePathMetadata(messages map[string]*SmapMessage) error
- func (ms *MongoStore) SaveTags(messages *map[string]*SmapMessage) error
- func (ms *MongoStore) SaveTimeseriesMetadata(messages map[string]*SmapMessage) error
- func (ms *MongoStore) StartUpdateCacheLoop()
- func (ms *MongoStore) UUIDTags(uuid string) (bson.M, error)
- func (ms *MongoStore) UpdateTags(updates bson.M, apikey string, where bson.M) (bson.M, error)
- type NetworkNode
- type Node
- func NewChunkedStreamingDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewCountNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewEdgeNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewMaxNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewMeanNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewMinNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewNetworkNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewNode(operation Operator, done <-chan struct{}) (n *Node)
- func NewNopNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewSelectDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewStreamingEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewSubscribeDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewWhereNode(done <-chan struct{}, args ...interface{}) (n *Node)
- func NewWindowNode(done <-chan struct{}, args ...interface{}) (n *Node)
- type NodeConstructor
- type NopNode
- type ObjectStore
- type OpNode
- type OperationType
- type Operator
- type QuasarDB
- func (quasar *QuasarDB) Add(sb *StreamBuf) bool
- func (quasar *QuasarDB) AddStore(s MetadataStore)
- func (quasar *QuasarDB) GetConnection() (net.Conn, error)
- func (quasar *QuasarDB) GetData(uuids []string, start uint64, end uint64, uot UnitOfTime) ([]SmapNumbersResponse, error)
- func (quasar *QuasarDB) LiveConnections() int
- func (quasar *QuasarDB) Next(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)
- func (quasar *QuasarDB) Prev(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)
- type QuasarReading
- type Query
- type QueryChangeSet
- type QueryHash
- type QueryProcessor
- type Reading
- type RepublishClient
- type Republisher
- func (r *Republisher) ChangeSubscriptions(readings map[string]*SmapMessage) map[QueryHash]*QueryChangeSet
- func (r *Republisher) EvaluateQuery(qh QueryHash)
- func (r *Republisher) HandleMetadataSubscriber(s Subscriber, query, apikey string)
- func (r *Republisher) HandleQuery(querystring string) (*Query, error)
- func (r *Republisher) HandleQuery2(query string) (*Query, error)
- func (r *Republisher) HandleSubscriber(s Subscriber, query, apikey string, membership bool)
- func (r *Republisher) HandleSubscriber2(s Subscriber, query, apikey string, legacy bool)
- func (r *Republisher) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)
- func (r *Republisher) MetadataChange(msg *SmapMessage)
- func (r *Republisher) MetadataChangeKeys(keys []string)
- func (r *Republisher) ReevaluateQuery(qh QueryHash, cs *QueryChangeSet) bool
- func (r *Republisher) Republish(msg *SmapMessage)
- func (r *Republisher) RepublishKeyChanges(keys []string) map[QueryHash]*QueryChangeSet
- func (r *Republisher) RepublishReadings(messages map[string]*SmapMessage)
- type SQLex
- type SQLexer
- type SQSymType
- type SSHConfigServer
- type SelectDataNode
- type SelectTagsNode
- type SmapItem
- type SmapMessage
- func (sm *SmapMessage) GetKey(key string) interface{}
- func (sm *SmapMessage) GetValuesFor(q *Query)
- func (sm *SmapMessage) HasKeysFrom(keys []string) bool
- func (sm *SmapMessage) HasMetadata() bool
- func (sm *SmapMessage) IsTimeseries() bool
- func (sm *SmapMessage) ToJson() []byte
- func (sm *SmapMessage) ToSmapReading() *SmapReading
- func (sm *SmapMessage) UnmarshalJSON(b []byte) (err error)
- type SmapNumberReading
- type SmapNumbersResponse
- type SmapObjectReading
- type SmapObjectResponse
- type SmapReading
- type StreamBuf
- type StreamLockMap
- type StreamMap
- type StreamType
- type StreamingEchoNode
- type StructureType
- type SubscribeDataNode
- type Subscriber
- type TSDB
- type TSDBConn
- type TieredSmapMessage
- type TransactionCoalescer
- type UUIDSTATE
- type UnitOfTime
- type WhereNode
- type WindowNode
Constants ¶
const ( COALESCE_TIMEOUT = 1000 // milliseconds COALESCE_MAX = 16384 // num readings )
const ( SELECT_TYPE queryType = iota DELETE_TYPE SET_TYPE DATA_TYPE APPLY_TYPE )
const ( IN_TYPE dataqueryType = iota BEFORE_TYPE AFTER_TYPE )
const AFTER = 57354
const ALL = 57364
const AND = 57368
const APPLY = 57350
const AS = 57367
const BEFORE = 57353
const COMMA = 57363
const DATA = 57352
const DELETE = 57348
const DISTINCT = 57347
const EQ = 57361
const HAS = 57370
const IN = 57372
const LBRACK = 57376
const LEFTPIPE = 57365
const LIKE = 57366
const LIMIT = 57355
const LPAREN = 57374
const LVALUE = 57358
const NEQ = 57362
const NEWLINE = 57380
const NOT = 57371
const NOW = 57357
const NUMBER = 57378
const OPERATOR = 57360
const OR = 57369
const QSTRING = 57359
const RBRACK = 57377
const RPAREN = 57375
const SELECT = 57346
const SEMICOLON = 57379
const SET = 57349
const SQEofCode = 1
const SQErrCode = 2
const SQFlag = -1000
const SQLast = 159
const SQMaxDepth = 200
const SQNprod = 65
const SQPrivate = 57344
const STREAMLIMIT = 57356
const TIMEUNIT = 57381
const TO = 57373
const WHERE = 57351
Variables ¶
var NodeLookup map[OperationType]NodeConstructor
var OpLookup map[string]OperationType
var SQAct = []int{}/* 159 elements not displayed */
var SQChk = []int{}/* 136 elements not displayed */
var SQDebug = 0
var SQDef = []int{}/* 136 elements not displayed */
var SQExca = []int{
-1, 1,
1, -1,
-2, 0,
}
var SQPact = []int{}/* 136 elements not displayed */
var SQPgo = []int{
0, 15, 158, 11, 9, 4, 157, 85, 10, 71,
12, 156, 17, 16, 155, 3, 1, 0, 8, 2,
153,
}
var SQR1 = []int{
0, 20, 20, 20, 20, 20, 20, 20, 20, 7,
7, 9, 8, 8, 4, 4, 4, 4, 4, 4,
6, 6, 6, 6, 12, 12, 12, 12, 13, 13,
14, 14, 14, 14, 15, 15, 16, 16, 16, 16,
17, 17, 3, 2, 2, 2, 2, 2, 2, 2,
18, 19, 1, 1, 1, 1, 1, 10, 10, 11,
11, 5, 5, 5, 5,
}
var SQR2 = []int{
0, 4, 3, 4, 4, 3, 4, 3, 6, 1,
3, 3, 1, 3, 3, 3, 3, 5, 5, 5,
1, 1, 2, 1, 9, 7, 5, 5, 1, 2,
2, 1, 1, 1, 2, 3, 0, 2, 2, 4,
0, 2, 2, 3, 3, 3, 3, 2, 3, 4,
1, 1, 3, 3, 2, 3, 1, 1, 3, 3,
4, 3, 3, 5, 5,
}
var SQStatenames = []string{}
var SQStates []string
var SQTok1 = []int{
1,
}
var SQTok2 = []int{
2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39,
}
var SQTok3 = []int{
0,
}
var SQTokenNames []string
var SQToknames = []string{
"SELECT",
"DISTINCT",
"DELETE",
"SET",
"APPLY",
"WHERE",
"DATA",
"BEFORE",
"AFTER",
"LIMIT",
"STREAMLIMIT",
"NOW",
"LVALUE",
"QSTRING",
"OPERATOR",
"EQ",
"NEQ",
"COMMA",
"ALL",
"LEFTPIPE",
"LIKE",
"AS",
"AND",
"OR",
"HAS",
"NOT",
"IN",
"TO",
"LPAREN",
"RPAREN",
"LBRACK",
"RBRACK",
"NUMBER",
"SEMICOLON",
"NEWLINE",
"TIMEUNIT",
}
Functions ¶
func PrintConfig ¶
func PrintConfig(c *Config)
func SQStatname ¶
Types ¶
type APIKeyManager ¶
type APIKeyManager interface { // Returns True if the given api key exists ApiKeyExists(apikey string) (bool, error) // Creates a new key with the given name registered to the given email. The public argument // maps to the public attribute of the key. Returns the key NewKey(name, email string, public bool) (string, error) // Retrieves the key with the given name registered to the given email GetKey(name, email string) (string, error) // Lists all keys registered to the given email. Returns a list of k/v pairs for each // found key, giving us name, public, etc ListKeys(email string) ([]map[string]interface{}, error) // Deletes the key with the given name registered to the given email. Returns the key as well DeleteKeyByName(name, email string) (string, error) // Deletes the key with the given value. DeleteKeyByValue(key string) (string, error) // Retrieves the owner information for the given key Owner(key string) (map[string]interface{}, error) }
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
This is the central object for the archiver process and contains most of the requisite logic for the core features of the archiver. One of the focuses of Giles is to facilitate adapting the sMAP protocol to different interfaces; the handlers packages (HTTP, WS, etc) provide handler functions that in turn call the archiver's core functions. Most of these core functions use easily usable data formats (such as bson.M), so the handler functions just have to deal with translating data formats
For now, because the metadata interface was designed with a MongoDB backend in mind, most of the in-transit data types for dealing with metadata use the MongoDB interface defined by http://godoc.org/gopkg.in/mgo.v2/bson and http://godoc.org/gopkg.in/mgo.v2. I suggest taking a quick look though their documentation and how they talk to Mongo to get a feel for what the incoming/outgoing data is going to look like.
func (*Archiver) AddData ¶
func (a *Archiver) AddData(readings map[string]*SmapMessage, apikey string) error
Takes a map of string/SmapMessage (path, sMAP JSON object) and commits them to the underlying databases. First, checks that write permission is granted with the accompanied apikey (generated with the gilescmd CLI tool), then saves the metadata, pushes the readings out to any concerned republish clients, and commits the reading to the timeseries database. Returns an error, which is nil if all went well
func (*Archiver) GetData ¶
func (a *Archiver) GetData(streamids []string, start, end uint64, query_uot, to_uot UnitOfTime) (interface{}, error)
For each of the streamids, fetches all data between start and end (where start < end). The units for start/end are given by query_uot. We give the units so that each time series database can convert the incoming timestamps to whatever it needs (most of these will query the metadata store for the unit of time for the data stream it is accessing)
func (*Archiver) GetTags ¶
For all streams that match the provided where clause in where_tags, returns the values of the requested tags. where_tags is a bson.M object that follows the same syntax as a MongoDB query. select_tags is a map[string]int corresponding to which tags we wish returned. A value of 1 means the tag will be returned (and ignores all other tags), and a value of 0 means the tag will NOT be returned (and all other tags will be).
func (*Archiver) GetUUIDs ¶
Returns a list of UUIDs for all streams that match the provided 'where' clause. where_tags is a bson.M object that follows the same syntax as a MongoDB query. This query is executed against the underlying metadata store. As we move into supporting multiple possible metadata storage solutions, this interface may change.
func (*Archiver) HandleMetadataSubscriber ¶
func (a *Archiver) HandleMetadataSubscriber(s Subscriber, query, apikey string)
func (*Archiver) HandleQuery ¶
Takes the body of the query and the apikey that accompanies the query. First parses the string query into an intermediary form (the abstract syntax tree as the AST type). Depending on the action, it will check to see if the provided API key grants sufficient permission to return the results. If so, returns those results as []byte (marshaled JSON). Most of this method is just switch statements dependent on different components of the generated AST. Any actual computation is done as calls to the Archiver API, so if you want to use your own query language or handle queries in some external handler, then you shouldn't need to use any of this method; just use the Archiver API
func (*Archiver) HandleQuerySubscriber ¶
func (a *Archiver) HandleQuerySubscriber(s Subscriber, query, apikey string)
func (*Archiver) HandleSubscriber ¶
func (a *Archiver) HandleSubscriber(s Subscriber, query, apikey string)
For all streams that match the WHERE clause in the provided query string, will push all subsequent incoming information (data and tags) on those streams to the client associated with the provided http.ResponseWriter.
func (*Archiver) HandleSubscriber2 ¶
func (a *Archiver) HandleSubscriber2(s Subscriber, query, apikey string)
func (*Archiver) HandleUUIDSubscriber ¶
func (a *Archiver) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)
func (*Archiver) NextData ¶
func (a *Archiver) NextData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)
For each of the streamids, fetches data after the start time. If limit is < 0, fetches all data. If limit >= 0, fetches only that number of points. See Archiver.GetData for explanation of query_uot
func (*Archiver) PrevData ¶
func (a *Archiver) PrevData(streamids []string, start uint64, limit int32, query_uot, to_uot UnitOfTime) (interface{}, error)
For each of the streamids, fetches data before the start time. If limit is < 0, fetches all data. If limit >= 0, fetches only that number of points. See Archiver.GetData for explanation of query_uot
func (*Archiver) PrintStatus ¶
func (a *Archiver) PrintStatus()
func (*Archiver) SetTags ¶
func (a *Archiver) SetTags(update_tags, where_tags map[string]interface{}, apikey string) (int, error)
For all streams that match the provided where clause in where_tags, sets the key-value pairs specified in update_tags.
func (*Archiver) StreamingQuery ¶
func (a *Archiver) StreamingQuery(querystring, apikey string, sendback Subscriber) error
type ChunkedStreamingDataNode ¶
type ChunkedStreamingDataNode struct {
// contains filtered or unexported fields
}
func (*ChunkedStreamingDataNode) Run ¶
func (csn *ChunkedStreamingDataNode) Run(input interface{}) (interface{}, error)
type Config ¶
type Config struct { Archiver struct { TSDB *string Metadata *string Objects *string Keepalive *int EnforceKeys bool LogLevel *string MaxConnections *int } ReadingDB struct { Port *string Address *string } Quasar struct { Port *string Address *string } Mongo struct { Port *string Address *string UpdateInterval *int } Venkman struct { Port *string Address *string } HTTP struct { Enabled bool Port *int } WebSockets struct { Enabled bool Port *int } CapnProto struct { Enabled bool Port *int } MsgPack struct { TcpEnabled bool TcpPort *int UdpEnabled bool UdpPort *int } SSH struct { Enabled bool Port *string PrivateKey *string AuthorizedKeysFile *string User *string Pass *string PasswordEnabled bool KeyAuthEnabled bool } Profile struct { CpuProfile *string MemProfile *string BenchmarkTimer *int Enabled bool } }
func LoadConfig ¶
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
func NewConnectionPool ¶
func NewConnectionPool(newConn func() *TSDBConn, maxConnections int) *ConnectionPool
func (*ConnectionPool) Get ¶
func (pool *ConnectionPool) Get() *TSDBConn
func (*ConnectionPool) Put ¶
func (pool *ConnectionPool) Put(c *TSDBConn)
type EdgeNode ¶
type EdgeNode struct {
// contains filtered or unexported fields
}
The Edge operator essentially takes the 1st order derivative of a stream
type IncomingSmapMessage ¶
type IncomingSmapMessage struct { // Readings for this message Readings [][]json.RawMessage // If this struct corresponds to a sMAP collection, // then Contents contains a list of paths contained within // this collection Contents []string `json:",omitempty"` // Map of the metadata Metadata bson.M `json:",omitempty"` // Map containing the actuator reference Actuator bson.M `json:",omitempty"` // Map of the properties Properties bson.M `json:",omitempty"` // Unique identifier for this stream. Should be empty for Collections UUID string `json:"uuid"` // Path of this stream (thus far) Path string }
type MetadataStore ¶
type MetadataStore interface { // if called with True (this is default), checks all API keys. For testing or // "sandbox" deployments, it can be helpful to call this with False, which will // allow ALL operations on ANY streams. EnforceKeys(enforce bool) // Returns true if the key @apikey is allowed to write to each of the // streams listed in @messages. Should check each SmapMessage.UUID value. CheckKey(apikey string, messages map[string]*SmapMessage) (bool, error) // Associates metadata k/v pairs with non-terminal (non-timeseries) Paths SavePathMetadata(messages map[string]*SmapMessage) error // Associates metadata k/v pairs with timeserise paths. Inherits // from PathMetadata before applying timeseries-specific tags SaveTimeseriesMetadata(messages map[string]*SmapMessage) error // Retrieves the tags indicated by @target for documents that match the // @where clause. If @is_distinct is true, then it will return a list of // distinct values for the tag @distinct_key GetTags(target bson.M, is_distinct bool, distinct_key string, where bson.M) ([]interface{}, error) // Normal metadata save method SaveTags(messages *map[string]*SmapMessage) error // For all documents that match the where clause @where, apply the updates // contained in @updates, provided that the key @apikey is valid for all of // them UpdateTags(updates bson.M, apikey string, where bson.M) (bson.M, error) // Removes all documents that match the where clause @where, provided that the // key @apikey is valid for them RemoveDocs(apikey string, where bson.M) (bson.M, error) // Unapplies all tags in the list @target for all documents that match the // where clause @where, after checking the API key RemoveTags(target bson.M, apikey string, where bson.M) (bson.M, error) // Returns all metadata for a given UUID UUIDTags(uuid string) (bson.M, error) // Resolve a where clause to a slice of UUIDs GetUUIDs(where bson.M) ([]string, error) // Returns the unit of time for the stream identified by the given UUID. GetUnitOfTime(uuid string) UnitOfTime // Returns the stream type for the stream identified by the given UUID GetStreamType(uuid string) StreamType // General purpose metadata Find Find(findclause, selectClause bson.M) (interface{}, error) // General purpose metadata Find FindDistinct(findClause bson.M, distinctKey string) (interface{}, error) }
The metadata store should support the following operations
type MongoObjectStore ¶
type MongoObjectStore struct {
// contains filtered or unexported fields
}
The object store interface into Mongo uses a collection named 'objects'. Each document in this collection contains 3 keys:
uuid: the stream identifier object: a binary blob (byte array) of data (MsgPack encoded) timestamp: the timestamp associated with this record IN NANOSECONDS
This is obviously a very primitive interface to the object store, and doesn't do nice things like transaction coalescence.
func NewMongoObjectStore ¶
func NewMongoObjectStore(address *net.TCPAddr) *MongoObjectStore
func (*MongoObjectStore) AddObject ¶
func (ms *MongoObjectStore) AddObject(msg *SmapMessage) (bool, error)
func (*MongoObjectStore) AddStore ¶
func (ms *MongoObjectStore) AddStore(store MetadataStore)
func (*MongoObjectStore) GetObjects ¶
func (ms *MongoObjectStore) GetObjects(uuid string, start uint64, end uint64, uot UnitOfTime) (SmapObjectResponse, error)
func (*MongoObjectStore) NextObject ¶
func (ms *MongoObjectStore) NextObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)
func (*MongoObjectStore) PrevObject ¶
func (ms *MongoObjectStore) PrevObject(uuid string, time uint64, uot UnitOfTime) (SmapObjectResponse, error)
type MongoStore ¶
type MongoStore struct { // list of timeseries we have committed. Only send upserts to mongo // via SaveTags if a) we have metadata or b) we don't have the uuid // in this lookup UUIDS map[string]struct{} CacheLock sync.RWMutex // contains filtered or unexported fields }
TODO: copy the session for a transaction -- this is faster
func NewMongoStore ¶
func NewMongoStore(address *net.TCPAddr, interval int) *MongoStore
func (*MongoStore) ApiKeyExists ¶
func (ms *MongoStore) ApiKeyExists(apikey string) (bool, error)
func (*MongoStore) CheckKey ¶
func (ms *MongoStore) CheckKey(apikey string, messages map[string]*SmapMessage) (bool, error)
func (*MongoStore) DeleteKeyByName ¶
func (ms *MongoStore) DeleteKeyByName(name, email string) (string, error)
func (*MongoStore) DeleteKeyByValue ¶
func (ms *MongoStore) DeleteKeyByValue(key string) (string, error)
func (*MongoStore) EnforceKeys ¶
func (ms *MongoStore) EnforceKeys(enforce bool)
func (*MongoStore) Find ¶
func (ms *MongoStore) Find(findClause, selectClause bson.M) (interface{}, error)
func (*MongoStore) FindDistinct ¶
func (ms *MongoStore) FindDistinct(findClause bson.M, distinctKey string) (interface{}, error)
func (*MongoStore) GetStreamId ¶
func (ms *MongoStore) GetStreamId(uuid string) uint32
func (*MongoStore) GetStreamType ¶
func (ms *MongoStore) GetStreamType(uuid string) StreamType
func (*MongoStore) GetTags ¶
func (ms *MongoStore) GetTags(target bson.M, is_distinct bool, distinct_key string, where bson.M) ([]interface{}, error)
Retrieves the tags indicated by `target` for documents that match the `where` clause. If `is_distinct` is true, then it will return a list of distinct values for the tag `distinct_key`
func (*MongoStore) GetUUIDs ¶
func (ms *MongoStore) GetUUIDs(where bson.M) ([]string, error)
Resolve a query to a slice of UUIDs
func (*MongoStore) GetUnitOfTime ¶
func (ms *MongoStore) GetUnitOfTime(uuid string) UnitOfTime
retrieve the unit of time for the stream identified by the given UUID. Should return one of ns, us, ms, s; defaults to ms
func (*MongoStore) ListKeys ¶
func (ms *MongoStore) ListKeys(email string) ([]map[string]interface{}, error)
func (*MongoStore) NewKey ¶
func (ms *MongoStore) NewKey(name, email string, public bool) (string, error)
func (*MongoStore) RemoveDocs ¶
func (*MongoStore) RemoveTags ¶
func (*MongoStore) SavePathMetadata ¶
func (ms *MongoStore) SavePathMetadata(messages map[string]*SmapMessage) error
SavePathMetadata takes a map of paths to sMAP messages and saves them to the path metadata store. The Path keys can be terminal paths, e.g. the full path of a timeseries, which are identified by the linked sMAP message having a UUID (and optionally a Readings) field. Path keys can also be non-terminal paths, which do not have an associated UUID.
This method stores the metadata associated with paths, indexed by each path. The pathmetadata collection should be queried internally by SaveTimeseriesMetadata to build up the full document for each individual timeseries
func (*MongoStore) SaveTags ¶
func (ms *MongoStore) SaveTags(messages *map[string]*SmapMessage) error
The incoming messages will be in the form of {pathname: metadata/properties/etc}. Only the timeseries will have UUIDs attached. When we receive a message like this, we need to compress all of the prefix-path kv pairs into each of the timeseries, and then save those timeseries to the metadata collection
func (*MongoStore) SaveTimeseriesMetadata ¶
func (ms *MongoStore) SaveTimeseriesMetadata(messages map[string]*SmapMessage) error
Here, we are handed a chunk of incoming sMAP messages, which can include both non-terminal and terminal (timeseries) paths. Timeseries (terminal) paths are identified by having a UUID key in their SmapMessage struct. For each of these, we decompose the full timeseries Path into its components -- e.g. /a/b/c -> /, /a, /a/b -- and inherit from the PathMetadata collection into the metadata for this source. Timeseries-specific metadata is then upserted into this document, and the result is saved in the Metadata collection TODO: this is super slow: Find and Upsert take the most time. Avoid this method if we already have all the new metadata! Important to note: sMAP 2.0 archiver does *not* do path inheritance of metadata within the archiver. Inheritance only happens w/n a source, when multiple messages are sent
func (*MongoStore) StartUpdateCacheLoop ¶
func (ms *MongoStore) StartUpdateCacheLoop()
type NetworkNode ¶
type NetworkNode struct {
// contains filtered or unexported fields
}
func (*NetworkNode) Run ¶
func (nn *NetworkNode) Run(input interface{}) (interface{}, error)
type Node ¶
type Node struct { Id string Tags map[string]interface{} In chan interface{} Done <-chan struct{} Children map[string]*Node Op Operator }
func NewChunkedStreamingDataNode ¶
func NewChunkedStreamingDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
arg0: archiver reference arg3: query.y query struct
func NewCountNode ¶
func NewCountNode(done <-chan struct{}, args ...interface{}) (n *Node)
func NewEchoNode ¶
func NewEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)
func NewEdgeNode ¶
func NewEdgeNode(done <-chan struct{}, args ...interface{}) (n *Node)
func NewMaxNode ¶
func NewMaxNode(done <-chan struct{}, args ...interface{}) (n *Node)
** Max Node **/ TODO: implement max over axis
func NewMeanNode ¶
func NewMeanNode(done <-chan struct{}, args ...interface{}) (n *Node)
func NewMinNode ¶
func NewMinNode(done <-chan struct{}, args ...interface{}) (n *Node)
TODO: implement min over axis
func NewNetworkNode ¶
func NewNetworkNode(done <-chan struct{}, args ...interface{}) (n *Node)
arg0: URI
func NewNopNode ¶
func NewNopNode(done <-chan struct{}, args ...interface{}) (n *Node)
func NewSelectDataNode ¶
func NewSelectDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
arg0: archiver reference arg1: query.y dataquery struct
func NewStreamingEchoNode ¶
func NewStreamingEchoNode(done <-chan struct{}, args ...interface{}) (n *Node)
arg0: send channel
func NewSubscribeDataNode ¶
func NewSubscribeDataNode(done <-chan struct{}, args ...interface{}) (n *Node)
arg0: archiver reference arg1: querystring arg2: apikey arg3: query.y dataquery struct
func NewWhereNode ¶
func NewWhereNode(done <-chan struct{}, args ...interface{}) (n *Node)
First argument are the k/v tags for this node, second are the arguments to the constructor arg0: BSON where clause, most likely from a parsed query arg1: pointer to a metadata store
func NewWindowNode ¶
func NewWindowNode(done <-chan struct{}, args ...interface{}) (n *Node)
type NodeConstructor ¶
type NodeConstructor func(<-chan struct{}, ...interface{}) *Node
type ObjectStore ¶
type ObjectStore interface { // archive the given SmapMessage that contains non-numerical Readings AddObject(*SmapMessage) (bool, error) // retrieve blob closest before the reference time for the given UUID PrevObject(string, uint64, UnitOfTime) (SmapObjectResponse, error) // retrieve blob closest after the reference time for the given UUIDs NextObject(string, uint64, UnitOfTime) (SmapObjectResponse, error) // retrieves all blobs between the start/end times for the given UUIDs GetObjects(string, uint64, uint64, UnitOfTime) (SmapObjectResponse, error) // Adds a pointer to metadata store for streamid/uuid conversion and the like AddStore(MetadataStore) }
The object store is a database for binary blobs rather than explicit timeseries data.
type OperationType ¶
type OperationType uint
const ( WINDOW OperationType = iota MIN MAX MEAN COUNT EDGE NETWORK )
type QuasarDB ¶
type QuasarDB struct {
// contains filtered or unexported fields
}
func (*QuasarDB) AddStore ¶
func (quasar *QuasarDB) AddStore(s MetadataStore)
func (*QuasarDB) GetData ¶
func (quasar *QuasarDB) GetData(uuids []string, start uint64, end uint64, uot UnitOfTime) ([]SmapNumbersResponse, error)
func (*QuasarDB) LiveConnections ¶
func (*QuasarDB) Next ¶
func (quasar *QuasarDB) Next(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)
func (*QuasarDB) Prev ¶
func (quasar *QuasarDB) Prev(uuids []string, start uint64, limit int32, uot UnitOfTime) ([]SmapNumbersResponse, error)
type QuasarReading ¶
type QuasarReading struct {
// contains filtered or unexported fields
}
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
hashable (Type A in a map) version of a query
func (Query) Match ¶
func (q Query) Match(cs *QueryChangeSet) *QueryChangeSet
func (Query) MatchSelectClause ¶
func (q Query) MatchSelectClause(msg *SmapMessage) (ret *SmapMessage)
type QueryChangeSet ¶
type QueryChangeSet struct { // new messages (streams) that match this query New map[string]*SmapMessage `json:",omitempty"` Del []string `json:",omitempty"` // contains filtered or unexported fields }
func NewQueryChangeSet ¶
func NewQueryChangeSet() *QueryChangeSet
func (*QueryChangeSet) AddNew ¶
func (cs *QueryChangeSet) AddNew(msg *SmapMessage)
func (*QueryChangeSet) Debug ¶
func (qs *QueryChangeSet) Debug()
func (*QueryChangeSet) DelStream ¶
func (cs *QueryChangeSet) DelStream(uuid string)
func (*QueryChangeSet) IsEmpty ¶
func (cs *QueryChangeSet) IsEmpty() bool
func (*QueryChangeSet) MarshalJSON ¶
func (cs *QueryChangeSet) MarshalJSON() ([]byte, error)
func (*QueryChangeSet) NewStream ¶
func (cs *QueryChangeSet) NewStream(uuid string, msg *SmapMessage)
type QueryProcessor ¶
type QueryProcessor struct {
// contains filtered or unexported fields
}
func NewQueryProcessor ¶
func NewQueryProcessor(a *Archiver) (qp *QueryProcessor)
func (*QueryProcessor) CheckOutToIn ¶
func (qp *QueryProcessor) CheckOutToIn(out, in *Node) bool
Checks that the ouput of node @out is compatible with the input of node @in. First checks that the structures match. If structures match, then it checks the data type. If the datatypes do not match, then we return false This does not actually resolve the types of the nodes, but rather just checks that they are potentially compatible. The actual type resolution is performed when the nodes are evaluated
func (*QueryProcessor) GetNodeFromOp ¶
func (qp *QueryProcessor) GetNodeFromOp(op *OpNode, query *query) *Node
func (*QueryProcessor) Parse ¶
func (qp *QueryProcessor) Parse(querystring string) *SQLex
type RepublishClient ¶
type RepublishClient struct {
// contains filtered or unexported fields
}
This is the type used within the Republisher to track the subscribers
type Republisher ¶
This is a more thought-out version of the republisher that was first included in Giles. The focus of this version of the republisher is SPEED: efficient discovery of who to deliver a new message to, and efficient reevaluation of queries in the face of new commands + data
func NewRepublisher ¶
func NewRepublisher(a *Archiver) (r *Republisher)
func (*Republisher) ChangeSubscriptions ¶
func (r *Republisher) ChangeSubscriptions(readings map[string]*SmapMessage) map[QueryHash]*QueryChangeSet
When we receive a new set of readings, we make a set (map[Queryhash]struct{}) of which queries could potentially be affected by the incoming data. We then reevaluate each of these queries, and keep track of which changed (true return value on ReevaluateQuery). For each of the clients for the changed queries, we send the updates. We look up which clients to send to based on what their where-clause is (looking at republisher.keyconcern for each key mentioned in msg.{Metadata, Properties, Actuator}
func (*Republisher) EvaluateQuery ¶
func (r *Republisher) EvaluateQuery(qh QueryHash)
Given a query hash, reevaluate the associated WHERE clause to get the new set of UUIDs that match the query. We now have a set of clients attached to a specific query and a set of clients associated with each of the UUIDs. Compare the previous list of UUIDs with the current list of UUIDs. For each UUID that is now in the set, add the list of concerned clients to the subscribers. For each UUID that is now NOT in the set, remove the list of concerned clients from that subscriber list
func (*Republisher) HandleMetadataSubscriber ¶
func (r *Republisher) HandleMetadataSubscriber(s Subscriber, query, apikey string)
A Metadata subscriber wants notifications on changes on the set of streams that match the where-clause provided by the `query` parameter
func (*Republisher) HandleQuery ¶
func (r *Republisher) HandleQuery(querystring string) (*Query, error)
Given a query string, tokenize and parse the query, also keeping track of what keys are mentioned in the query.
func (*Republisher) HandleQuery2 ¶
func (r *Republisher) HandleQuery2(query string) (*Query, error)
func (*Republisher) HandleSubscriber ¶
func (r *Republisher) HandleSubscriber(s Subscriber, query, apikey string, membership bool)
This is the Archiver API call. @s is a Subscriber, an interface that allows us to agnostically send "published" messages to the subscribed clients. Subscriber is wrapped in a RepublishClient internally so that the Republisher can keep track of necessary state. @query is the query string that describes what the client is subscribing to. This query should be a valid sMAP query
func (*Republisher) HandleSubscriber2 ¶
func (r *Republisher) HandleSubscriber2(s Subscriber, query, apikey string, legacy bool)
func (*Republisher) HandleUUIDSubscriber ¶
func (r *Republisher) HandleUUIDSubscriber(s Subscriber, uuids []string, apikey string)
A UUID subscriber is interested in all metadata associated with a given stream. We store a lookup from each uuid to a list of concerned clients. There is nothing to reevaluate here, so we can do normal lookups. TODO: to consider: update operations are expensive -- do we block other clients when
updating subscriptions?
func (*Republisher) MetadataChange ¶
func (r *Republisher) MetadataChange(msg *SmapMessage)
We call MetadataChange with an incoming sMAP message that includes changes to the metadata of a stream that could affect republish subscriptions TODO: store up queries and do r.EvaluateQuery once each at end. With this current scheme,
we can have multiple queries be re-evaluated twice
func (*Republisher) MetadataChangeKeys ¶
func (r *Republisher) MetadataChangeKeys(keys []string)
Same as MetadataChange, but operates on a known list of keys rather than a sMAP message TODO: store up queries and do r.EvaluateQuery once each at end. With this current scheme,
we can have multiple queries be re-evaluated twice
func (*Republisher) ReevaluateQuery ¶
func (r *Republisher) ReevaluateQuery(qh QueryHash, cs *QueryChangeSet) bool
reevaluate the query corresponding to the given QueryHash. Return true if the results of the query changed (streams add or remove)
func (*Republisher) Republish ¶
func (r *Republisher) Republish(msg *SmapMessage)
Publish @msg to all clients subscribing to @msg.UUID
func (*Republisher) RepublishKeyChanges ¶
func (r *Republisher) RepublishKeyChanges(keys []string) map[QueryHash]*QueryChangeSet
When a metadata change comes in from somewhere other than a smap message, we calculate the subscription changes and notify subscribers
func (*Republisher) RepublishReadings ¶
func (r *Republisher) RepublishReadings(messages map[string]*SmapMessage)
We receive a new message from a client, and want to send it out to the subscribers. A subscriber is interested in 1 of 3 things: * (all metadata), data before now (most recent data point) or a list of metadata tags.
type SSHConfigServer ¶
type SSHConfigServer struct {
// contains filtered or unexported fields
}
The SSHConfigServer offers a command-line based alternative to the PowerDB2 administration interface. If you are using the giles.go program as an interfaced to the Archiver API (this is by default), then this is automatically configured from the following section of the giles.cfg file:
[SSH] Port=2222 PrivateKey=./id_rsa AuthorizedKeysFile=/home/gabe/.ssh/authorized_keys User=admin Pass=supersecurepassword PasswordEnabled=false KeyAuthEnabled=true
Currently, the shell supports the following commands, which map to direct calls to the Metadata store in metadata.go. This makes it simple to not only extend the range of commands supported, but also introduce additional interfaces (e.g. a command-line utility).
Right now, the administration is focused around creating/deleting/viewing api keys, which are necessary to publish data to the sMAP archiver.
[[General]] quit -- exits the session help -- prints this help [[Key Management]] newkey <name> <email> <public?> -- creates a new API key and prints it getkey <name> <email> -- retrieve the API key for the given name and email listkeys <email> -- list all API keys and names for the given email delkey <name> <email> -- deletes the key associated with the given name and email delkey <key> -- deletes the given key owner <key> -- retrieves owner (name, email) for given key
func NewSSHConfigServer ¶
func NewSSHConfigServer(manager APIKeyManager, port, privatekey, authorizedKeysFile, confuser, confpass string, passenabled, keyenabled bool) *SSHConfigServer
func (*SSHConfigServer) Listen ¶
func (scs *SSHConfigServer) Listen()
type SelectDataNode ¶
type SelectDataNode struct {
// contains filtered or unexported fields
}
* Select Data Node *
func (*SelectDataNode) Run ¶
func (sn *SelectDataNode) Run(input interface{}) (interface{}, error)
type SmapMessage ¶
type SmapMessage struct { // Readings for this message Readings []Reading // If this struct corresponds to a sMAP collection, // then Contents contains a list of paths contained within // this collection Contents []string `json:",omitempty"` // Map of the metadata Metadata bson.M `json:",omitempty"` // Map containing the actuator reference Actuator bson.M `json:",omitempty"` // Map of the properties Properties bson.M `json:",omitempty"` // Unique identifier for this stream. Should be empty for Collections UUID string `json:"uuid"` // Path of this stream (thus far) Path string `json:", omitempty"` }
This is the general-purpose struct for all INCOMING sMAP messages. This struct is designed to match the format of sMAP JSON, as that is the primary data format.
func (*SmapMessage) GetKey ¶
func (sm *SmapMessage) GetKey(key string) interface{}
Key names like uuid, Metadata.Key, Properties.Key, etc. Fetches the corresponding value from the
func (*SmapMessage) GetValuesFor ¶
func (sm *SmapMessage) GetValuesFor(q *Query)
func (*SmapMessage) HasKeysFrom ¶
func (sm *SmapMessage) HasKeysFrom(keys []string) bool
Returns true if the current message contains keys mentioned in the provided list
func (*SmapMessage) HasMetadata ¶
func (sm *SmapMessage) HasMetadata() bool
func (*SmapMessage) IsTimeseries ¶
func (sm *SmapMessage) IsTimeseries() bool
func (*SmapMessage) ToJson ¶
func (sm *SmapMessage) ToJson() []byte
Convenience method to turn a sMAP message into marshaled JSON
func (*SmapMessage) ToSmapReading ¶
func (sm *SmapMessage) ToSmapReading() *SmapReading
func (*SmapMessage) UnmarshalJSON ¶
func (sm *SmapMessage) UnmarshalJSON(b []byte) (err error)
type SmapNumberReading ¶
type SmapNumberReading struct { // uint64 timestamp Time uint64 // value associated with this timestamp Value float64 }
func (*SmapNumberReading) GetTime ¶
func (s *SmapNumberReading) GetTime() uint64
func (*SmapNumberReading) GetValue ¶
func (s *SmapNumberReading) GetValue() interface{}
func (*SmapNumberReading) IsObject ¶
func (s *SmapNumberReading) IsObject() bool
func (*SmapNumberReading) MarshalJSON ¶
func (s *SmapNumberReading) MarshalJSON() ([]byte, error)
type SmapNumbersResponse ¶
type SmapNumbersResponse struct { Readings []*SmapNumberReading UUID string `json:"uuid"` }
type SmapObjectReading ¶
type SmapObjectReading struct { // uint64 timestamp Time uint64 // value associated with this timestamp Value interface{} }
func (*SmapObjectReading) GetTime ¶
func (s *SmapObjectReading) GetTime() uint64
func (*SmapObjectReading) GetValue ¶
func (s *SmapObjectReading) GetValue() interface{}
func (*SmapObjectReading) IsObject ¶
func (s *SmapObjectReading) IsObject() bool
func (*SmapObjectReading) MarshalJSON ¶
func (s *SmapObjectReading) MarshalJSON() ([]byte, error)
type SmapObjectResponse ¶
type SmapObjectResponse struct { Readings []*SmapObjectReading UUID string `json:"uuid"` }
type SmapReading ¶
type SmapReading struct { // Readings will be interpreted as a list of [uint64, float64] = [time, value] // OR as a lsit of [uint64, []byte] = [time, value] Readings [][]interface{} // Unique identifier for this stream UUID string `json:"uuid"` }
Struct representing data readings to and from sMAP
type StreamBuf ¶
func NewStreamBuf ¶
func NewStreamBuf(uuid string, uot UnitOfTime, txc *TransactionCoalescer) *StreamBuf
type StreamLockMap ¶
type StreamingEchoNode ¶
type StreamingEchoNode struct {
// contains filtered or unexported fields
}
* Streaming Echo Node *
func (*StreamingEchoNode) Run ¶
func (sen *StreamingEchoNode) Run(input interface{}) (interface{}, error)
type SubscribeDataNode ¶
type SubscribeDataNode struct {
// contains filtered or unexported fields
}
func (*SubscribeDataNode) GetNotify ¶
func (sn *SubscribeDataNode) GetNotify() <-chan bool
func (*SubscribeDataNode) Run ¶
func (sn *SubscribeDataNode) Run(input interface{}) (interface{}, error)
func (*SubscribeDataNode) Send ¶
func (sn *SubscribeDataNode) Send(msg interface{})
* implement the Subscriber interface for SubscribeDataNode *
func (*SubscribeDataNode) SendError ¶
func (sn *SubscribeDataNode) SendError(err error)
type Subscriber ¶
type Subscriber interface { // Called by the Republisher when there is a new message to send to the // client. Send should transform the message to the appropriate format // before forwarding to the actual client. Send(interface{}) // Called by Republisher when there is an error with the subscription SendError(error) // GetNotify is called by the Republisher to get a pointer to a "notify" // channel. When the client is closed and no longer wants to subscribe, // a value should be sent on the returned channel to signal to the Republisher // to unsubscribe the client. The client can of course disconnect on its own // without notifying the Republisher, but this means we cannot protect against // memory leaks resulting from infinitely adding new clients GetNotify() <-chan bool }
Subscriber is an interface that should be implemented by each protocol adapter that wants to support sMAP republish pub-sub.
type TSDB ¶
type TSDB interface { // add the following SmapReading to the timeseries database Add(*StreamBuf) bool // uuids, reference time, limit, unit of time // retrieve data before reference time Prev([]string, uint64, int32, UnitOfTime) ([]SmapNumbersResponse, error) // retrieve data after reference time Next([]string, uint64, int32, UnitOfTime) ([]SmapNumbersResponse, error) // uuids, start time, end time, unit of time GetData([]string, uint64, uint64, UnitOfTime) ([]SmapNumbersResponse, error) // get a new connection to the timeseries database GetConnection() (net.Conn, error) // return the number of live connections LiveConnections() int // Adds a pointer to metadata store for streamid/uuid conversion and the like AddStore(MetadataStore) }
TSDB (or TimeSeries DataBase) is a subset of functionality expected by Giles for (timestamp, value) oriented database. The relevant read/write types are SmapReading and can be found in json.go and readingdb.go respectively (although their locations are likely to change). The UnitOfTime parameters indicate how to interpret the timesteps that are given as parameters
type TieredSmapMessage ¶
type TieredSmapMessage map[string]*SmapMessage
func (*TieredSmapMessage) CollapseToTimeseries ¶
func (tsm *TieredSmapMessage) CollapseToTimeseries()
This performs the metadata inheritance for the paths and messages inside this collection of SmapMessages. Inheritance starts from the root path "/" can progresses towards the leaves. First, get a list of all of the potential timeseries (any path that contains a UUID) Then, for each of the prefixes for the path of that timeserie (util.getPrefixes), grab the paths from the TieredSmapMessage that match the prefixes. Sort these in "decreasing" order and apply to the metadata. Finally, delete all non-timeseries paths
func (*TieredSmapMessage) ToBson ¶
func (tsm *TieredSmapMessage) ToBson() []bson.M
type TransactionCoalescer ¶
func NewTransactionCoalescer ¶
func NewTransactionCoalescer(tsdb *TSDB, store *MetadataStore) *TransactionCoalescer
func (*TransactionCoalescer) AddSmapMessage ¶
func (txc *TransactionCoalescer) AddSmapMessage(sm *SmapMessage)
Called to add an incoming SmapMessage to the underlying timeseries database. A SmapMessage contains an array of Readings and the UUID for the stream the readings belong to. The Readings must be added to a StreamBuffer for coalescing. This StreamBuffer is either a) pre-existing and still open, b) pre-existing and committing or c) not existing. In the
func (*TransactionCoalescer) Commit ¶
func (txc *TransactionCoalescer) Commit(sb *StreamBuf)
type UnitOfTime ¶
type UnitOfTime uint
unit of time indicators
const ( // nanoseconds 1000000000 UOT_NS UnitOfTime = 1 // microseconds 1000000 UOT_US UnitOfTime = 2 // milliseconds 1000 UOT_MS UnitOfTime = 3 // seconds 1 UOT_S UnitOfTime = 4 )
type WhereNode ¶
type WhereNode struct {
// contains filtered or unexported fields
}
* Where Node * A WhereNode takes a where clause in its constructor.
type WindowNode ¶
type WindowNode struct {
// contains filtered or unexported fields
}
func (*WindowNode) Run ¶
func (wn *WindowNode) Run(input interface{}) (interface{}, error)
TODO: do we assume that data is sorted?