Documentation ¶
Index ¶
- type Config
- type ConsumerGroup
- func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error
- func (cg *ConsumerGroup) ExitGroup()
- func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)
- func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)
- func (cg *ConsumerGroup) IsStopped() bool
- func (cg *ConsumerGroup) JoinGroup() error
- func (cg *ConsumerGroup) SetLogger(logger Logger)
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // ZkList is required, zookeeper address's list ZkList []string // Zookeeper session timeout, default is 6s ZkSessionTimeout time.Duration // GroupID is required, identifer to determin which ConsumerGroup would be joined GroupID string // TopicList is required, topics that ConsumerGroup would be consumed TopicList []string // Just export Sarama Config SaramaConfig *sarama.Config // Size of error channel, default is 1024 ErrorChannelBufferSize int // Whether auto commit the offset or not, default is true OffsetAutoCommitEnable bool // Offset auto commit interval, default is 10s OffsetAutoCommitInterval time.Duration // Where to fetch messages when offset was not found, default is newest OffsetAutoReset int64 // Claim the partition would give up after ClaimPartitionRetryTimes(>0) retires, // ClaimPartitionRetryTimes <= 0 would retry until success or receive stop signal ClaimPartitionRetryTimes int // Retry interval when fail to clain the partition ClaimPartitionRetryInterval time.Duration }
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup consume message from Kafka with rebalancing supports
func NewConsumerGroup ¶
func NewConsumerGroup(config *Config) (*ConsumerGroup, error)
NewConsumerGroup create the ConsumerGroup instance with config
func (*ConsumerGroup) CommitOffset ¶
func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error
CommitOffset is used to commit offset when auto commit was disabled.
func (*ConsumerGroup) ExitGroup ¶
func (cg *ConsumerGroup) ExitGroup()
ExitGroup would unregister ConsumerGroup, and rebalance would be triggered. The partitions which consumed by this ConsumerGroup would be assigned to others.
func (*ConsumerGroup) GetErrors ¶ added in v0.2.0
func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)
GetErrors was used to get a unbuffered error's channel from specified topic
func (*ConsumerGroup) GetMessages ¶ added in v0.2.0
func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)
GetMessages was used to get a unbuffered message's channel from specified topic
func (*ConsumerGroup) IsStopped ¶
func (cg *ConsumerGroup) IsStopped() bool
IsStopped return whether the ConsumerGroup was stopped or not.
func (*ConsumerGroup) JoinGroup ¶
func (cg *ConsumerGroup) JoinGroup() error
JoinGroup would register ConsumerGroup, and rebalance would be triggered. ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message.
func (*ConsumerGroup) SetLogger ¶
func (cg *ConsumerGroup) SetLogger(logger Logger)
SetLogger allow user to set user's logger, or defaultLogger would print to stdout.
type Logger ¶
type Logger interface { Debug(args ...interface{}) Debugf(format string, args ...interface{}) Info(args ...interface{}) Infof(format string, args ...interface{}) Warn(args ...interface{}) Warnf(format string, args ...interface{}) Error(args ...interface{}) Errorf(format string, args ...interface{}) }
Logger is a simple log interface. The dafault implementation prints to stdout.