aggregator

package
v0.0.0-...-44638ef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

README

package aggregator

The Aggregator is the first thing a metric hits during its journey towards the intake. This package is responsible for metrics reception and aggregation before passing them to the forwarder. It computes rates and histograms and passes them to the Serializer.

For now sources of metrics are DogStatsD and Python/Go checks. DogStatsD directly send MetricSample to the Aggregator while checks use the sender to do so.

MetricSample are the raw metric value that flow from our 2 sources to the different metric types (Gauge, Count, ...).

     +===========+                       +===============+
     + DogStatsD +                       +    checks     +
     +===========+                       | Python and Go |
          ++                             +===============+
          ||                                    ++
          ||                                    vv
          ||                                .+------+.
          ||                                . Sender .
          ||                                '+---+--+'
          ||                                     |
          vv                                     v
       .+----------------------------------------+--+.
       +                 Aggregator                  +
       '+--+-------------------------------------+--+'
           |                                     |
           |                                     |
           v                                     v
    .+-----+-----+.                       .+-----+------+.
    + TimeSampler +                       + CheckSampler +
    '+-----+-----+'                       '+-----+------+'
           |                                     |
           |                                     |
           +         .+---------------+.         +
           '+------->+ ContextMetrics  +<-------+'
                     '+-------+-------+'
                              |
                              v
                     .+-------+-------+.
                     +     Metrics     +
                     | Gauge           |
                     | Count           |
                     | Histogram       |
                     | Rate            |
                     | Set             |
                     + ...             +
                     '+--------+------+'
                              ||               +=================+
                              ++==============>+  Serializer     |
                                               +=================+

Sender

The Sender is used by calling code (namely: checks) that wants to send metric samples upstream. Sender exposes a high level interface mapping to different metric types supported upstream (Gauges, Counters, etc). To get an instance of the global default sender, call GetDefaultSender, the function will take care of initialising everything, Aggregator included.

Aggregator

For now the package provides only one Aggregator implementation, the BufferedAggregator, named after its capabilities of storing in memory the samples it receives. The Aggregator should be used as a singleton, the function InitAggregator takes care of this and should be considered the right way to get an Aggregator instance at any time. An Aggregator has its own loop that needs to be started with the run method, in the case of the BufferedAggregator the buffer is flushed at defined intervals. An Aggregator receives metric samples using one or more channels and those samples are processed by different samplers (TimeSampler or CheckSampler).

Sampler

Metrics come this way as samples (e.g. in case of rates, the actual metric is computed over samples in a given time) and samplers take care of store and process them depending on where samples come from. We currently use two different samplers, one for samples coming from Dogstatsd, the other one for samples coming from checks. In the latter case, we have one sampler instance per check instance (this is to support running the same check at different intervals).

Metric

We have different kind of metrics (Gauge, Count, ...). Those are responsible to compute final Serie (set of points) to forwarde the the Datadog backend.

Documentation

Index

Constants

View Source
const DefaultFlushInterval = 15 * time.Second // flush interval

DefaultFlushInterval aggregator default flush interval

View Source
const MetricSamplePoolBatchSize = 32

MetricSamplePoolBatchSize is the batch size of the metric sample pool.

Variables

This section is empty.

Functions

func AddRecurrentSeries

func AddRecurrentSeries(newSerie *metrics.Serie)

AddRecurrentSeries adds a serie to the series that are sent at every flush

func DestroySender

func DestroySender(id check.ID)

DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost

func SetDefaultAggregator

func SetDefaultAggregator(agg *BufferedAggregator)

SetDefaultAggregator allows to force a custom Aggregator as the default one and run it. This is useful for testing or benchmarking.

func SetSender

func SetSender(sender Sender, id check.ID) error

SetSender returns the passed sender with the passed ID. This is largely for testing purposes

func StopDefaultAggregator

func StopDefaultAggregator()

StopDefaultAggregator stops the default aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.

Types

type BufferedAggregator

type BufferedAggregator struct {

	// metricSamplePool is a pool of slices of metric sample to avoid allocations.
	// Used by the Dogstatsd Batcher.
	MetricSamplePool *metrics.MetricSamplePool

	TickerChan          <-chan time.Time // For test/benchmark purposes: it allows the flush to be controlled from the outside
	ServerlessFlush     chan bool
	ServerlessFlushDone chan struct{}

	// [sts]
	MetricPrefix string // The prefix used for metrics generated in the aggregator.
	// contains filtered or unexported fields
}

BufferedAggregator aggregates metrics in buckets for dogstatsd Metrics

func InitAggregator

func InitAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string) *BufferedAggregator

InitAggregator returns the Singleton instance

func InitAggregatorWithFlushInterval

func InitAggregatorWithFlushInterval(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator

InitAggregatorWithFlushInterval returns the Singleton instance with a configured flush interval

func NewBufferedAggregator

func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator

NewBufferedAggregator instantiates a BufferedAggregator

func (*BufferedAggregator) AddAgentStartupTelemetry

func (agg *BufferedAggregator) AddAgentStartupTelemetry(agentVersion string)

AddAgentStartupTelemetry adds a startup event and count to be sent on the next flush

func (*BufferedAggregator) Flush

func (agg *BufferedAggregator) Flush(start time.Time, waitForSerializer bool)

Flush flushes the data contained in the BufferedAggregator into the Forwarder. This method can be called from multiple routines.

func (*BufferedAggregator) GetBufferedChannels

func (agg *BufferedAggregator) GetBufferedChannels() (chan []metrics.MetricSample, chan []*metrics.Event, chan []*metrics.ServiceCheck)

GetBufferedChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck

func (*BufferedAggregator) GetBufferedMetricsWithTsChannel

func (agg *BufferedAggregator) GetBufferedMetricsWithTsChannel() chan []metrics.MetricSample

GetBufferedMetricsWithTsChannel returns the channel to send MetricSamples containing their timestamp.

func (*BufferedAggregator) GetChannels

func (agg *BufferedAggregator) GetChannels() (chan *metrics.MetricSample, chan metrics.Event, chan metrics.ServiceCheck)

GetChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck

func (*BufferedAggregator) GetEventPlatformEvents

func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message

GetEventPlatformEvents grabs the event platform events from the queue and clears them. Note that this works only if using the 'noop' event platform forwarder

func (*BufferedAggregator) GetEvents

func (agg *BufferedAggregator) GetEvents() metrics.Events

GetEvents grabs the events from the queue and clears it

func (*BufferedAggregator) GetSeriesAndSketches

func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)

GetSeriesAndSketches grabs all the series & sketches from the queue and clears the queue The parameter `before` is used as an end interval while retrieving series and sketches from the time sampler. Metrics and sketches before this timestamp should be returned.

func (*BufferedAggregator) GetServiceChecks

func (agg *BufferedAggregator) GetServiceChecks() metrics.ServiceChecks

GetServiceChecks grabs all the service checks from the queue and clears the queue

func (*BufferedAggregator) IsInputQueueEmpty

func (agg *BufferedAggregator) IsInputQueueEmpty() bool

IsInputQueueEmpty returns true if every input channel for the aggregator are empty. This is mainly useful for tests and benchmark

func (*BufferedAggregator) SetHostname

func (agg *BufferedAggregator) SetHostname(hostname string)

SetHostname sets the hostname that the aggregator uses by default on all the data it sends Blocks until the main aggregator goroutine has finished handling the update

func (*BufferedAggregator) Stop

func (agg *BufferedAggregator) Stop()

Stop stops the aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.

type CheckSampler

type CheckSampler struct {
	// contains filtered or unexported fields
}

CheckSampler aggregates metrics from one Check instance

type Context

type Context struct {
	Name string
	Tags []string
	Host string
}

Context holds the elements that form a context, and can be serialized into a context key

type RawSender

type RawSender interface {
	SendRawMetricSample(sample *metrics.MetricSample)
	SendRawServiceCheck(sc *metrics.ServiceCheck)
	Event(e metrics.Event)
}

RawSender interface to submit samples to aggregator directly

type Sender

type Sender interface {
	Commit()
	Gauge(metric string, value float64, hostname string, tags []string)
	Rate(metric string, value float64, hostname string, tags []string)
	Count(metric string, value float64, hostname string, tags []string)
	MonotonicCount(metric string, value float64, hostname string, tags []string)
	MonotonicCountWithFlushFirstValue(metric string, value float64, hostname string, tags []string, flushFirstValue bool)
	Counter(metric string, value float64, hostname string, tags []string)
	Histogram(metric string, value float64, hostname string, tags []string)
	Historate(metric string, value float64, hostname string, tags []string)
	ServiceCheck(checkName string, status metrics.ServiceCheckStatus, hostname string, tags []string, message string)
	HistogramBucket(metric string, value int64, lowerBound, upperBound float64, monotonic bool, hostname string, tags []string, flushFirstValue bool)
	Event(e metrics.Event)
	EventPlatformEvent(rawEvent string, eventType string)
	GetSenderStats() check.SenderStats
	DisableDefaultHostname(disable bool)
	SetCheckCustomTags(tags []string)
	SetCheckService(service string)
	FinalizeCheckServiceTag()
	OrchestratorMetadata(msgs []serializer.ProcessMessageBody, clusterID string, nodeType int)
}

Sender allows sending metrics from checks/a check

func GetDefaultSender

func GetDefaultSender() (Sender, error)

GetDefaultSender returns the default sender

func GetSender

func GetSender(id check.ID) (Sender, error)

GetSender returns a Sender with passed ID, properly registered with the aggregator If no error is returned here, DestroySender must be called with the same ID once the sender is not used anymore

type SerieSignature

type SerieSignature struct {
	// contains filtered or unexported fields
}

SerieSignature holds the elements that allow to know whether two similar `Serie`s from the same bucket can be merged into one

type Stats

type Stats struct {
	Flushes    [32]int64 // circular buffer of recent flushes stat
	FlushIndex int       // last flush position in circular buffer
	LastFlush  int64     // most recent flush stat, provided for convenience
	Name       string
	// contains filtered or unexported fields
}

Stats stores a statistic from several past flushes allowing computations like median or percentiles

type TimeSampler

type TimeSampler struct {
	// contains filtered or unexported fields
}

TimeSampler aggregates metrics by buckets of 'interval' seconds

func NewTimeSampler

func NewTimeSampler(interval int64) *TimeSampler

NewTimeSampler returns a newly initialized TimeSampler

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL