consumergroup

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2017 License: MIT Imports: 13 Imported by: 0

README

go-consumergroup Build Status Go Report Card

Go-consumergroup is a kafka consumer library written in golang with group and rebalance supports.

Chinese Doc

Requirements

  • Apache Kafka 0.8.x, 0.9.x, 0.10.x

Dependencies

Getting started

  • API documentation and examples are available via godoc.
  • The example directory contains more elaborate example applications.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// ZkList is required, zookeeper address's list
	ZkList []string
	// Zookeeper session timeout, default is 6s
	ZkSessionTimeout time.Duration
	// GroupID is required, identifer to determin which ConsumerGroup would be joined
	GroupID string
	// TopicList is required, topics that ConsumerGroup would be consumed
	TopicList []string
	// Just export Sarama Config
	SaramaConfig *sarama.Config
	// Size of error channel, default is 1024
	ErrorChannelBufferSize int
	// Whether auto commit the offset or not, default is true
	OffsetAutoCommitEnable bool
	// Offset auto commit interval, default is 10s
	OffsetAutoCommitInterval time.Duration
	// Where to fetch messages when offset was not found, default is newest
	OffsetAutoReset int64
	// Claim the partition would give up after ClaimPartitionRetryTimes(>0) retires,
	// ClaimPartitionRetryTimes <= 0 would retry until success or receive stop signal
	ClaimPartitionRetryTimes int
	// Retry interval when fail to clain the partition
	ClaimPartitionRetryInterval time.Duration
}

func NewConfig

func NewConfig() *Config

NewConfig return the new config with default value.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup consume message from Kafka with rebalancing supports

func NewConsumerGroup

func NewConsumerGroup(config *Config) (*ConsumerGroup, error)

NewConsumerGroup create the ConsumerGroup instance with config

func (*ConsumerGroup) CommitOffset

func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset is used to commit offset when auto commit was disabled.

func (*ConsumerGroup) ExitGroup

func (cg *ConsumerGroup) ExitGroup()

ExitGroup would unregister ConsumerGroup, and rebalance would be triggered. The partitions which consumed by this ConsumerGroup would be assigned to others.

func (*ConsumerGroup) GetErrors added in v0.2.0

func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)

GetErrors was used to get a unbuffered error's channel from specified topic

func (*ConsumerGroup) GetMessages added in v0.2.0

func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)

GetMessages was used to get a unbuffered message's channel from specified topic

func (*ConsumerGroup) IsStopped

func (cg *ConsumerGroup) IsStopped() bool

IsStopped return whether the ConsumerGroup was stopped or not.

func (*ConsumerGroup) JoinGroup

func (cg *ConsumerGroup) JoinGroup() error

JoinGroup would register ConsumerGroup, and rebalance would be triggered. ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message.

func (*ConsumerGroup) SetLogger

func (cg *ConsumerGroup) SetLogger(logger Logger)

SetLogger allow user to set user's logger, or defaultLogger would print to stdout.

type Logger

type Logger interface {
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Warn(args ...interface{})
	Warnf(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger is a simple log interface. The dafault implementation prints to stdout.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL