metrics

package
v0.0.0-...-69f0413 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: AGPL-3.0 Imports: 31 Imported by: 3

Documentation

Index

Constants

View Source
const METRICS_BLK_FLUSH_SLEEP_DURATION = 60 // 1 min
View Source
const METRICS_BLK_ROTATE_SLEEP_DURATION = 10 // 10 seconds

Variables

View Source
var TAGS_TREE_FLUSH_SLEEP_DURATION = 60 // 1 min

Functions

func EncodeDatapoint

func EncodeDatapoint(mName []byte, tags *TagsHolder, dp float64, timestamp uint32, nBytes uint64, orgid uint64) error

For a given metricName, tags, dp, and timestamp, add it to the respective in memory series

Internally, this function will try to find the series then will encode it. If it cannot find the series or no space exists in the metrics segment, it will return an error

Return number of bytes written and any error encountered

func ExtractInfluxPayload

func ExtractInfluxPayload(rawCSV []byte, tags *TagsHolder) ([]byte, float64, uint32, error)

for an input raw csv row []byte, return the metric name, datapoint value, timestamp (ignored), all tags, and any errors occurred The metric name is returned as a raw []byte The tags

func ExtractOTSDBPayload

func ExtractOTSDBPayload(rawJson []byte, tags *TagsHolder) ([]byte, float64, uint32, error)

for an input raw json []byte, return the metric name, datapoint value, timestamp, all tags, and any errors occurred The metric name is returned as a raw []byte The tags

func ForceFlushMetricsBlock

func ForceFlushMetricsBlock()

func GetFinalTagsTreeDir

func GetFinalTagsTreeDir(mid string, suffix uint64) string

func GetTotalEncodedSize

func GetTotalEncodedSize() uint64

func GetUnrotatedMetricStats

func GetUnrotatedMetricStats(orgid uint64) (uint64, uint64, uint64)

Returns the total incoming bytes, total on disk bytes, approx number of datapoints across all metric segments

func GetUnrotatedMetricsSegmentRequests

func GetUnrotatedMetricsSegmentRequests(metricName string, tRange *dtu.MetricsTimeRange, querySummary *summary.QuerySummary, orgid uint64) (map[string][]*structs.MetricsSearchRequest, error)

func InitMetricsSegStore

func InitMetricsSegStore()

TODO: pre-allocates as many metricsbuffers that can fix and sets hash range To evenly distribute metric names, hash range can simply metricsId mod numMetricsBuffers

func InitTestingConfig

func InitTestingConfig()

func ReturnTagsHolder

func ReturnTagsHolder(th *TagsHolder)

Returns allocated tags holder memory back to the pool

Types

type MetricsAndTagsHolder

type MetricsAndTagsHolder struct {
	MetricSegments map[string]*MetricsSegment
	TagHolders     map[string]*TagsTreeHolder
}

type MetricsBlock

type MetricsBlock struct {
	// contains filtered or unexported fields
}

A metrics buffer represent a 15 minute (or 1GB size) window of encoded series

A metrics buffer's suffix determines the path of the generated files in relation to the metricssegment

Every 5s, this metrics buffer should persist to disk and will create / update two file:

  1. Raw TS encoded file. Format [tsid][packed-len][raw-values]
  2. TSID offset file. Format [tsid][soff]

func (*MetricsBlock) FlushTSOAndTSGFiles

func (mb *MetricsBlock) FlushTSOAndTSGFiles(file string) error

Format of TSO file: [version - 1 byte][number of tsids - 2 bytes][tsid - 8bytes][offset - 4 bytes][tsid - 8bytes]... Formar of TSG file: [version - 1 byte][tsid - 8bytes][len - 4 bytes][raw series - n bytes][tsid - 8 bytes]...

func (*MetricsBlock) GetTimeSeries

func (mb *MetricsBlock) GetTimeSeries(tsid uint64) (*TimeSeries, bool, error)

returns:

*TimeSeries corresponding to tsid if found
bool indicating if the tsid was found

This will create the time series if it doesn't exist already

func (*MetricsBlock) InsertTimeSeries

func (mb *MetricsBlock) InsertTimeSeries(tsid uint64, ts *TimeSeries) (bool, int, error)

Inserts a time series for the given tsid

The caller is responsible for acquiring and releasing the the required locks

Returns bool if the tsid already existed, the idx it exists at, or any errors

type MetricsSegment

type MetricsSegment struct {
	Suffix uint64 // current suffix
	Mid    string // metrics id for this metric segment

	Orgid uint64
	// contains filtered or unexported fields
}

A metrics segment represents a 2hr window and consists of many metrics blocks and tagTrees.

Only a single metrics buffer per metrics segment can be in memory at a time. Prior metrics buffers will be flushed to disk.

The tagsTree will be shared across metrics this metrics segment.

A metrics segment generate the following set of files:

  • A tagTree file for each incoming tagKey seen across this segment
  • A metricsBlock file for each incoming 15minute window
  • A bloomfilter for all metric names in the metrics segment

TODO: this metrics segment should reject samples not in 2hr window

func GetAllMetricsSegments

func GetAllMetricsSegments() []*MetricsSegment

func GetMetricSegments

func GetMetricSegments(orgid uint64) []*MetricsSegment

func InitMetricsSegment

func InitMetricsSegment(orgid uint64, mId string) (*MetricsSegment, error)

func (*MetricsSegment) CheckAndRotate

func (ms *MetricsSegment) CheckAndRotate(forceRotate bool) error

Wrapper function to check and rotate the current metrics block or the metrics segment

Caller is responsible for acquiring locks

type TagTree

type TagTree struct {
	// contains filtered or unexported fields
}

TagTree is a two level tree, containing metricName at level 1 and a tagValue at level 2 The leaf nodes stores the tsids that match certain tagValue

TODO: how to flushes to just write updates

func InitTagsTree

func InitTagsTree() *TagTree

func (*TagTree) AddTagValue

func (tt *TagTree) AddTagValue(mName, val []byte, valueType jp.ValueType, tsid uint64) error

type TagsHolder

type TagsHolder struct {
	// contains filtered or unexported fields
}

func GetTagsHolder

func GetTagsHolder() *TagsHolder

Allocates and returns a TagsHolder

Caller is responsible for calling ReturnTagsHolder

func (*TagsHolder) GetTSID

func (th *TagsHolder) GetTSID(mName []byte) (uint64, error)

Gets the TSID given a metric name

Internally, will make sure the tags keys are sorted

func (*TagsHolder) Insert

func (th *TagsHolder) Insert(key string, value []byte, vType jp.ValueType)

func (*TagsHolder) String

func (th *TagsHolder) String() string

type TagsTreeHolder

type TagsTreeHolder struct {
	// contains filtered or unexported fields
}

Holder struct for all tagTrees

Internally, will expose functions to check and add tags to the tree

func GetAllTagsTreeHolders

func GetAllTagsTreeHolders() []*TagsTreeHolder

func GetTagsTreeHolder

func GetTagsTreeHolder(orgid uint64, mid string) *TagsTreeHolder

func InitTagsTreeHolder

func InitTagsTreeHolder(mid string) (*TagsTreeHolder, error)

func (*TagsTreeHolder) AddTagsForTSID

func (tth *TagsTreeHolder) AddTagsForTSID(mName []byte, tags *TagsHolder, tsid uint64) error

Returns a bool indicating if this tsid is new

Adds the inputed tags into corresponding tagsTree

Internally, will use the internal bloom to check if the tsid has already been added or not

func (*TagsTreeHolder) EncodeTagsTreeHolder

func (tt *TagsTreeHolder) EncodeTagsTreeHolder() error

type TimeSeries

type TimeSeries struct {
	// contains filtered or unexported fields
}

Represents a single timeseries

func (*TimeSeries) AddSingleEntry

func (ts *TimeSeries) AddSingleEntry(dpVal float64, dpTS uint32) (uint64, error)

adds this single dp and time entry to the time series encode dpVal & dpTs using dod / floating point compression every 15 mins, if a series was updated, we need to flush it

Returns number of bytes written, or any errors encoundered

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL