Documentation ¶
Index ¶
- Constants
- Variables
- func EncodeDatapoint(mName []byte, tags *TagsHolder, dp float64, timestamp uint32, nBytes uint64, ...) error
- func ExtractInfluxPayload(rawCSV []byte, tags *TagsHolder) ([]byte, float64, uint32, error)
- func ExtractOTSDBPayload(rawJson []byte, tags *TagsHolder) ([]byte, float64, uint32, error)
- func ForceFlushMetricsBlock()
- func GetFinalTagsTreeDir(mid string, suffix uint64) string
- func GetTotalEncodedSize() uint64
- func GetUnrotatedMetricStats(orgid uint64) (uint64, uint64, uint64)
- func GetUnrotatedMetricsSegmentRequests(metricName string, tRange *dtu.MetricsTimeRange, ...) (map[string][]*structs.MetricsSearchRequest, error)
- func InitMetricsSegStore()
- func InitTestingConfig()
- func ReturnTagsHolder(th *TagsHolder)
- type MetricsAndTagsHolder
- type MetricsBlock
- type MetricsSegment
- type TagTree
- type TagsHolder
- type TagsTreeHolder
- type TimeSeries
Constants ¶
const METRICS_BLK_FLUSH_SLEEP_DURATION = 60 // 1 min
const METRICS_BLK_ROTATE_SLEEP_DURATION = 10 // 10 seconds
Variables ¶
var OrgMetricsAndTags map[uint64]*MetricsAndTagsHolder = make(map[uint64]*MetricsAndTagsHolder)
var TAGS_TREE_FLUSH_SLEEP_DURATION = 60 // 1 min
Functions ¶
func EncodeDatapoint ¶
func EncodeDatapoint(mName []byte, tags *TagsHolder, dp float64, timestamp uint32, nBytes uint64, orgid uint64) error
For a given metricName, tags, dp, and timestamp, add it to the respective in memory series
Internally, this function will try to find the series then will encode it. If it cannot find the series or no space exists in the metrics segment, it will return an error
Return number of bytes written and any error encountered
func ExtractInfluxPayload ¶
for an input raw csv row []byte, return the metric name, datapoint value, timestamp (ignored), all tags, and any errors occurred The metric name is returned as a raw []byte The tags
func ExtractOTSDBPayload ¶
for an input raw json []byte, return the metric name, datapoint value, timestamp, all tags, and any errors occurred The metric name is returned as a raw []byte The tags
func ForceFlushMetricsBlock ¶
func ForceFlushMetricsBlock()
func GetFinalTagsTreeDir ¶
func GetTotalEncodedSize ¶
func GetTotalEncodedSize() uint64
func GetUnrotatedMetricStats ¶
Returns the total incoming bytes, total on disk bytes, approx number of datapoints across all metric segments
func GetUnrotatedMetricsSegmentRequests ¶
func GetUnrotatedMetricsSegmentRequests(metricName string, tRange *dtu.MetricsTimeRange, querySummary *summary.QuerySummary, orgid uint64) (map[string][]*structs.MetricsSearchRequest, error)
func InitMetricsSegStore ¶
func InitMetricsSegStore()
TODO: pre-allocates as many metricsbuffers that can fix and sets hash range To evenly distribute metric names, hash range can simply metricsId mod numMetricsBuffers
func InitTestingConfig ¶
func InitTestingConfig()
func ReturnTagsHolder ¶
func ReturnTagsHolder(th *TagsHolder)
Returns allocated tags holder memory back to the pool
Types ¶
type MetricsAndTagsHolder ¶
type MetricsAndTagsHolder struct { MetricSegments map[string]*MetricsSegment TagHolders map[string]*TagsTreeHolder }
type MetricsBlock ¶
type MetricsBlock struct {
// contains filtered or unexported fields
}
A metrics buffer represent a 15 minute (or 1GB size) window of encoded series
A metrics buffer's suffix determines the path of the generated files in relation to the metricssegment ¶
Every 5s, this metrics buffer should persist to disk and will create / update two file:
- Raw TS encoded file. Format [tsid][packed-len][raw-values]
- TSID offset file. Format [tsid][soff]
func (*MetricsBlock) FlushTSOAndTSGFiles ¶
func (mb *MetricsBlock) FlushTSOAndTSGFiles(file string) error
Format of TSO file: [version - 1 byte][number of tsids - 2 bytes][tsid - 8bytes][offset - 4 bytes][tsid - 8bytes]... Formar of TSG file: [version - 1 byte][tsid - 8bytes][len - 4 bytes][raw series - n bytes][tsid - 8 bytes]...
func (*MetricsBlock) GetTimeSeries ¶
func (mb *MetricsBlock) GetTimeSeries(tsid uint64) (*TimeSeries, bool, error)
returns:
*TimeSeries corresponding to tsid if found bool indicating if the tsid was found
This will create the time series if it doesn't exist already
func (*MetricsBlock) InsertTimeSeries ¶
func (mb *MetricsBlock) InsertTimeSeries(tsid uint64, ts *TimeSeries) (bool, int, error)
Inserts a time series for the given tsid
The caller is responsible for acquiring and releasing the the required locks
Returns bool if the tsid already existed, the idx it exists at, or any errors
type MetricsSegment ¶
type MetricsSegment struct { Suffix uint64 // current suffix Mid string // metrics id for this metric segment Orgid uint64 // contains filtered or unexported fields }
A metrics segment represents a 2hr window and consists of many metrics blocks and tagTrees.
Only a single metrics buffer per metrics segment can be in memory at a time. Prior metrics buffers will be flushed to disk.
The tagsTree will be shared across metrics this metrics segment.
A metrics segment generate the following set of files:
- A tagTree file for each incoming tagKey seen across this segment
- A metricsBlock file for each incoming 15minute window
- A bloomfilter for all metric names in the metrics segment
TODO: this metrics segment should reject samples not in 2hr window
func GetAllMetricsSegments ¶
func GetAllMetricsSegments() []*MetricsSegment
func GetMetricSegments ¶
func GetMetricSegments(orgid uint64) []*MetricsSegment
func InitMetricsSegment ¶
func InitMetricsSegment(orgid uint64, mId string) (*MetricsSegment, error)
func (*MetricsSegment) CheckAndRotate ¶
func (ms *MetricsSegment) CheckAndRotate(forceRotate bool) error
Wrapper function to check and rotate the current metrics block or the metrics segment
Caller is responsible for acquiring locks
type TagTree ¶
type TagTree struct {
// contains filtered or unexported fields
}
TagTree is a two level tree, containing metricName at level 1 and a tagValue at level 2 The leaf nodes stores the tsids that match certain tagValue
The tags for a metrics will be inserted via xxhash to allow for O(log n) search ¶
TODO: how to flushes to just write updates
func InitTagsTree ¶
func InitTagsTree() *TagTree
type TagsHolder ¶
type TagsHolder struct {
// contains filtered or unexported fields
}
func GetTagsHolder ¶
func GetTagsHolder() *TagsHolder
Allocates and returns a TagsHolder
Caller is responsible for calling ReturnTagsHolder
func (*TagsHolder) GetTSID ¶
func (th *TagsHolder) GetTSID(mName []byte) (uint64, error)
Gets the TSID given a metric name
Internally, will make sure the tags keys are sorted
func (*TagsHolder) Insert ¶
func (th *TagsHolder) Insert(key string, value []byte, vType jp.ValueType)
func (*TagsHolder) String ¶
func (th *TagsHolder) String() string
type TagsTreeHolder ¶
type TagsTreeHolder struct {
// contains filtered or unexported fields
}
Holder struct for all tagTrees
Internally, will expose functions to check and add tags to the tree
func GetAllTagsTreeHolders ¶
func GetAllTagsTreeHolders() []*TagsTreeHolder
func GetTagsTreeHolder ¶
func GetTagsTreeHolder(orgid uint64, mid string) *TagsTreeHolder
func InitTagsTreeHolder ¶
func InitTagsTreeHolder(mid string) (*TagsTreeHolder, error)
func (*TagsTreeHolder) AddTagsForTSID ¶
func (tth *TagsTreeHolder) AddTagsForTSID(mName []byte, tags *TagsHolder, tsid uint64) error
Returns a bool indicating if this tsid is new
Adds the inputed tags into corresponding tagsTree ¶
Internally, will use the internal bloom to check if the tsid has already been added or not
func (*TagsTreeHolder) EncodeTagsTreeHolder ¶
func (tt *TagsTreeHolder) EncodeTagsTreeHolder() error
type TimeSeries ¶
type TimeSeries struct {
// contains filtered or unexported fields
}
Represents a single timeseries
func (*TimeSeries) AddSingleEntry ¶
func (ts *TimeSeries) AddSingleEntry(dpVal float64, dpTS uint32) (uint64, error)
adds this single dp and time entry to the time series encode dpVal & dpTs using dod / floating point compression every 15 mins, if a series was updated, we need to flush it
Returns number of bytes written, or any errors encoundered