Documentation ¶
Overview ¶
Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.
The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.
Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/atlassian/gostatsd/tree/master/backend/backends.
As with the original etsy statsd, multiple backends can be used simultaneously.
Index ¶
- Constants
- Variables
- func AddFlags(fs *pflag.FlagSet)
- func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, ...) *cloudHandler
- type Aggregator
- type AggregatorFactory
- type AggregatorFactoryFunc
- type CacheOptions
- type ConsoleServer
- type Dispatcher
- type DispatcherProcessFunc
- type Flusher
- type FlusherStats
- type Handler
- type ProcessFunc
- type Receiver
- type ReceiverStats
- type Server
- type SocketFactory
Constants ¶
const ( // DefaultCacheRefreshPeriod is the default cache refresh period. DefaultCacheRefreshPeriod = 1 * time.Minute // DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period. DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute // DefaultCacheTTL is the default cache TTL for successful lookups. DefaultCacheTTL = 30 * time.Minute // DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found). DefaultCacheNegativeTTL = 1 * time.Minute )
const ( // DefaultMaxCloudRequests is the maximum number of cloud provider requests per second. DefaultMaxCloudRequests = 40 // DefaultBurstCloudRequests is the burst number of cloud provider requests per second. DefaultBurstCloudRequests = DefaultMaxCloudRequests + 20 // DefaultExpiryInterval is the default expiry interval for metrics. DefaultExpiryInterval = 5 * time.Minute // DefaultFlushInterval is the default metrics flush interval. DefaultFlushInterval = 1 * time.Second // DefaultMetricsAddr is the default address on which to listen for metrics. DefaultMetricsAddr = ":8125" // DefaultMaxQueueSize is the default maximum number of buffered metrics per worker. DefaultMaxQueueSize = 10000 // arbitrary // DefaultMaxConcurrentEvents is the default maximum number of events sent concurrently. DefaultMaxConcurrentEvents = 1024 // arbitrary )
const ( // ParamBackends is the name of parameter with backends. ParamBackends = "backends" // ParamConsoleAddr is the name of parameter with console address. ParamConsoleAddr = "console-addr" // ParamCloudProvider is the name of parameter with the name of cloud provider. ParamCloudProvider = "cloud-provider" // ParamMaxCloudRequests is the name of parameter with maximum number of cloud provider requests per second. ParamMaxCloudRequests = "maxCloudRequests" // ParamBurstCloudRequests is the name of parameter with burst number of cloud provider requests per second. ParamBurstCloudRequests = "burstCloudRequests" // ParamDefaultTags is the name of parameter with the list of additional tags. ParamDefaultTags = "default-tags" // ParamExpiryInterval is the name of parameter with expiry interval for metrics. ParamExpiryInterval = "expiry-interval" // ParamFlushInterval is the name of parameter with metrics flush interval. ParamFlushInterval = "flush-interval" // ParamMaxReaders is the name of parameter with number of socket readers. ParamMaxReaders = "max-readers" // ParamMaxWorkers is the name of parameter with number of goroutines that aggregate metrics. ParamMaxWorkers = "max-workers" // ParamMaxQueueSize is the name of parameter with maximum number of buffered metrics per worker. ParamMaxQueueSize = "max-queue-size" // ParamMaxConcurrentEvents is the name of parameter with maximum number of events sent concurrently. ParamMaxConcurrentEvents = "max-concurrent-events" // ParamMetricsAddr is the name of parameter with address on which to listen for metrics. ParamMetricsAddr = "metrics-addr" // ParamNamespace is the name of parameter with namespace for all metrics. ParamNamespace = "namespace" // ParamPercentThreshold is the name of parameter with list of applied percentiles. ParamPercentThreshold = "percent-threshold" // ParamWebAddr is the name of parameter with the address of the web-based console. ParamWebAddr = "web-addr" )
const DefaultConsoleAddr = ":8126"
DefaultConsoleAddr is the default address on which a ConsoleServer will listen.
const DefaultWebConsoleAddr = ":8181"
DefaultWebConsoleAddr is the default address on which a WebConsoleServer will listen.
Variables ¶
var DefaultBackends = []string{"graphite"}
DefaultBackends is the list of default backends' names.
var DefaultMaxReaders = runtime.NumCPU()
DefaultMaxReaders is the default number of socket reading goroutines.
var DefaultMaxWorkers = runtime.NumCPU()
DefaultMaxWorkers is the default number of goroutines that aggregate metrics.
var DefaultPercentThreshold = []float64{90}
DefaultPercentThreshold is the default list of applied percentiles.
var DefaultTags = types.Tags{}
DefaultTags is the default list of additional tags.
Functions ¶
func NewCloudHandler ¶
func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, cacheOptions *CacheOptions) *cloudHandler
NewCloudHandler initialises a new cloud handler. If cacheOptions is nil default cache configuration is used.
Types ¶
type Aggregator ¶
type Aggregator interface { Receive(*types.Metric, time.Time) Flush(interval time.Duration) Process(ProcessFunc) Reset() }
Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.
Incoming metrics should be passed via Receive function.
func NewAggregator ¶
func NewAggregator(percentThresholds []float64, expiryInterval time.Duration) Aggregator
NewAggregator creates a new Aggregator object.
type AggregatorFactory ¶
type AggregatorFactory interface { // Create creates Aggregator objects. Create() Aggregator }
AggregatorFactory creates Aggregator objects.
type AggregatorFactoryFunc ¶
type AggregatorFactoryFunc func() Aggregator
AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.
func (AggregatorFactoryFunc) Create ¶
func (f AggregatorFactoryFunc) Create() Aggregator
Create calls f().
type CacheOptions ¶
type CacheOptions struct { CacheRefreshPeriod time.Duration CacheEvictAfterIdlePeriod time.Duration CacheTTL time.Duration CacheNegativeTTL time.Duration }
CacheOptions holds cache behaviour configuration.
type ConsoleServer ¶
type ConsoleServer struct { Addr string Receiver Dispatcher Flusher }
ConsoleServer is an object that listens for telnet connection on a TCP address Addr and provides a console interface to manage statsd server.
func (*ConsoleServer) ListenAndServe ¶
func (s *ConsoleServer) ListenAndServe(ctx context.Context) error
ListenAndServe listens on the ConsoleServer's TCP network address and then calls Serve.
type Dispatcher ¶
type Dispatcher interface { Run(context.Context) error DispatchMetric(context.Context, *types.Metric) error Process(context.Context, DispatcherProcessFunc) *sync.WaitGroup }
Dispatcher is responsible for managing Aggregators' lifecycle and dispatching metrics among them.
func NewDispatcher ¶
func NewDispatcher(numWorkers int, perWorkerBufferSize int, af AggregatorFactory) Dispatcher
NewDispatcher creates a new Dispatcher with provided configuration.
type DispatcherProcessFunc ¶
type DispatcherProcessFunc func(uint16, Aggregator)
DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.
type Flusher ¶
type Flusher interface { Run(context.Context) error GetStats() FlusherStats }
Flusher periodically flushes metrics from all Aggregators to Senders.
func NewFlusher ¶
func NewFlusher(flushInterval time.Duration, dispatcher Dispatcher, receiver Receiver, handler Handler, backends []backendTypes.Backend, selfIP types.IP, hostname string) Flusher
NewFlusher creates a new Flusher with provided configuration.
type FlusherStats ¶
type FlusherStats struct { LastFlush time.Time // Last time the metrics where aggregated LastFlushError time.Time // Time of the last flush error }
FlusherStats holds statistics about a Flusher.
type Handler ¶
type Handler interface { // DispatchMetric dispatches metric to the next step in a pipeline. DispatchMetric(context.Context, *types.Metric) error // DispatchEvent dispatches event to the next step in a pipeline. DispatchEvent(context.Context, *types.Event) error // WaitForEvents waits for all event-dispatching goroutines to finish. WaitForEvents() }
Handler interface can be used to handle metrics and events.
func NewDispatchingHandler ¶
func NewDispatchingHandler(dispatcher Dispatcher, backends []backendTypes.Backend, tags types.Tags, maxConcurrentEvents uint) Handler
NewDispatchingHandler initialises a new dispatching handler.
type ProcessFunc ¶
ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.
type Receiver ¶
type Receiver interface { Receive(context.Context, net.PacketConn) error GetStats() ReceiverStats }
Receiver receives data on its PacketConn and converts lines into Metrics. For each types.Metric it calls Handler.HandleMetric()
func NewMetricReceiver ¶
NewMetricReceiver initialises a new Receiver.
type ReceiverStats ¶
type ReceiverStats struct { LastPacket time.Time BadLines uint64 PacketsReceived uint64 MetricsReceived uint64 EventsReceived uint64 }
ReceiverStats holds statistics for a Receiver.
type Server ¶
type Server struct { Backends []backendTypes.Backend ConsoleAddr string CloudProvider cloudTypes.Interface Limiter *rate.Limiter DefaultTags types.Tags ExpiryInterval time.Duration FlushInterval time.Duration MaxReaders int MaxWorkers int MaxQueueSize int MaxConcurrentEvents int MaxEventQueueSize int MetricsAddr string Namespace string PercentThreshold []float64 WebConsoleAddr string Viper *viper.Viper }
Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.
func NewServer ¶
func NewServer() *Server
NewServer will create a new Server with the default configuration.
func (*Server) RunWithCustomSocket ¶
func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error
RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.
type SocketFactory ¶
type SocketFactory func() (net.PacketConn, error)
SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.