kage

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2017 License: MIT Imports: 3 Imported by: 0

README

Kage

Go Report Card Build Status Coverage Status GitHub release GitHub license

Synopsis

Kage (as in "Kafka AGEnt") reads Offset- and Lag metrics from Kafka and writes them to an InfluxDB.

Motivation

When you're running Kafka you probably want to have monitoring as well.
You can - of course - query the beans directly via JMX and work with that, but that requires another JVM that collects the data.
If you're a java-shop anyway and have all that available - give it a try. We decided that we wanted to get the metrics straight out of Kafka and feed them into InfluxDB in a configurable way - and here we are now.

Basic Installation

Grab a binary from the Releases or clone the repo and build it yourself.
Run the binary with the configuration options found below.

kage agent <CONFIG OPTIONS>

Advanced Installation

There's systemd configuration magic in examples/systemd/.
Put the files in the appropriate directories on your machine (in case of Debian/Ubuntu that should be /lib/systemd/system and /lib/systemd/system-generators), remember to chmod 0755 the generator, create /etc/kage/, run systemctl daemon-reload and then you should get one service per configuration-file in /etc/kage/.

Configuration

Kage can be configured with command line flags and environment variables.

Command Line Flags
Flag Options Multiple Allowed Description Environment Variable
--log stdout, file No The type of log to use. KAGE_LOG
--log.file No The path to the file to log to. KAGE_LOG_FILE
--log.level debug, info, warn, error No The log level to use. KAGE_LOG_LEVEL
--kafka.brokers Yes The kafka seed brokers connect to. Format: 'ip:port'. KAGE_KAFKA_BROKERS
--kafka.ignore-topics Yes The kafka topic patterns to ignore. This may contian wildcards. KAGE_KAFKA_IGNORE_TOPICS
--kafka.ignore-groups Yes The kafka consumer group patterns to ignore. This may contian wildcards. KAGE_KAFKA_IGNORE_GROUPS
--reporters influx, stdout Yes The reporters to use. KAGE_REPORTERS
--influx No The DSN of the InfluxDB server to report to. Format: http://user:pass@ip:port/database'. KAGE_INFLUX
--influx.metric No The measurement name to report statistics under. KAGE_INFLUX_METRIC
--influx.policy No The retention policy to report statistics under. KAGE_INFLUX_POLICY
--influx.tags Yes Additional tags to add to the statistics. Format: 'key=value' KAGE_INFLUX_TAGS
--server No Start the http server. KAGE_SERVER
--port No The address to bind to for the http server. KAGE_PORT
Multi value environment variables

When using environment variables where mutltiple values are allowed, the values should be comma seperated. E.g. --reporters=stdout --reporters=influx should become KAGE_REPORTERS=stdout,influx.

HTTP Endpoints

Kage has an optional http server that can be enabled with the --addr configuration. This allows health checking as well as fetching broker and consumer group information. The endpoints are as follows:

GET /health

Gets the current health status of Kage. Returns a 200 status code if Kage is healthy, otherwise a 500 status code

GET /brokers

Get the state of all known brokers.

GET /brokers/health

Get the current kafka health status. Returns a 200 status code if all brokers are connected, otherwise a 500 status code

GET /topics

Get a topic offset information in json format.

GET /metadata

Get a topic metadata information in json format.

GET /consumers

Get a consumer group offset information in json format.

GET /consumers/$group

Get a consumer group offset information for the specified consumer group in json format, or will return with a 404 status code.

Contributors

We're supposed to tell you how to contribute to kage here.
Since this is github: You know the drill - open issues, fork, create PRs, ...

TODO

  • provide ansible-templates and examples
  • set up debian packaging

License

MIT-License. As is. No warranties whatsoever. Mileage may vary. Batteries not included.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Application

type Application struct {
	Store     Store
	Reporters *Reporters
	Monitor   Monitor

	Logger log15.Logger
}

Application represents the kage application.

func NewApplication

func NewApplication() *Application

NewApplication creates an instance of Application.

func (*Application) Close

func (a *Application) Close()

Close gracefully shuts down the application

func (*Application) Collect added in v1.0.0

func (a *Application) Collect()

Collect collects the current state of the Kafka cluster.

func (*Application) IsHealthy

func (a *Application) IsHealthy() bool

IsHealthy checks the health of the Application.

func (*Application) Report

func (a *Application) Report()

Report reports the current state of the MemoryStore to the Reporters.

type Monitor added in v1.0.0

type Monitor interface {
	// Brokers returns a list of Kafka brokers.
	Brokers() []kafka.Broker

	// Collect collects the state of Monitor.
	Collect()

	// IsHealthy checks the health of the Monitor.
	IsHealthy() bool

	// Close gracefully stops the Monitor client.
	Close()
}

Monitor represents a Monitor monitor.

type Reporter

type Reporter interface {
	// ReportBrokerOffsets reports a snapshot of the broker offsets.
	ReportBrokerOffsets(o *store.BrokerOffsets)

	// ReportBrokerMetadata reports a snapshot of the broker metadata.
	ReportBrokerMetadata(o *store.BrokerMetadata)

	// ReportConsumerOffsets reports a snapshot of the consumer group offsets.
	ReportConsumerOffsets(o *store.ConsumerOffsets)
}

Reporter represents a offset reporter.

type Reporters

type Reporters map[string]Reporter

Reporters represents a set of reporters.

func (*Reporters) Add

func (rs *Reporters) Add(key string, r Reporter)

Add adds a Reporter to the set.

func (*Reporters) ReportBrokerMetadata added in v1.0.0

func (rs *Reporters) ReportBrokerMetadata(v *store.BrokerMetadata)

ReportBrokerMetadata reports a snapshot of the broker metadata.

func (*Reporters) ReportBrokerOffsets

func (rs *Reporters) ReportBrokerOffsets(v *store.BrokerOffsets)

ReportBrokerOffsets reports a snapshot of the broker offsets on all reporters.

func (*Reporters) ReportConsumerOffsets

func (rs *Reporters) ReportConsumerOffsets(v *store.ConsumerOffsets)

ReportConsumerOffsets reports a snapshot of the consumer group offsets on all reporters.

type Store

type Store interface {
	// SetState adds a state into the store.
	SetState(interface{}) error

	// BrokerOffsets returns a snapshot of the current broker offsets.
	BrokerOffsets() store.BrokerOffsets

	// ConsumerOffsets returns a snapshot of the current consumer group offsets.
	ConsumerOffsets() store.ConsumerOffsets

	// BrokerMetadata returns a snapshot of the current broker metadata.
	BrokerMetadata() store.BrokerMetadata

	// Channel get the offset channel.
	Channel() chan interface{}

	// Close gracefully stops the Store.
	Close()
}

Store represents an offset store.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL