kafka: github.com/wvanbergen/kafka/consumergroup Index | Examples | Files

package consumergroup

import "github.com/wvanbergen/kafka/consumergroup"



Package Files

consumer_group.go offset_manager.go utils.go


var (
    AlreadyClosing = errors.New("The consumer group is already shutting down.")
var (
    UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")

type Config Uses

type Config struct {

    Zookeeper *kazoo.Config

    Offsets struct {
        Initial           int64         // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
        ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
        CommitInterval    time.Duration // The interval between which the processed offsets are commited.
        ResetOffsets      bool          // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.

func NewConfig Uses

func NewConfig() *Config

func (*Config) Validate Uses

func (cgc *Config) Validate() error

type ConsumerGroup Uses

type ConsumerGroup struct {
    // contains filtered or unexported fields

The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.


consumer, consumerErr := JoinConsumerGroup(
    []string{TopicWithSinglePartition, TopicWithMultiplePartitions},

if consumerErr != nil {

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {

eventCount := 0

for event := range consumer.Messages() {
    // Process event
    eventCount += 1

    // Ack event

log.Printf("Processed %d events.", eventCount)

func JoinConsumerGroup Uses

func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)

Connects to a consumer group, using Zookeeper for auto-discovery

func (*ConsumerGroup) Close Uses

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) Closed Uses

func (cg *ConsumerGroup) Closed() bool

func (*ConsumerGroup) CommitUpto Uses

func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error

func (*ConsumerGroup) Errors Uses

func (cg *ConsumerGroup) Errors() <-chan error

Returns a channel that you can read to obtain events from Kafka to process.

func (*ConsumerGroup) FlushOffsets Uses

func (cg *ConsumerGroup) FlushOffsets() error

func (*ConsumerGroup) InstanceRegistered Uses

func (cg *ConsumerGroup) InstanceRegistered() (bool, error)

func (*ConsumerGroup) Logf Uses

func (cg *ConsumerGroup) Logf(format string, args ...interface{})

func (*ConsumerGroup) Messages Uses

func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage

Returns a channel that you can read to obtain events from Kafka to process.

type OffsetManager Uses

type OffsetManager interface {

    // InitializePartition is called when the consumergroup is starting to consume a
    // partition. It should return the last processed offset for this partition. Note:
    // the same partition can be initialized multiple times during a single run of a
    // consumer group due to other consumer instances coming online and offline.
    InitializePartition(topic string, partition int32) (int64, error)

    // MarkAsProcessed tells the offset manager than a certain message has been successfully
    // processed by the consumer, and should be committed. The implementation does not have
    // to store this offset right away, but should return true if it intends to do this at
    // some point.
    // Offsets should generally be increasing if the consumer
    // processes events serially, but this cannot be guaranteed if the consumer does any
    // asynchronous processing. This can be handled in various ways, e.g. by only accepting
    // offsets that are higehr than the offsets seen before for the same partition.
    MarkAsProcessed(topic string, partition int32, offset int64) bool

    // Flush tells the offset manager to immediately commit offsets synchronously and to
    // return any errors that may have occured during the process.
    Flush() error

    // FinalizePartition is called when the consumergroup is done consuming a
    // partition. In this method, the offset manager can flush any remaining offsets to its
    // backend store. It should return an error if it was not able to commit the offset.
    // Note: it's possible that the consumergroup instance will start to consume the same
    // partition again after this function is called.
    FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error

    // Close is called when the consumergroup is shutting down. In normal circumstances, all
    // offsets are committed because FinalizePartition is called for all the running partition
    // consumers. You may want to check for this to be true, and try to commit any outstanding
    // offsets. If this doesn't succeed, it should return an error.
    Close() error

OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.

func NewZookeeperOffsetManager Uses

func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager

NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.

type OffsetManagerConfig Uses

type OffsetManagerConfig struct {
    CommitInterval time.Duration // Interval between offset flushes to the backend store.
    VerboseLogging bool          // Whether to enable verbose logging.

OffsetManagerConfig holds configuration setting son how the offset manager should behave.

func NewOffsetManagerConfig Uses

func NewOffsetManagerConfig() *OffsetManagerConfig

NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.

Package consumergroup imports 10 packages (graph) and is imported by 107 packages. Updated 2018-09-17. Refresh now. Tools for package owners.