consumergroup

package
v0.0.0-...-c4536c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2017 License: MIT Imports: 10 Imported by: 6

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	EmptyZkAddrs           = errors.New("You need to provide at least one zookeeper node address")
	AlreadyClosing         = errors.New("The consumer group is already shutting down.")
	ConfigErrorOffset      = errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest")
	UncleanClose           = errors.New("Not all offsets were committed before shutdown was completed")
	TopicPartitionNotFound = errors.New("Never consumed this topic/partition")
	OffsetBackwardsError   = errors.New("Offset to be committed is smaller than highest processed offset")
	NoOffsetToCommit       = errors.New("No offsets to commit")
	OffsetTooLarge         = errors.New("Offset to be committed is larger than highest consumed offset")
	ErrTooManyConsumers    = errors.New("Consumers more than active partitions")
	ErrInvalidTopic        = errors.New("Invalid topic")
	ErrConsumerConflict    = errors.New("One group can only consume one topic")
	ErrConnBroken          = errors.New("Kafka connection broken")
	ErrKafkaDead           = errors.New("Kakfa brokers all dead")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	*sarama.Config

	Zookeeper *kazoo.Config

	Offsets struct {
		// The initial offset to use if the consumer has no previously stored offset.
		// Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
		Initial int64

		// Resets the offsets for the consumer group so that it won't resume
		// from where it left off previously.
		ResetOffsets bool

		// Time to wait for all the offsets for a partition to be processed
		// after stopping to consume from it.
		ProcessingTimeout time.Duration

		// The interval between which the processed offsets are commited.
		CommitInterval time.Duration
	}

	// If NoDup is true, consumer group will automatically discard the duplicated message.
	NoDup bool

	// If not PermitStandby, consumer group will emit ErrTooManyConsumers through error channel
	// to let client close the consumer group.
	PermitStandby bool

	// If OneToOne is true, a single consumer group can only consumer a single topic.
	OneToOne bool
}

func NewConfig

func NewConfig() *Config

func (*Config) Validate

func (cgc *Config) Validate() error

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.

You must call Close() on a consumer group to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

Example
consumer, consumerErr := JoinConsumerGroup(
	"ExampleConsumerGroup",
	[]string{TopicWithSinglePartition, TopicWithMultiplePartitions},
	zookeeperPeers,
	nil)

if consumerErr != nil {
	log.Fatalln(consumerErr)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
	<-c
	consumer.Close()
}()

eventCount := 0

for event := range consumer.Messages() {
	// Process event
	log.Println(string(event.Value))
	eventCount += 1

	// Ack event
	consumer.CommitUpto(event)
}

log.Printf("Processed %d events.", eventCount)
Output:

func JoinConsumerGroup

func JoinConsumerGroup(name string, topics []string, zookeeper []string,
	config *Config) (cg *ConsumerGroup, err error)

Connects to a consumer group, using Zookeeper for auto-discovery

func JoinConsumerGroupRealIp

func JoinConsumerGroupRealIp(realIp string, name string, topics []string, zookeeper []string,
	config *Config) (cg *ConsumerGroup, err error)

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) CommitUpto

func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error

func (*ConsumerGroup) Errors

func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError

Returns a channel that you can read to obtain errors from Kafka to process.

func (*ConsumerGroup) ID

func (cg *ConsumerGroup) ID() string

func (*ConsumerGroup) Messages

func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage

Returns a channel that you can read to obtain events from Kafka to process.

func (*ConsumerGroup) Name

func (cg *ConsumerGroup) Name() string

type OffsetManager

type OffsetManager interface {

	// InitializePartition is called when the consumergroup is starting to consume a
	// partition. It should return the last processed offset for this partition. Note:
	// the same partition can be initialized multiple times during a single run of a
	// consumer group due to other consumer instances coming online and offline.
	InitializePartition(topic string, partition int32) (int64, error)

	// MarkAsProcessed tells the offset manager that a certain message has been successfully
	// processed by the consumer, and should be committed. The implementation does not have
	// to store this offset right away, but should return true if it intends to do this at
	// some point.
	//
	// Offsets should generally be increasing if the consumer
	// processes events serially, but this cannot be guaranteed if the consumer does any
	// asynchronous processing. This can be handled in various ways, e.g. by only accepting
	// offsets that are higehr than the offsets seen before for the same partition.
	MarkAsProcessed(topic string, partition int32, offset int64) error

	MarkAsConsumed(topic string, partition int32, offset int64) error

	// FinalizePartition is called when the consumergroup is done consuming a
	// partition. In this method, the offset manager can flush any remaining offsets to its
	// backend store. It should return an error if it was not able to commit the offset.
	// Note: it's possible that the consumergroup instance will start to consume the same
	// partition again after this function is called.
	FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error

	// Close is called when the consumergroup is shutting down. In normal circumstances, all
	// offsets are committed because FinalizePartition is called for all the running partition
	// consumers. You may want to check for this to be true, and try to commit any outstanding
	// offsets. If this doesn't succeed, it should return an error.
	Close() error
}

OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.

func NewKafkaOffsetManager

func NewKafkaOffsetManager() OffsetManager

func NewZookeeperOffsetManager

func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager

NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.

type OffsetManagerConfig

type OffsetManagerConfig struct {
	CommitInterval time.Duration // Interval between offset flushes to the backend store.
	VerboseLogging bool          // Whether to enable verbose logging.
}

OffsetManagerConfig holds configuration setting son how the offset manager should behave.

func NewOffsetManagerConfig

func NewOffsetManagerConfig() *OffsetManagerConfig

NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL