Documentation ¶
Index ¶
- Constants
- Variables
- func ErrConversationNotFound(name string) error
- func ErrDatabaseNotFound(name string) error
- func IsRetryable(err error) bool
- func NewSnapshotWriter(meta []byte, store *Store) (*snapshot.Writer, error)
- func NopWriteToCloser(w io.WriterTo) interface{ ... }
- func WALPartition(key []byte) uint8
- type Config
- type Conversation
- type Conversations
- type DatabaseIndex
- type ErrAuthorize
- type Executor
- type FieldCodec
- func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
- func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
- func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
- func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
- type LocalMapper
- type Mapper
- type MapperOutput
- type Message
- type QueryExecutor
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) ConversationsCount() (n int, err error)
- func (s *Shard) DB() *bolt.DB
- func (s *Shard) Flush(partitionFlushDelay time.Duration) error
- func (s *Shard) FlushPartition(partitionID uint8) error
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) WriteMessages(messages []Message) error
- type StatefulMapper
- type Store
- func (s *Store) Close() error
- func (s *Store) Conversation(database, name string) *Conversation
- func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error)
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) Flush() error
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) Shard(shardID uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) WriteToShard(shardID uint64, messages []Message) error
Constants ¶
const ( // DefaultMaxWALSize is the default size of the WAL before it is flushed. DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB // DefaultWALFlushInterval is the frequency the WAL will get flushed if // it doesn't reach its size threshold. DefaultWALFlushInterval = 10 * time.Minute // DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes. DefaultWALPartitionFlushDelay = 2 * time.Second )
const ( // Return an error if the user is trying to select more than this number of points in a group by statement. // Most likely they specified a group by interval without time boundaries. MaxGroupByPoints = 100000 // Since time is always selected, the column count when selecting only a single other value will be 2 SelectColumnCountWithOneValue = 2 // IgnoredChunkSize is what gets passed into Mapper.Begin for aggregate queries as they don't chunk points out IgnoredChunkSize = 0 )
const WALPartitionN = 8
WALPartitionN is the number of partitions in the write ahead log.
Variables ¶
var ( // ErrInvalidQuery is returned when executing an unknown query type. ErrInvalidQuery = errors.New("invalid query") // ErrNotExecuted is returned when a statement is not executed in a query. // This can occur when a previous statement in the same query has errored. ErrNotExecuted = errors.New("not executed") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") // ErrWALPartitionNotFound is returns when flushing a WAL partition that // does not exist. ErrWALPartitionNotFound = errors.New("wal partition not found") )
var (
ErrShardNotFound = fmt.Errorf("shard not found")
)
Functions ¶
func ErrConversationNotFound ¶
func ErrDatabaseNotFound ¶
func IsRetryable ¶
IsRetryable returns true if this error is temporary and could be retried
func NewSnapshotWriter ¶
NewSnapshotWriter returns a new snapshot.Writer that will write metadata and the store's shards to an archive.
func NopWriteToCloser ¶
NopWriteToCloser returns an io.WriterTo that implements io.Closer.
func WALPartition ¶
WALPartition returns the partition number that key belongs to.
Types ¶
type Config ¶
type Conversation ¶
type Conversation struct { Name string `json:"name,omitempty"` Key string Tags map[string]string // contains filtered or unexported fields }
Conversation represent unique series messages in a database
func (*Conversation) HasField ¶
func (c *Conversation) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Conversation) HasTagKey ¶
func (c *Conversation) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Conversation) MarshalBinary ¶
func (c *Conversation) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*Conversation) UnmarshalBinary ¶
func (c *Conversation) UnmarshalBinary(buf []byte) error
UnmarshalBinary decodes the object from a binary format.
type Conversations ¶
type Conversations []*Conversation
Conversations represents a list of *Conversation.
func (Conversations) Len ¶
func (a Conversations) Len() int
func (Conversations) Less ¶
func (a Conversations) Less(i, j int) bool
func (Conversations) Swap ¶
func (a Conversations) Swap(i, j int)
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of conversations, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex() *DatabaseIndex
NewDatabaseIndex creates the in memory index
func (*DatabaseIndex) Conversation ¶
func (db *DatabaseIndex) Conversation(name string) *Conversation
Conversation returns the measurement object from the index by the name
func (*DatabaseIndex) Conversations ¶
func (db *DatabaseIndex) Conversations() Conversations
Conversations returns a list of all conversations.
func (*DatabaseIndex) ConversationsCount ¶
func (db *DatabaseIndex) ConversationsCount() (nConversations int)
ConversationsCount returns the number of conversations currently indexed by the database. Useful for reporting and monitoring.
func (*DatabaseIndex) DropConversation ¶
func (db *DatabaseIndex) DropConversation(name string)
type ErrAuthorize ¶
type ErrAuthorize struct {
// contains filtered or unexported fields
}
ErrAuthorize represents an authorization error.
func NewErrAuthorize ¶
func NewErrAuthorize(qe *QueryExecutor, q *sql.Query, u, db, m string) *ErrAuthorize
newAuthorizationError returns a new instance of AuthorizationError.
func (ErrAuthorize) Error ¶
func (e ErrAuthorize) Error() string
Error returns the text of the error.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(stmt *sql.SelectStatement, mappers []Mapper, chunkSize int) *Executor
NewExecutor returns a new Executor.
type FieldCodec ¶
type FieldCodec struct {
// contains filtered or unexported fields
}
FieldCodec provides encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.
It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
However, this is here until tx.go and the engine get refactored into tsdb.
func (*FieldCodec) DecodeByID ¶
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine
func (*FieldCodec) DecodeByName ¶
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
DecodeByName scans a byte slice for a field with the given name, converts it to its expected type, and return that value.
func (*FieldCodec) DecodeFields ¶
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
DecodeFields decodes a byte slice into a set of field ids and values.
func (*FieldCodec) DecodeFieldsWithNames ¶
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine
func (*FieldCodec) EncodeFields ¶
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.
If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.
func (*FieldCodec) FieldIDByName ¶
func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
type LocalMapper ¶
type LocalMapper struct {
// contains filtered or unexported fields
}
LocalMapper is for retrieving data for a query, from a given shard.
func NewLocalMapper ¶
func NewLocalMapper(shard *Shard, stmt sql.Statement, chunkSize int) *LocalMapper
NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement.
func (*LocalMapper) NextChunk ¶
func (lm *LocalMapper) NextChunk() (interface{}, error)
NextChunk returns the next chunk of data
type MapperOutput ¶
type MapperOutput struct { Name string `json:"name,omitempty"` Values []*mapperValue `json:"values,omitempty"` // For aggregates contains a single value at [0] }
type Message ¶
type Message interface { Time() time.Time SetTime(t time.Time) UnixNano() int64 HashID() uint64 Key() []byte Data() []byte SetData(buf []byte) String() string }
func NewMessage ¶
func ParseMessages ¶
func ParseMessagesString ¶
type QueryExecutor ¶
type QueryExecutor struct { // The meta store for accessing and updating cluster and schema data. MetaStore interface { Database(name string) (*meta.DatabaseInfo, error) Databases() ([]meta.DatabaseInfo, error) User(name string) (*meta.UserInfo, error) AdminUserExists() (bool, error) Authenticate(username, password string) (*meta.UserInfo, error) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) UserCount() (int, error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) NodeID() uint64 } // Executes statements relating to meta data. MetaStatementExecutor interface { ExecuteStatement(stmt sql.Statement) *sql.Result } // Maps shards for queries. ShardMapper interface { CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error) } Logger *log.Logger // contains filtered or unexported fields }
QueryExecutor executes every statement in an MessageDB Query. It is responsible for coordinating between the local tsdb.Store, the meta.Store, and the other nodes in the cluster to run the query against their local tsdb.Stores. There should be one executor in a running process
func NewQueryExecutor ¶
func NewQueryExecutor(store *Store) *QueryExecutor
NewQueryExecutor returns an initialized QueryExecutor
func (*QueryExecutor) Authorize ¶
Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If no user is provided it will return an error unless the query's first statement is to create a root user.
func (*QueryExecutor) ExecuteQuery ¶
func (q *QueryExecutor) ExecuteQuery(query *sql.Query, database string, chunkSize int) (<-chan *sql.Result, error)
ExecuteQuery executes an sql query against the server. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.
type Shard ¶
type Shard struct { // The maximum size and time thresholds for flushing the WAL. MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration // The writer used by the logger. LogOutput io.Writer // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(index *DatabaseIndex, path string) *Shard
NewShard returns a new initialized Shard
func (*Shard) ConversationsCount ¶
ConversationsCount returns the number of conversations buckets on the shard. This does not include a count from the WAL.
func (*Shard) DB ¶
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying store.
func (*Shard) FlushPartition ¶
FlushPartition flushes a single WAL partition.
func (*Shard) WriteMessages ¶
WriteMessages will write the raw data messages and any new metadata to the index in the shard
type StatefulMapper ¶
type StatefulMapper struct { Mapper // contains filtered or unexported fields }
StatefulMapper encapsulates a Mapper and some state that the executor needs to track for that mapper.
func (*StatefulMapper) NextChunk ¶
func (sm *StatefulMapper) NextChunk() (*MapperOutput, error)
NextChunk wraps a RawMapper and some state.
type Store ¶
type Store struct { MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration Logger *log.Logger // contains filtered or unexported fields }
func (*Store) Conversation ¶
func (s *Store) Conversation(database, name string) *Conversation
func (*Store) CreateMapper ¶
func (*Store) CreateShard ¶
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.