statsd

package
v0.0.0-...-170def7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2018 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.

The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.

Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/atlassian/gostatsd/tree/master/backend/backends.

As with the original etsy statsd, multiple backends can be used simultaneously.

Index

Constants

View Source
const (
	// StatserInternal is the name used to indicate the use of the internal statser.
	StatserInternal = "internal"
	// StatserLogging is the name used to indicate the use of the logging statser.
	StatserLogging = "logging"
	// StatserNull is the name used to indicate the use of the null statser.
	StatserNull = "null"
	// StatserTagged is the name used to indicate the use of the tagged statser.
	StatserTagged = "tagged"
)
View Source
const (
	// DefaultMaxCloudRequests is the maximum number of cloud provider requests per second.
	DefaultMaxCloudRequests = 10
	// DefaultBurstCloudRequests is the burst number of cloud provider requests per second.
	DefaultBurstCloudRequests = DefaultMaxCloudRequests + 5
	// DefaultExpiryInterval is the default expiry interval for metrics.
	DefaultExpiryInterval = 5 * time.Minute
	// DefaultFlushInterval is the default metrics flush interval.
	DefaultFlushInterval = 1 * time.Second
	// DefaultIgnoreHost is the default value for whether the source should be used as the host
	DefaultIgnoreHost = false
	// DefaultMetricsAddr is the default address on which to listen for metrics.
	DefaultMetricsAddr = ":8125"
	// DefaultMaxQueueSize is the default maximum number of buffered metrics per worker.
	DefaultMaxQueueSize = 10000 // arbitrary
	// DefaultMaxConcurrentEvents is the default maximum number of events sent concurrently.
	DefaultMaxConcurrentEvents = 1024 // arbitrary
	// DefaultCacheRefreshPeriod is the default cache refresh period.
	DefaultCacheRefreshPeriod = 1 * time.Minute
	// DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period.
	DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute
	// DefaultCacheTTL is the default cache TTL for successful lookups.
	DefaultCacheTTL = 30 * time.Minute
	// DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found).
	DefaultCacheNegativeTTL = 1 * time.Minute
	// DefaultInternalNamespace is the default internal namespace
	DefaultInternalNamespace = "statsd"
	// DefaultHeartbeatEnabled is the default heartbeat enabled flag
	DefaultHeartbeatEnabled = false
	// DefaultReceiveBatchSize is the number of datagrams to read in each receive batch
	DefaultReceiveBatchSize = 50
	// DefaultEstimatedTags is the estimated number of expected tags on an individual metric submitted externally
	DefaultEstimatedTags = 4
	// DefaultConnPerReader is the default for whether to create a connection per reader
	DefaultConnPerReader = false
	// DefaultStatserType is the default statser type
	DefaultStatserType = StatserInternal
	// DefaultBadLinesPerMinute is the default number of bad lines to allow to log per minute
	DefaultBadLinesPerMinute = 0
)
View Source
const (
	// ParamBackends is the name of parameter with backends.
	ParamBackends = "backends"
	// ParamCloudProvider is the name of parameter with the name of cloud provider.
	ParamCloudProvider = "cloud-provider"
	// ParamMaxCloudRequests is the name of parameter with maximum number of cloud provider requests per second.
	ParamMaxCloudRequests = "max-cloud-requests"
	// ParamBurstCloudRequests is the name of parameter with burst number of cloud provider requests per second.
	ParamBurstCloudRequests = "burst-cloud-requests"
	// ParamDefaultTags is the name of parameter with the list of additional tags.
	ParamDefaultTags = "default-tags"
	// ParamInternalTags is the name of parameter with the list of tags for internal metrics.
	ParamInternalTags = "internal-tags"
	// ParamInternalNamespace is the name of parameter with the namespace for internal metrics.
	ParamInternalNamespace = "internal-namespace"
	// ParamExpiryInterval is the name of parameter with expiry interval for metrics.
	ParamExpiryInterval = "expiry-interval"
	// ParamFlushInterval is the name of parameter with metrics flush interval.
	ParamFlushInterval = "flush-interval"
	// ParamIgnoreHost is the name of parameter indicating if the source should be used as the host
	ParamIgnoreHost = "ignore-host"
	// ParamMaxReaders is the name of parameter with number of socket readers.
	ParamMaxReaders = "max-readers"
	// ParamMaxParsers is the name of the parameter with the number of goroutines that parse datagrams into metrics.
	ParamMaxParsers = "max-parsers"
	// ParamMaxWorkers is the name of parameter with number of goroutines that aggregate metrics.
	ParamMaxWorkers = "max-workers"
	// ParamMaxQueueSize is the name of parameter with maximum number of buffered metrics per worker.
	ParamMaxQueueSize = "max-queue-size"
	// ParamMaxConcurrentEvents is the name of parameter with maximum number of events sent concurrently.
	ParamMaxConcurrentEvents = "max-concurrent-events"
	// ParamEstimatedTags is the name of parameter with estimated number of tags per metric
	ParamEstimatedTags = "estimated-tags"
	// ParamCacheRefreshPeriod is the name of parameter with cache refresh period.
	ParamCacheRefreshPeriod = "cloud-cache-refresh-period"
	// ParamCacheEvictAfterIdlePeriod is the name of parameter with idle cache eviction period.
	ParamCacheEvictAfterIdlePeriod = "cloud-cache-evict-after-idle-period"
	// ParamCacheTTL is the name of parameter with cache TTL for successful lookups.
	ParamCacheTTL = "cloud-cache-ttl"
	// ParamCacheNegativeTTL is the name of parameter with cache TTL for failed lookups (errors or when instance was not found).
	ParamCacheNegativeTTL = "cloud-cache-negative-ttl"
	// ParamMetricsAddr is the name of parameter with address on which to listen for metrics.
	ParamMetricsAddr = "metrics-addr"
	// ParamNamespace is the name of parameter with namespace for all metrics.
	ParamNamespace = "namespace"
	// ParamStatserType is the name of parameter with type of statser.
	ParamStatserType = "statser-type"
	// ParamPercentThreshold is the name of parameter with list of applied percentiles.
	ParamPercentThreshold = "percent-threshold"
	// ParamHeartbeatEnabled is the name of the parameter with the heartbeat enabled
	ParamHeartbeatEnabled = "heartbeat-enabled"
	// ParamReceiveBatchSize is the name of the parameter with the number of datagrams to read in each receive batch
	ParamReceiveBatchSize = "receive-batch-size"
	// ParamConnPerReader is the name of the parameter indicating whether to create a connection per reader
	ParamConnPerReader = "conn-per-reader"
	// ParamBadLineRateLimitPerMinute is the name of the parameter indicating how many bad lines can be logged per minute
	ParamBadLinesPerMinute = "bad-lines-per-minute"
)

Variables

View Source
var DefaultBackends = []string{"graphite"}

DefaultBackends is the list of default backends' names.

View Source
var DefaultInternalTags = gostatsd.Tags{}

DefaultInternalTags is the default list of additional tags on internal metrics

View Source
var DefaultMaxParsers = runtime.NumCPU()

DefaultMaxParsers is the default number of goroutines that parse datagrams into metrics.

View Source
var DefaultMaxReaders = minInt(8, runtime.NumCPU())

DefaultMaxReaders is the default number of socket reading goroutines.

View Source
var DefaultMaxWorkers = runtime.NumCPU()

DefaultMaxWorkers is the default number of goroutines that aggregate metrics.

View Source
var DefaultPercentThreshold = []float64{90}

DefaultPercentThreshold is the default list of applied percentiles.

View Source
var DefaultTags = gostatsd.Tags{}

DefaultTags is the default list of additional tags.

Functions

func AddFlags

func AddFlags(fs *pflag.FlagSet)

AddFlags adds flags to the specified FlagSet.

Types

type AggregateProcesser

type AggregateProcesser interface {
	Process(ctx context.Context, fn DispatcherProcessFunc) gostatsd.Wait
}

AggregateProcesser is an interface to run a function against each Aggregator, in the goroutine context of that Aggregator.

type Aggregator

type Aggregator interface {
	Receive(*gostatsd.Metric, time.Time)
	Flush(interval time.Duration)
	Process(ProcessFunc)
	Reset()
}

Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.

Incoming metrics should be passed via Receive function.

type AggregatorFactory

type AggregatorFactory interface {
	// Create creates Aggregator objects.
	Create() Aggregator
}

AggregatorFactory creates Aggregator objects.

type AggregatorFactoryFunc

type AggregatorFactoryFunc func() Aggregator

AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.

func (AggregatorFactoryFunc) Create

func (f AggregatorFactoryFunc) Create() Aggregator

Create calls f().

type BackendHandler

type BackendHandler struct {
	// contains filtered or unexported fields
}

BackendEventHandler dispatches metrics and events to all configured backends (via Aggregators)

func NewBackendHandler

func NewBackendHandler(backends []gostatsd.Backend, maxConcurrentEvents uint, numWorkers int, perWorkerBufferSize int, af AggregatorFactory) *BackendHandler

NewBackendHandler initialises a new Handler which sends metrics and events to all backends

func (*BackendHandler) DispatchEvent

func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) error

func (*BackendHandler) DispatchMetric

func (bh *BackendHandler) DispatchMetric(ctx context.Context, m *gostatsd.Metric) error

DispatchMetric dispatches metric to a corresponding Aggregator.

func (*BackendHandler) EstimatedTags

func (bh *BackendHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*BackendHandler) Process

Process concurrently executes provided function in goroutines that own Aggregators. DispatcherProcessFunc function may be executed zero or up to numWorkers times. It is executed less than numWorkers times if the context signals "done".

func (*BackendHandler) Run

func (bh *BackendHandler) Run(ctx context.Context)

Run runs the BackendHandler workers until the Context is closed.

func (*BackendHandler) RunMetrics

func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)

RunMetrics attaches a Statser to the BackendHandler. Stops when the context is closed.

func (*BackendHandler) WaitForEvents

func (bh *BackendHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type BatchReader

type BatchReader interface {
	ReadBatch(ms []Message) (int, error)
}

func NewBatchReader

func NewBatchReader(conn net.PacketConn) BatchReader

type CacheOptions

type CacheOptions struct {
	CacheRefreshPeriod        time.Duration
	CacheEvictAfterIdlePeriod time.Duration
	CacheTTL                  time.Duration
	CacheNegativeTTL          time.Duration
}

CacheOptions holds cache behaviour configuration.

type CloudHandler

type CloudHandler struct {
	// contains filtered or unexported fields
}

CloudHandler enriches metrics and events with additional information fetched from cloud provider.

func NewCloudHandler

func NewCloudHandler(cloud gostatsd.CloudProvider, metrics MetricHandler, events EventHandler, logger logrus.FieldLogger, limiter *rate.Limiter, cacheOptions *CacheOptions) *CloudHandler

NewCloudHandler initialises a new cloud handler.

func (*CloudHandler) DispatchEvent

func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) error

func (*CloudHandler) DispatchMetric

func (ch *CloudHandler) DispatchMetric(ctx context.Context, m *gostatsd.Metric) error

func (*CloudHandler) EstimatedTags

func (ch *CloudHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*CloudHandler) Run

func (ch *CloudHandler) Run(ctx context.Context)

func (*CloudHandler) RunMetrics

func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)

func (*CloudHandler) WaitForEvents

func (ch *CloudHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type Datagram

type Datagram struct {
	IP       gostatsd.IP
	Msg      []byte
	DoneFunc func() // to be called once the datagram has been parsed and msg can be freed
}

Datagram is a received UDP datagram that has not been parsed into Metric/Event(s)

type DatagramParser

type DatagramParser struct {
	// contains filtered or unexported fields
}

DatagramParser receives datagrams and parses them into Metrics/Events For each Metric/Event it calls Handler.HandleMetric/Event()

func NewDatagramParser

func NewDatagramParser(in <-chan []*Datagram, ns string, ignoreHost bool, estimatedTags int, metrics MetricHandler, events EventHandler, statser statser.Statser, badLineLimiter *rate.Limiter) *DatagramParser

NewDatagramParser initialises a new DatagramParser.

func (*DatagramParser) Run

func (dp *DatagramParser) Run(ctx context.Context)

func (*DatagramParser) RunMetrics

func (dp *DatagramParser) RunMetrics(ctx context.Context)

type DatagramReceiver

type DatagramReceiver struct {
	// contains filtered or unexported fields
}

DatagramReceiver receives datagrams on its PacketConn and passes them off to be parsed

func NewDatagramReceiver

func NewDatagramReceiver(out chan<- []*Datagram, receiveBatchSize int) *DatagramReceiver

NewDatagramReceiver initialises a new DatagramReceiver.

func (*DatagramReceiver) Receive

func (dr *DatagramReceiver) Receive(ctx context.Context, c net.PacketConn)

Receive accepts incoming datagrams on c, and passes them off to be parsed

func (*DatagramReceiver) RunMetrics

func (dr *DatagramReceiver) RunMetrics(ctx context.Context, statser stats.Statser)

type DispatcherProcessFunc

type DispatcherProcessFunc func(int, Aggregator)

DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.

type EventHandler

type EventHandler interface {
	// DispatchEvent dispatches event to the next step in a pipeline.
	DispatchEvent(ctx context.Context, e *gostatsd.Event) error
	// WaitForEvents waits for all event-dispatching goroutines to finish.
	WaitForEvents()
}

EventHandler can be used to handle events

type GenericBatchReader

type GenericBatchReader struct {
	// contains filtered or unexported fields
}

func (*GenericBatchReader) ReadBatch

func (gbr *GenericBatchReader) ReadBatch(ms []Message) (int, error)

type Message

type Message struct {
	Buffers [][]byte
	Addr    net.Addr
	N       int
}

type MetricAggregator

type MetricAggregator struct {
	gostatsd.MetricMap
	// contains filtered or unexported fields
}

MetricAggregator aggregates metrics.

func NewMetricAggregator

func NewMetricAggregator(percentThresholds []float64, expiryInterval time.Duration, disabled gostatsd.TimerSubtypes) *MetricAggregator

NewMetricAggregator creates a new MetricAggregator object.

func (*MetricAggregator) Flush

func (a *MetricAggregator) Flush(flushInterval time.Duration)

Flush prepares the contents of a MetricAggregator for sending via the Sender.

func (*MetricAggregator) Process

func (a *MetricAggregator) Process(f ProcessFunc)

func (*MetricAggregator) Receive

func (a *MetricAggregator) Receive(m *gostatsd.Metric, now time.Time)

Receive aggregates an incoming metric.

func (*MetricAggregator) Reset

func (a *MetricAggregator) Reset()

Reset clears the contents of a MetricAggregator.

func (*MetricAggregator) RunMetrics

func (a *MetricAggregator) RunMetrics(ctx context.Context, statser statser.Statser)

type MetricEmitter

type MetricEmitter interface {
	RunMetrics(ctx context.Context, statser statser.Statser)
}

MetricEmitter is an object that emits metrics. Used to pass a Statser to the object after initialization, as Statsers may be created after MetricEmitters

type MetricFlusher

type MetricFlusher struct {
	// contains filtered or unexported fields
}

MetricFlusher periodically flushes metrics from all Aggregators to Senders.

func NewMetricFlusher

func NewMetricFlusher(flushInterval time.Duration, aggregateProcesser AggregateProcesser, backends []gostatsd.Backend, hostname string, statser statser.Statser) *MetricFlusher

NewMetricFlusher creates a new MetricFlusher with provided configuration.

func (*MetricFlusher) Run

func (f *MetricFlusher) Run(ctx context.Context)

Run runs the MetricFlusher.

type MetricHandler

type MetricHandler interface {
	// EstimatedTags returns a guess for how many tags to pre-allocate
	EstimatedTags() int
	// DispatchMetric dispatches a metric to the next step in a pipeline.
	DispatchMetric(ctx context.Context, m *gostatsd.Metric) error
}

MetricHandler can be used to handle metrics

type ProcessFunc

type ProcessFunc func(*gostatsd.MetricMap)

ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.

type Server

type Server struct {
	Backends                  []gostatsd.Backend
	CloudProvider             gostatsd.CloudProvider
	Limiter                   *rate.Limiter
	InternalTags              gostatsd.Tags
	InternalNamespace         string
	DefaultTags               gostatsd.Tags
	ExpiryInterval            time.Duration
	FlushInterval             time.Duration
	MaxReaders                int
	MaxParsers                int
	MaxWorkers                int
	MaxQueueSize              int
	MaxConcurrentEvents       int
	MaxEventQueueSize         int
	EstimatedTags             int
	MetricsAddr               string
	Namespace                 string
	StatserType               string
	PercentThreshold          []float64
	IgnoreHost                bool
	ConnPerReader             bool
	HeartbeatEnabled          bool
	HeartbeatTags             gostatsd.Tags
	ReceiveBatchSize          int
	DisabledSubTypes          gostatsd.TimerSubtypes
	BadLineRateLimitPerSecond rate.Limit
	CacheOptions
	Viper *viper.Viper
}

Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run runs the server until context signals done.

func (*Server) RunWithCustomSocket

func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error

RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.

type SocketFactory

type SocketFactory func() (net.PacketConn, error)

SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.

type TagHandler

type TagHandler struct {
	// contains filtered or unexported fields
}

func NewTagHandler

func NewTagHandler(metrics MetricHandler, events EventHandler, tags gostatsd.Tags) *TagHandler

NewTagHandler initialises a new handler which adds unique tags and sends metrics/events to the next handler

func (*TagHandler) DispatchEvent

func (th *TagHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) error

DispatchEvent adds the unique tags from the TagHandler to the event and passes it to the next stage in the pipeline

func (*TagHandler) DispatchMetric

func (th *TagHandler) DispatchMetric(ctx context.Context, m *gostatsd.Metric) error

DispatchMetric adds the unique tags from the TagHandler to the metric and passes it to the next stage in the pipeline

func (*TagHandler) EstimatedTags

func (th *TagHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*TagHandler) WaitForEvents

func (th *TagHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type V6BatchReader

type V6BatchReader struct {
	// contains filtered or unexported fields
}

func (*V6BatchReader) ReadBatch

func (br *V6BatchReader) ReadBatch(ms []Message) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL