veneur: github.com/stripe/veneur/sinks/kafka Index | Files

package kafka

import "github.com/stripe/veneur/sinks/kafka"

Index

Package Files

kafka.go

Constants

const IngestTimeout = 5 * time.Second

Variables

var IngestTimeoutError = errors.New("Timed out writing to Kafka producer")

type KafkaMetricSink Uses

type KafkaMetricSink struct {
    // contains filtered or unexported fields
}

func NewKafkaMetricSink Uses

func NewKafkaMetricSink(logger *logrus.Logger, cl *trace.Client, brokers string, checkTopic string, eventTopic string, metricTopic string, ackRequirement string, partitioner string, retries int, bufferBytes int, bufferMessages int, bufferDuration string) (*KafkaMetricSink, error)

NewKafkaMetricSink creates a new Kafka Plugin.

func (*KafkaMetricSink) Flush Uses

func (k *KafkaMetricSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error

Flush sends a slice of metrics to Kafka

func (*KafkaMetricSink) FlushOtherSamples Uses

func (k *KafkaMetricSink) FlushOtherSamples(ctx context.Context, samples []ssf.SSFSample)

FlushOtherSamples flushes non-metric, non-span samples

func (*KafkaMetricSink) Name Uses

func (k *KafkaMetricSink) Name() string

Name returns the name of this sink.

func (*KafkaMetricSink) Start Uses

func (k *KafkaMetricSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

type KafkaSpanSink Uses

type KafkaSpanSink struct {
    // contains filtered or unexported fields
}

func NewKafkaSpanSink Uses

func NewKafkaSpanSink(logger *logrus.Logger, cl *trace.Client, brokers string, topic string, partitioner string, ackRequirement string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, serializationFormat string, sampleTag string, sampleRatePercentage float64) (*KafkaSpanSink, error)

NewKafkaSpanSink creates a new Kafka Plugin.

func (*KafkaSpanSink) Flush Uses

func (k *KafkaSpanSink) Flush()

Flush emits metrics, since the spans have already been ingested and are sending async.

func (*KafkaSpanSink) Ingest Uses

func (k *KafkaSpanSink) Ingest(span *ssf.SSFSpan) error

Ingest takes the span and adds it to Kafka producer for async flushing. The flushing is driven by the settings from KafkaSpanSink's constructor. Tune the bytes, messages and interval settings to your tastes!

func (*KafkaSpanSink) Name Uses

func (k *KafkaSpanSink) Name() string

Name returns the name of this sink.

func (*KafkaSpanSink) Start Uses

func (k *KafkaSpanSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

Package kafka imports 20 packages (graph) and is imported by 3 packages. Updated 2019-12-25. Refresh now. Tools for package owners.