koff

package module
v0.0.0-...-bc1f5cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2018 License: MIT Imports: 7 Imported by: 0

README

Koff

Kafka consumer (group) offset tracker.

WIP

Work in progress, there's plenty of stuff left to figure out. You have been warned.

Usage

So far it is only cable of printing parsed messages. For usage eample take a look at the main command.

go run cmd/printer/main.go -brokers 127.0.0.1:9092
Design

Starting with Kafka version 0.9 consumer offsets are stored and managed by the Kafka server. Internally offsets are stored in the __consumer_offsets topic. It is not designed to be used by third party software but nothing stops us from doing that really.

This topic is not only used to store individual consumer offsets, it also contains consumer group metadata: list of group members and their subscriptions and assignments, leader details and plenty of other things. Given that the topic provides realtime updates on consumer offsets and consumer group structure and state, it makes it a very convenient foundation for consumer or group tracking and monitoring.

Licence

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads messages from __consumer_offsets topic and decodes them into OffsetMessages.

func NewConsumer

func NewConsumer(brokers []string, watch bool) (*Consumer, error)

NewConsumer creates a new Kafka offsets topic consumer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close shuts down the consumer.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan Message

Messages returns a read only channel of offset messages.

type GroupMember

type GroupMember struct {
	ID               string
	ClientID         string
	ClientHost       string
	SessionTimeout   time.Duration
	RebalanceTimeout time.Duration
	Subscription     []TopicAndPartition
	Assignment       []TopicAndPartition
}

GroupMember contains metadata for a consumer group member.

type GroupMessage

type GroupMessage struct {
	ProtocolType string
	GenerationID int32
	LeaderID     string
	Protocol     string
	Members      []GroupMember
}

GroupMessage contains consumer group metadata.

func (GroupMessage) Complete

func (gm GroupMessage) Complete() bool

Complete returns true if message is complete.

type Message

type Message struct {
	Consumer      string
	OffsetMessage *OffsetMessage
	GroupMessage  *GroupMessage
}

Message is the main structure that wraps a consumer offsets topic message.

func Decode

func Decode(ctx context.Context, key, val []byte) Message

Decode decodes message key and value into an OffsetMessage.

type OffsetMessage

type OffsetMessage struct {
	Topic       string
	Partition   int32
	Offset      int64
	Metadata    string
	CommittedAt time.Time
	ExpiresAt   time.Time
}

OffsetMessage is a kind of message that carries individual consumer offset.

type TopicAndPartition

type TopicAndPartition struct {
	Topic     string
	Partition int32
}

TopicAndPartition is a tuple of topic and partition.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL