kafka

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2018 License: MIT Imports: 16 Imported by: 0

README

Kafka Sink

The Kafka sink allows flushing of metrics or spans to to a Kafka topic.

Status

This sink is experimental.

TODO

  • Uses the async client, but doesn't currently do anything on failure.
  • Does not currently handle writes of events or checks

Configuration

See the various kafka_* keys in example.yaml for all available configuration options. This sink supports the following features:

  • batching
  • ack requirements
  • publishing of Protobuf or JSON formatted messages

Format

Metrics are published in JSON in the form of:

{
  "name": "some.metric",
  "timestamp": 1234567, // unix time
  "value": 1.0,
  "tags": [ "tag:value", … ],
  "type": "gauge" // counter, etc
}

Spans are published in one of JSON or Protobuf. The form is defined in SSF's protobuf and codegen output. Note that it has a version field for compatibility in the future.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaMetricSink

type KafkaMetricSink struct {
	// contains filtered or unexported fields
}

func NewKafkaMetricSink

func NewKafkaMetricSink(logger *logrus.Logger, brokers string, checkTopic string, eventTopic string, metricTopic string, ackRequirement string, partitioner string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, stats *statsd.Client) (*KafkaMetricSink, error)

NewKafkaMetricSink creates a new Kafka Plugin.

func (*KafkaMetricSink) Flush

func (k *KafkaMetricSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error

Flush sends a slice of metrics to Kafka

func (*KafkaMetricSink) FlushEventsChecks

func (k *KafkaMetricSink) FlushEventsChecks(ctx context.Context, events []samplers.UDPEvent, checks []samplers.UDPServiceCheck)

FlushEventsChecks flushes Events and Checks

func (*KafkaMetricSink) Name

func (k *KafkaMetricSink) Name() string

Name returns the name of this sink.

func (*KafkaMetricSink) Start

func (k *KafkaMetricSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

type KafkaSpanSink

type KafkaSpanSink struct {
	// contains filtered or unexported fields
}

func NewKafkaSpanSink

func NewKafkaSpanSink(logger *logrus.Logger, brokers string, topic string, partitioner string, ackRequirement string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, serializationFormat string, stats *statsd.Client) (*KafkaSpanSink, error)

NewKafkaSpanSink creates a new Kafka Plugin.

func (*KafkaSpanSink) Flush

func (k *KafkaSpanSink) Flush()

Flush emits metrics, since the spans have already been ingested and are sending async.

func (*KafkaSpanSink) Ingest

func (k *KafkaSpanSink) Ingest(span *ssf.SSFSpan) error

Ingest takes the span and adds it to Kafka producer for async flushing. The flushing is driven by the settings from KafkaSpanSink's constructor. Tune the bytes, messages and interval settings to your tastes!

func (*KafkaSpanSink) Name

func (k *KafkaSpanSink) Name() string

Name returns the name of this sink.

func (*KafkaSpanSink) Start

func (k *KafkaSpanSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL