veneur: github.com/stripe/veneur Index | Files | Directories

package veneur

import "github.com/stripe/veneur"

Index

Package Files

config.go config_parse.go config_proxy.go consul.go datadog_trace_span.go discoverer.go file_create.go flusher.go generate.go handlers_global.go http.go kubernetes.go networking.go proxy.go sentry.go server.go socket_linux.go worker.go

Constants

const REDACTED = "REDACTED"

REDACTED is used to replace values that we don't want to leak into loglines (e.g., credentials)

Variables

var BUILD_DATE = defaultLinkValue
var VERSION = defaultLinkValue

VERSION stores the current veneur version. It must be a var so it can be set at link time.

func CalculateTickDelay Uses

func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration

CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.

func ConsumePanic Uses

func ConsumePanic(sentry *raven.Client, cl *trace.Client, hostname string, err interface{})

ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)

func NewSocket Uses

func NewSocket(addr *net.UDPAddr, recvBuf int, reuseport bool) (net.PacketConn, error)

see also https://github.com/jbenet/go-reuseport/blob/master/impl_unix.go#L279

func SetLogger Uses

func SetLogger(logger *logrus.Logger)

SetLogger sets the default logger in veneur to the passed value.

func StartSSF Uses

func StartSSF(s *Server, a net.Addr, tracePool *sync.Pool) net.Addr

StartSSF starts listening for SSF on an address a, and returns the concrete address that the server is listening on.

func StartStatsd Uses

func StartStatsd(s *Server, a net.Addr, packetPool *sync.Pool) net.Addr

StartStatsd spawns a goroutine that listens for metrics in statsd format on the address a, and returns the concrete listening address. As this is a setup routine, if any error occurs, it panics.

type Config Uses

type Config struct {
    Aggregates                                []string  `yaml:"aggregates"`
    AwsAccessKeyID                            string    `yaml:"aws_access_key_id"`
    AwsRegion                                 string    `yaml:"aws_region"`
    AwsS3Bucket                               string    `yaml:"aws_s3_bucket"`
    AwsSecretAccessKey                        string    `yaml:"aws_secret_access_key"`
    BlockProfileRate                          int       `yaml:"block_profile_rate"`
    CountUniqueTimeseries                     bool      `yaml:"count_unique_timeseries"`
    DatadogAPIHostname                        string    `yaml:"datadog_api_hostname"`
    DatadogAPIKey                             string    `yaml:"datadog_api_key"`
    DatadogFlushMaxPerBody                    int       `yaml:"datadog_flush_max_per_body"`
    DatadogSpanBufferSize                     int       `yaml:"datadog_span_buffer_size"`
    DatadogTraceAPIAddress                    string    `yaml:"datadog_trace_api_address"`
    Debug                                     bool      `yaml:"debug"`
    DebugFlushedMetrics                       bool      `yaml:"debug_flushed_metrics"`
    DebugIngestedSpans                        bool      `yaml:"debug_ingested_spans"`
    EnableProfiling                           bool      `yaml:"enable_profiling"`
    FalconerAddress                           string    `yaml:"falconer_address"`
    FlushFile                                 string    `yaml:"flush_file"`
    FlushMaxPerBody                           int       `yaml:"flush_max_per_body"`
    FlushWatchdogMissedFlushes                int       `yaml:"flush_watchdog_missed_flushes"`
    ForwardAddress                            string    `yaml:"forward_address"`
    ForwardUseGrpc                            bool      `yaml:"forward_use_grpc"`
    GrpcAddress                               string    `yaml:"grpc_address"`
    Hostname                                  string    `yaml:"hostname"`
    HTTPAddress                               string    `yaml:"http_address"`
    HTTPQuit                                  bool      `yaml:"http_quit"`
    IndicatorSpanTimerName                    string    `yaml:"indicator_span_timer_name"`
    Interval                                  string    `yaml:"interval"`
    KafkaBroker                               string    `yaml:"kafka_broker"`
    KafkaCheckTopic                           string    `yaml:"kafka_check_topic"`
    KafkaEventTopic                           string    `yaml:"kafka_event_topic"`
    KafkaMetricBufferBytes                    int       `yaml:"kafka_metric_buffer_bytes"`
    KafkaMetricBufferFrequency                string    `yaml:"kafka_metric_buffer_frequency"`
    KafkaMetricBufferMessages                 int       `yaml:"kafka_metric_buffer_messages"`
    KafkaMetricRequireAcks                    string    `yaml:"kafka_metric_require_acks"`
    KafkaMetricTopic                          string    `yaml:"kafka_metric_topic"`
    KafkaPartitioner                          string    `yaml:"kafka_partitioner"`
    KafkaRetryMax                             int       `yaml:"kafka_retry_max"`
    KafkaSpanBufferBytes                      int       `yaml:"kafka_span_buffer_bytes"`
    KafkaSpanBufferFrequency                  string    `yaml:"kafka_span_buffer_frequency"`
    KafkaSpanBufferMesages                    int       `yaml:"kafka_span_buffer_mesages"`
    KafkaSpanRequireAcks                      string    `yaml:"kafka_span_require_acks"`
    KafkaSpanSampleRatePercent                float64   `yaml:"kafka_span_sample_rate_percent"`
    KafkaSpanSampleTag                        string    `yaml:"kafka_span_sample_tag"`
    KafkaSpanSerializationFormat              string    `yaml:"kafka_span_serialization_format"`
    KafkaSpanTopic                            string    `yaml:"kafka_span_topic"`
    LightstepAccessToken                      string    `yaml:"lightstep_access_token"`
    LightstepCollectorHost                    string    `yaml:"lightstep_collector_host"`
    LightstepMaximumSpans                     int       `yaml:"lightstep_maximum_spans"`
    LightstepNumClients                       int       `yaml:"lightstep_num_clients"`
    LightstepReconnectPeriod                  string    `yaml:"lightstep_reconnect_period"`
    MetricMaxLength                           int       `yaml:"metric_max_length"`
    MutexProfileFraction                      int       `yaml:"mutex_profile_fraction"`
    NumReaders                                int       `yaml:"num_readers"`
    NumSpanWorkers                            int       `yaml:"num_span_workers"`
    NumWorkers                                int       `yaml:"num_workers"`
    ObjectiveSpanTimerName                    string    `yaml:"objective_span_timer_name"`
    OmitEmptyHostname                         bool      `yaml:"omit_empty_hostname"`
    Percentiles                               []float64 `yaml:"percentiles"`
    ReadBufferSizeBytes                       int       `yaml:"read_buffer_size_bytes"`
    SentryDsn                                 string    `yaml:"sentry_dsn"`
    SignalfxAPIKey                            string    `yaml:"signalfx_api_key"`
    SignalfxDynamicPerTagAPIKeysEnable        bool      `yaml:"signalfx_dynamic_per_tag_api_keys_enable"`
    SignalfxDynamicPerTagAPIKeysRefreshPeriod string    `yaml:"signalfx_dynamic_per_tag_api_keys_refresh_period"`
    SignalfxEndpointAPI                       string    `yaml:"signalfx_endpoint_api"`
    SignalfxEndpointBase                      string    `yaml:"signalfx_endpoint_base"`
    SignalfxFlushMaxPerBody                   int       `yaml:"signalfx_flush_max_per_body"`
    SignalfxHostnameTag                       string    `yaml:"signalfx_hostname_tag"`
    SignalfxMetricNamePrefixDrops             []string  `yaml:"signalfx_metric_name_prefix_drops"`
    SignalfxMetricTagPrefixDrops              []string  `yaml:"signalfx_metric_tag_prefix_drops"`
    SignalfxPerTagAPIKeys                     []struct {
        APIKey string `yaml:"api_key"`
        Name   string `yaml:"name"`
    }   `yaml:"signalfx_per_tag_api_keys"`
    SignalfxVaryKeyBy                 string   `yaml:"signalfx_vary_key_by"`
    SpanChannelCapacity               int      `yaml:"span_channel_capacity"`
    SplunkHecAddress                  string   `yaml:"splunk_hec_address"`
    SplunkHecBatchSize                int      `yaml:"splunk_hec_batch_size"`
    SplunkHecConnectionLifetimeJitter string   `yaml:"splunk_hec_connection_lifetime_jitter"`
    SplunkHecIngestTimeout            string   `yaml:"splunk_hec_ingest_timeout"`
    SplunkHecMaxConnectionLifetime    string   `yaml:"splunk_hec_max_connection_lifetime"`
    SplunkHecSendTimeout              string   `yaml:"splunk_hec_send_timeout"`
    SplunkHecSubmissionWorkers        int      `yaml:"splunk_hec_submission_workers"`
    SplunkHecTLSValidateHostname      string   `yaml:"splunk_hec_tls_validate_hostname"`
    SplunkHecToken                    string   `yaml:"splunk_hec_token"`
    SplunkSpanSampleRate              int      `yaml:"splunk_span_sample_rate"`
    SsfBufferSize                     int      `yaml:"ssf_buffer_size"`
    SsfListenAddresses                []string `yaml:"ssf_listen_addresses"`
    StatsAddress                      string   `yaml:"stats_address"`
    StatsdListenAddresses             []string `yaml:"statsd_listen_addresses"`
    SynchronizeWithInterval           bool     `yaml:"synchronize_with_interval"`
    Tags                              []string `yaml:"tags"`
    TagsExclude                       []string `yaml:"tags_exclude"`
    TLSAuthorityCertificate           string   `yaml:"tls_authority_certificate"`
    TLSCertificate                    string   `yaml:"tls_certificate"`
    TLSKey                            string   `yaml:"tls_key"`
    TraceLightstepAccessToken         string   `yaml:"trace_lightstep_access_token"`
    TraceLightstepCollectorHost       string   `yaml:"trace_lightstep_collector_host"`
    TraceLightstepMaximumSpans        int      `yaml:"trace_lightstep_maximum_spans"`
    TraceLightstepNumClients          int      `yaml:"trace_lightstep_num_clients"`
    TraceLightstepReconnectPeriod     string   `yaml:"trace_lightstep_reconnect_period"`
    TraceMaxLengthBytes               int      `yaml:"trace_max_length_bytes"`
    VeneurMetricsAdditionalTags       []string `yaml:"veneur_metrics_additional_tags"`
    VeneurMetricsScopes               struct {
        Counter   string `yaml:"counter"`
        Gauge     string `yaml:"gauge"`
        Histogram string `yaml:"histogram"`
        Set       string `yaml:"set"`
        Status    string `yaml:"status"`
    }   `yaml:"veneur_metrics_scopes"`
    XrayAddress          string   `yaml:"xray_address"`
    XrayAnnotationTags   []string `yaml:"xray_annotation_tags"`
    XraySamplePercentage int      `yaml:"xray_sample_percentage"`
}

func ReadConfig Uses

func ReadConfig(path string) (c Config, err error)

ReadConfig unmarshals the config file and slurps in its data. ReadConfig can return an error of type *UnknownConfigKeys, which means that the file is usable, but contains unknown fields.

func (Config) ParseInterval Uses

func (c Config) ParseInterval() (time.Duration, error)

ParseInterval handles parsing the flush interval as a time.Duration

type Consul Uses

type Consul struct {
    ConsulHealth *api.Health
}

Consul is a Discoverer that uses Consul to find healthy instances of a given name.

func NewConsul Uses

func NewConsul(config *api.Config) (*Consul, error)

NewConsul creates a new instance of a Consul Discoverer

func (*Consul) GetDestinationsForService Uses

func (c *Consul) GetDestinationsForService(serviceName string) ([]string, error)

GetDestinationsForService updates the list of destinations based on healthy nodes found via Consul. It returns destinations in the form "<host>:<port>".

type DatadogTraceSpan Uses

type DatadogTraceSpan struct {
    Duration int64              `json:"duration"`
    Error    int64              `json:"error"`
    Meta     map[string]string  `json:"meta"`
    Metrics  map[string]float64 `json:"metrics"`
    Name     string             `json:"name"`
    ParentID int64              `json:"parent_id,omitempty"`
    Resource string             `json:"resource,omitempty"`
    Service  string             `json:"service"`
    SpanID   int64              `json:"span_id"`
    Start    int64              `json:"start"`
    TraceID  int64              `json:"trace_id"`
    Type     string             `json:"type"`
}

DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.

type Discoverer Uses

type Discoverer interface {
    GetDestinationsForService(string) ([]string, error)
}

Discoverer is an interface for various service discovery mechanisms. You could implement your own by implementing this method! See consul.go

type EventWorker Uses

type EventWorker struct {
    // contains filtered or unexported fields
}

EventWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewEventWorker Uses

func NewEventWorker(cl *trace.Client, stats scopedstatsd.Client) *EventWorker

NewEventWorker creates an EventWorker ready to collect events and service checks.

func (*EventWorker) Flush Uses

func (ew *EventWorker) Flush() []ssf.SSFSample

Flush returns the EventWorker's stored events and service checks and resets the stored contents.

func (*EventWorker) Work Uses

func (ew *EventWorker) Work()

Work will start the EventWorker listening for events and service checks. This function will never return.

type KubernetesDiscoverer Uses

type KubernetesDiscoverer struct {
    // contains filtered or unexported fields
}

func NewKubernetesDiscoverer Uses

func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error)

func (*KubernetesDiscoverer) GetDestinationsForService Uses

func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error)

type Proxy Uses

type Proxy struct {
    Sentry                     *raven.Client
    Hostname                   string
    ForwardDestinations        *consistent.Consistent
    TraceDestinations          *consistent.Consistent
    ForwardGRPCDestinations    *consistent.Consistent
    Discoverer                 Discoverer
    ConsulForwardService       string
    ConsulTraceService         string
    ConsulForwardGRPCService   string
    ConsulInterval             time.Duration
    MetricsInterval            time.Duration
    ForwardDestinationsMtx     sync.Mutex
    TraceDestinationsMtx       sync.Mutex
    ForwardGRPCDestinationsMtx sync.Mutex
    HTTPAddr                   string
    HTTPClient                 *http.Client
    AcceptingForwards          bool
    AcceptingTraces            bool
    AcceptingGRPCForwards      bool
    ForwardTimeout             time.Duration

    TraceClient *trace.Client
    // contains filtered or unexported fields
}

func NewProxyFromConfig Uses

func NewProxyFromConfig(logger *logrus.Logger, conf ProxyConfig) (p Proxy, err error)

func (*Proxy) HTTPServe Uses

func (p *Proxy) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Proxy) Handler Uses

func (p *Proxy) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Proxy) ProxyMetrics Uses

func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric, origin string)

ProxyMetrics takes a slice of JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.

func (*Proxy) ProxyTraces Uses

func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)

func (*Proxy) RefreshDestinations Uses

func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)

RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.

func (*Proxy) ReportRuntimeMetrics Uses

func (p *Proxy) ReportRuntimeMetrics()

func (*Proxy) Serve Uses

func (p *Proxy) Serve()

Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.

func (*Proxy) Shutdown Uses

func (p *Proxy) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Proxy) Start Uses

func (p *Proxy) Start()

Start fires up the various goroutines that run on behalf of the server. This is separated from the constructor for testing convenience.

type ProxyConfig Uses

type ProxyConfig struct {
    ConsulForwardGrpcServiceName string `yaml:"consul_forward_grpc_service_name"`
    ConsulForwardServiceName     string `yaml:"consul_forward_service_name"`
    ConsulRefreshInterval        string `yaml:"consul_refresh_interval"`
    ConsulTraceServiceName       string `yaml:"consul_trace_service_name"`
    Debug                        bool   `yaml:"debug"`
    EnableProfiling              bool   `yaml:"enable_profiling"`
    ForwardAddress               string `yaml:"forward_address"`
    ForwardTimeout               string `yaml:"forward_timeout"`
    GrpcAddress                  string `yaml:"grpc_address"`
    GrpcForwardAddress           string `yaml:"grpc_forward_address"`
    HTTPAddress                  string `yaml:"http_address"`
    IdleConnectionTimeout        string `yaml:"idle_connection_timeout"`
    MaxIdleConns                 int    `yaml:"max_idle_conns"`
    MaxIdleConnsPerHost          int    `yaml:"max_idle_conns_per_host"`
    RuntimeMetricsInterval       string `yaml:"runtime_metrics_interval"`
    SentryDsn                    string `yaml:"sentry_dsn"`
    SsfDestinationAddress        string `yaml:"ssf_destination_address"`
    StatsAddress                 string `yaml:"stats_address"`
    TraceAddress                 string `yaml:"trace_address"`
    TraceAPIAddress              string `yaml:"trace_api_address"`
    TracingClientCapacity        int    `yaml:"tracing_client_capacity"`
    TracingClientFlushInterval   string `yaml:"tracing_client_flush_interval"`
    TracingClientMetricsInterval string `yaml:"tracing_client_metrics_interval"`
}

func ReadProxyConfig Uses

func ReadProxyConfig(path string) (c ProxyConfig, err error)

ReadProxyConfig unmarshals the proxy config file and slurps in its data.

type Server Uses

type Server struct {
    Workers               []*Worker
    EventWorker           *EventWorker
    SpanChan              chan *ssf.SSFSpan
    SpanWorker            *SpanWorker
    SpanWorkerGoroutines  int
    CountUniqueTimeseries bool

    Statsd *scopedstatsd.ScopedClient
    Sentry *raven.Client

    Hostname  string
    Tags      []string
    TagsAsMap map[string]string

    HTTPClient *http.Client

    HTTPAddr string

    ForwardAddr string

    StatsdListenAddrs []net.Addr
    SSFListenAddrs    []net.Addr
    RcvbufBytes       int

    HistogramPercentiles []float64

    HistogramAggregates samplers.HistogramAggregates

    TraceClient *trace.Client
    // contains filtered or unexported fields
}

A Server is the actual veneur instance that will be run.

func NewFromConfig Uses

func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error)

NewFromConfig creates a new veneur server from a configuration specification and sets up the passed logger according to the configuration.

func (*Server) Flush Uses

func (s *Server) Flush(ctx context.Context)

Flush collects sampler's metrics and passes them to sinks.

func (*Server) FlushWatchdog Uses

func (s *Server) FlushWatchdog()

FlushWatchdog periodically checks that at most `flush_watchdog_missed_flushes` were skipped in a Server. If more than that number was skipped, it panics (assuming that flushing is stuck) with a full level of detail on that panic's backtraces.

It never terminates, so is ideally run from a goroutine in a program's main function.

func (*Server) HTTPServe Uses

func (s *Server) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Server) HandleMetricPacket Uses

func (s *Server) HandleMetricPacket(packet []byte) error

HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).

func (*Server) HandleTracePacket Uses

func (s *Server) HandleTracePacket(packet []byte)

HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.

func (*Server) Handler Uses

func (s *Server) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Server) ImportMetrics Uses

func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)

ImportMetrics feeds a slice of json metrics to the server's workers

func (*Server) IsLocal Uses

func (s *Server) IsLocal() bool

IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).

func (*Server) ReadMetricSocket Uses

func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadMetricSocket listens for available packets to handle.

func (*Server) ReadSSFPacketSocket Uses

func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadSSFPacketSocket reads SSF packets off a packet connection.

func (*Server) ReadSSFStreamSocket Uses

func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)

ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package github.com/stripe/veneur/protocol for details.

func (*Server) ReadStatsdDatagramSocket Uses

func (s *Server) ReadStatsdDatagramSocket(serverConn *net.UnixConn, packetPool *sync.Pool)

ReadStatsdDatagramSocket reads statsd metrics packets from connection off a unix datagram socket.

func (*Server) ReadTCPSocket Uses

func (s *Server) ReadTCPSocket(listener net.Listener)

ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.

func (*Server) Serve Uses

func (s *Server) Serve()

Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.

func (*Server) Shutdown Uses

func (s *Server) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Server) Start Uses

func (s *Server) Start()

Start spins up the Server to do actual work, firing off goroutines for various workers and utilities.

type SpanWorker Uses

type SpanWorker struct {
    SpanChan <-chan *ssf.SSFSpan
    // contains filtered or unexported fields
}

SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewSpanWorker Uses

func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd scopedstatsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string) *SpanWorker

NewSpanWorker creates a SpanWorker ready to collect events and service checks.

func (*SpanWorker) Flush Uses

func (tw *SpanWorker) Flush()

Flush invokes flush on each sink.

func (*SpanWorker) Work Uses

func (tw *SpanWorker) Work()

Work will start the SpanWorker listening for spans. This function will never return.

type UnknownConfigKeys Uses

type UnknownConfigKeys struct {
    // contains filtered or unexported fields
}

UnknownConfigKeys represents a failure to strictly parse a configuration YAML file has failed, indicating that the file contains unknown keys.

func (*UnknownConfigKeys) Error Uses

func (e *UnknownConfigKeys) Error() string

type Worker Uses

type Worker struct {
    PacketChan       chan samplers.UDPMetric
    ImportChan       chan []samplers.JSONMetric
    ImportMetricChan chan []*metricpb.Metric
    QuitChan         chan struct{}
    // contains filtered or unexported fields
}

Worker is the doodad that does work.

func NewWorker Uses

func NewWorker(id int, isLocal bool, countUniqueTimeseries bool, cl *trace.Client, logger *logrus.Logger, stats scopedstatsd.Client) *Worker

NewWorker creates, and returns a new Worker object.

func (*Worker) Flush Uses

func (w *Worker) Flush() WorkerMetrics

Flush resets the worker's internal metrics and returns their contents.

func (*Worker) ImportMetric Uses

func (w *Worker) ImportMetric(other samplers.JSONMetric)

ImportMetric receives a metric from another veneur instance

func (*Worker) ImportMetricGRPC Uses

func (w *Worker) ImportMetricGRPC(other *metricpb.Metric) (err error)

ImportMetricGRPC receives a metric from another veneur instance over gRPC.

In practice, this is only called when in the aggregation tier, so we don't handle LocalOnly scope.

func (*Worker) IngestMetrics Uses

func (w *Worker) IngestMetrics(ms []*metricpb.Metric)

func (*Worker) IngestUDP Uses

func (w *Worker) IngestUDP(metric samplers.UDPMetric)

IngestUDP on a Worker feeds the metric into the worker's PacketChan.

func (*Worker) MetricsProcessedCount Uses

func (w *Worker) MetricsProcessedCount() int64

MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.

func (*Worker) ProcessMetric Uses

func (w *Worker) ProcessMetric(m *samplers.UDPMetric)

ProcessMetric takes a Metric and samples it

func (*Worker) SampleTimeseries Uses

func (w *Worker) SampleTimeseries(m *samplers.UDPMetric)

SampleTimeseries takes a metric and counts whether the timeseries has already been seen by the worker in this flush interval.

func (*Worker) Stop Uses

func (w *Worker) Stop()

Stop tells the worker to stop listening for work requests.

Note that the worker will only stop *after* it has finished its work.

func (*Worker) Work Uses

func (w *Worker) Work()

Work will start the worker listening for metrics to process or import. It will not return until the worker is sent a message to terminate using Stop()

type WorkerMetrics Uses

type WorkerMetrics struct {
    // contains filtered or unexported fields
}

WorkerMetrics is just a plain struct bundling together the flushed contents of a worker

func NewWorkerMetrics Uses

func NewWorkerMetrics() WorkerMetrics

NewWorkerMetrics initializes a WorkerMetrics struct

func (WorkerMetrics) ForwardableMetrics Uses

func (wm WorkerMetrics) ForwardableMetrics(cl *trace.Client) []*metricpb.Metric

ForwardableMetrics converts all metrics that should be forwarded to metricpb.Metric (protobuf-compatible).

func (WorkerMetrics) Upsert Uses

func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool

Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.

Directories

PathSynopsis
cmd/veneur
cmd/veneur-emit
cmd/veneur-prometheus
cmd/veneur-proxy
forwardrpc
http
importsrvPackage importsrv receives metrics over gRPC and sends them to workers.
internal/forwardtest
plugins
plugins/localfile
plugins/s3
plugins/s3/mock
protocolPackage protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
protocol/dogstatsd
proxysrvPackage proxysrv proxies metrics over gRPC to global veneurs using consistent hashing
samplers
samplers/metricpb
samplers/metricpb/testutils
scopedstatsd
sinks
sinks/blackhole
sinks/datadog
sinks/debug
sinks/falconer
sinks/grpsink
sinks/kafka
sinks/lightstep
sinks/signalfx
sinks/splunk
sinks/ssfmetricsPackage ssfmetrics provides sinks that are used by veneur internally.
sinks/xray
ssfPackage ssf provides an implementation of the Sensor Sensibility Format.
tdigestPackage tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
tdigest/analysis
testhelpers
tracePackage trace provies an API for initiating and reporting SSF traces and attaching spans to them.
trace/metricsPackage metrics provides routines for conveniently reporting metrics attached to SSF spans.
trace/testbackendPackage testbackend contains helpers to make it easier to test the tracing behavior of code using veneur's trace API.

Package veneur imports 80 packages (graph) and is imported by 2 packages. Updated 2019-11-22. Refresh now. Tools for package owners.