Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type Assignment map[string]TopicPartitions
Assignment is the result of a Protocol determining which TopicPartitions from Candidates should be assigned to specific MembersIDs.
type CachingCommitter ¶
type CachingCommitter struct {
// contains filtered or unexported fields
}
CachingCommitter is a helper for consumers that need to implement committing offsets.
func NewCachingCommitter ¶
func NewCachingCommitter(cfg *CachingCommitterConfig) (*CachingCommitter, error)
NewCachingCommitter helps consumers implement their CommittOffset function by wrapping the Coordinator.CommitOffset function with a cache so we don't hit the database too frequently.
func (*CachingCommitter) CommitOffset ¶
func (cc *CachingCommitter) CommitOffset(offset int64) error
CommitOffset writes the provided offset to the database solang as the offset is higher than what we've already committed and the time since our last commit is greater than the configured duration.
type CachingCommitterConfig ¶
type CachingCommitterConfig struct { Coordinator *Coordinator Duration time.Duration Partition int32 Topic string }
CachingCommitterConfig is required to create a new CachingCommitter.
type Candidates ¶
type Candidates struct { Members []Member TopicPartitions TopicPartitions }
Candidates represents all members of the Consumer Group which need to decide how to devide up all the TopicPartitions among themselves.
type Consume ¶
Consume is a function the Coordinator will call when it becomes responsible for a new topic-partition. The provided context will be canceled when the Coordinator is no longer responsible for the topic-partition. This function is expected not to block. If you encounter an error while reading the topic- partition, you are still seen as responsible for that topic-partition in the kafka consumer group, so you must either recover or stop the Coordinator to remove yourself from the consumer group.
type Consumer ¶
type Consumer interface { CommitOffset(offset int64) error Consume() <-chan *sarama.ConsumerMessage Err() error HighWaterMarkOffset() int64 }
Consumer can read from Kafka on the returned Consume() channel and commit offsets for the topic-partition it represents.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator implements a Kafka GroupConsumer with semantics available after Kafka 0.9.
func NewCoordinator ¶
func NewCoordinator(cfg *CoordinatorConfig) *Coordinator
NewCoordinator creates a Kafka GroupConsumer.
func (*Coordinator) CommitOffset ¶
func (c *Coordinator) CommitOffset(topic string, partition int32, offset int64) error
CommitOffset writes the provided offset for the topic-partition for the consumer group that this coordinator is participating as.
func (*Coordinator) GetOffset ¶
func (c *Coordinator) GetOffset(topic string, partition int32) (int64, error)
GetOffset returns the current committed offset for the topic-partition for the group id provided to the coordinator.
func (*Coordinator) Run ¶
func (c *Coordinator) Run(consume Consume) error
Run executes the Coordinator until an error or the context provided at create time closes.
type CoordinatorConfig ¶
type CoordinatorConfig struct { Client sarama.Client Context context.Context GroupID string Protocols []ProtocolKey RetentionTime time.Duration SessionTimeout time.Duration Heartbeat time.Duration Topics []string }
CoordinatorConfig is used to create a new Coordinator.
type Member ¶
Member represents a consumer in a consumer group. The MemberID is assigned by Kafka and the UserData is provided by the consumer when it joins the group via the Protocol that was common between all consumers.
type Protocol ¶
type Protocol interface { Assign(Candidates) Assignment UserData() []byte }
Protocol is an agreed-upon method between all consumer group members of how to divide TopicPartitions amongst themselves.
type ProtocolKey ¶
ProtocolKey is an implementation of a protocol that will be announced to the Kafka cluster under the given Key. All nodes in the group must agree on a common protocol.
type TopicPartitions ¶
TopicPartitions is a map of topics and the partition numbers in that topic.