kafka

package
v14.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2022 License: MIT Imports: 21 Imported by: 0

README

Kafka Sink

The Kafka sink allows flushing of metrics or spans to to a Kafka topic.

Configuration

See the various kafka_* keys in example.yaml for all available configuration options. This sink supports the following features:

Status

This sink is stable. Some some encoding or options may change. This sink is in active development.

TODO

  • Uses the async client, but doesn't currently do anything on failure.

  • Does not currently handle writes of events or checks

  • batching

  • ack requirements

  • publishing of Protobuf or JSON formatted messages

Span Sampling

The Kafka sink supports span sampling! By default, setting kafka_span_sample_rate_percent less than 100 will sample based off of traceId (meaning that if one span with a particular traceId is selected, all spans with that traceId will be selected), but that behavior can be configured to use a tag instead via kafka_span_sample_tag. For example,

kafka_span_sample_tag: "request_id"
kafka_span_sample_rate_percent: 75

With this configuration, spans without the "request_id" tag will be rejected, and spans with the "request_id" will be sampled at 75%, based off of a hash of their "request_id" value; in this way, you can sample all values relevant to a particular tag value.

Format

Metrics are published in JSON in the form of:

{
  "name": "some.metric",
  "timestamp": 1234567, // unix time
  "value": 1.0,
  "tags": [ "tag:value", … ],
  "type": "gauge" // counter, etc
}

Spans are published in one of JSON or Protobuf. The form is defined in SSF's protobuf and codegen output. Note that it has a version field for compatibility in the future.

Documentation

Index

Constants

View Source
const IngestTimeout = 5 * time.Second

Variables

View Source
var ErrIngestTimeout = errors.New("timed out writing to Kafka producer")

Functions

func CreateMetricSink added in v14.2.0

func CreateMetricSink(
	server *veneur.Server, name string, logger *logrus.Entry,
	config veneur.Config, sinkConfig veneur.MetricSinkConfig,
) (sinks.MetricSink, error)

CreateMetricSink creates a new Kafka sink for metrics. This function should match the signature of a value in veneur.MetricSinkTypes, and is intended to be passed into veneur.NewFromConfig to be called based on the provided configuration.

func CreateSpanSink added in v14.2.0

func CreateSpanSink(
	server *veneur.Server, name string, logger *logrus.Entry,
	config veneur.Config, sinkConfig veneur.SpanSinkConfig,
) (sinks.SpanSink, error)

CreateSpanSink creates a new Kafka sink for spans. This function should match the signature of a value in veneur.SpanSinkTypes, and is intended to be passed into veneur.NewFromConfig to be called based on the provided configuration.

func MigrateConfig added in v14.2.0

func MigrateConfig(conf *veneur.Config) error

func ParseMetricConfig added in v14.2.0

func ParseMetricConfig(
	name string, config interface{},
) (veneur.MetricSinkConfig, error)

ParseMetricConfig decodes the map config for a Kafka metric sink into a KafkaMetricSinkConfig struct.

func ParseSpanConfig added in v14.2.0

func ParseSpanConfig(
	name string, config interface{},
) (veneur.SpanSinkConfig, error)

ParseSpanConfig decodes the map config for a Kafka span sink into a KafkaSpanSinkConfig struct.

Types

type KafkaMetricSink

type KafkaMetricSink struct {
	// contains filtered or unexported fields
}

func (*KafkaMetricSink) Flush

func (k *KafkaMetricSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error

Flush sends a slice of metrics to Kafka

func (*KafkaMetricSink) FlushOtherSamples

func (k *KafkaMetricSink) FlushOtherSamples(ctx context.Context, samples []ssf.SSFSample)

FlushOtherSamples flushes non-metric, non-span samples

func (*KafkaMetricSink) Name

func (k *KafkaMetricSink) Name() string

Name returns the name of this sink.

func (*KafkaMetricSink) Start

func (k *KafkaMetricSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

type KafkaMetricSinkConfig added in v14.2.0

type KafkaMetricSinkConfig struct {
	Broker                string        `yaml:"broker"`
	CheckTopic            string        `yaml:"check_topic"`
	EventTopic            string        `yaml:"event_topic"`
	MetricBufferBytes     int           `yaml:"metric_buffer_bytes"`
	MetricBufferFrequency time.Duration `yaml:"metric_buffer_frequency"`
	MetricBufferMessages  int           `yaml:"metric_buffer_messages"`
	MetricRequireAcks     string        `yaml:"metric_require_acks"`
	MetricTopic           string        `yaml:"metric_topic"`
	Partitioner           string        `yaml:"partitioner"`
	RetryMax              int           `yaml:"retry_max"`
}

type KafkaSpanSink

type KafkaSpanSink struct {
	// contains filtered or unexported fields
}

func (*KafkaSpanSink) Flush

func (k *KafkaSpanSink) Flush()

Flush emits metrics, since the spans have already been ingested and are sending async.

func (*KafkaSpanSink) Ingest

func (k *KafkaSpanSink) Ingest(span *ssf.SSFSpan) error

Ingest takes the span and adds it to Kafka producer for async flushing. The flushing is driven by the settings from KafkaSpanSink's constructor. Tune the bytes, messages and interval settings to your tastes!

func (*KafkaSpanSink) Name

func (k *KafkaSpanSink) Name() string

Name returns the name of this sink.

func (*KafkaSpanSink) Start

func (k *KafkaSpanSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

type KafkaSpanSinkConfig added in v14.2.0

type KafkaSpanSinkConfig struct {
	Broker                  string        `yaml:"broker"`
	Partitioner             string        `yaml:"partitioner"`
	RetryMax                int           `yaml:"retry_max"`
	SpanBufferBytes         int           `yaml:"span_buffer_bytes"`
	SpanBufferFrequency     time.Duration `yaml:"span_buffer_frequency"`
	SpanBufferMesages       int           `yaml:"span_buffer_mesages"`
	SpanRequireAcks         string        `yaml:"span_require_acks"`
	SpanSampleRatePercent   float64       `yaml:"span_sample_rate_percent"`
	SpanSampleTag           string        `yaml:"span_sample_tag"`
	SpanSerializationFormat string        `yaml:"span_serialization_format"`
	SpanTopic               string        `yaml:"span_topic"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL