metrictank: github.com/grafana/metrictank/mdata Index | Files | Directories

package mdata

import "github.com/grafana/metrictank/mdata"

Package mdata stands for "managed data" or "metrics data" if you will it has all the stuff to keep metric data in memory, store it, and synchronize save states over the network

Index

Package Files

aggmetric.go aggmetrics.go aggregation.go aggregator.go cwr.go cwr_gen.go helper.go ifaces.go init.go notifier.go reorder_buffer.go result.go schema.go store_mock.go

Constants

const PersistMessageBatchV1 = 1

PersistMessage format version

Variables

var (

    // set either via ConfigProcess or from the unit tests. other code should not touch
    Aggregations conf.Aggregations
    Schemas      conf.Schemas

    PromDiscardedSamples = promauto.NewCounterVec(prometheus.CounterOpts{
        Namespace: "metrictank",
        Name:      "discarded_samples_total",
        Help:      "Total # of samples that were discarded",
    }, []string{"reason", "org"})
)
var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to")

func AggBoundary Uses

func AggBoundary(ts uint32, span uint32) uint32

AggBoundary returns ts if it is a boundary, or the next boundary otherwise. see description for Aggregator and unit tests, for more details

func ConfigProcess Uses

func ConfigProcess()

func ConfigSetup Uses

func ConfigSetup()

func InitPersistNotifier Uses

func InitPersistNotifier(not ...Notifier)

func MatchAgg Uses

func MatchAgg(key string) (uint16, conf.Aggregation)

MatchAgg returns the aggregation definition for the given metric key, and the index of it (to efficiently reference it) it will always find the aggregation definition because Aggregations has a catchall default

func MatchSchema Uses

func MatchSchema(key string, interval int) (uint16, conf.Schema)

MatchSchema returns the schema for the given metric key, and the index of the schema (to efficiently reference it) it will always find the schema because Schemas has a catchall default

func MaxChunkSpan Uses

func MaxChunkSpan() uint32

func SendPersistMessage Uses

func SendPersistMessage(key string, t0 uint32)

func SetSingleAgg Uses

func SetSingleAgg(met ...conf.Method)

func SetSingleSchema Uses

func SetSingleSchema(ret conf.Retentions)

func TS Uses

func TS(ts interface{}) string

func TTLs Uses

func TTLs() []uint32

TTLs returns the full set of unique TTLs (in seconds) used by the current schema config.

type AggMetric Uses

type AggMetric struct {
    sync.RWMutex
    // contains filtered or unexported fields
}

AggMetric takes in new values, updates the in-memory data and streams the points to aggregators it uses a circular buffer of chunks each chunk starts at their respective t0 a t0 is a timestamp divisible by chunkSpan without a remainder (e.g. 2 hour boundaries) firstT0's data is held at index 0, indexes go up and wrap around from numChunks-1 to 0 in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation AggMetric is concurrency-safe

func NewAggMetric Uses

func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, reorderAllowUpdate, dropFirstChunk bool, ingestFrom int64) *AggMetric

NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long it optionally also creates aggregations with the given settings the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. it's the callers responsibility to make sure agg is not nil in that case! If reorderWindow is greater than 0, a reorder buffer is enabled. In that case data points with duplicate timestamps the behavior is defined by reorderAllowUpdate

func (*AggMetric) Add Uses

func (a *AggMetric) Add(ts uint32, val float64)

don't ever call with a ts of 0, cause we use 0 to mean not initialized!

func (*AggMetric) GC Uses

func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) (uint32, bool)

GC returns whether or not this AggMetric is stale and can be removed, and its pointcount if so chunkMinTs -> min timestamp of a chunk before to be considered stale and to be persisted to Cassandra metricMinTs -> min timestamp for a metric before to be considered stale and to be purged from the tank

func (*AggMetric) Get Uses

func (a *AggMetric) Get(from, to uint32) (Result, error)

Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to more data then what's requested may be included specifically, returns: * points from the ROB (if enabled) * iters from matching chunks * oldest point we have, so that if your query needs data before it, the caller knows when to query the store

func (*AggMetric) GetAggregated Uses

func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)

func (*AggMetric) SyncAggregatedChunkSaveState Uses

func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)

Sync the saved state of a chunk by its T0.

func (*AggMetric) SyncChunkSaveState Uses

func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback

Sync the saved state of a chunk by its T0.

type AggMetrics Uses

type AggMetrics struct {
    sync.RWMutex
    Metrics map[uint32]map[schema.Key]*AggMetric
    // contains filtered or unexported fields
}

AggMetrics is an in-memory store of AggMetric objects note: they are keyed by MKey here because each AggMetric manages access to, and references of, their rollup archives themselves

func NewAggMetrics Uses

func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, ingestFrom map[uint32]int64, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics

func (*AggMetrics) GC Uses

func (ms *AggMetrics) GC()

periodically scan chunks and close any that have not received data in a while

func (*AggMetrics) Get Uses

func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool)

func (*AggMetrics) GetOrCreate Uses

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric

type Aggregation Uses

type Aggregation struct {
    Min float64
    Max float64
    Sum float64
    Cnt float64
    Lst float64
}

Aggregation is a container for all summary statistics / aggregated data for 1 metric, in 1 time frame if the Cnt is 0, the numbers don't necessarily make sense.

func NewAggregation Uses

func NewAggregation() *Aggregation

func (*Aggregation) Add Uses

func (a *Aggregation) Add(val float64)

func (*Aggregation) Reset Uses

func (a *Aggregation) Reset()

type Aggregator Uses

type Aggregator struct {
    // contains filtered or unexported fields
}

receives data and builds aggregations note: all points with timestamps t1, t2, t3, t4, [t5] get aggregated into a point with ts t5 where t5 % span = 0. in other words: * an aggregation point reflects the data in the timeframe preceding it. * the timestamps for the aggregated series is quantized to the given span, unlike the raw series which may have an offset (be non-quantized)

func NewAggregator Uses

func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, retOrig string, ret conf.Retention, agg conf.Aggregation, dropFirstChunk bool, ingestFrom int64) *Aggregator

func (*Aggregator) Add Uses

func (agg *Aggregator) Add(ts uint32, val float64)

Add adds the point to the in-progress aggregation, and flushes it if we reached the boundary points going back in time are accepted, unless they go into a previous bucket, in which case they are ignored

func (*Aggregator) GC Uses

func (agg *Aggregator) GC(now, chunkMinTs, metricMinTs, lastWriteTime uint32) (uint32, bool)

GC returns whether all of the associated series are stale and can be removed, and their combined pointcount if so

type ChunkSaveCallback Uses

type ChunkSaveCallback func()

type ChunkWriteRequest Uses

type ChunkWriteRequest struct {
    ChunkWriteRequestPayload
    Callback ChunkSaveCallback
    Key      schema.AMKey
}

ChunkWriteRequest is a request to write a chunk into a store

func NewChunkWriteRequest Uses

func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest

type ChunkWriteRequestPayload Uses

type ChunkWriteRequestPayload struct {
    TTL       uint32
    T0        uint32
    Data      []byte
    Timestamp time.Time
}

func (*ChunkWriteRequestPayload) DecodeMsg Uses

func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*ChunkWriteRequestPayload) EncodeMsg Uses

func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*ChunkWriteRequestPayload) MarshalMsg Uses

func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*ChunkWriteRequestPayload) Msgsize Uses

func (z *ChunkWriteRequestPayload) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*ChunkWriteRequestPayload) UnmarshalMsg Uses

func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DefaultNotifierHandler Uses

type DefaultNotifierHandler struct {
    // contains filtered or unexported fields
}

func NewDefaultNotifierHandler Uses

func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler

func (DefaultNotifierHandler) Handle Uses

func (dn DefaultNotifierHandler) Handle(data []byte)

func (DefaultNotifierHandler) PartitionOf Uses

func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool)

type Metric Uses

type Metric interface {
    Add(ts uint32, val float64)
    Get(from, to uint32) (Result, error)
    GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
}

type Metrics Uses

type Metrics interface {
    Get(key schema.MKey) (Metric, bool)
    GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric
}

type MockStore Uses

type MockStore struct {

    // dont save any data.
    Drop bool
    // contains filtered or unexported fields
}

MockStore is an in-memory Store implementation for unit tests

func NewMockStore Uses

func NewMockStore() *MockStore

func (*MockStore) Add Uses

func (c *MockStore) Add(cwr *ChunkWriteRequest)

Add adds a chunk to the store

func (*MockStore) Items Uses

func (c *MockStore) Items() int

func (*MockStore) Reset Uses

func (c *MockStore) Reset()

func (*MockStore) Search Uses

func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)

searches through the mock results and returns the right ones according to start / end

func (*MockStore) SetTracer Uses

func (c *MockStore) SetTracer(t opentracing.Tracer)

func (*MockStore) Stop Uses

func (c *MockStore) Stop()

type Notifier Uses

type Notifier interface {
    Send(SavedChunk)
}

type NotifierHandler Uses

type NotifierHandler interface {
    // Handle handles an incoming message
    Handle([]byte)
    // PartitionOf is used for notifiers that want to flush and need partition information for metrics
    PartitionOf(key schema.MKey) (int32, bool)
}

type PersistMessageBatch Uses

type PersistMessageBatch struct {
    Instance    string       `json:"instance"`
    SavedChunks []SavedChunk `json:"saved_chunks"`
}

type ReorderBuffer Uses

type ReorderBuffer struct {
    // contains filtered or unexported fields
}

ReorderBuffer keeps a window of data during which it is ok to send data out of order. The reorder buffer itself is not thread safe because it is only used by AggMetric, which is thread safe, so there is no locking in the buffer.

newest=0 may mean no points added yet, or newest point is at position 0. we use the Ts of points in the buffer to check for valid points. Ts == 0 means no point in particular newest.Ts == 0 means the buffer is empty the buffer is evenly spaced (points are `interval` apart) and may be sparsely populated

func NewReorderBuffer Uses

func NewReorderBuffer(reorderWindow, interval uint32, allowUpdate bool) *ReorderBuffer

func (*ReorderBuffer) Add Uses

func (rob *ReorderBuffer) Add(ts uint32, val float64) ([]schema.Point, error)

Add adds the point if it falls within the window. it returns points that have been purged out of the buffer, as well as whether the add succeeded.

func (*ReorderBuffer) Flush Uses

func (rob *ReorderBuffer) Flush() []schema.Point

func (*ReorderBuffer) Get Uses

func (rob *ReorderBuffer) Get() []schema.Point

Get returns the points in the buffer

func (*ReorderBuffer) IsEmpty Uses

func (rob *ReorderBuffer) IsEmpty() bool

func (*ReorderBuffer) Reset Uses

func (rob *ReorderBuffer) Reset()

type Result Uses

type Result struct {
    Points []schema.Point
    Iters  []tsz.Iter
    Oldest uint32 // timestamp of oldest point we have, to know when and when not we may need to query slower storage
}

type SavedChunk Uses

type SavedChunk struct {
    Key string `json:"key"`
    T0  uint32 `json:"t0"`
}

SavedChunk represents a chunk persisted to the store Key is a stringified schema.AMKey

type Store Uses

type Store interface {
    Add(cwr *ChunkWriteRequest)
    Search(ctx context.Context, key schema.AMKey, ttl, from, to uint32) ([]chunk.IterGen, error)
    Stop()
    SetTracer(t opentracing.Tracer)
}

Directories

PathSynopsis
cache
cache/accnt
chunkpackage chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information.
chunk/archive
chunk/tszPackage tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info
errors
importer
notifierKafka

Package mdata imports 29 packages (graph) and is imported by 31 packages. Updated 2020-05-18. Refresh now. Tools for package owners.