Stop will initiate a graceful stop of the Consumer (permanent) and block until it stopped.
LagMonitor determines how upToDate this node is. For each partition, we periodically collect: * the consumption lag (we keep the last N measurements) * ingest rate We then combine this data into a score, see the Metric() method.
Metric computes the overall score of up-to-date-ness of this node, as an estimated number of seconds behind kafka. We first compute the score for each partition like so: (minimum lag seen in last N measurements) / input rate. example: lag (in messages/metrics) input rate ---> score (seconds behind)
10k 1k/second 10 200 1k/second 0 (less than 1s behind) 0 * 0 (perfectly in sync) anything 0 (after startup) same as lag
The returned total score for the node is the max of the scores of individual partitions. Note that one or more StoreOffset() (rate) calls may have been made but no StoreLag(). This can happen in 3 cases: - we're not consuming yet - trouble querying the partition for latest offset - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()