kafka

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: MIT Imports: 15 Imported by: 0

README

GO-KAFKA

Build Status Coverage Status Go Report Card

Go-kafka provides an easy way to use kafka listeners and producers with only a few lines of code. The listener is able to consume from multiple topics, and will execute a separate handler for each topic.

Quick start

Simple consumer

// topic-specific handlers
var handler1 kafka.Handler
var handler2 kafka.Handler

// map your topics to their handlers
handlers := map[string]kafka.Handler{
    "topic-1": handler1,
    "topic-2": handler2,
}

// define your listener
kafka.Brokers = []string{"localhost:9092"}
listener, _ := kafka.NewListener("my-consumer-group", handlers)
defer listener.Close()

// listen and enjoy
errc <- listener.Listen(ctx)

Simple producer

// define your producer
kafka.Brokers = []string{"localhost:9092"}
producer, _ := kafka.NewProducer()

// send your message
message := &sarama.ProducerMessage{
	Topic: "my-topic",
	Value: sarama.StringEncoder("my-message"),
}
_ = producer.Produce(message)

Features

  • Create a listener on multiple topics
  • Retry policy on message handling
  • Create a producer
  • Prometheus instrumenting

Consumer error handling

You can customize the error handling of the consumer. And if there's still an error after all possible retries (3 by default), the error is logged and the faulty event can be pushed to a deadletter topic.

Deadletter

By default, events that have exceeded the maximum number of retries will be pushed to a dead letter topic. This behaviour can be disabled through the PushConsumerErrorsToTopic property.

PushConsumerErrorsToTopic = false

The name of the deadletter topic is dynamically generated based on the original topic name and the consumer group. For example, if the original topic is my-topic and the consumer group is my-consumer-group, the deadletter topic will be my-consumer-group-my-topic-error. This pattern can be overridden through the ErrorTopicPattern property.

ErrorTopicPattern = "custom-deadletter-topic"
Retries

By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds). This can be configured through the following properties:

  • ConsumerMaxRetries
  • DurationBeforeRetry

If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set ConsumerMaxRetries to InfiniteRetries (-1).

If you want to not retry specific errors, you can wrap them in a kafka.ErrNonRetriable error before returning them, or return a kafka.ErrNonRetriable directly.

// This error will not be retried
err := errors.New("my error")
return errors.Wrap(kafka.ErrNonRetriable, err.Error())

// This error will also not be retried
return kafka.ErrNonRetriable
Omitting specific errors

In certain scenarios, you might want to omit some errors. For example, you might want to discard outdated events that are not relevant anymore. Such events would increase a separate, dedicated metric instead of the error one, and would not be retried. To do so, wrap the errors that should lead to omitted events in a ErrEventOmitted, or return a kafka.ErrEventOmitted directly.

// This error will be omitted
err := errors.New("my error")
return errors.Wrap(kafka.ErrEventOmitted, err.Error())

// This error will also be omitted
return kafka.ErrEventOmitted

Instrumenting

Metrics for the listener and the producer can be exported to Prometheus. The following metrics are available:

Metric name Labels Description
kafka_consumer_record_consumed_total kafka_topic, consumer_group Number of messages consumed
kafka_consumer_record_latency_seconds kafka_topic, consumer_group Latency of consuming a message
kafka_consumer_record_omitted_total kafka_topic, consumer_group Number of messages omitted
kafka_consumer_record_error_total kafka_topic, consumer_group Number of errors when consuming a message
kafka_consumergroup_current_message_timestamp kafka_topic, consumer_group, partition, type Timestamp of the current message being processed. Type can be either of LogAppendTime or CreateTime.
kafka_producer_record_send_total kafka_topic Number of messages sent
kafka_producer_dead_letter_created_total kafka_topic Number of messages sent to a dead letter topic
kafka_producer_record_error_total kafka_topic Number of errors when sending a message

To activate the tracing on go-Kafka:

// define your listener
listener, _ := kafka.NewListener(brokers, "my-consumer-group", handlers, kafka.WithInstrumenting())
defer listener.Close()

// Instances a new HTTP server for metrics using prometheus 
go func() {
	httpAddr := ":8080" 
	mux.Handle("/metrics", promhttp.Handler())
	errc <- http.ListenAndServe(httpAddr, mux)
}()

Default configuration

Configuration of consumer/producer is opinionated. It aim to resolve simply problems that have taken us by surprise in the past. For this reason:

  • the default partioner is based on murmur2 instead of the one sarama use by default
  • offset retention is set to 30 days
  • initial offset is oldest

License

go-kafka is licensed under the MIT license. (http://opensource.org/licenses/MIT)

Contributing

Pull requests are the way to help us here. We will be really grateful.

Documentation

Overview

Package kafka copied from https://github.com/burdiyan/kafkautil/blob/master/partitioner.go copied here to ensure this stay.

Index

Constants

View Source
const (
	TimestampTypeLogAppendTime = "LogAppendTime"
	TimestampTypeCreateTime    = "CreateTime"
)
View Source
const InfiniteRetries = -1

InfiniteRetries is a constant to define infinite retries. It is used to set the ConsumerMaxRetries to a blocking retry process.

Variables

View Source
var (
	ErrEventUnretriable = errors.New("the event will not be retried")
	ErrEventOmitted     = errors.New("the event will be omitted")
)
View Source
var Brokers []string

Brokers is the list of Kafka brokers to connect to.

View Source
var Config = sarama.NewConfig()

Config is the sarama (cluster) config used for the consumer and producer.

View Source
var ConsumerMaxRetries = 3

ConsumerMaxRetries is the maximum number of time we want to retry to process an event before throwing the error. By default 3 times.

View Source
var DurationBeforeRetry = 2 * time.Second

DurationBeforeRetry is the duration we wait between process retries. By default 2 seconds.

View Source
var ErrorTopicPattern = "$$CG$$-$$T$$-error"

ErrorTopicPattern is the error topic name pattern. By default "consumergroup-topicname-error" Use $$CG$$ as consumer group placeholder Use $$T$$ as original topic name placeholder

View Source
var PushConsumerErrorsToTopic = true

PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.

Functions

func DefaultTracing

func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)

DefaultTracing implements TracingFunc It fetches opentracing headers from the kafka message headers, then creates a span using the opentracing.GlobalTracer() usage: `listener, err = kafka.NewListener(brokers, appName, handlers, kafka.WithTracing(kafka.DefaultTracing))`

func DeserializeContextFromKafkaHeaders

func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)

DeserializeContextFromKafkaHeaders fetches tracing headers from json encoded carrier and returns the context

func GetContextFromKafkaMessage

func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)

GetContextFromKafkaMessage fetches tracing headers from the kafka message

func GetKafkaHeadersFromContext

func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader

GetKafkaHeadersFromContext fetch tracing metadata from context and returns them in format []RecordHeader

func MurmurHasher

func MurmurHasher() hash.Hash32

MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface. The implementation is not full and does not support streaming. It only implements the interface to comply with sarama.NewCustomHashPartitioner signature. But Sarama only uses Write method once, when writing keys and values of the message, so streaming support is not necessary.

func NewJVMCompatiblePartitioner

func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner

NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.

func SerializeKafkaHeadersFromContext

func SerializeKafkaHeadersFromContext(ctx context.Context) (string, error)

SerializeKafkaHeadersFromContext fetches tracing metadata from context and serialize it into a json map[string]string

Types

type ConsumerMetricsService

type ConsumerMetricsService struct {
	// contains filtered or unexported fields
}

ConsumerMetricsService object represents consumer metrics

func NewConsumerMetricsService

func NewConsumerMetricsService(groupID string) *ConsumerMetricsService

NewConsumerMetricsService creates a layer of service that add metrics capability

func (*ConsumerMetricsService) Instrumentation

func (c *ConsumerMetricsService) Instrumentation(next Handler) Handler

Instrumentation middleware used to add metrics

type Handler

type Handler func(ctx context.Context, msg *sarama.ConsumerMessage) error

Handler that handle received kafka messages

type Handlers

type Handlers map[string]Handler

Handlers defines a handler for a given topic

type Listener

type Listener interface {
	Listen(ctx context.Context) error
	Close()
}

Listener is able to listen multiple topics with one handler by topic

func NewListener

func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (Listener, error)

NewListener creates a new instance of Listener

type ListenerOption

type ListenerOption func(l *listener)

ListenerOption add listener option

func WithInstrumenting

func WithInstrumenting() ListenerOption

WithInstrumenting adds the instrumenting layer on a listener.

func WithTracing

func WithTracing(tracer TracingFunc) ListenerOption

WithTracing accepts a TracingFunc to execute before each message

type Producer

type Producer interface {
	Produce(msg *sarama.ProducerMessage) error
}

func NewProducer

func NewProducer(options ...ProducerOption) (Producer, error)

NewProducer creates a new producer that uses the default sarama client.

type ProducerHandler

type ProducerHandler func(p *producer, msg *sarama.ProducerMessage) error

ProducerHandler is a function that handles the production of a message. It is exposed to allow for easy middleware building.

type ProducerMetricsService

type ProducerMetricsService struct {
	// contains filtered or unexported fields
}

ProducerMetricsService is a service that provides metrics for the producer.

func NewDeadletterProducerMetricsService

func NewDeadletterProducerMetricsService() *ProducerMetricsService

func NewProducerMetricsService

func NewProducerMetricsService() *ProducerMetricsService

func (*ProducerMetricsService) DeadletterInstrumentation

func (p *ProducerMetricsService) DeadletterInstrumentation(next ProducerHandler) ProducerHandler

DeadletterInstrumentation is a middleware that provides metrics for a deadletter producer.

func (*ProducerMetricsService) Instrumentation

func (p *ProducerMetricsService) Instrumentation(next ProducerHandler) ProducerHandler

Instrumentation is a middleware that provides metrics for the producer.

type ProducerOption

type ProducerOption func(p *producer)

ProducerOption is a function that is passed to the producer constructor to configure it.

func WithDeadletterProducerInstrumenting

func WithDeadletterProducerInstrumenting() ProducerOption

WithDeadletterProducerInstrumenting adds the instrumenting layer on a deadletter producer.

func WithProducerInstrumenting

func WithProducerInstrumenting() ProducerOption

WithProducerInstrumenting adds the instrumenting layer on a producer.

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log messages.

var ErrorLogger StdLogger = log.New(os.Stderr, "[Go-Kafka] ", log.LstdFlags)

ErrorLogger is the instance of a StdLogger interface. By default it is set to output on stderr all log messages, but you can set it to redirect wherever you want.

var Logger StdLogger = log.New(ioutil.Discard, "[Go-Kafka] ", log.LstdFlags)

Logger is the instance of a StdLogger interface. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type TracingFunc

type TracingFunc func(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)

TracingFunc is used to create tracing and/or propagate the tracing context from each messages to the go context.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL