db

package
v0.0.0-...-4471eec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2015 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxWALSize is the default size of the WAL before it is flushed.
	DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB

	// DefaultWALFlushInterval is the frequency the WAL will get flushed if
	// it doesn't reach its size threshold.
	DefaultWALFlushInterval = 10 * time.Minute

	// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
	DefaultWALPartitionFlushDelay = 2 * time.Second
)
View Source
const (
	// Return an error if the user is trying to select more than this number of points in a group by statement.
	// Most likely they specified a group by interval without time boundaries.
	MaxGroupByPoints = 100000

	// Since time is always selected, the column count when selecting only a single other value will be 2
	SelectColumnCountWithOneValue = 2

	// IgnoredChunkSize is what gets passed into Mapper.Begin for aggregate queries as they don't chunk points out
	IgnoredChunkSize = 0
)
View Source
const WALPartitionN = 8

WALPartitionN is the number of partitions in the write ahead log.

Variables

View Source
var (
	// ErrInvalidQuery is returned when executing an unknown query type.
	ErrInvalidQuery = errors.New("invalid query")

	// ErrNotExecuted is returned when a statement is not executed in a query.
	// This can occur when a previous statement in the same query has errored.
	ErrNotExecuted = errors.New("not executed")
)
View Source
var (
	// ErrFieldOverflow is returned when too many fields are created on a measurement.
	ErrFieldOverflow = errors.New("field overflow")

	// ErrFieldTypeConflict is returned when a new field already exists with a different type.
	ErrFieldTypeConflict = errors.New("field type conflict")

	// ErrFieldNotFound is returned when a field cannot be found.
	ErrFieldNotFound = errors.New("field not found")

	// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
	// there is no mapping for.
	ErrFieldUnmappedID = errors.New("field ID not mapped")

	// ErrWALPartitionNotFound is returns when flushing a WAL partition that
	// does not exist.
	ErrWALPartitionNotFound = errors.New("wal partition not found")
)
View Source
var (
	ErrShardNotFound = fmt.Errorf("shard not found")
)

Functions

func ErrConversationNotFound

func ErrConversationNotFound(name string) error

func ErrDatabaseNotFound

func ErrDatabaseNotFound(name string) error

func IsRetryable

func IsRetryable(err error) bool

IsRetryable returns true if this error is temporary and could be retried

func NewSnapshotWriter

func NewSnapshotWriter(meta []byte, store *Store) (*snapshot.Writer, error)

NewSnapshotWriter returns a new snapshot.Writer that will write metadata and the store's shards to an archive.

func NopWriteToCloser

func NopWriteToCloser(w io.WriterTo) interface {
	io.WriterTo
	io.Closer
}

NopWriteToCloser returns an io.WriterTo that implements io.Closer.

func WALPartition

func WALPartition(key []byte) uint8

WALPartition returns the partition number that key belongs to.

Types

type Config

type Config struct {
	Dir                    string        `toml:"dir"`
	MaxWALSize             int           `toml:"max-wal-size"`
	WALFlushInterval       toml.Duration `toml:"wal-flush-interval"`
	WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`
}

func NewConfig

func NewConfig() Config

type Conversation

type Conversation struct {
	Name string `json:"name,omitempty"`
	Key  string

	Tags map[string]string
	// contains filtered or unexported fields
}

Conversation represent unique series messages in a database

func (*Conversation) HasField

func (c *Conversation) HasField(name string) bool

HasField returns true if the measurement has a field by the given name

func (*Conversation) HasTagKey

func (c *Conversation) HasTagKey(k string) bool

HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key

func (*Conversation) MarshalBinary

func (c *Conversation) MarshalBinary() ([]byte, error)

MarshalBinary encodes the object to a binary format.

func (*Conversation) UnmarshalBinary

func (c *Conversation) UnmarshalBinary(buf []byte) error

UnmarshalBinary decodes the object from a binary format.

type Conversations

type Conversations []*Conversation

Conversations represents a list of *Conversation.

func (Conversations) Len

func (a Conversations) Len() int

func (Conversations) Less

func (a Conversations) Less(i, j int) bool

func (Conversations) Swap

func (a Conversations) Swap(i, j int)

type DatabaseIndex

type DatabaseIndex struct {
	// contains filtered or unexported fields
}

DatabaseIndex is the in memory index of a collection of conversations, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks

func NewDatabaseIndex

func NewDatabaseIndex() *DatabaseIndex

NewDatabaseIndex creates the in memory index

func (*DatabaseIndex) Conversation

func (db *DatabaseIndex) Conversation(name string) *Conversation

Conversation returns the measurement object from the index by the name

func (*DatabaseIndex) Conversations

func (db *DatabaseIndex) Conversations() Conversations

Conversations returns a list of all conversations.

func (*DatabaseIndex) ConversationsCount

func (db *DatabaseIndex) ConversationsCount() (nConversations int)

ConversationsCount returns the number of conversations currently indexed by the database. Useful for reporting and monitoring.

func (*DatabaseIndex) DropConversation

func (db *DatabaseIndex) DropConversation(name string)

type ErrAuthorize

type ErrAuthorize struct {
	// contains filtered or unexported fields
}

ErrAuthorize represents an authorization error.

func NewErrAuthorize

func NewErrAuthorize(qe *QueryExecutor, q *sql.Query, u, db, m string) *ErrAuthorize

newAuthorizationError returns a new instance of AuthorizationError.

func (ErrAuthorize) Error

func (e ErrAuthorize) Error() string

Error returns the text of the error.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(stmt *sql.SelectStatement, mappers []Mapper, chunkSize int) *Executor

NewExecutor returns a new Executor.

func (*Executor) Execute

func (e *Executor) Execute() <-chan *sql.Row

Execute begins execution of the query and returns a channel to receive rows.

type FieldCodec

type FieldCodec struct {
	// contains filtered or unexported fields
}

FieldCodec provides encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.

It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.

However, this is here until tx.go and the engine get refactored into tsdb.

func (*FieldCodec) DecodeByID

func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)

DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine

func (*FieldCodec) DecodeByName

func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)

DecodeByName scans a byte slice for a field with the given name, converts it to its expected type, and return that value.

func (*FieldCodec) DecodeFields

func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)

DecodeFields decodes a byte slice into a set of field ids and values.

func (*FieldCodec) DecodeFieldsWithNames

func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)

DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine

func (*FieldCodec) EncodeFields

func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)

EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.

If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.

func (*FieldCodec) FieldIDByName

func (f *FieldCodec) FieldIDByName(s string) (uint8, error)

TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb

type LocalMapper

type LocalMapper struct {
	// contains filtered or unexported fields
}

LocalMapper is for retrieving data for a query, from a given shard.

func NewLocalMapper

func NewLocalMapper(shard *Shard, stmt sql.Statement, chunkSize int) *LocalMapper

NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement.

func (*LocalMapper) Close

func (lm *LocalMapper) Close()

Close closes the mapper.

func (*LocalMapper) NextChunk

func (lm *LocalMapper) NextChunk() (interface{}, error)

NextChunk returns the next chunk of data

func (*LocalMapper) Open

func (lm *LocalMapper) Open() error

Open opens the local mapper.

type Mapper

type Mapper interface {
	Open() error
	NextChunk() (interface{}, error)
	Close()
}

Mapper is the interface all Mapper types must implement.

type MapperOutput

type MapperOutput struct {
	Name   string         `json:"name,omitempty"`
	Values []*mapperValue `json:"values,omitempty"` // For aggregates contains a single value at [0]
}

type Message

type Message interface {
	Time() time.Time
	SetTime(t time.Time)
	UnixNano() int64

	HashID() uint64
	Key() []byte

	Data() []byte
	SetData(buf []byte)

	String() string
}

func NewMessage

func NewMessage(time time.Time) Message

func ParseMessages

func ParseMessages(buf []byte) ([]Message, error)

func ParseMessagesString

func ParseMessagesString(buf string) ([]Message, error)

type QueryExecutor

type QueryExecutor struct {
	// The meta store for accessing and updating cluster and schema data.
	MetaStore interface {
		Database(name string) (*meta.DatabaseInfo, error)
		Databases() ([]meta.DatabaseInfo, error)
		User(name string) (*meta.UserInfo, error)
		AdminUserExists() (bool, error)
		Authenticate(username, password string) (*meta.UserInfo, error)
		RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
		UserCount() (int, error)
		ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
		NodeID() uint64
	}

	// Executes statements relating to meta data.
	MetaStatementExecutor interface {
		ExecuteStatement(stmt sql.Statement) *sql.Result
	}

	// Maps shards for queries.
	ShardMapper interface {
		CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error)
	}

	Logger *log.Logger
	// contains filtered or unexported fields
}

QueryExecutor executes every statement in an MessageDB Query. It is responsible for coordinating between the local tsdb.Store, the meta.Store, and the other nodes in the cluster to run the query against their local tsdb.Stores. There should be one executor in a running process

func NewQueryExecutor

func NewQueryExecutor(store *Store) *QueryExecutor

NewQueryExecutor returns an initialized QueryExecutor

func (*QueryExecutor) Authorize

func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *sql.Query, database string) error

Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If no user is provided it will return an error unless the query's first statement is to create a root user.

func (*QueryExecutor) ExecuteQuery

func (q *QueryExecutor) ExecuteQuery(query *sql.Query, database string, chunkSize int) (<-chan *sql.Result, error)

ExecuteQuery executes an sql query against the server. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.

type Shard

type Shard struct {

	// The maximum size and time thresholds for flushing the WAL.
	MaxWALSize             int
	WALFlushInterval       time.Duration
	WALPartitionFlushDelay time.Duration

	// The writer used by the logger.
	LogOutput io.Writer
	// contains filtered or unexported fields
}

Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.

func NewShard

func NewShard(index *DatabaseIndex, path string) *Shard

NewShard returns a new initialized Shard

func (*Shard) Close

func (s *Shard) Close() error

Close shuts down the shard's store.

func (*Shard) ConversationsCount

func (s *Shard) ConversationsCount() (n int, err error)

ConversationsCount returns the number of conversations buckets on the shard. This does not include a count from the WAL.

func (*Shard) DB

func (s *Shard) DB() *bolt.DB

TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying store.

func (*Shard) Flush

func (s *Shard) Flush(partitionFlushDelay time.Duration) error

Flush writes all points from the write ahead log to the index.

func (*Shard) FlushPartition

func (s *Shard) FlushPartition(partitionID uint8) error

FlushPartition flushes a single WAL partition.

func (*Shard) Open

func (s *Shard) Open() error

open initializes and opens the shard's store.

func (*Shard) Path

func (s *Shard) Path() string

Path returns the path set on the shard when it was created.

func (*Shard) WriteMessages

func (s *Shard) WriteMessages(messages []Message) error

WriteMessages will write the raw data messages and any new metadata to the index in the shard

type StatefulMapper

type StatefulMapper struct {
	Mapper
	// contains filtered or unexported fields
}

StatefulMapper encapsulates a Mapper and some state that the executor needs to track for that mapper.

func (*StatefulMapper) NextChunk

func (sm *StatefulMapper) NextChunk() (*MapperOutput, error)

NextChunk wraps a RawMapper and some state.

type Store

type Store struct {
	MaxWALSize             int
	WALFlushInterval       time.Duration
	WALPartitionFlushDelay time.Duration

	Logger *log.Logger
	// contains filtered or unexported fields
}

func NewStore

func NewStore(path string) *Store

func (*Store) Close

func (s *Store) Close() error

func (*Store) Conversation

func (s *Store) Conversation(database, name string) *Conversation

func (*Store) CreateMapper

func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error)

func (*Store) CreateShard

func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error

func (*Store) DatabaseIndex

func (s *Store) DatabaseIndex(name string) *DatabaseIndex

func (*Store) DeleteDatabase

func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error

DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.

func (*Store) DeleteShard

func (s *Store) DeleteShard(shardID uint64) error

DeleteShard removes a shard from disk.

func (*Store) Flush

func (s *Store) Flush() error

Flush forces all shards to write their WAL data to the index.

func (*Store) Open

func (s *Store) Open() error

func (*Store) Path

func (s *Store) Path() string

Path returns the store's root path.

func (*Store) Shard

func (s *Store) Shard(shardID uint64) *Shard

func (*Store) ShardIDs

func (s *Store) ShardIDs() []uint64

ShardIDs returns a slice of all ShardIDs under management.

func (*Store) WriteToShard

func (s *Store) WriteToShard(shardID uint64, messages []Message) error

Directories

Path Synopsis
Package internal is a generated protocol buffer package.
Package internal is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL