mdata

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2020 License: AGPL-3.0 Imports: 29 Imported by: 0

Documentation

Overview

Package mdata stands for "managed data" or "metrics data" if you will it has all the stuff to keep metric data in memory, store it, and synchronize save states over the network

Index

Constants

View Source
const PersistMessageBatchV1 = 1

PersistMessage format version

Variables

View Source
var (

	// set either via ConfigProcess or from the unit tests. other code should not touch
	Aggregations conf.Aggregations
	Schemas      conf.Schemas

	PromDiscardedSamples = promauto.NewCounterVec(prometheus.CounterOpts{
		Namespace: "metrictank",
		Name:      "discarded_samples_total",
		Help:      "Total # of samples that were discarded",
	}, []string{"reason", "org"})
)
View Source
var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to")

Functions

func AggBoundary

func AggBoundary(ts uint32, span uint32) uint32

AggBoundary returns ts if it is a boundary, or the next boundary otherwise. see description for Aggregator and unit tests, for more details

func ConfigProcess

func ConfigProcess()

func ConfigSetup

func ConfigSetup()

func InitPersistNotifier

func InitPersistNotifier(not ...Notifier)

func MatchAgg

func MatchAgg(key string) (uint16, conf.Aggregation)

MatchAgg returns the aggregation definition for the given metric key, and the index of it (to efficiently reference it) it will always find the aggregation definition because Aggregations has a catchall default

func MatchSchema

func MatchSchema(key string, interval int) (uint16, conf.Schema)

MatchSchema returns the schema for the given metric key, and the index of the schema (to efficiently reference it) it will always find the schema because Schemas has a catchall default

func MaxChunkSpan

func MaxChunkSpan() uint32

func SendPersistMessage

func SendPersistMessage(key string, t0 uint32)

func SetSingleAgg

func SetSingleAgg(met ...conf.Method)

func SetSingleSchema

func SetSingleSchema(ret conf.Retentions)

func TS

func TS(ts interface{}) string

func TTLs

func TTLs() []uint32

TTLs returns the full set of unique TTLs (in seconds) used by the current schema config.

Types

type AggMetric

type AggMetric struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

AggMetric takes in new values, updates the in-memory data and streams the points to aggregators it uses a circular buffer of chunks each chunk starts at their respective t0 a t0 is a timestamp divisible by chunkSpan without a remainder (e.g. 2 hour boundaries) firstT0's data is held at index 0, indexes go up and wrap around from numChunks-1 to 0 in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation AggMetric is concurrency-safe

func NewAggMetric

func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, reorderAllowUpdate, dropFirstChunk bool, ingestFrom int64) *AggMetric

NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long it optionally also creates aggregations with the given settings the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. it's the callers responsibility to make sure agg is not nil in that case! If reorderWindow is greater than 0, a reorder buffer is enabled. In that case data points with duplicate timestamps the behavior is defined by reorderAllowUpdate

func (*AggMetric) Add

func (a *AggMetric) Add(ts uint32, val float64)

don't ever call with a ts of 0, cause we use 0 to mean not initialized!

func (*AggMetric) GC

func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) (uint32, bool)

GC returns whether or not this AggMetric is stale and can be removed, and its pointcount if so chunkMinTs -> min timestamp of a chunk before to be considered stale and to be persisted to Cassandra metricMinTs -> min timestamp for a metric before to be considered stale and to be purged from the tank

func (*AggMetric) Get

func (a *AggMetric) Get(from, to uint32) (Result, error)

Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to more data then what's requested may be included specifically, returns: * points from the ROB (if enabled) * iters from matching chunks * oldest point we have, so that if your query needs data before it, the caller knows when to query the store

func (*AggMetric) GetAggregated

func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)

func (*AggMetric) SyncAggregatedChunkSaveState

func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)

Sync the saved state of a chunk by its T0.

func (*AggMetric) SyncChunkSaveState

func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback

Sync the saved state of a chunk by its T0.

type AggMetrics

type AggMetrics struct {
	sync.RWMutex
	Metrics map[uint32]map[schema.Key]*AggMetric
	// contains filtered or unexported fields
}

AggMetrics is an in-memory store of AggMetric objects note: they are keyed by MKey here because each AggMetric manages access to, and references of, their rollup archives themselves

func NewAggMetrics

func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, ingestFrom map[uint32]int64, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics

func (*AggMetrics) GC

func (ms *AggMetrics) GC()

periodically scan chunks and close any that have not received data in a while

func (*AggMetrics) Get

func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool)

func (*AggMetrics) GetOrCreate

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric

type Aggregation

type Aggregation struct {
	Min float64
	Max float64
	Sum float64
	Cnt float64
	Lst float64
}

Aggregation is a container for all summary statistics / aggregated data for 1 metric, in 1 time frame if the Cnt is 0, the numbers don't necessarily make sense.

func NewAggregation

func NewAggregation() *Aggregation

func (*Aggregation) Add

func (a *Aggregation) Add(val float64)

func (*Aggregation) Reset

func (a *Aggregation) Reset()

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

receives data and builds aggregations note: all points with timestamps t1, t2, t3, t4, [t5] get aggregated into a point with ts t5 where t5 % span = 0. in other words: * an aggregation point reflects the data in the timeframe preceding it. * the timestamps for the aggregated series is quantized to the given span, unlike the raw series which may have an offset (be non-quantized)

func NewAggregator

func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, retOrig string, ret conf.Retention, agg conf.Aggregation, dropFirstChunk bool, ingestFrom int64) *Aggregator

func (*Aggregator) Add

func (agg *Aggregator) Add(ts uint32, val float64)

Add adds the point to the in-progress aggregation, and flushes it if we reached the boundary points going back in time are accepted, unless they go into a previous bucket, in which case they are ignored

func (*Aggregator) GC

func (agg *Aggregator) GC(now, chunkMinTs, metricMinTs, lastWriteTime uint32) (uint32, bool)

GC returns whether all of the associated series are stale and can be removed, and their combined pointcount if so

type ChunkSaveCallback added in v0.13.0

type ChunkSaveCallback func()

type ChunkWriteRequest

type ChunkWriteRequest struct {
	ChunkWriteRequestPayload
	Callback ChunkSaveCallback
	Key      schema.AMKey
}

ChunkWriteRequest is a request to write a chunk into a store

func NewChunkWriteRequest

func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest

type ChunkWriteRequestPayload added in v0.13.0

type ChunkWriteRequestPayload struct {
	TTL       uint32
	T0        uint32
	Data      []byte
	Timestamp time.Time
}

func (*ChunkWriteRequestPayload) DecodeMsg added in v0.13.0

func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*ChunkWriteRequestPayload) EncodeMsg added in v0.13.0

func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*ChunkWriteRequestPayload) MarshalMsg added in v0.13.0

func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*ChunkWriteRequestPayload) Msgsize added in v0.13.0

func (z *ChunkWriteRequestPayload) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*ChunkWriteRequestPayload) UnmarshalMsg added in v0.13.0

func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DefaultNotifierHandler

type DefaultNotifierHandler struct {
	// contains filtered or unexported fields
}

func NewDefaultNotifierHandler

func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler

func (DefaultNotifierHandler) Handle

func (dn DefaultNotifierHandler) Handle(data []byte)

func (DefaultNotifierHandler) PartitionOf

func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool)

type Metric

type Metric interface {
	Add(ts uint32, val float64)
	Get(from, to uint32) (Result, error)
	GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
}

type Metrics

type Metrics interface {
	Get(key schema.MKey) (Metric, bool)
	GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric
}

type MockStore

type MockStore struct {

	// dont save any data.
	Drop bool
	// contains filtered or unexported fields
}

MockStore is an in-memory Store implementation for unit tests

func NewMockStore

func NewMockStore() *MockStore

func (*MockStore) Add

func (c *MockStore) Add(cwr *ChunkWriteRequest)

Add adds a chunk to the store

func (*MockStore) Items

func (c *MockStore) Items() int

func (*MockStore) Reset

func (c *MockStore) Reset()

func (*MockStore) Search

func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)

searches through the mock results and returns the right ones according to start / end

func (*MockStore) SetTracer

func (c *MockStore) SetTracer(t opentracing.Tracer)

func (*MockStore) Stop

func (c *MockStore) Stop()

type Notifier

type Notifier interface {
	Send(SavedChunk)
}

type NotifierHandler

type NotifierHandler interface {
	// Handle handles an incoming message
	Handle([]byte)
	// PartitionOf is used for notifiers that want to flush and need partition information for metrics
	PartitionOf(key schema.MKey) (int32, bool)
}

type PersistMessageBatch

type PersistMessageBatch struct {
	Instance    string       `json:"instance"`
	SavedChunks []SavedChunk `json:"saved_chunks"`
}

type ReorderBuffer

type ReorderBuffer struct {
	// contains filtered or unexported fields
}

ReorderBuffer keeps a window of data during which it is ok to send data out of order. The reorder buffer itself is not thread safe because it is only used by AggMetric, which is thread safe, so there is no locking in the buffer.

newest=0 may mean no points added yet, or newest point is at position 0. we use the Ts of points in the buffer to check for valid points. Ts == 0 means no point in particular newest.Ts == 0 means the buffer is empty the buffer is evenly spaced (points are `interval` apart) and may be sparsely populated

func NewReorderBuffer

func NewReorderBuffer(reorderWindow, interval uint32, allowUpdate bool) *ReorderBuffer

func (*ReorderBuffer) Add

func (rob *ReorderBuffer) Add(ts uint32, val float64) ([]schema.Point, error)

Add adds the point if it falls within the window. it returns points that have been purged out of the buffer, as well as whether the add succeeded.

func (*ReorderBuffer) Flush

func (rob *ReorderBuffer) Flush() []schema.Point

func (*ReorderBuffer) Get

func (rob *ReorderBuffer) Get() []schema.Point

Get returns the points in the buffer

func (*ReorderBuffer) IsEmpty

func (rob *ReorderBuffer) IsEmpty() bool

func (*ReorderBuffer) Reset

func (rob *ReorderBuffer) Reset()

type Result

type Result struct {
	Points []schema.Point
	Iters  []tsz.Iter
	Oldest uint32 // timestamp of oldest point we have, to know when and when not we may need to query slower storage
}

type SavedChunk

type SavedChunk struct {
	Key string `json:"key"`
	T0  uint32 `json:"t0"`
}

SavedChunk represents a chunk persisted to the store Key is a stringified schema.AMKey

type Store

type Store interface {
	Add(cwr *ChunkWriteRequest)
	Search(ctx context.Context, key schema.AMKey, ttl, from, to uint32) ([]chunk.IterGen, error)
	Stop()
	SetTracer(t opentracing.Tracer)
}

Directories

Path Synopsis
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information.
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information.
tsz
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL