group

package
v0.73.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package group provides kafka consumer group component implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component is a kafka consumer implementation that processes messages in batch.

func New

func New(name, group string, brokers, topics []string, proc kafka.BatchProcessorFunc, saramaCfg *sarama.Config, oo ...OptionFunc) (*Component, error)

New initializes a new kafka consumer component with support for functional configuration. The default failure strategy is the ExitStrategy. The default batch size is 1 and the batch timeout is 100ms. The default number of retries is 0 and the retry wait is 0.

func (*Component) Run

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop to process messages from Kafka.

type OptionFunc

type OptionFunc func(*Component) error

OptionFunc definition for configuring the component in a functional way.

func BatchMessageDeduplication added in v0.64.0

func BatchMessageDeduplication() OptionFunc

BatchMessageDeduplication enables the deduplication of messages based on the message's key. This implementation does not do additional sorting, but instead relies on the ordering guarantees that Kafka gives within partitions of a topic. Don't use this functionality if you've changed your producer's partition hashing behaviour to a nondeterministic way.

func BatchSize

func BatchSize(size uint) OptionFunc

BatchSize sets the message batch size the component should process at once.

func BatchTimeout

func BatchTimeout(timeout time.Duration) OptionFunc

BatchTimeout sets the message batch timeout. If the desired batch size is not reached and if the timeout elapses without new messages coming in, the messages in the buffer would get processed as a batch.

func CheckTopic added in v0.62.0

func CheckTopic() OptionFunc

CheckTopic checks whether the component-configured topics exist in the broker.

func CommitSync

func CommitSync() OptionFunc

CommitSync instructs the consumer to commit offsets in a blocking operation after processing every batch of messages.

func FailureStrategy

func FailureStrategy(fs kafka.FailStrategy) OptionFunc

FailureStrategy sets the strategy to follow for the component when it encounters an error. The kafka.ExitStrategy will fail the component, if there are Retries > 0 then the component will reconnect and retry the failed message. The kafka.SkipStrategy will skip the message on failure. If a client wants to retry a message before failing then this needs to be handled in the kafka.BatchProcessorFunc.

func NewSessionCallback added in v0.65.0

func NewSessionCallback(sessionCallback func(sarama.ConsumerGroupSession) error) OptionFunc

NewSessionCallback adds a callback when a new consumer group session is created (e.g., rebalancing).

func Retries

func Retries(count uint) OptionFunc

Retries sets the number of time a component should retry in case of an error. These retries are depleted in these cases: * when there are temporary connection issues * a message batch fails to be processed through the user-defined processing function and the failure strategy is set to kafka.ExitStrategy * any other reason for which the component needs to reconnect.

func RetryWait

func RetryWait(interval time.Duration) OptionFunc

RetryWait sets the wait period for the component retry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL