cg

package module
v0.0.0-...-158209a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2016 License: Apache-2.0 Imports: 5 Imported by: 0

README

sarama-cg

[WIP] consumer group for Kafka using Sarama

why

I need to be able to provide custom topic-partition assignment protocols e.g. hashring assignment instead of round-robin to minimize partition ownership changes upon group membership change.

I need to be able to customize how I read a partition from a last-known offset e.g. when handling a new partition, rewind 2 hours of history to build context up until the last-offset.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

License

Apache License 2.0, see LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Assignment

type Assignment map[string]TopicPartitions

Assignment is the result of a Protocol determining which TopicPartitions from Candidates should be assigned to specific MembersIDs.

type CachingCommitter

type CachingCommitter struct {
	// contains filtered or unexported fields
}

CachingCommitter is a helper for consumers that need to implement committing offsets.

func NewCachingCommitter

func NewCachingCommitter(cfg *CachingCommitterConfig) (*CachingCommitter, error)

NewCachingCommitter helps consumers implement their CommittOffset function by wrapping the Coordinator.CommitOffset function with a cache so we don't hit the database too frequently.

func (*CachingCommitter) CommitOffset

func (cc *CachingCommitter) CommitOffset(offset int64) error

CommitOffset writes the provided offset to the database solang as the offset is higher than what we've already committed and the time since our last commit is greater than the configured duration.

type CachingCommitterConfig

type CachingCommitterConfig struct {
	Coordinator *Coordinator
	Duration    time.Duration
	Partition   int32
	Topic       string
}

CachingCommitterConfig is required to create a new CachingCommitter.

type Candidates

type Candidates struct {
	Members         []Member
	TopicPartitions TopicPartitions
}

Candidates represents all members of the Consumer Group which need to decide how to devide up all the TopicPartitions among themselves.

type Consume

type Consume func(ctx context.Context, topic string, partition int32)

Consume is a function the Coordinator will call when it becomes responsible for a new topic-partition. The provided context will be canceled when the Coordinator is no longer responsible for the topic-partition. This function is expected not to block. If you encounter an error while reading the topic- partition, you are still seen as responsible for that topic-partition in the kafka consumer group, so you must either recover or stop the Coordinator to remove yourself from the consumer group.

type Consumer

type Consumer interface {
	CommitOffset(offset int64) error
	Consume() <-chan *sarama.ConsumerMessage
	Err() error
	HighWaterMarkOffset() int64
}

Consumer can read from Kafka on the returned Consume() channel and commit offsets for the topic-partition it represents.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator implements a Kafka GroupConsumer with semantics available after Kafka 0.9.

func NewCoordinator

func NewCoordinator(cfg *CoordinatorConfig) *Coordinator

NewCoordinator creates a Kafka GroupConsumer.

func (*Coordinator) CommitOffset

func (c *Coordinator) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset writes the provided offset for the topic-partition for the consumer group that this coordinator is participating as.

func (*Coordinator) GetOffset

func (c *Coordinator) GetOffset(topic string, partition int32) (int64, error)

GetOffset returns the current committed offset for the topic-partition for the group id provided to the coordinator.

func (*Coordinator) Run

func (c *Coordinator) Run(consume Consume) error

Run executes the Coordinator until an error or the context provided at create time closes.

type CoordinatorConfig

type CoordinatorConfig struct {
	Client         sarama.Client
	Context        context.Context
	GroupID        string
	Protocols      []ProtocolKey
	RetentionTime  time.Duration
	SessionTimeout time.Duration
	Heartbeat      time.Duration
	Topics         []string
}

CoordinatorConfig is used to create a new Coordinator.

type Member

type Member struct {
	MemberID string
	UserData []byte
}

Member represents a consumer in a consumer group. The MemberID is assigned by Kafka and the UserData is provided by the consumer when it joins the group via the Protocol that was common between all consumers.

type Protocol

type Protocol interface {
	Assign(Candidates) Assignment
	UserData() []byte
}

Protocol is an agreed-upon method between all consumer group members of how to divide TopicPartitions amongst themselves.

type ProtocolKey

type ProtocolKey struct {
	Protocol Protocol
	Key      string
}

ProtocolKey is an implementation of a protocol that will be announced to the Kafka cluster under the given Key. All nodes in the group must agree on a common protocol.

type TopicPartitions

type TopicPartitions map[string][]int32

TopicPartitions is a map of topics and the partition numbers in that topic.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL