consumer

package
v1.0.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DialerSCRAM512

func DialerSCRAM512(username string, password string) (*kafka.Dialer, error)

DialerSCRAM512 returns a Kafka dialer configured with SASL authentication to securely transmit the provided credentials to Kafka using SCRAM-SHA-512.

func NonStopExponentialBackOff

func NonStopExponentialBackOff() backoff.BackOff

NonStopExponentialBackOff is the suggested backoff retry strategy for consumers groups handling messages where ordering matters e.g. a data-capture stream from a database. This results in endless retries to prevent Kafka from re-balancing the group so that each consumer does not eventually experience the same error.

Retry intervals: 500ms, 4s, 32s, 4m, 34m, 4.5h, 5h (max).

The max interval of 5 hours is intended to leave enough time for manual intervention if necessary.

Types

type ClientLogger

type ClientLogger interface {
	// Kafka-go Logger interface
	Infof(string, ...interface{})
	Errorf(string, ...interface{})
}

type Config

type Config struct {
	ID      string // Default: UUID
	Brokers []string
	Topic   string

	MinBytes      int           // Default: 1MB
	MaxBytes      int           // Default: 10MB
	MaxWait       time.Duration // Default: 250ms
	QueueCapacity int           // Default: 100
	// contains filtered or unexported fields
}

Config is a configuration object used to create a new Consumer.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides a high level API for consuming and handling messages from a Kafka topic.

It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.

func NewConsumer

func NewConsumer(config Config, opts ...Option) *Consumer

NewConsumer returns a new Consumer configured with the provided dialer and config.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, handler Handler) error

Run consumes and handles messages from the topic. The method call blocks until the context is canceled, the consumer is closed, or an error occurs.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop stops the consumer. It waits for the current message (if any) to finish being handled before closing the reader stream, preventing the consumer from reading any more messages.

type Group

type Group struct {
	ID string
	// contains filtered or unexported fields
}

Group groups consumers together to concurrently consume and handle messages from a Kafka topic. Many groups with the same group ID are safe to use, which is particularly useful for groups across separate instances.

It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.

func NewGroup

func NewGroup(config GroupConfig, opts ...Option) *Group

NewGroup returns a new Group configured with the provided dialer and config.

func (*Group) Run

func (g *Group) Run(ctx context.Context, handler Handler) <-chan error

Run concurrently consumes and handles messages from the topic across all consumers in the group. The method call returns an error channel that is used to receive any consumer errors. The run process is only stopped if the context is canceled, the consumer has been closed, or all consumers in the group have errored.

func (*Group) Stop

func (g *Group) Stop()

Stop stops the group. It waits for the current message (if any) in each consumer to finish being handled before closing the reader streams, preventing each consumer from reading any more messages.

type GroupConfig

type GroupConfig struct {
	Count   int
	Brokers []string
	Topic   string
	GroupID string

	MinBytes      int           // Default: 1MB
	MaxBytes      int           // Default: 10MB
	MaxWait       time.Duration // Default: 250ms
	QueueCapacity int           // Default: 100
}

GroupConfig is a configuration object used to create a new Group. The default consumer count in a group is 1 unless specified otherwise.

type Handler

type Handler func(ctx context.Context, msg Message) error

Handler specifies how a consumer should handle a received Kafka message.

type HandlerRetryBackOffConstructor

type HandlerRetryBackOffConstructor func() backoff.BackOff

type Message

type Message struct {
	kafka.Message
	Metadata
}

type Metadata

type Metadata struct {
	GroupID    string
	ConsumerID string
	Attempt    int
}

Metadata contains relevant handler metadata for received Kafka messages.

type NotifyError

type NotifyError func(ctx context.Context, err error, msg Message)

NotifyError is a notify-on-error function used to report consumer handler errors.

type Option

type Option func(consumer *Consumer)

func WithDataDogTracing

func WithDataDogTracing() Option

WithDataDogTracing adds Data Dog tracing to the consumer.

A span is started each time a Kafka message is read and finished when the offset is committed. The consumer span can also be retrieved from within your handler using tracer.SpanFromContext.

func WithExplicitCommit

func WithExplicitCommit() Option

WithExplicitCommit enables offset commit only after a message is successfully handled.

Do not use this option if the default behaviour of auto committing offsets on initial read (before handling the message) is required.

func WithGroupBalancers

func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option

WithGroupBalancers adds a priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader.

Default: [Range, RoundRobin]

Only used by consumer group.

func WithHandlerBackOffRetry

func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option

WithHandlerBackOffRetry adds a back off retry policy on the consumer handler.

func WithKafkaDialer

func WithKafkaDialer(dialer *kafka.Dialer) Option

func WithKafkaReader

func WithKafkaReader(readerFn func() Reader) Option

WithKafkaReader allows a custom reader to be injected into the Consumer/Group. Using this will ignore any other reader specific options passed in.

It is highly recommended to not use this option unless injecting a mock reader implementation for testing.

func WithLogger

func WithLogger(logger ClientLogger) Option

WithLogger specifies a logger used to report internal consumer reader changes.

func WithNotifyError

func WithNotifyError(notifier NotifyError) Option

WithNotifyError adds the NotifyError function to the consumer for it to be invoked on each consumer handler error.

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) Option

WithQueueCapacity sets the internal message queue capacity. Defaults to 100 if none is set.

type Reader

type Reader interface {
	ReadMessage(ctx context.Context) (kafka.Message, error)
	FetchMessage(ctx context.Context) (kafka.Message, error)
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

Reader fetches and commits messages from a Kafka topic.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL