datastore

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2014 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ONE_MEGABYTE                 = 1024 * 1024
	ONE_GIGABYTE                 = ONE_MEGABYTE * 1024
	TWO_FIFTY_SIX_KILOBYTES      = 256 * 1024
	BLOOM_FILTER_BITS_PER_KEY    = 64
	MAX_SERIES_SIZE              = ONE_MEGABYTE
	REQUEST_SEQUENCE_NUMBER_KEY  = "r"
	REQUEST_LOG_BASE_DIR         = "request_logs"
	DATABASE_DIR                 = "db"
	REQUEST_LOG_ROTATION_PERIOD  = 24 * time.Hour
	HOUR_TO_ROTATE_REQUEST_LOG   = 0
	MINUTE_TO_ROTATE_REQUEST_LOG = 1
)

Variables

View Source
var (

	// This datastore implements the PersistentAtomicInteger interface. All of the persistent
	// integers start with this prefix, followed by their name
	ATOMIC_INCREMENT_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD}
	// NEXT_ID_KEY holds the next id. ids are used to "intern" timeseries and column names
	NEXT_ID_KEY = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
	// SERIES_COLUMN_INDEX_PREFIX is the prefix of the series to column names index
	SERIES_COLUMN_INDEX_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE}
	// DATABASE_SERIES_INDEX_PREFIX is the prefix of the database to series names index
	DATABASE_SERIES_INDEX_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
	MAX_SEQUENCE                 = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}

	TRUE = true
)

Functions

func DivideOperator added in v0.4.0

func DivideOperator(elems []*parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

func EqualityOperator

func EqualityOperator(leftValue, rightValue *protocol.FieldValue) (bool, error)

func Filter

func Filter(query *parser.SelectQuery, series *protocol.Series) (*protocol.Series, error)

func GetValue added in v0.4.0

func GetValue(value *parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

func GreaterThanOperator

func GreaterThanOperator(leftValue, rightValue *protocol.FieldValue) (bool, error)

func GreaterThanOrEqualOperator

func GreaterThanOrEqualOperator(leftValue, rightValue *protocol.FieldValue) (bool, error)

func InOperator added in v0.3.1

func InOperator(leftValue *protocol.FieldValue, rightValue []*protocol.FieldValue) (bool, error)

func MinusOperator added in v0.4.0

func MinusOperator(elems []*parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

func MultiplyOperator added in v0.4.0

func MultiplyOperator(elems []*parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

func NewRequestLogDb added in v0.4.0

func NewRequestLogDb(dir string) (*requestLogDb, error)

func PlusOperator added in v0.4.0

func PlusOperator(elems []*parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

func RegexMatcherOperator

func RegexMatcherOperator(leftValue, rightValue *protocol.FieldValue) (bool, error)

Types

type ArithmeticOperator added in v0.4.0

type ArithmeticOperator func(elems []*parser.Value, fields []string, point *protocol.Point) (*protocol.FieldValue, error)

type BooleanOperation

type BooleanOperation func(leftValue *protocol.FieldValue, rightValues []*protocol.FieldValue) (bool, error)

type Datastore

type Datastore interface {
	ExecuteQuery(user common.User, database string,
		query *parser.SelectQuery, yield func(*protocol.Series) error,
		ringFilter func(database, series *string, time *int64) bool) error
	// Logs the request to a local store and assigns a sequence number that is unique per server id per day
	LogRequestAndAssignSequenceNumber(request *protocol.Request, replicationFactor *uint8, ownerServerId *uint32) error
	CurrentSequenceNumber(clusterVersion *uint32, replicationFactor *uint8, ownerServerId, originatingServerId *uint32) (uint64, error)
	// will replay all requests from a given number. If the number hasn't occured yet today, it replays from yesterday.
	// So this log replay is only meant to work for outages that last less than maybe 12 hours.
	ReplayRequestsFromSequenceNumber(*uint32, *uint32, *uint32, *uint8, *uint64, func(*[]byte) error) error
	// Increment the named integer by the given amount and return the new value
	AtomicIncrement(name string, val int) (uint64, error)
	WriteSeriesData(database string, series *protocol.Series) error
	DeleteSeriesData(database string, query *parser.DeleteQuery) error
	GetSeriesForDatabase(database string, yield func(string) error) error
	DropDatabase(database string) error
	DropSeries(database, series string) error
	Close()
}

func NewLevelDbDatastore

func NewLevelDbDatastore(dbDir string) (Datastore, error)

type Field

type Field struct {
	Id   []byte
	Name string
}

type FieldLookupError added in v0.4.0

type FieldLookupError struct {
	// contains filtered or unexported fields
}

func (FieldLookupError) Error added in v0.4.0

func (self FieldLookupError) Error() string

type LevelDbDatastore

type LevelDbDatastore struct {
	// contains filtered or unexported fields
}

func (*LevelDbDatastore) AtomicIncrement added in v0.4.0

func (self *LevelDbDatastore) AtomicIncrement(name string, val int) (uint64, error)

func (*LevelDbDatastore) Close

func (self *LevelDbDatastore) Close()

func (*LevelDbDatastore) CurrentSequenceNumber added in v0.4.0

func (self *LevelDbDatastore) CurrentSequenceNumber(clusterVersion *uint32, replicationFactor *uint8, ownerServerId, originatingServerId *uint32) (uint64, error)

func (*LevelDbDatastore) DeleteRangeOfRegex

func (self *LevelDbDatastore) DeleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error

func (*LevelDbDatastore) DeleteSeriesData added in v0.4.0

func (self *LevelDbDatastore) DeleteSeriesData(database string, query *parser.DeleteQuery) error

func (*LevelDbDatastore) DropDatabase added in v0.2.0

func (self *LevelDbDatastore) DropDatabase(database string) error

func (*LevelDbDatastore) DropSeries added in v0.4.0

func (self *LevelDbDatastore) DropSeries(database, series string) error

func (*LevelDbDatastore) ExecuteQuery

func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string,
	query *parser.SelectQuery, yield func(*protocol.Series) error,
	ringFilter func(database, series *string, time *int64) bool) error

func (*LevelDbDatastore) GetSeriesForDatabase added in v0.4.0

func (self *LevelDbDatastore) GetSeriesForDatabase(database string, yield func(string) error) error

func (*LevelDbDatastore) LogRequestAndAssignSequenceNumber added in v0.4.0

func (self *LevelDbDatastore) LogRequestAndAssignSequenceNumber(request *protocol.Request, replicationFactor *uint8, ownerServerId *uint32) error

func (*LevelDbDatastore) ReplayRequestsFromSequenceNumber added in v0.4.0

func (self *LevelDbDatastore) ReplayRequestsFromSequenceNumber(clusterVersion, originatingServerId, ownerServerId *uint32, replicationFactor *uint8, lastKnownSequence *uint64, yield func(*[]byte) error) error

func (*LevelDbDatastore) WriteSeriesData

func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.Series) error

type SequenceMissingRequestsError added in v0.4.0

type SequenceMissingRequestsError struct {
	LastKnownRequestSequence uint64
	ReceivedSequence         uint64
	// contains filtered or unexported fields
}

func (SequenceMissingRequestsError) Error added in v0.4.0

func (self SequenceMissingRequestsError) Error() string

type WALKey added in v0.4.0

type WALKey []byte

func NewWALKey added in v0.4.0

func NewWALKey(clusterVersion, originatingServerId, ownerServerId *uint32, sequenceNumber *uint64, replicationFactor *uint8) WALKey

func NewWALKeyFromBytes added in v0.4.0

func NewWALKeyFromBytes(bytes []byte) WALKey

func (WALKey) EqualsIgnoreSequenceNumber added in v0.4.0

func (self WALKey) EqualsIgnoreSequenceNumber(other WALKey) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL