metrictank: github.com/grafana/metrictank/input/kafkamdm Index | Files

package kafkamdm

import "github.com/grafana/metrictank/input/kafkamdm"

Index

Package Files

kafkamdm.go lag_monitor.go

Variables

var Enabled bool

func ConfigProcess Uses

func ConfigProcess(instance string)

func ConfigSetup Uses

func ConfigSetup()

type Explanation Uses

type Explanation struct {
    Status   map[int32]Status
    Priority int
    Updated  time.Time
}

type KafkaMdm Uses

type KafkaMdm struct {
    input.Handler
    // contains filtered or unexported fields
}

func New Uses

func New() *KafkaMdm

func (*KafkaMdm) ExplainPriority Uses

func (k *KafkaMdm) ExplainPriority() interface{}

func (*KafkaMdm) MaintainPriority Uses

func (k *KafkaMdm) MaintainPriority()

func (*KafkaMdm) Name Uses

func (k *KafkaMdm) Name() string

func (*KafkaMdm) Start Uses

func (k *KafkaMdm) Start(handler input.Handler, cancel context.CancelFunc) error

func (*KafkaMdm) Stop Uses

func (k *KafkaMdm) Stop()

Stop will initiate a graceful stop of the Consumer (permanent) and block until it stopped.

type LagMonitor Uses

type LagMonitor struct {
    sync.Mutex
    // contains filtered or unexported fields
}

LagMonitor determines how upToDate this node is. For each partition, we periodically collect: * the consumption lag (we keep the last N measurements) * ingest rate We then combine this data into a score, see the Metric() method.

func NewLagMonitor Uses

func NewLagMonitor(size int, partitions []int32) *LagMonitor

func (*LagMonitor) Explain Uses

func (l *LagMonitor) Explain() interface{}

func (*LagMonitor) GetPartitionPriority Uses

func (l *LagMonitor) GetPartitionPriority(partition int32) int

func (*LagMonitor) Metric Uses

func (l *LagMonitor) Metric() int

Metric computes the overall score of up-to-date-ness of this node, as an estimated number of seconds behind kafka. We first compute the score for each partition like so: (minimum lag seen in last N measurements) / input rate. example: lag (in messages/metrics) input rate ---> score (seconds behind)

    10k       1k/second                 10
    200       1k/second                  0 (less than 1s behind)
      0               *                  0 (perfectly in sync)
anything     0 (after startup)          same as lag

The returned total score for the node is the max of the scores of individual partitions. Note that one or more StoreOffset() (rate) calls may have been made but no StoreLag(). This can happen in 3 cases: - we're not consuming yet - trouble querying the partition for latest offset - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()

func (*LagMonitor) StoreOffsets Uses

func (l *LagMonitor) StoreOffsets(partition int32, readOffset, highWaterMark int64, ts time.Time)

type Status Uses

type Status struct {
    Lag      int
    Rate     int
    Priority int
}

Package kafkamdm imports 18 packages (graph). Updated 2020-05-07. Refresh now. Tools for package owners.