statsd

package
v0.0.0-...-2b0a6a7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2016 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.

The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.

Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/atlassian/gostatsd/tree/master/backend/backends.

As with the original etsy statsd, multiple backends can be used simultaneously.

Index

Constants

View Source
const (

	// DefaultCacheRefreshPeriod is the default cache refresh period.
	DefaultCacheRefreshPeriod = 1 * time.Minute
	// DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period.
	DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute
	// DefaultCacheTTL is the default cache TTL for successful lookups.
	DefaultCacheTTL = 30 * time.Minute
	// DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found).
	DefaultCacheNegativeTTL = 1 * time.Minute
)
View Source
const (
	// DefaultMaxCloudRequests is the maximum number of cloud provider requests per second.
	DefaultMaxCloudRequests = 40
	// DefaultBurstCloudRequests is the burst number of cloud provider requests per second.
	DefaultBurstCloudRequests = DefaultMaxCloudRequests + 20
	// DefaultExpiryInterval is the default expiry interval for metrics.
	DefaultExpiryInterval = 5 * time.Minute
	// DefaultFlushInterval is the default metrics flush interval.
	DefaultFlushInterval = 1 * time.Second
	// DefaultMetricsAddr is the default address on which to listen for metrics.
	DefaultMetricsAddr = ":8125"
	// DefaultMaxQueueSize is the default maximum number of buffered metrics per worker.
	DefaultMaxQueueSize = 10000 // arbitrary
	// DefaultMaxConcurrentEvents is the default maximum number of events sent concurrently.
	DefaultMaxConcurrentEvents = 1024 // arbitrary
)
View Source
const (
	// ParamBackends is the name of parameter with backends.
	ParamBackends = "backends"
	// ParamConsoleAddr is the name of parameter with console address.
	ParamConsoleAddr = "console-addr"
	// ParamCloudProvider is the name of parameter with the name of cloud provider.
	ParamCloudProvider = "cloud-provider"
	// ParamMaxCloudRequests is the name of parameter with maximum number of cloud provider requests per second.
	ParamMaxCloudRequests = "maxCloudRequests"
	// ParamBurstCloudRequests is the name of parameter with burst number of cloud provider requests per second.
	ParamBurstCloudRequests = "burstCloudRequests"
	// ParamDefaultTags is the name of parameter with the list of additional tags.
	ParamDefaultTags = "default-tags"
	// ParamExpiryInterval is the name of parameter with expiry interval for metrics.
	ParamExpiryInterval = "expiry-interval"
	// ParamFlushInterval is the name of parameter with metrics flush interval.
	ParamFlushInterval = "flush-interval"
	// ParamMaxReaders is the name of parameter with number of socket readers.
	ParamMaxReaders = "max-readers"
	// ParamMaxWorkers is the name of parameter with number of goroutines that aggregate metrics.
	ParamMaxWorkers = "max-workers"
	// ParamMaxQueueSize is the name of parameter with maximum number of buffered metrics per worker.
	ParamMaxQueueSize = "max-queue-size"
	// ParamMaxConcurrentEvents is the name of parameter with maximum number of events sent concurrently.
	ParamMaxConcurrentEvents = "max-concurrent-events"
	// ParamMetricsAddr is the name of parameter with address on which to listen for metrics.
	ParamMetricsAddr = "metrics-addr"
	// ParamNamespace is the name of parameter with namespace for all metrics.
	ParamNamespace = "namespace"
	// ParamPercentThreshold is the name of parameter with list of applied percentiles.
	ParamPercentThreshold = "percent-threshold"
	// ParamWebAddr is the name of parameter with the address of the web-based console.
	ParamWebAddr = "web-addr"
)
View Source
const DefaultConsoleAddr = ":8126"

DefaultConsoleAddr is the default address on which a ConsoleServer will listen.

View Source
const DefaultWebConsoleAddr = ":8181"

DefaultWebConsoleAddr is the default address on which a WebConsoleServer will listen.

Variables

View Source
var DefaultBackends = []string{"graphite"}

DefaultBackends is the list of default backends' names.

View Source
var DefaultMaxReaders = runtime.NumCPU()

DefaultMaxReaders is the default number of socket reading goroutines.

View Source
var DefaultMaxWorkers = runtime.NumCPU()

DefaultMaxWorkers is the default number of goroutines that aggregate metrics.

View Source
var DefaultPercentThreshold = []float64{90}

DefaultPercentThreshold is the default list of applied percentiles.

View Source
var DefaultTags = types.Tags{}

DefaultTags is the default list of additional tags.

Functions

func AddFlags

func AddFlags(fs *pflag.FlagSet)

AddFlags adds flags to the specified FlagSet.

func NewCloudHandler

func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, cacheOptions *CacheOptions) *cloudHandler

NewCloudHandler initialises a new cloud handler. If cacheOptions is nil default cache configuration is used.

Types

type Aggregator

type Aggregator interface {
	Receive(*types.Metric, time.Time)
	Flush(interval time.Duration)
	Process(ProcessFunc)
	Reset()
}

Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.

Incoming metrics should be passed via Receive function.

func NewAggregator

func NewAggregator(percentThresholds []float64, expiryInterval time.Duration) Aggregator

NewAggregator creates a new Aggregator object.

type AggregatorFactory

type AggregatorFactory interface {
	// Create creates Aggregator objects.
	Create() Aggregator
}

AggregatorFactory creates Aggregator objects.

type AggregatorFactoryFunc

type AggregatorFactoryFunc func() Aggregator

AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.

func (AggregatorFactoryFunc) Create

func (f AggregatorFactoryFunc) Create() Aggregator

Create calls f().

type CacheOptions

type CacheOptions struct {
	CacheRefreshPeriod        time.Duration
	CacheEvictAfterIdlePeriod time.Duration
	CacheTTL                  time.Duration
	CacheNegativeTTL          time.Duration
}

CacheOptions holds cache behaviour configuration.

type ConsoleServer

type ConsoleServer struct {
	Addr string
	Receiver
	Dispatcher
	Flusher
}

ConsoleServer is an object that listens for telnet connection on a TCP address Addr and provides a console interface to manage statsd server.

func (*ConsoleServer) ListenAndServe

func (s *ConsoleServer) ListenAndServe(ctx context.Context) error

ListenAndServe listens on the ConsoleServer's TCP network address and then calls Serve.

func (*ConsoleServer) Serve

func (s *ConsoleServer) Serve(ctx context.Context, l net.Listener) error

Serve accepts incoming connections on the listener and serves them a console interface to the Dispatcher and Receiver.

type Dispatcher

type Dispatcher interface {
	Run(context.Context) error
	DispatchMetric(context.Context, *types.Metric) error
	Process(context.Context, DispatcherProcessFunc) *sync.WaitGroup
}

Dispatcher is responsible for managing Aggregators' lifecycle and dispatching metrics among them.

func NewDispatcher

func NewDispatcher(numWorkers int, perWorkerBufferSize int, af AggregatorFactory) Dispatcher

NewDispatcher creates a new Dispatcher with provided configuration.

type DispatcherProcessFunc

type DispatcherProcessFunc func(uint16, Aggregator)

DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.

type Flusher

type Flusher interface {
	Run(context.Context) error
	GetStats() FlusherStats
}

Flusher periodically flushes metrics from all Aggregators to Senders.

func NewFlusher

func NewFlusher(flushInterval time.Duration, dispatcher Dispatcher, receiver Receiver, handler Handler, backends []backendTypes.Backend, selfIP types.IP, hostname string) Flusher

NewFlusher creates a new Flusher with provided configuration.

type FlusherStats

type FlusherStats struct {
	LastFlush      time.Time // Last time the metrics where aggregated
	LastFlushError time.Time // Time of the last flush error
}

FlusherStats holds statistics about a Flusher.

type Handler

type Handler interface {
	// DispatchMetric dispatches metric to the next step in a pipeline.
	DispatchMetric(context.Context, *types.Metric) error
	// DispatchEvent dispatches event to the next step in a pipeline.
	DispatchEvent(context.Context, *types.Event) error
	// WaitForEvents waits for all event-dispatching goroutines to finish.
	WaitForEvents()
}

Handler interface can be used to handle metrics and events.

func NewDispatchingHandler

func NewDispatchingHandler(dispatcher Dispatcher, backends []backendTypes.Backend, tags types.Tags, maxConcurrentEvents uint) Handler

NewDispatchingHandler initialises a new dispatching handler.

type ProcessFunc

type ProcessFunc func(*types.MetricMap)

ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.

type Receiver

type Receiver interface {
	Receive(context.Context, net.PacketConn) error
	GetStats() ReceiverStats
}

Receiver receives data on its PacketConn and converts lines into Metrics. For each types.Metric it calls Handler.HandleMetric()

func NewMetricReceiver

func NewMetricReceiver(ns string, handler Handler) Receiver

NewMetricReceiver initialises a new Receiver.

type ReceiverStats

type ReceiverStats struct {
	LastPacket      time.Time
	BadLines        uint64
	PacketsReceived uint64
	MetricsReceived uint64
	EventsReceived  uint64
}

ReceiverStats holds statistics for a Receiver.

type Server

type Server struct {
	Backends            []backendTypes.Backend
	ConsoleAddr         string
	CloudProvider       cloudTypes.Interface
	Limiter             *rate.Limiter
	DefaultTags         types.Tags
	ExpiryInterval      time.Duration
	FlushInterval       time.Duration
	MaxReaders          int
	MaxWorkers          int
	MaxQueueSize        int
	MaxConcurrentEvents int
	MaxEventQueueSize   int
	MetricsAddr         string
	Namespace           string
	PercentThreshold    []float64
	WebConsoleAddr      string
	Viper               *viper.Viper
}

Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.

func NewServer

func NewServer() *Server

NewServer will create a new Server with the default configuration.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run runs the server until context signals done.

func (*Server) RunWithCustomSocket

func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error

RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.

type SocketFactory

type SocketFactory func() (net.PacketConn, error)

SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL