kag

package module
v0.0.0-...-bc43a78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2018 License: Apache-2.0 Imports: 13 Imported by: 0

README

GoDoc

kag

kafka lag meter written in pure go with no external dependencies (except Kafka)

Motivations

We use Kafka extensively, but unlike many organizations, our Kafka clustered are managed by a hosting provider. Consequently, when we looked for kafka monitoring tools, we found a derth of options:

  • Burrow looks like a fantastic tool, but appears to require a connection to zookeeper which our hosting provider doesn't expose
  • Datadog provides an agent to extract stats from Kafka via JMX. Again, with a hosted Kafka, we don't have JMX access. Also, it appears that the agent should be colocated with the Kafka instances.
Installation
go get github.com/savaki/kag/cmd/kag
Usage
NAME:
   kag - A new cli application

USAGE:
   main [global options] command [command options] [arguments...]

VERSION:
   0.0.0

COMMANDS:
     help, h  Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --brokers value            comma separated list of brokers e.g. localhost:9092 (default: "localhost:9092") [$KAG_BROKERS]
   --interval value           interval between polling (default: 1m0s) [$KAG_INTERVAL]
   --observer value           observer for stdout; stdout, datadog (default: "stdout") [$KAG_OBSERVER]
   --datadog-addr value       statsd host and port; require --observer datadog (default: "127.0.0.1:8125") [$KAG_DATADOG_ADDR]
   --datadog-namespace value  optional datadog namespace [$KAG_DATADOG_NAMESPACE]
   --datadog-tags value       comma separated list of datadog tags [$KAG_DATADOG_TAGS]
   --tls-cert value           tls certificate [$KAG_TLS_CERT]
   --tls-key value            tls private key [$KAG_TLS_KEY]
   --tls-ca value             tls ca certificate [$KAG_TLS_CA]
   --debug                    display additional debugging info [$KAG_DEBUG]
   --ecs                      use the address of the ecs host [$KAG_ECS]
   --help, -h                 show help
   --version, -v              print the version
Datadog

kag has a built in datadog metrics publisher

kag --observer datadog 
Configuration

kag can be configured entirely from environment variables

Name Default Value Description
KAG_BROKERS localhost:9092 comma separated list of kafka brokers
KAG_INTERVAL 1m polling interval. examples 5m, 90s, 1h
KAG_OBSERVER stdout indicates where metrics should be published; stdout, datadog
KAG_DATADOG_ADDR 127.0.0.1:8125 statsd host and port when using datadog observer
KAG_DATADOG_NAMESPACE optional datadog namespace
KAG_DATADOG_TAGS comma separated list of datadog tags
KAG_DEBUG true to include additional debug data
KAG_ECS true to use the AWS ECS host as the base address for the observer e.g. for datadog {host}:8125
KAG_TLS_CERT optional tls cert pem
KAG_TLS_KEY optional tls private key pem for cert
KAG_TLS_CA optional tls ca certification

Documentation

Index

Constants

View Source
const (
	DefaultInterval = time.Minute
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Brokers contains the initial list of kafka brokerArray to connect to
	Brokers []string

	// Unique identifier for client connections established by this Config.
	ClientID string

	// Observer publishes lag
	Observer Observer

	// Interval specifies the rate the kafka brokers should be polled
	Interval time.Duration

	// Timeout is the maximum amount of time a dial will wait for a connect to
	// complete. If Deadline is also set, it may fail earlier.
	//
	// The default is no timeout.
	//
	// When dialing a name with multiple IP addresses, the timeout may be
	// divided between them.
	//
	// With or without a timeout, the operating system may impose its own
	// earlier timeout. For instance, TCP timeouts are often around 3 minutes.
	Timeout time.Duration

	// Deadline is the absolute point in time after which dials will fail.
	// If Timeout is set, it may fail earlier.
	// Zero means no deadline, or dependent on the operating system as with the
	// Timeout option.
	Deadline time.Time

	// LocalAddr is the local address to use when dialing an address.
	// The address must be of a compatible type for the network being dialed.
	// If nil, a local address is automatically chosen.
	LocalAddr net.Addr

	// DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the
	// network is "tcp" and the destination is a host name with both IPv4 and
	// IPv6 addresses. This allows a client to tolerate networks where one
	// address family is silently broken.
	DualStack bool

	// FallbackDelay specifies the length of time to wait before spawning a
	// fallback connection, when DualStack is enabled.
	// If zero, a default delay of 300ms is used.
	FallbackDelay time.Duration

	// KeepAlive specifies the keep-alive period for an active network
	// connection.
	// If zero, keep-alives are not enabled. Network protocols that do not
	// support keep-alives ignore this field.
	KeepAlive time.Duration

	// Resolver optionally specifies an alternate resolver to use.
	Resolver Resolver

	// TLS enables Config to open secure connections.  If nil, standard net.Conn
	// will be used.
	TLS *tls.Config

	// Debug writer for optional debug messages
	Debug io.Writer
}

Config type mirrors the net.Config API but is designed to open kafka connections instead of raw network connections.

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

func New

func New(config Config) *Monitor

func NewContext

func NewContext(ctx context.Context, config Config) *Monitor

func (*Monitor) Close

func (m *Monitor) Close() error

type Observer

type Observer interface {
	Observe(groupID, topic string, partition int32, lag int64)
}
var (
	Stdout Observer = ObserverFunc(func(groupID, topic string, partition int32, lag int64) {
		fmt.Printf("%v/%v/%v => %v\n", groupID, topic, partition, lag)
	})
	Nop Observer = ObserverFunc(func(groupID, topic string, partition int32, lag int64) {})
)

type ObserverFunc

type ObserverFunc func(groupID, topic string, partition int32, lag int64)

func (ObserverFunc) Observe

func (fn ObserverFunc) Observe(groupID, topic string, partition int32, lag int64)

type Resolver

type Resolver interface {
	// LookupHost looks up the given host using the local resolver.
	// It returns a slice of that host's addresses.
	LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.

type ScanOut

type ScanOut struct {
	Offsets map[string]map[int32]int64
}

Directories

Path Synopsis
cmd
kag

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL