consumergroup

package
v0.0.0-...-7809f0a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2014 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	REBALANCE_START uint8 = iota + 1
	REBALANCE_OK
	REBALANCE_ERROR
)

Variables

View Source
var (
	DiscardCommit = errors.New("sarama: commit discarded")
	NoCheckout    = errors.New("sarama: not checkout")
)

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

A ConsumerGroup operates on all partitions of a single topic. The goal is to ensure each topic message is consumed only once, no matter of the number of consumer instances within a cluster, as described in: http://kafka.apache.org/documentation.html#distributionimpl.

The ConsumerGroup internally creates multiple Consumer instances. It uses Zookkeper and follows a simple consumer rebalancing algorithm which allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Each ConsumerGroup can 'claim' 0-n partitions and will consume their messages until another ConsumerGroup instance with the same name joins or leaves the cluster.

Unlike stated in the Kafka documentation, consumer rebalancing is *only* triggered on each addition or removal of consumers within the same group, while the addition of broker nodes and/or partition *does currently not trigger* a rebalancing cycle.

Example
consumerGroupName := "my_consumer_group_name"
kafkaTopic := "my_topic"
zookeeper := []string{"localhost:2181"}

consumer, consumerErr := JoinConsumerGroup(consumerGroupName, kafkaTopic, zookeeper, nil)
if consumerErr != nil {
	log.Fatalln(consumerErr)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
	<-c
	consumer.Close()
}()

eventCount := 0

stream := consumer.Stream()
for {
	event, ok := <-stream
	if !ok {
		break
	}

	// Process event
	log.Println(string(event.Value))

	eventCount += 1
}

log.Printf("Processed %d events.", eventCount)
Output:

func JoinConsumerGroup

func JoinConsumerGroup(name string, topic string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error)

Connects to a consumer group, using Zookeeper for auto-discovery

func NewConsumerGroup

func NewConsumerGroup(client *sarama.Client, zoo *ZK, name string, topic string, listener chan *Notification, config *ConsumerGroupConfig) (group *ConsumerGroup, err error)

NewConsumerGroup creates a new consumer group for a given topic.

You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

func (*ConsumerGroup) Checkout

func (cg *ConsumerGroup) Checkout(callback func(*PartitionConsumer) error) error

Checkout applies a callback function to a single partition consumer. The latest consumer offset is automatically comitted to zookeeper if successful. The callback may return a DiscardCommit error to skip the commit silently. Returns an error if any, but may also return a NoCheckout error to indicate that no partition was available. You should add an artificial delay keep your CPU cool.

func (*ConsumerGroup) Claims

func (cg *ConsumerGroup) Claims() []int32

Claims returns the claimed partitions

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

Close closes the consumer group

func (*ConsumerGroup) Commit

func (cg *ConsumerGroup) Commit(partition int32, offset int64) error

Commit manually commits an offset for a partition

func (*ConsumerGroup) EventsBehindLatest

func (cg *ConsumerGroup) EventsBehindLatest() (map[int32]int64, error)

func (*ConsumerGroup) Offset

func (cg *ConsumerGroup) Offset(partition int32) (int64, error)

Offset manually retrives an offset for a partition

func (*ConsumerGroup) Stream

func (cg *ConsumerGroup) Stream() <-chan *sarama.ConsumerEvent

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	// The Zookeeper read timeout
	ZookeeperTimeout time.Duration

	// Zookeeper chroot to use. Should not include a trailing slash.
	// Leave this empty for to not set a chroot.
	ZookeeperChroot string

	// The preempt interval when listening to a single partition of a topic.
	// After this interval, the current offset will be committed to Zookeeper,
	// and a different partition will be checked out to consume next.
	CheckoutInterval time.Duration

	KafkaClientConfig   *sarama.ClientConfig   // This will be passed to Sarama when creating a new Client
	KafkaConsumerConfig *sarama.ConsumerConfig // This will be passed to Sarama when creating a new Consumer
}

func NewConsumerGroupConfig

func NewConsumerGroupConfig() *ConsumerGroupConfig

func (*ConsumerGroupConfig) Validate

func (cgc *ConsumerGroupConfig) Validate() error

type EventBatch

type EventBatch struct {
	Topic     string
	Partition int32
	Events    []sarama.ConsumerEvent
}

EventBatch is a batch of events from a single topic/partition

type EventStream

type EventStream interface {
	Events() <-chan *sarama.ConsumerEvent
	Close() error
}

EventStream is an abstraction of a sarama.Consumer

type Notification

type Notification struct {
	Type uint8
	Src  *ConsumerGroup
	Err  error
}

A subscribable notification

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer can consume a single partition of a single topic

func NewPartitionConsumer

func NewPartitionConsumer(group *ConsumerGroup, partition int32) (*PartitionConsumer, error)

NewPartitionConsumer creates a new partition consumer instance

func (*PartitionConsumer) Close

func (p *PartitionConsumer) Close() error

Close closes a partition consumer

func (*PartitionConsumer) Fetch

func (p *PartitionConsumer) Fetch(stream chan *sarama.ConsumerEvent, duration time.Duration, stopper chan bool) error

Fetch returns a batch of events WARNING: may return nil if not events are available

type ZK

type ZK struct {
	*zk.Conn
	// contains filtered or unexported fields
}

ZK wraps a zookeeper connection

func NewZK

func NewZK(servers []string, chroot string, recvTimeout time.Duration) (*ZK, error)

NewZK creates a new connection instance

func (*ZK) Brokers

func (z *ZK) Brokers() ([]string, error)

func (*ZK) Claim

func (z *ZK) Claim(group, topic string, partition int32, id string) (err error)

Claim claims a topic/partition ownership for a consumer ID within a group

func (*ZK) Commit

func (z *ZK) Commit(group, topic string, partition int32, offset int64) (err error)

Commit commits an offset to a group/topic/partition

func (*ZK) Consumers

func (z *ZK) Consumers(group string) ([]string, <-chan zk.Event, error)

Consumers returns all active consumers within a group

func (*ZK) Create

func (z *ZK) Create(node string, value []byte, ephemeral bool) (err error)

Create stores a new value at node. Fails if already set.

func (*ZK) DeleteAll

func (z *ZK) DeleteAll(node string) (err error)

DeleteAll deletes a node recursively

func (*ZK) Exists

func (z *ZK) Exists(node string) (ok bool, err error)

Exists checks existence of a node

func (*ZK) MkdirAll

func (z *ZK) MkdirAll(node string) (err error)

MkdirAll creates a directory recursively

func (*ZK) Offset

func (z *ZK) Offset(group, topic string, partition int32) (int64, error)

Offset retrieves an offset to a group/topic/partition

func (*ZK) RegisterConsumer

func (z *ZK) RegisterConsumer(group, id, topic string) error

CreateConsumer registers a new consumer within a group

func (*ZK) RegisterGroup

func (z *ZK) RegisterGroup(group string) error

RegisterGroup creates/updates a group directory

func (*ZK) Release

func (z *ZK) Release(group, topic string, partition int32, id string) error

Release releases a claim

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL