kafka

package module
v2.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 27 Imported by: 0

README

Kafka Konsumer

Go Report Card

Description

Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).

Migration Guide

V2 Release Notes
  • Added ability for manipulating kafka message headers.
  • Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
  • Enable manuel commit at both single and batch consuming modes.
  • Enabling consumer resume/pause functionality. Please refer to its example and how it works documentation.
  • Bumped kafka-cronsumer to the latest version:
    • Backoff strategy support (linear, exponential options)
    • Added message key for retried messages
    • Added x-error-message to see what was the error of the message during processing
  • Reduce memory allocation.
  • Increase TP on changing internal concurrency structure.
How to migrate from v1 to v2?

You can get latest version via go get github.com/Trendyol/kafka-konsumer/v2@latest

  • You need to change import path from github.com/Trendyol/kafka-konsumer to github.com/Trendyol/kafka-konsumer/v2

  • You need to change your consume function with pointer signature.

  • We moved messageGroupDuration from batchConfiguration.messageGroupDuration to root level. Because this field is used single (non-batch) consumer too.

Installation
go get github.com/Trendyol/kafka-konsumer/v2@latest
Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Simple Consumer
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        ConsumeFn:    consumeFn,
        RetryEnabled: false,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
Simple Consumer With Retry/Exception Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        ConsumeFn: consumeFn,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        MessageGroupDuration: time.Second,
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil
}
With Disabling Transactional Retry
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        TransactionalRetry: kafka.NewBoolPtr(false),
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        MessageGroupDuration: time.Second,
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    // you can add custom error handling here & flag messages
    for i := range messages {
        if i%2 == 0 {
            messages[i].IsFailed = true
        }
    }

    // you must return err here to retry failed messages
    return errors.New("err")
}
With Distributed Tracing Support

Please refer to Tracing Example

With Pause & Resume Consumer

Please refer to Pause Resume Example

With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.

grafana

With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.

Configurations

config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
skipMessageByHeaderFn Function to filter messages based on headers, return true if you want to skip the message nil
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
transactionalRetry Set false if you want to use exception/retry strategy to only failed messages true
commitInterval indicates the interval at which offsets are committed to the broker. 1s
rack see doc
clientId see doc
messageGroupDuration Maximum time to wait for a batch 1s
metricPrefix MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_konsumer. Currently, there are two exposed prometheus metrics. processed_messages_total and unprocessed_messages_total So, if default metric prefix used, metrics names are kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current. kafka_konsumer
dial.Timeout see doc no timeout
dial.KeepAlive see doc not enabled
transport.DialTimeout see doc 5s
transport.IdleTimeout see doc 30s
transport.MetadataTTL see doc 6s
transport.MetadataTopics see doc all topics in cluster
distributedTracingEnabled indicates open telemetry support on/off for consume and produce operations. false
distributedTracingConfiguration.TracerProvider see doc otel.GetTracerProvider()
distributedTracingConfiguration.Propagator see doc otel.GetTextMapPropagator()
retryConfiguration.clientId see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.metricPrefix MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current kafka_cronsumer
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
retryConfiguration.skipMessageByHeaderFn Function to filter messages based on headers, return true if you want to skip the message nil
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.batchConsumeFn Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages
batchConfiguration.preBatchFn This function enable for transforming messages before batch consuming starts
batchConfiguration.balancer see doc leastBytes
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics

Monitoring

Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics
Metric Name Description Value Type
kafka_konsumer_processed_messages_total_current Total number of processed messages. Counter
kafka_konsumer_unprocessed_messages_total_current Total number of unprocessed messages. Counter

Documentation

Index

Constants

View Source
const (
	MechanismScram = "scram"
	MechanismPlain = "plain"
)
View Source
const Name = "kafka_konsumer"

Variables

This section is empty.

Functions

func NewBoolPtr

func NewBoolPtr(value bool) *bool

func NewMetricMiddleware

func NewMetricMiddleware(cfg *ConsumerConfig,
	app *fiber.App,
	consumerMetric *ConsumerMetric,
	metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error)

Types

type API

type API interface {
	Start()
	Stop()
}

func NewAPI

func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API

type APIConfiguration

type APIConfiguration struct {
	// Port default is 8090
	Port *int

	// HealthCheckPath default is /healthcheck
	HealthCheckPath *string
}

type Balancer added in v2.3.1

type Balancer kafka.Balancer

func GetBalancerCRC32 added in v2.3.1

func GetBalancerCRC32() Balancer

func GetBalancerHash added in v2.3.1

func GetBalancerHash() Balancer

func GetBalancerLeastBytes added in v2.3.1

func GetBalancerLeastBytes() Balancer

func GetBalancerMurmur2Balancer added in v2.3.1

func GetBalancerMurmur2Balancer() Balancer

func GetBalancerReferenceHash added in v2.3.1

func GetBalancerReferenceHash() Balancer

func GetBalancerRoundRobin added in v2.3.1

func GetBalancerRoundRobin() Balancer

type BatchConfiguration

type BatchConfiguration struct {
	BatchConsumeFn    BatchConsumeFn
	PreBatchFn        PreBatchFn
	MessageGroupLimit int
}

type BatchConsumeFn

type BatchConsumeFn func([]*Message) error

type ConsumeFn

type ConsumeFn func(*Message) error

type Consumer

type Consumer interface {
	// Consume starts consuming
	Consume()

	// Pause function pauses consumer, it is stop consuming new messages
	// It works idempotent under the hood
	// Calling with multiple goroutines is safe
	Pause()

	// Resume function resumes consumer, it is start to working
	// It works idempotent under the hood
	// Calling with multiple goroutines is safe
	Resume()

	// GetMetricCollectors for the purpose of making metric collectors available.
	// You can register these collectors on your own http server.
	// Please look at the examples/with-metric-collector directory.
	GetMetricCollectors() []prometheus.Collector

	// WithLogger for injecting custom log implementation
	WithLogger(logger LoggerInterface)

	// Stop for graceful shutdown. In order to avoid data loss, you have to call it!
	Stop() error
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (Consumer, error)

type ConsumerConfig

type ConsumerConfig struct {
	DistributedTracingConfiguration DistributedTracingConfiguration
	Logger                          LoggerInterface
	APIConfiguration                APIConfiguration
	MetricConfiguration             MetricConfiguration
	SASL                            *SASLConfig
	TLS                             *TLSConfig
	Dial                            *DialConfig
	BatchConfiguration              *BatchConfiguration
	ConsumeFn                       ConsumeFn
	SkipMessageByHeaderFn           SkipMessageByHeaderFn
	TransactionalRetry              *bool
	RetryConfiguration              RetryConfiguration
	LogLevel                        LogLevel
	Rack                            string
	ClientID                        string
	Reader                          ReaderConfig
	CommitInterval                  time.Duration
	MessageGroupDuration            time.Duration
	Concurrency                     int
	DistributedTracingEnabled       bool
	RetryEnabled                    bool
	APIEnabled                      bool

	// MetricPrefix is used for prometheus fq name prefix.
	// If not provided, default metric prefix value is `kafka_konsumer`.
	// Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`.
	// So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and
	// `kafka_konsumer_unprocessed_messages_total_current`.
	MetricPrefix string
}

type ConsumerMetric

type ConsumerMetric struct {
	TotalUnprocessedMessagesCounter int64
	TotalProcessedMessagesCounter   int64
}

type DialConfig

type DialConfig struct {
	Timeout   time.Duration
	KeepAlive time.Duration
}

type Dialer

type Dialer struct {
	*kafka.Dialer
}

func (*Dialer) SetSASL

func (t *Dialer) SetSASL(mechanism sasl.Mechanism)

func (*Dialer) SetTLSConfig

func (t *Dialer) SetTLSConfig(config *tls.Config)

type DistributedTracingConfiguration

type DistributedTracingConfiguration struct {
	TracerProvider trace.TracerProvider
	Propagator     propagation.TextMapPropagator
}
type Header = protocol.Header

type IncomingMessage added in v2.1.8

type IncomingMessage struct {
	// contains filtered or unexported fields
}

type Layer

type Layer interface {
	SetTLSConfig(config *tls.Config)
	SetSASL(mechanism sasl.Mechanism)
}

type LogLevel

type LogLevel string
const (
	LogLevelDebug LogLevel = "debug"
	LogLevelInfo  LogLevel = "info"
	LogLevelWarn  LogLevel = "warn"
	LogLevelError LogLevel = "error"
)

type LoggerInterface

type LoggerInterface interface {
	// With returns a logger based off the root logger and decorates it with the given context and arguments.
	With(args ...interface{}) LoggerInterface

	// Debug uses fmt.Sprint to construct and log a message at DEBUG level
	Debug(args ...interface{})
	// Info uses fmt.Sprint to construct and log a message at INFO level
	Info(args ...interface{})
	// Warn uses fmt.Sprint to construct and log a message at ERROR level
	Warn(args ...interface{})
	// Error uses fmt.Sprint to construct and log a message at ERROR level
	Error(args ...interface{})

	// Debugf uses fmt.Sprintf to construct and log a message at DEBUG level
	Debugf(format string, args ...interface{})
	// Infof uses fmt.Sprintf to construct and log a message at INFO level
	Infof(format string, args ...interface{})
	// Warnf uses fmt.Sprintf to construct and log a message at WARN level
	Warnf(format string, args ...interface{})
	// Errorf uses fmt.Sprintf to construct and log a message at ERROR level
	Errorf(format string, args ...interface{})

	Infow(msg string, keysAndValues ...interface{})
	Errorw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
}

LoggerInterface is a logger that supports log levels, context and structured logging.

func NewZapLogger

func NewZapLogger(level LogLevel) LoggerInterface

type Mechanism

type Mechanism string

type Message

type Message struct {
	Time       time.Time
	WriterData interface{}

	// Context To enable distributed tracing support
	Context       context.Context
	Topic         string
	Key           []byte
	Value         []byte
	Headers       []Header
	Partition     int
	Offset        int64
	HighWaterMark int64

	// IsFailed Is only used on transactional retry disabled
	IsFailed bool

	// If available, kafka-konsumer writes this description into the failed message's
	// headers as `x-error-message` key when producing retry topic
	ErrDescription string
}

func (*Message) AddHeader

func (m *Message) AddHeader(header Header)

AddHeader works as a idempotent function

func (*Message) Header

func (m *Message) Header(key string) *kafka.Header

func (*Message) RemoveHeader

func (m *Message) RemoveHeader(header Header)

type MetricCollector added in v2.3.0

type MetricCollector struct {
	// contains filtered or unexported fields
}

func NewMetricCollector added in v2.3.0

func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector

func (*MetricCollector) Collect added in v2.3.0

func (s *MetricCollector) Collect(ch chan<- prometheus.Metric)

func (*MetricCollector) Describe added in v2.3.0

func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc)

type MetricConfiguration

type MetricConfiguration struct {
	// Path default is /metrics
	Path *string
}

type OtelKafkaKonsumerWriter

type OtelKafkaKonsumerWriter interface {
	WriteMessage(ctx context.Context, msg segmentio.Message) error
	WriteMessages(ctx context.Context, msgs []segmentio.Message) error
	Close() error
}

type PreBatchFn added in v2.0.6

type PreBatchFn func([]*Message) []*Message

type Producer

type Producer interface {
	Produce(ctx context.Context, message Message) error
	ProduceBatch(ctx context.Context, messages []Message) error
	Close() error
}

func NewProducer

func NewProducer(cfg *ProducerConfig) (Producer, error)

type ProducerConfig

type ProducerConfig struct {
	DistributedTracingConfiguration DistributedTracingConfiguration
	Transport                       *TransportConfig
	SASL                            *SASLConfig
	TLS                             *TLSConfig
	ClientID                        string
	Writer                          WriterConfig
	DistributedTracingEnabled       bool
}

type Reader

type Reader interface {
	FetchMessage(ctx context.Context, msg *kafka.Message) error
	Close() error
	CommitMessages(messages []kafka.Message) error
}

func NewOtelReaderWrapper

func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error)

func NewReaderWrapper

func NewReaderWrapper(reader *segmentio.Reader) Reader

type ReaderConfig

type ReaderConfig kafka.ReaderConfig

type RetryConfiguration

type RetryConfiguration struct {
	// MetricPrefix is used for prometheus fq name prefix.
	// If not provided, default metric prefix value is `kafka_cronsumer`.
	// Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`.
	// So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and
	// `kafka_cronsumer_discarded_messages_total_current`.
	MetricPrefix string

	SASL                  *SASLConfig
	TLS                   *TLSConfig
	ClientID              string
	StartTimeCron         string
	Topic                 string
	DeadLetterTopic       string
	Rack                  string
	LogLevel              LogLevel
	Brokers               []string
	Balancer              Balancer
	MaxRetry              int
	WorkDuration          time.Duration
	SkipMessageByHeaderFn SkipMessageByHeaderFn
}

type SASLConfig

type SASLConfig struct {
	Type     Mechanism
	Username string
	Password string
}

func (*SASLConfig) IsEmpty

func (s *SASLConfig) IsEmpty() bool

func (*SASLConfig) Mechanism

func (s *SASLConfig) Mechanism() (sasl.Mechanism, error)

type SkipMessageByHeaderFn added in v2.2.9

type SkipMessageByHeaderFn func(header []kafka.Header) bool

type TLSConfig

type TLSConfig struct {
	RootCAPath         string
	IntermediateCAPath string
}

func (*TLSConfig) IsEmpty

func (c *TLSConfig) IsEmpty() bool

func (*TLSConfig) TLSConfig

func (c *TLSConfig) TLSConfig() (*tls.Config, error)

type Transport

type Transport struct {
	*kafka.Transport
}

func (*Transport) SetSASL

func (t *Transport) SetSASL(mechanism sasl.Mechanism)

func (*Transport) SetTLSConfig

func (t *Transport) SetTLSConfig(config *tls.Config)

type TransportConfig

type TransportConfig struct {
	MetadataTopics []string
	DialTimeout    time.Duration
	IdleTimeout    time.Duration
	MetadataTTL    time.Duration
}

type Writer

type Writer interface {
	WriteMessages(context.Context, ...kafka.Message) error
	Close() error
}

func NewOtelProducer

func NewOtelProducer(cfg *ProducerConfig, writer *segmentio.Writer) (Writer, error)

type WriterConfig

type WriterConfig struct {
	ErrorLogger            kafka.Logger
	Logger                 kafka.Logger
	Balancer               kafka.Balancer
	Completion             func(messages []kafka.Message, err error)
	Topic                  string
	Brokers                []string
	ReadTimeout            time.Duration
	BatchTimeout           time.Duration
	BatchBytes             int64
	WriteTimeout           time.Duration
	RequiredAcks           kafka.RequiredAcks
	BatchSize              int
	WriteBackoffMax        time.Duration
	WriteBackoffMin        time.Duration
	MaxAttempts            int
	Async                  bool
	Compression            kafka.Compression
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL