kafka

package
v0.0.0-...-3667945 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2019 License: MIT Imports: 8 Imported by: 0

README

Kafka Producer Output Plugin

This plugin writes to a Kafka Broker acting a Kafka Producer.

[[outputs.kafka]]
  ## URLs of kafka brokers
  brokers = ["localhost:9092"]
  ## Kafka topic for producer messages
  topic = "telegraf"

  ## Optional topic suffix configuration.
  ## If the section is omitted, no suffix is used.
  ## Following topic suffix methods are supported:
  ##   measurement - suffix equals to separator + measurement's name
  ##   tags        - suffix equals to separator + specified tags' values
  ##                 interleaved with separator

  ## Suffix equals to "_" + measurement's name
  # [outputs.kafka.topic_suffix]
  #   method = "measurement"
  #   separator = "_"

  ## Suffix equals to "__" + measurement's "foo" tag value.
  ##   If there's no such a tag, suffix equals to an empty string
  # [outputs.kafka.topic_suffix]
  #   method = "tags"
  #   keys = ["foo"]
  #   separator = "__"

  ## Suffix equals to "_" + measurement's "foo" and "bar"
  ##   tag values, separated by "_". If there is no such tags,
  ##   their values treated as empty strings.
  # [outputs.kafka.topic_suffix]
  #   method = "tags"
  #   keys = ["foo", "bar"]
  #   separator = "_"

  ## Telegraf tag to use as a routing key
  ##  ie, if this tag exists, its value will be used as the routing key
  routing_tag = "host"

  ## CompressionCodec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : No compression
  ##  1 : Gzip compression
  ##  2 : Snappy compression
  compression_codec = 0

  ##  RequiredAcks is used in Produce Requests to tell the broker how many
  ##  replica acknowledgements it must see before responding
  ##   0 : the producer never waits for an acknowledgement from the broker.
  ##       This option provides the lowest latency but the weakest durability
  ##       guarantees (some data will be lost when a server fails).
  ##   1 : the producer gets an acknowledgement after the leader replica has
  ##       received the data. This option provides better durability as the
  ##       client waits until the server acknowledges the request as successful
  ##       (only messages that were written to the now-dead leader but not yet
  ##       replicated will be lost).
  ##   -1: the producer gets an acknowledgement after all in-sync replicas have
  ##       received the data. This option provides the best durability, we
  ##       guarantee that no messages will be lost as long as at least one in
  ##       sync replica remains.
  required_acks = -1

  ##  The total number of times to retry sending a message
  max_retry = 3

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional SASL Config
  # sasl_username = "kafka"
  # sasl_password = "secret"

  data_format = "influx"
Required parameters:
  • brokers: List of strings, this is for speaking to a cluster of kafka brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> ["{host}:{port}","{host2}:{port2}"]
  • topic: The kafka topic to publish to.
Optional parameters:
  • routing_tag: If this tag exists, its value will be used as the routing key
  • compression_codec: What level of compression to use: 0 -> no compression, 1 -> gzip compression, 2 -> snappy compression
  • required_acks: a setting for how may acks required from the kafka broker cluster.
  • max_retry: Max number of times to retry failed write
  • ssl_ca: SSL CA
  • ssl_cert: SSL CERT
  • ssl_key: SSL key
  • insecure_skip_verify: Use SSL but skip chain & host verification (default: false)
  • data_format: About Telegraf data formats
  • topic_suffix: Which, if any, method of calculating kafka topic suffix to use. For examples, please refer to sample configuration.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ValidTopicSuffixMethods = []string{
	"",
	"measurement",
	"tags",
}

Functions

func ValidateTopicSuffixMethod

func ValidateTopicSuffixMethod(method string) error

Types

type Kafka

type Kafka struct {
	// Kafka brokers to send metrics to
	Brokers []string
	// Kafka topic
	Topic string
	// Kafka topic suffix option
	TopicSuffix TopicSuffix `toml:"topic_suffix"`
	// Routing Key Tag
	RoutingTag string `toml:"routing_tag"`
	// Compression Codec Tag
	CompressionCodec int
	// RequiredAcks Tag
	RequiredAcks int
	// MaxRetry Tag
	MaxRetry int

	// Legacy SSL config options
	// TLS client certificate
	Certificate string
	// TLS client key
	Key string
	// TLS certificate authority
	CA string

	// Path to CA file
	SSLCA string `toml:"ssl_ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl_cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl_key"`

	// Skip SSL verification
	InsecureSkipVerify bool

	// SASL Username
	SASLUsername string `toml:"sasl_username"`
	// SASL Password
	SASLPassword string `toml:"sasl_password"`
	// contains filtered or unexported fields
}

func (*Kafka) Close

func (k *Kafka) Close() error

func (*Kafka) Connect

func (k *Kafka) Connect() error

func (*Kafka) Description

func (k *Kafka) Description() string

func (*Kafka) GetTopicName

func (k *Kafka) GetTopicName(metric telegraf.Metric) string

func (*Kafka) SampleConfig

func (k *Kafka) SampleConfig() string

func (*Kafka) SetSerializer

func (k *Kafka) SetSerializer(serializer serializers.Serializer)

func (*Kafka) Write

func (k *Kafka) Write(metrics []telegraf.Metric) error

type TopicSuffix

type TopicSuffix struct {
	Method    string   `toml:"method"`
	Keys      []string `toml:"keys"`
	Separator string   `toml:"separator"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL