Documentation ¶
Overview ¶
Package receiver manages the receiving end of the data. All of the queueing, caching, perioding flushing and cluster forwarding logic is here.
Index ¶
- Variables
- func ReportRcacheStats(ndsf dsl.NamedDSFetcher, sr statReporter)
- type MatchingDSSpecFinder
- type Receiver
- func (r *Receiver) ClusterReady(ready bool)
- func (r *Receiver) Drain()
- func (r *Receiver) DsCache() *dsCache
- func (r *Receiver) QueueAggregatorCommand(agg *aggregator.Command)
- func (r *Receiver) QueueDataPoint(ident serde.Ident, ts time.Time, v float64)
- func (r *Receiver) QueueGauge(ident serde.Ident, v float64)
- func (r *Receiver) QueueSum(ident serde.Ident, v float64)
- func (r *Receiver) SetCluster(c clusterer)
- func (r *Receiver) Start()
- func (r *Receiver) Stop()
- type SimpleDSFinder
Constants ¶
This section is empty.
Variables ¶
var DftDSSPec = &rrd.DSSpec{ Step: 10 * time.Second, Heartbeat: 2 * time.Hour, RRAs: []rrd.RRASpec{ rrd.RRASpec{Function: rrd.WMEAN, Step: 10 * time.Second, Span: 6 * time.Hour, }, rrd.RRASpec{Function: rrd.WMEAN, Step: 1 * time.Minute, Span: 24 * time.Hour, }, rrd.RRASpec{Function: rrd.WMEAN, Step: 10 * time.Minute, Span: 93 * 24 * time.Hour, }, rrd.RRASpec{Function: rrd.WMEAN, Step: 24 * time.Hour, Span: 1825 * 24 * time.Hour, }, }, }
A default "reasonable" spec for those who do not want to think about it.
Functions ¶
func ReportRcacheStats ¶
func ReportRcacheStats(ndsf dsl.NamedDSFetcher, sr statReporter)
TODO: This is a hack, there should be a better way to report stats from outside the receiver package.
Types ¶
type MatchingDSSpecFinder ¶
A DSSpec Finder can find a DSSpec for a name. For previously unknown DS names that need to be created on-the-fly this interface provides a mechanism for specifying DS/RRA configurations based on the name.
type Receiver ¶
type Receiver struct { // Smallest step MinStep time.Duration // MaxReceiverQueueSize is the limit on the receiver queue. Points // are sent to /dev/null when this size is exceeded. Zero or a // negative value means unlimited. MaxReceiverQueueSize int // MaxMemoryBytes is the limit after which points are // discarded. It is based on runtime.ReadMemStats() and is rough // and approximate, but better than nothing. MaxMemoryBytes uint64 StatFlushDuration time.Duration // Period after which stats are flushed StatsNamePrefix string // Stat names are prefixed with this ReportStats bool // report internal stats? ReportStatsPrefix string // prefix for internal stats // Number of workers and flushers NWorkers int Blaster *blaster.Blaster // contains filtered or unexported fields }
Receiver receives and directs incoming datapoints to one of n workers, which is done to provide some parallelism, especially when it comes to flushing data to the database. The job of the workers is to update and maintain an in-memory RRD, and the job of the flushers is to persist the data to a database. The Receiver orchestrates this flow, providing a caching layer which reduces the database I/O.
The Receiver is cluster-aware. In a clustered set up points are forwarded to the node responsible for a particular DS.
The Receiver also creates an Aggregator which can aggregate metrics and send as aggregated data points periodically. In a clustered set up there is one Aggregator per cluster. Default aggregation period is 10 seconds.
Receiver also handles paced metrics. A paced metric is a metric that can come in at a very fast rate (e.g. counting function calls within a process). Paced metrics are similar to aggregator metrics, but in a clustered set up they are accumulated locally in the process, and then sent to the aggregator (counter) or to the receiver (gauge), at which point they may end up getting forwarded to the appropriate node for handling. By default metrics are paced to be send once per second.
func New ¶
func New(serde serde.SerDe, finder MatchingDSSpecFinder) *Receiver
Create a Receiver. The first argument is a SerDe, the second is a MatchingDSSpecFinder used to match previously unknown DS names to a DSSpec with which the DS is to be created. If you pass nil, then the default SimpleDSFinder is used which always returns DftDSSPec.
func NewWithMaxQueue ¶
func NewWithMaxQueue(db serde.SerDe, finder MatchingDSSpecFinder, maxQueue int) *Receiver
func (*Receiver) ClusterReady ¶
In a clustered set up informes other nodes that we are ready to handle data.
func (*Receiver) Drain ¶
func (r *Receiver) Drain()
Marks the receiver as stopped and waits for the channel to empty
func (*Receiver) QueueAggregatorCommand ¶
func (r *Receiver) QueueAggregatorCommand(agg *aggregator.Command)
Sends a data point (in the form of an aggregator.Command) to the aggregator.
func (*Receiver) QueueDataPoint ¶
Sends a data point to the receiver channel. A Data Source PDP always treats incoming data as a rate, it is the responsibility of the caller to present non-rate values such as counters as a rate. Consider using the Aggregator (QueueAggregatorCommand) or paced metrics (QueueSum/QueueGauge) for non-rate data.
func (*Receiver) QueueGauge ¶
Send a gauge (i.e. a rate). This is a paced metric.
func (*Receiver) QueueSum ¶
Send a counter/sum. This is a paced metric which will periodically be passed to the aggregator and from the aggregator to the data source as a rate.
func (*Receiver) SetCluster ¶
func (r *Receiver) SetCluster(c clusterer)
Make the receiver clustered. It will also cause internal stats to be prefixed with the node address by setting ReportStatsPrefix.
type SimpleDSFinder ¶
A simple DS finder always returns itself as the only DSSpec it knows.
func (*SimpleDSFinder) FindMatchingDSSpec ¶
func (s *SimpleDSFinder) FindMatchingDSSpec(ident serde.Ident) *rrd.DSSpec