tsdb

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Overview

Package tsdb implements a time-series-based storage engine. It provides:

  • Partition data based on a time axis.
  • Sharding data based on a series id which represents a unique entity of stream/measure
  • Retrieving data based on index.Filter.
  • Cleaning expired data, or the data retention.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknownShard indicates that the shard is not found.
	ErrUnknownShard = errors.New("unknown shard")

	// OptionsKey is the key of options in context.
	OptionsKey = contextOptionsKey{}
)
View Source
var AnyEntry = Entry(nil)

AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity.

View Source
var (
	// ErrEmptySeriesSpan hints there is no any data blocks based on the input time range.
	ErrEmptySeriesSpan = errors.New("there is no data in such time range")
)

Functions

func GlobalSeriesID

func GlobalSeriesID(scope Entry) common.SeriesID

GlobalSeriesID encodes Entry to common.SeriesID.

func Hash

func Hash(entry []byte) []byte

Hash encode Entry to 8 bytes.

func HashEntity

func HashEntity(entity Entity) []byte

HashEntity runs hash function (e.g. with xxhash algorithm) on each segment of the Entity, and concatenates all uint64 in byte array. So the return length of the byte array will be 8 (every uint64 has 8 bytes) * length of the input.

func MarshalEntityValues added in v0.3.0

func MarshalEntityValues(evs EntityValues) ([]byte, error)

MarshalEntityValues encodes EntityValues to bytes.

func SeriesID

func SeriesID(entity Entity) common.SeriesID

SeriesID transforms Entity to common.SeriesID.

Types

type BlockID

type BlockID struct {
	SegID   SectionID
	BlockID SectionID
}

BlockID is the identity of a block in a shard.

func (BlockID) String added in v0.2.0

func (b BlockID) String() string

type BlockState

type BlockState struct {
	TimeRange timestamp.TimeRange
	ID        BlockID
	Closed    bool
}

BlockState is a sample of a block's runtime state.

type Buffer added in v0.4.0

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is an exported struct that represents a buffer composed of multiple shard buckets.

func NewBuffer added in v0.4.0

func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int) (*Buffer, error)

NewBuffer creates a new Buffer instance with the given parameters.

func NewBufferWithWal added in v0.5.0

func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, enableWal bool, walPath string,
) (*Buffer, error)

NewBufferWithWal creates a new Buffer instance with the given parameters.

func (*Buffer) Close added in v0.4.0

func (b *Buffer) Close() error

Close gracefully closes the Buffer and ensures that all pending operations are completed.

func (*Buffer) Read added in v0.4.0

func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool)

Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.

func (*Buffer) Register added in v0.5.0

func (b *Buffer) Register(id string, onFlushFn onFlush)

Register registers a callback function that will be called when a shard bucket is flushed.

func (*Buffer) Unregister added in v0.5.0

func (b *Buffer) Unregister(id string)

Unregister unregisters a callback function that will be called when a shard bucket is flushed.

func (*Buffer) Write added in v0.4.0

func (b *Buffer) Write(key, value []byte, timestamp time.Time) error

Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.

type BufferSupplier added in v0.5.0

type BufferSupplier struct {
	// contains filtered or unexported fields
}

BufferSupplier lends a Buffer to a caller and returns it when the caller is done with it.

func NewBufferSupplier added in v0.5.0

func NewBufferSupplier(l *logger.Logger, p common.Position, writeConcurrency, numShards int, enableWAL bool, path string) *BufferSupplier

NewBufferSupplier creates a new BufferSupplier instance with the given parameters.

func (*BufferSupplier) Borrow added in v0.5.0

func (b *BufferSupplier) Borrow(bufferName, name string, bufferSize int, onFlushFn onFlush) (buffer *Buffer, err error)

Borrow borrows a Buffer from the BufferSupplier.

func (*BufferSupplier) Close added in v0.5.0

func (b *BufferSupplier) Close() error

Close closes all Buffers in the BufferSupplier.

func (*BufferSupplier) Return added in v0.5.0

func (b *BufferSupplier) Return(bufferName, name string)

Return returns a Buffer to the BufferSupplier.

func (*BufferSupplier) Volume added in v0.5.0

func (b *BufferSupplier) Volume() int

Volume returns the number of Buffers in the BufferSupplier.

type CompressionMethod added in v0.3.0

type CompressionMethod struct {
	Type             CompressionType
	ChunkSizeInBytes int
}

CompressionMethod denotes how to compress a single chunk.

type CompressionType added in v0.3.0

type CompressionType int

CompressionType specifies how a chunk should be compressed.

const (
	// CompressionTypeNone mode indicates that a chunk is not compressed.
	CompressionTypeNone CompressionType = iota
	// CompressionTypeZSTD mode indicates that a chunk is compressed using CompressionTypeZSTD algorithm.
	CompressionTypeZSTD
)

type Database

type Database interface {
	io.Closer
	CreateShardsAndGetByID(id common.ShardID) (Shard, error)
	Shards() []Shard
	Shard(id common.ShardID) (Shard, error)
}

Database allows listing and getting shard details.

func OpenDatabase

func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error)

OpenDatabase returns a new tsdb runtime. This constructor will create a new database if it's absent, or load an existing one.

type DatabaseOpts

type DatabaseOpts struct {
	TSTableFactory     TSTableFactory
	Location           string
	SegmentInterval    IntervalRule
	BlockInterval      IntervalRule
	TTL                IntervalRule
	BlockInvertedIndex InvertedIndexOpts
	SeriesMemSize      run.Bytes
	GlobalIndexMemSize run.Bytes
	IndexGranularity   IndexGranularity
	ShardNum           uint32
	EnableGlobalIndex  bool
	EnableWAL          bool
}

DatabaseOpts wraps options to create a tsdb.

type EncodingMethod

type EncodingMethod struct {
	EncoderPool      encoding.SeriesEncoderPool
	DecoderPool      encoding.SeriesDecoderPool
	ChunkSizeInBytes int
}

EncodingMethod wraps encoder/decoder pools to flush/compact data on disk.

type Entity

type Entity []Entry

Entity denotes an identity of a Series. It defined by Stream or Measure schema.

func NewEntity added in v0.2.0

func NewEntity(length int) Entity

NewEntity return an Entity with an fixed length.

func (Entity) Copy added in v0.2.0

func (e Entity) Copy() Entity

Copy an Entity deeply.

func (Entity) Marshal

func (e Entity) Marshal() []byte

Marshal encodes an Entity to bytes.

func (Entity) Prepend

func (e Entity) Prepend(entry Entry) Entity

Prepend inserts an Entry before the first Entry as the prefix.

type EntityValue added in v0.3.0

type EntityValue = *modelv1.TagValue

EntityValue represents the value of a tag which is a part of an entity.

func Int64Value added in v0.3.0

func Int64Value(v int64) EntityValue

Int64Value returns an EntityValue which wraps a int64 value.

func StrValue added in v0.3.0

func StrValue(v string) EntityValue

StrValue returns an EntityValue which wraps a string value.

type EntityValues added in v0.3.0

type EntityValues []EntityValue

EntityValues is the encoded Entity.

func DecodeEntityValues added in v0.3.0

func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues)

DecodeEntityValues decodes tag values to EntityValues.

func UnmarshalEntityValues added in v0.3.0

func UnmarshalEntityValues(evs []byte) (result EntityValues, err error)

UnmarshalEntityValues decodes EntityValues from bytes.

func (EntityValues) Encode added in v0.3.0

func (evs EntityValues) Encode() (result []*modelv1.TagValue)

Encode EntityValues to tag values.

func (EntityValues) Prepend added in v0.3.0

func (evs EntityValues) Prepend(scope EntityValue) EntityValues

Prepend inserts an EntityValue before the first EntityValue as the prefix.

func (EntityValues) String added in v0.3.0

func (evs EntityValues) String() string

String outputs the string represent of an EntityValue.

func (EntityValues) ToEntity added in v0.3.0

func (evs EntityValues) ToEntity() (result Entity, err error)

ToEntity transforms EntityValues to Entity.

type Entry

type Entry []byte

Entry is an element in an Entity.

func EntityValueToEntry added in v0.3.0

func EntityValueToEntry(ev EntityValue) (Entry, error)

EntityValueToEntry transforms EntityValue to Entry.

type GlobalItemID

type GlobalItemID struct {
	ShardID common.ShardID

	SeriesID common.SeriesID
	ID       common.ItemID
	// contains filtered or unexported fields
}

GlobalItemID is the top level identity of an item. The item could be retrieved by a GlobalItemID in a tsdb.

type IndexDatabase

type IndexDatabase interface {
	WriterBuilder() IndexWriterBuilder
	Seek(field index.Field) ([]GlobalItemID, error)
}

IndexDatabase allows stocking index data.

type IndexGranularity added in v0.4.0

type IndexGranularity int

IndexGranularity denotes the granularity of the local index.

const (
	IndexGranularityBlock IndexGranularity = iota
	IndexGranularitySeries
)

The options of the local index granularity.

type IndexWriter

type IndexWriter interface {
	WriteLSMIndex(field []index.Field) error
	WriteInvertedIndex(field []index.Field) error
}

IndexWriter allows ingesting index data.

func NewSeriesIndexWriter added in v0.4.0

func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter

NewSeriesIndexWriter returns a new series index writer.

type IndexWriterBuilder

type IndexWriterBuilder interface {
	Scope(scope Entry) IndexWriterBuilder
	Time(ts time.Time) IndexWriterBuilder
	GlobalItemID(itemID GlobalItemID) IndexWriterBuilder
	Build() (IndexWriter, error)
}

IndexWriterBuilder is a helper to build IndexWriter.

type IntervalRule

type IntervalRule struct {
	Unit IntervalUnit
	Num  int
}

IntervalRule defines a length of two points in time.

type IntervalUnit

type IntervalUnit int

IntervalUnit denotes the unit of a time point.

const (
	HOUR IntervalUnit = iota
	DAY
)

Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.

func (IntervalUnit) String

func (iu IntervalUnit) String() string

type InvertedIndexOpts added in v0.3.0

type InvertedIndexOpts struct {
	BatchWaitSec int64
}

InvertedIndexOpts wraps options to create the block inverted index.

type Item

type Item interface {
	Family(family []byte) ([]byte, error)
	Val() ([]byte, error)
	ID() common.ItemID
	SortedField() []byte
	Time() uint64
}

Item allows retrieving raw data from an item.

type Iterator

type Iterator interface {
	Next() bool
	Val() Item
	Close() error
}

Iterator allows iterating a series in a time span.

type OrderBy added in v0.4.0

type OrderBy struct {
	Index *databasev1.IndexRule
	Sort  modelv1.Sort
}

OrderBy specifies the order of the result.

type Path

type Path struct {
	// contains filtered or unexported fields
}

Path denotes a expression to match a Series. It supports the fuzzy matching more than EQ by setting an entry to AnyEntry.

func NewPath

func NewPath(matchingExpression []Entry) Path

NewPath return a Path with a matching expression.

type SectionID added in v0.3.0

type SectionID uint32

SectionID is the kind of a block/segment.

func GenerateInternalID

func GenerateInternalID(unit IntervalUnit, suffix int) SectionID

GenerateInternalID returns a identity of a section(segment or block) based on IntervalRule.

type Seeker

type Seeker interface {
	Seek() ([]Iterator, error)
}

Seeker allows searching data in a Database.

type SeekerBuilder

type SeekerBuilder interface {
	Filter(predicator index.Filter) SeekerBuilder
	OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder
	OrderByTime(order modelv1.Sort) SeekerBuilder
	Build() (Seeker, error)
}

SeekerBuilder a helper to build a Seeker.

type Series

type Series interface {
	ID() common.SeriesID
	Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error)
	Create(ctx context.Context, t time.Time) (SeriesSpan, error)
	Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error)
	String() string
}

Series denotes a series of data points group by a common.SeriesID common.SeriesID is encoded by a entity defined by Stream or Measure.

type SeriesDatabase

type SeriesDatabase interface {
	io.Closer
	GetByID(id common.SeriesID) (Series, error)
	Get(key []byte, entityValues EntityValues) (Series, error)
	List(ctx context.Context, path Path) (SeriesList, error)
	Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error)
	SizeOnDisk() int64
	// contains filtered or unexported methods
}

SeriesDatabase allows retrieving series.

type SeriesList

type SeriesList []Series

SeriesList is a collection of Series.

func (SeriesList) Len

func (a SeriesList) Len() int

func (SeriesList) Less

func (a SeriesList) Less(i, j int) bool

func (SeriesList) Merge added in v0.2.0

func (a SeriesList) Merge(other SeriesList) SeriesList

Merge other SeriesList with this one to create a new SeriesList.

func (SeriesList) Swap

func (a SeriesList) Swap(i, j int)

type SeriesSpan

type SeriesSpan interface {
	io.Closer
	WriterBuilder() WriterBuilder
	SeekerBuilder() SeekerBuilder
}

SeriesSpan is a span in a time series. It contains data blocks in such time range.

type Shard

type Shard interface {
	io.Closer
	ID() common.ShardID
	Series() SeriesDatabase
	Index() IndexDatabase
	State() ShardState
	// Only works with MockClock
	TriggerSchedule(task string) bool
}

Shard allows accessing data of tsdb.

func NewScopedShard

func NewScopedShard(scope Entry, delegated Shard) Shard

NewScopedShard returns a shard in a scope.

func OpenShard

func OpenShard(ctx context.Context, id common.ShardID,
	root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int, enableWAL bool,
) (Shard, error)

OpenShard returns an existed Shard or create a new one if not existed.

type ShardState

type ShardState struct {
	Blocks           []BlockState
	OpenBlocks       []BlockID
	StrategyManagers []string
}

ShardState is a sample of a shard's runtime state.

type Supplier

type Supplier interface {
	SupplyTSDB() Database
}

Supplier allows getting a tsdb's runtime.

type TSTable added in v0.4.0

type TSTable interface {
	// Put a value with a timestamp/version
	Put(key, val []byte, ts time.Time) error
	// Get a value by its key and timestamp/version
	Get(key []byte, ts time.Time) ([]byte, error)
	// CollectStats collects statistics of the underlying storage.
	CollectStats() *badger.Statistics
	// SizeOnDisk returns the size of the underlying storage.
	SizeOnDisk() int64
	io.Closer
}

TSTable is time series table.

type TSTableFactory added in v0.4.0

type TSTableFactory interface {
	// NewTSTable creates a new TSTable.
	NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position,
		l *logger.Logger, timeRange timestamp.TimeRange) (TSTable, error)
}

TSTableFactory is the factory of TSTable.

type Writer

type Writer interface {
	IndexWriter
	Write() (GlobalItemID, error)
	ItemID() GlobalItemID
	String() string
}

Writer allow ingesting data into a tsdb.

type WriterBuilder

type WriterBuilder interface {
	Family(name []byte, val []byte) WriterBuilder
	Time(ts time.Time) WriterBuilder
	Val(val []byte) WriterBuilder
	Build() (Writer, error)
}

WriterBuilder is a helper to build a Writer.

Directories

Path Synopsis
Package bucket implements a rolling bucket system.
Package bucket implements a rolling bucket system.
Package index implements transferring data to indices.
Package index implements transferring data to indices.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL