statsd

package
v0.0.0-...-035455f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 39 Imported by: 13

Documentation

Overview

Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.

The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.

Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/atlassian/gostatsd/tree/master/backend/backends.

As with the original etsy statsd, multiple backends can be used simultaneously.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregateProcesser

type AggregateProcesser interface {
	Process(ctx context.Context, fn DispatcherProcessFunc) gostatsd.Wait
}

AggregateProcesser is an interface to run a function against each Aggregator, in the goroutine context of that Aggregator.

type Aggregator

type Aggregator interface {
	ReceiveMap(mm *gostatsd.MetricMap)
	Flush(interval time.Duration)
	Process(ProcessFunc)
	Reset()
}

Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.

Incoming metrics should be passed via ReceiveMap function.

type AggregatorFactory

type AggregatorFactory interface {
	// Create creates Aggregator objects.
	Create() Aggregator
}

AggregatorFactory creates Aggregator objects.

type AggregatorFactoryFunc

type AggregatorFactoryFunc func() Aggregator

AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.

func (AggregatorFactoryFunc) Create

func (f AggregatorFactoryFunc) Create() Aggregator

Create calls f().

type BackendHandler

type BackendHandler struct {
	// contains filtered or unexported fields
}

BackendEventHandler dispatches metrics and events to all configured backends (via Aggregators)

func NewBackendHandler

func NewBackendHandler(backends []gostatsd.Backend, maxConcurrentEvents uint, numWorkers int, perWorkerBufferSize int, af AggregatorFactory) *BackendHandler

NewBackendHandler initialises a new Handler which sends metrics and events to all backends

func (*BackendHandler) DispatchEvent

func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)

func (*BackendHandler) DispatchMetricMap

func (bh *BackendHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)

DispatchMetricMap splits a MetricMap in to per-aggregator buckets and distributes it.

func (*BackendHandler) EstimatedTags

func (bh *BackendHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*BackendHandler) Process

Process concurrently executes provided function in goroutines that own Aggregators. DispatcherProcessFunc function may be executed zero or up to numWorkers times. It is executed less than numWorkers times if the context signals "done".

func (*BackendHandler) Run

func (bh *BackendHandler) Run(ctx context.Context)

Run runs the BackendHandler workers until the Context is closed.

func (*BackendHandler) RunMetrics

func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)

RunMetrics attaches a Statser to the BackendHandler. Stops when the context is closed.

func (*BackendHandler) RunMetricsContext

func (bh *BackendHandler) RunMetricsContext(ctx context.Context)

RunMetricsContext pulls a Statser from the Context and invokes RunMetrics. Allows a BackendHandler to still conform to MetricEmitter.

func (*BackendHandler) WaitForEvents

func (bh *BackendHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type BatchReader

type BatchReader interface {
	ReadBatch(ms []Message) (int, error)
}

func NewBatchReader

func NewBatchReader(conn net.PacketConn) BatchReader

type CloudHandler

type CloudHandler struct {
	// contains filtered or unexported fields
}

CloudHandler enriches metrics and events with additional information fetched from cloud provider.

func NewCloudHandler

func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd.PipelineHandler) *CloudHandler

NewCloudHandler initialises a new cloud handler.

func (*CloudHandler) DispatchEvent

func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)

func (*CloudHandler) DispatchMetricMap

func (ch *CloudHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)

func (*CloudHandler) EstimatedTags

func (ch *CloudHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*CloudHandler) Run

func (ch *CloudHandler) Run(ctx context.Context)

func (*CloudHandler) RunMetrics

func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)

func (*CloudHandler) WaitForEvents

func (ch *CloudHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type Datagram

type Datagram struct {
	IP        gostatsd.Source
	Msg       []byte
	Timestamp gostatsd.Nanotime
	DoneFunc  func() // to be called once the datagram has been parsed and msg can be freed
}

Datagram is a received UDP datagram that has not been parsed into Metric/Event(s)

type DatagramParser

type DatagramParser struct {
	// contains filtered or unexported fields
}

DatagramParser receives datagrams and parses them into Metrics/Events For each Metric/Event it calls Handler.HandleMetric/Event()

func NewDatagramParser

func NewDatagramParser(
	in <-chan []*Datagram,
	ns string,
	ignoreHost bool,
	estimatedTags int,
	handler gostatsd.PipelineHandler,
	badLineRateLimitPerSecond rate.Limit,
	logRawMetric bool,
	logger logrus.FieldLogger,
) *DatagramParser

NewDatagramParser initialises a new DatagramParser.

func (*DatagramParser) Run

func (dp *DatagramParser) Run(ctx context.Context)

func (*DatagramParser) RunMetricsContext

func (dp *DatagramParser) RunMetricsContext(ctx context.Context)

type DatagramReceiver

type DatagramReceiver struct {
	// contains filtered or unexported fields
}

DatagramReceiver receives datagrams on its PacketConn and passes them off to be parsed

func NewDatagramReceiver

func NewDatagramReceiver(out chan<- []*Datagram, sf SocketFactory, numReaders, receiveBatchSize int) *DatagramReceiver

NewDatagramReceiver initialises a new DatagramReceiver.

func (*DatagramReceiver) Receive

func (dr *DatagramReceiver) Receive(ctx context.Context, c net.PacketConn)

Receive accepts incoming datagrams on c, and passes them off to be parsed

func (*DatagramReceiver) Run

func (dr *DatagramReceiver) Run(ctx context.Context)

func (*DatagramReceiver) RunMetricsContext

func (dr *DatagramReceiver) RunMetricsContext(ctx context.Context)

type DispatcherProcessFunc

type DispatcherProcessFunc func(int, Aggregator)

DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.

type Filter

type Filter struct {
	MatchMetrics   gostatsd.StringMatchList // Name must match
	ExcludeMetrics gostatsd.StringMatchList // Name must not match
	MatchTags      gostatsd.StringMatchList // Any tag must match
	DropTags       gostatsd.StringMatchList // Any tag matching anything will be dropped
	DropMetric     bool                     // Drop the entire metric
	DropHost       bool                     // Clears Hostname if present
}

func NewFilterFromViper

func NewFilterFromViper(v *viper.Viper) Filter

NewFilterFromViper creates a new Filter given a *viper.Viper

type GenericBatchReader

type GenericBatchReader struct {
	// contains filtered or unexported fields
}

func (*GenericBatchReader) ReadBatch

func (gbr *GenericBatchReader) ReadBatch(ms []Message) (int, error)

type HttpForwarderHandlerV2

type HttpForwarderHandlerV2 struct {
	// contains filtered or unexported fields
}

HttpForwarderHandlerV2 is a PipelineHandler which sends metrics to another gostatsd instance

func NewHttpForwarderHandlerV2

func NewHttpForwarderHandlerV2(
	logger logrus.FieldLogger,
	transport,
	apiEndpoint string,
	consolidatorSlots,
	maxRequests int,
	concurrentMerge int,
	compress bool,
	maxRequestElapsedTime time.Duration,
	flushInterval time.Duration,
	xheaders map[string]string,
	dynHeaderNames []string,
	pool *transport.TransportPool,
	fc flush.Coordinator,
) (*HttpForwarderHandlerV2, error)

NewHttpForwarderHandlerV2 returns a new handler which dispatches metrics over http to another gostatsd server.

func NewHttpForwarderHandlerV2FromViper

func NewHttpForwarderHandlerV2FromViper(logger logrus.FieldLogger, v *viper.Viper, pool *transport.TransportPool, fc flush.Coordinator) (*HttpForwarderHandlerV2, error)

NewHttpForwarderHandlerV2FromViper returns a new http API client.

func (*HttpForwarderHandlerV2) Close

func (hfh *HttpForwarderHandlerV2) Close()

func (*HttpForwarderHandlerV2) DeepChecks

func (*HttpForwarderHandlerV2) DispatchEvent

func (hfh *HttpForwarderHandlerV2) DispatchEvent(ctx context.Context, e *gostatsd.Event)

func (*HttpForwarderHandlerV2) DispatchMetricMap

func (hfh *HttpForwarderHandlerV2) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)

DispatchMetricMap dispatches a metric map to the MetricConsolidator

func (*HttpForwarderHandlerV2) EstimatedTags

func (hfh *HttpForwarderHandlerV2) EstimatedTags() int

func (*HttpForwarderHandlerV2) Run

func (hfh *HttpForwarderHandlerV2) Run(ctx context.Context)

func (*HttpForwarderHandlerV2) RunMetricsContext

func (hfh *HttpForwarderHandlerV2) RunMetricsContext(ctx context.Context)

func (*HttpForwarderHandlerV2) WaitForEvents

func (hfh *HttpForwarderHandlerV2) WaitForEvents()

type Message

type Message struct {
	Buffers [][]byte
	Addr    net.Addr
	N       int
}

type MetricAggregator

type MetricAggregator struct {
	// contains filtered or unexported fields
}

MetricAggregator aggregates metrics.

func NewMetricAggregator

func NewMetricAggregator(
	percentThresholds []float64,
	expiryIntervalCounter time.Duration,
	expiryIntervalGauge time.Duration,
	expiryIntervalSet time.Duration,
	expiryIntervalTimer time.Duration,
	disabled gostatsd.TimerSubtypes,
	histogramLimit uint32,
) *MetricAggregator

NewMetricAggregator creates a new MetricAggregator object.

func (*MetricAggregator) Flush

func (a *MetricAggregator) Flush(flushInterval time.Duration)

Flush prepares the contents of a MetricAggregator for sending via the Sender.

func (*MetricAggregator) Process

func (a *MetricAggregator) Process(f ProcessFunc)

func (*MetricAggregator) ReceiveMap

func (a *MetricAggregator) ReceiveMap(mm *gostatsd.MetricMap)

ReceiveMap takes a single metric map and will aggregate the values

func (*MetricAggregator) Reset

func (a *MetricAggregator) Reset()

Reset clears the contents of a MetricAggregator.

func (*MetricAggregator) RunMetrics

func (a *MetricAggregator) RunMetrics(ctx context.Context, statser stats.Statser)

type MetricEmitter

type MetricEmitter interface {
	RunMetrics(ctx context.Context, statser stats.Statser)
}

MetricEmitter is an object that emits metrics. Used to pass a Statser to the object after initialization, as Statsers may be created after MetricEmitters

type MetricFlusher

type MetricFlusher struct {
	// contains filtered or unexported fields
}

MetricFlusher periodically flushes metrics from all Aggregators to Senders.

func NewMetricFlusher

func NewMetricFlusher(flushInterval, flushOffset time.Duration, aligned bool, aggregateProcesser AggregateProcesser, backends []gostatsd.Backend) *MetricFlusher

NewMetricFlusher creates a new MetricFlusher with provided configuration.

func (*MetricFlusher) Run

func (f *MetricFlusher) Run(ctx context.Context)

Run runs the MetricFlusher.

type ProcessFunc

type ProcessFunc func(*gostatsd.MetricMap)

ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.

type Server

type Server struct {
	Runnables                 []gostatsd.Runnable
	Backends                  []gostatsd.Backend
	CachedInstances           gostatsd.CachedInstances
	InternalTags              gostatsd.Tags
	InternalNamespace         string
	DefaultTags               gostatsd.Tags
	ExpiryIntervalCounter     time.Duration
	ExpiryIntervalGauge       time.Duration
	ExpiryIntervalSet         time.Duration
	ExpiryIntervalTimer       time.Duration
	ForwarderFlushCoordinator flush.Coordinator
	FlushInterval             time.Duration
	FlushOffset               time.Duration
	FlushAligned              bool
	MaxReaders                int
	MaxParsers                int
	MaxWorkers                int
	MaxQueueSize              int
	MaxConcurrentEvents       int
	MaxEventQueueSize         int
	EstimatedTags             int
	MetricsAddr               string
	Namespace                 string
	StatserType               string
	PercentThreshold          []float64
	IgnoreHost                bool
	ConnPerReader             bool
	HeartbeatEnabled          bool
	HeartbeatTags             gostatsd.Tags
	ReceiveBatchSize          int
	DisabledSubTypes          gostatsd.TimerSubtypes
	HistogramLimit            uint32
	BadLineRateLimitPerSecond rate.Limit
	ServerMode                string
	Hostname                  gostatsd.Source
	LogRawMetric              bool
	DisableInternalEvents     bool
	Viper                     *viper.Viper
	TransportPool             *transport.TransportPool
}

Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run runs the server until context signals done.

func (*Server) RunWithCustomSocket

func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error

RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.

type SocketFactory

type SocketFactory func() (net.PacketConn, error)

SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.

type TagChanger

type TagChanger interface {
	AddTagsSetSource(additionalTags gostatsd.Tags, newSource gostatsd.Source)
}

TagChanger is an interface that Metric/Event can implement to update their tags and source. It is so the CloudHandler can change the tags without worrying about the TagsKey cache.

type TagHandler

type TagHandler struct {
	// contains filtered or unexported fields
}

func NewTagHandler

func NewTagHandler(handler gostatsd.PipelineHandler, tags gostatsd.Tags, filters []Filter) *TagHandler

NewTagHandler initialises a new handler which adds unique tags, and sends metrics/events to the next handler based on filter rules.

func NewTagHandlerFromViper

func NewTagHandlerFromViper(v *viper.Viper, handler gostatsd.PipelineHandler, tags gostatsd.Tags) *TagHandler

func (*TagHandler) DispatchEvent

func (th *TagHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)

DispatchEvent adds the unique tags from the TagHandler to the event and passes it to the next stage in the pipeline

func (*TagHandler) DispatchMetricMap

func (th *TagHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)

DispatchMetricMap adds the unique tags from the TagHandler to each consolidated metric in the map and passes it to the next stage in the pipeline

There is potential to optimize here: if the tagsKey doesn't change, we don't need to re-calculate it. But we're keeping things simple for now.

func (*TagHandler) EstimatedTags

func (th *TagHandler) EstimatedTags() int

EstimatedTags returns a guess for how many tags to pre-allocate

func (*TagHandler) WaitForEvents

func (th *TagHandler) WaitForEvents()

WaitForEvents waits for all event-dispatching goroutines to finish.

type V6BatchReader

type V6BatchReader struct {
	// contains filtered or unexported fields
}

func (*V6BatchReader) ReadBatch

func (br *V6BatchReader) ReadBatch(ms []Message) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL