Documentation ¶
Overview ¶
Package rrd contains the logic for updating in-memory partial Round-Robin Archives from incoming and usually unevenly-spaced data points by converting the input to a rate, consolidating and aggregating the data across a list of evenly spaced series of pre-defined resolution and time span.
Throughout this documentation and code the following terms are used (sometimes as abbreviations, listed in parenthesis):
Round-Robin Database (RRD): Collectively all the logic in this package and an instance of the data it maintains is referred to as an RRD.
Data Point (DP): There actually isn't a data structure representing a data point. A datapoint is just a float64.
Round-Robin Archive (RRA): An array of data points at a specific resolutoin and going back a pre-defined duration of time.
Primary Data Point (PDP): A conceptual value which represents a step-sized time slot in a series. Many (or none) actual data points can come in and fall into a PDP. Each DS and each RRA maintain a current (not yet complete) PDP, whose value upon completion is used to update PDPs of lower resolution (i.e. larger Step/PDP) RRAs.
Data Sourse (DS): Data Source loosely represents a "time series": the smalles resolution (PDP size) and other parameters, as well as the data. A DS should have at least one, but usually several RRAs.
Step: Step is the smallest unit of time of a DS and/or RRA's in milliseconds. RRA resolutions and sizes must be multiples of the DS step they belong to. In this implementation a step cannot be smaller than a millisecond.
DS Heartbeat (HB): Duration of time that can pass without data. A gap in data which exceeds HB is filled with NaNs.
Note that this package does not concern itself with loading a series from storage for analysis.
Index ¶
- func IndexDistance(i, j, size int64) int64
- func SlotIndex(slotEnd time.Time, step time.Duration, size int64) int64
- func SlotTime(n int64, latest time.Time, step time.Duration, size int64) time.Time
- type ClockPdp
- type Consolidation
- type DSSpec
- type DataSource
- func (ds *DataSource) BestRRA(start, end time.Time, points int64) RoundRobinArchiver
- func (ds *DataSource) ClearRRAs()
- func (ds *DataSource) Copy() DataSourcer
- func (ds *DataSource) Heartbeat() time.Duration
- func (ds *DataSource) LastUpdate() time.Time
- func (ds *DataSource) PointCount() int
- func (ds *DataSource) ProcessDataPoint(value float64, ts time.Time) error
- func (ds *DataSource) RRAs() []RoundRobinArchiver
- func (ds *DataSource) SetRRAs(rras []RoundRobinArchiver)
- func (ds *DataSource) Spec() DSSpec
- func (ds *DataSource) Step() time.Duration
- type DataSourcer
- type Pdp
- func (p *Pdp) AddValue(val float64, dur time.Duration)
- func (p *Pdp) AddValueLast(val float64, dur time.Duration)
- func (p *Pdp) AddValueMax(val float64, dur time.Duration)
- func (p *Pdp) AddValueMin(val float64, dur time.Duration)
- func (p *Pdp) Duration() time.Duration
- func (p *Pdp) Reset() float64
- func (p *Pdp) SetValue(val float64, dur time.Duration)
- func (p *Pdp) Value() float64
- type Pdper
- type RRASpec
- type RoundRobinArchive
- func (rra *RoundRobinArchive) Begins(now time.Time) time.Time
- func (rra *RoundRobinArchive) Copy() RoundRobinArchiver
- func (rra *RoundRobinArchive) DPs() map[int64]float64
- func (rra *RoundRobinArchive) Latest() time.Time
- func (rra *RoundRobinArchive) PointCount() int
- func (rra *RoundRobinArchive) Size() int64
- func (rra *RoundRobinArchive) Spec() RRASpec
- func (rra *RoundRobinArchive) Step() time.Duration
- type RoundRobinArchiver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IndexDistance ¶
Distance between i and j indexes in an RRA. If i > j (the RRA wraps around) then it is the sum of the distance from i to the end and the beginning to j. Size of 0 causes a division by zero panic.
Types ¶
type Consolidation ¶
type Consolidation int
const ( WMEAN Consolidation = iota // Time-weighted average MAX // Max MIN // Min LAST // Last )
type DSSpec ¶
type DSSpec struct { Step time.Duration Heartbeat time.Duration RRAs []RRASpec // These can be used to fill the initial value LastUpdate time.Time Value float64 Duration time.Duration }
DSSpec describes a DataSource. DSSpec is a schema that is used to create the DataSource, as an argument to NewDataSource(). DSSpec is used in configuration describing how a DataSource must be created on-the-fly.
type DataSource ¶
type DataSource struct { Pdp // contains filtered or unexported fields }
DataSource contains a time series and its parameters, RRAs and intermediate state (PDP). The DS PDP is the smallest unit of accumulation for this series, all RRAs should have PDPs that are a multiple of the DS PDP. The DS PDP supports only weighted mean as its consolidation function. The reason for not supporting min/max/last/avg (at least initially) is that additional state is required to maintain those, while it seems like that is better done in other places, e.g. the Aggregator anyhow.
func NewDataSource ¶
func NewDataSource(spec DSSpec) *DataSource
NewDataSource returns a new DataSource in accordance with the passed in DSSpec.
func (*DataSource) BestRRA ¶
func (ds *DataSource) BestRRA(start, end time.Time, points int64) RoundRobinArchiver
BestRRA examines the RRAs and returns the one that best matches the given start, end and resolution (as number of points).
func (*DataSource) ClearRRAs ¶
func (ds *DataSource) ClearRRAs()
ClearRRAs clears the data in all RRAs. It is meant to be called immedately after flushing the DS to permanent storage.
func (*DataSource) Copy ¶
func (ds *DataSource) Copy() DataSourcer
Returns a complete copy of this Data Source
func (*DataSource) Heartbeat ¶
func (ds *DataSource) Heartbeat() time.Duration
Heartbeat is the time interval size which if passed without any data renders the interval data NaN. Another way of looking at HB is this is how far back we go to connect adjacent data points. If the points are further apart than HB, the value in between becomes NaN.
A special value of 0 changes the behavior to be closer to that of Whisper files. Whisper logic assigns data points to slots and the last data point to arrive overwrites any previous value in the slot. The duration assigned to the data point is the PDP step, which causes it to be immediately moved to the RRAs. Note that multiple data points in the same PDP will cause multiple RRA updates, and the resulting RRA value is subject to whatever consolidation function the RRA uses. In the case of HB 0 MAX might be more appropriate than WMEAN (default).
Rationale for using HB 0 for this is that an HB of 0 doesn't make much sense otherwise: if a gap larger than HB is filled with NaNs (or just ignored, implementation detail), then HB of zero any incoming value strictly speaking ought to become NaN. We could treat 0 HB as "no limit to how far we go", but a "we just store the value without going back" is a nice compromise and it is actually useful when we want to mimic Whisper-like behavior. One example is using data points to denote that something happened, i.e. "deploy success".
func (*DataSource) LastUpdate ¶
func (ds *DataSource) LastUpdate() time.Time
LastUpdate returns the timestamp of the last Data Point processed
func (*DataSource) PointCount ¶
func (ds *DataSource) PointCount() int
PointCount returns the sum of all point counts of every RRA in this DS.
func (*DataSource) ProcessDataPoint ¶
func (ds *DataSource) ProcessDataPoint(value float64, ts time.Time) error
ProcessDataPoint checks the values and updates the DS PDP. If this the very first call for this DS (lastUpdate is 0), then it only sets lastUpdate and returns.
func (*DataSource) RRAs ¶
func (ds *DataSource) RRAs() []RoundRobinArchiver
List of Round Robin Archives this Data Source has
func (*DataSource) SetRRAs ¶
func (ds *DataSource) SetRRAs(rras []RoundRobinArchiver)
SetRRAs provides a way to set the RRAs (which may contain data)
func (*DataSource) Spec ¶
func (ds *DataSource) Spec() DSSpec
Return a DSSpec corresponding to this DS
func (*DataSource) Step ¶
func (ds *DataSource) Step() time.Duration
Step returns the step, i.e. the size of the PDP. All RRAs this DS has must have steps that are a multiple of this Step.
type DataSourcer ¶
type DataSourcer interface { Pdper Step() time.Duration Heartbeat() time.Duration LastUpdate() time.Time RRAs() []RoundRobinArchiver SetRRAs(rras []RoundRobinArchiver) Copy() DataSourcer BestRRA(start, end time.Time, points int64) RoundRobinArchiver PointCount() int ClearRRAs() ProcessDataPoint(value float64, ts time.Time) error Spec() DSSpec }
DataSourcer is a DataSource as an interface.
type Pdp ¶
type Pdp struct {
// contains filtered or unexported fields
}
Pdp is a Primary Data Point. It provides intermediate state and logic to interpolate and store incoming DP data in a consolidated way using weighted mean.
This is an illustration of how incoming data points are consolidated into a PDP using weighted mean. The PDP below is 4 units long (the actual unit is not relevant). It shows a time period during which 3 values (measurements) arrived: 1.0 at 1, 3.0 at 3 and 2.0 at 4. The final value of this PDP is 2.25.
|| +---------+ || || | 3.0 +----|| ||----+ | 2.0|| || 1.0| | || ||====+====+====+====|| 0 1 2 3 4 ---> time
In this PDP 0.25 of the value is 1.0, 0.50 is 3.0 and 0.25 is 2.0, for a total of 0.25*1 + 0.50*3 + 0.25*2 = 2.25.
If a part of the data point is NaN, then that part does not count. Even if NaN is at the end:
|| +---------+ || || | 3.0| || ||----+ | NaN|| || 1.0| | || ||====+====+====+====|| 0 1 2 3 4 ---> time
In the above PDP, the size is what is taken up by 1.0 and 3.0, without the NaN. Thus 1/3 of the value is 1.0 and 2/3 of the value is 3.0, for a total of 1/3*1 + 2/3*3 = 2.33333.
An alternative way of looking at the above data point is that it is simply shorter or has a shorter duration:
|| +---------|| || | 3.0|| ||----+ || || 1.0| || ||====+====+====|| 0 1 2 3 4 ---> time
A datapoint must be all NaN for its value to be NaN. If duration is 0, then the value is irrelevant.
To create an "empty" Pdp, simply use its zero value.
func (*Pdp) AddValueLast ¶
AddValueLast replaces the current value. This is different from SetValue in that it's a noop if val is NaN or dur is 0.
func (*Pdp) AddValueMax ¶
AddValueMax adds a value using max. A non-NaN value is considered greater than zero value (duration 0) or NaN.
func (*Pdp) AddValueMin ¶
AddValueMin adds a value using min. A non-NaN value is considered lesser than zero value (duration 0) or NaN.
func (*Pdp) Reset ¶
Reset sets the value to zero value and returns the value of the PDP before Reset.
type RRASpec ¶
type RRASpec struct { Function Consolidation Step time.Duration // duration of a single step Span time.Duration // duration of the whole series (should be multiple of step) Xff float32 // These can be used to fill the initial value Latest time.Time Value float64 Duration time.Duration DPs map[int64]float64 // Careful, these are round-robin }
RRASpec is the RRA definition for NewRoundRobinArchive.
type RoundRobinArchive ¶
type RoundRobinArchive struct { // Each RRA has its own PDP (duration and value). Note that // whatever fits in the DS PDP step will reside there, but // anything exceeding the time period that DS PDP can hold will // trickle down to RRA PDPs, until they are add to DPs. Pdp // contains filtered or unexported fields }
A Round Robin Archive and all its parameters.
func NewRoundRobinArchive ¶
func NewRoundRobinArchive(spec RRASpec) *RoundRobinArchive
Returns a new RRA in accordance with the provided RRASpec.
func (*RoundRobinArchive) Begins ¶
func (rra *RoundRobinArchive) Begins(now time.Time) time.Time
Begins returns the timestamp of the beginning of this RRA assuming that that the argument "now" is within it. This will be a time approximately but not exactly the RRA length ago, because it is aligned on the RRA step boundary.
func (*RoundRobinArchive) Copy ¶
func (rra *RoundRobinArchive) Copy() RoundRobinArchiver
Returns a complete copy of the RRA.
func (*RoundRobinArchive) DPs ¶
func (rra *RoundRobinArchive) DPs() map[int64]float64
Dps returns data points as a map of floats. It's a map rather than a slice to be more space-efficient for sparse series.
func (*RoundRobinArchive) Latest ¶
func (rra *RoundRobinArchive) Latest() time.Time
Latest returns the time on which the last slot ends.
func (*RoundRobinArchive) PointCount ¶
func (rra *RoundRobinArchive) PointCount() int
PointCount returns the number of points in this RRA.
func (*RoundRobinArchive) Size ¶
func (rra *RoundRobinArchive) Size() int64
Number of data points in this RRA
func (*RoundRobinArchive) Spec ¶
func (rra *RoundRobinArchive) Spec() RRASpec
Spec matching this RRA
func (*RoundRobinArchive) Step ¶
func (rra *RoundRobinArchive) Step() time.Duration
Step of this RRA
type RoundRobinArchiver ¶
type RoundRobinArchiver interface { Pdper Latest() time.Time Step() time.Duration Size() int64 PointCount() int DPs() map[int64]float64 Copy() RoundRobinArchiver Begins(now time.Time) time.Time Spec() RRASpec // contains filtered or unexported methods }
RoundRobinArchive as an interface