kafka

package module
v4.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2023 License: MIT Imports: 14 Imported by: 7

README

kafka-client-go

CircleCI Go Report Card Coverage Status

Description

Library for producing and consuming messages directly from Kafka.

The library is NOT using Zookeeper to connect to Kafka under the hood.

Usage

Importing:

    import "github.com/Financial-Times/kafka-client-go/v4"
Producer

Creating a producer:

    config := kafka.ProducerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        Topic:                   "", // Topic to publish to 
    }
	
    producer, err := kafka.NewProducer(config) 
    // Error handling

Failing to establish a connection to Kafka will result in an error.

Sending a message:

    headers := map[string]string{}
    body := ""
    message := kafka.NewFTMessage(headers, body)
    
    err := producer.SendMessage(message)
    // Error handling

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := producer.ConnectivityCheck()
   // Error handling

Connections should be closed by the client:

    err := producer.Close()
    // Error handling
Consumer

Creating a consumer:

    config := kafka.ConsumerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        ConsumerGroup:           "", // Unique name of a consumer group
    }

    topics := []*kafka.Topic{
        kafka.NewTopic(""),                             // Topic to consume from
        kafka.NewTopic("", kafka.WithLagTolerance(50)), // Topic to consume from with custom lag tolerance used for monitoring
    }
	
    logger := logger.NewUPPLogger(...)

    consumer, err := kafka.NewConsumer(config, topics, logger)
    // Error handling

Failing to establish a connection to Kafka will result in an error.

Consuming messages:

    handler := func(message kafka.FTMessage) {
        // Message handling
    }
    
    consumer.Start(handler)

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := consumer.ConnectivityCheck()
   // Error handling

The health of the consumer process is also being monitored and its status can be accessed:

   err := consumer.MonitorCheck()
   // Error handling

Connections should be closed by the client:

    err := consumer.Close()
    // Error handling

Testing

    go test --race -v ./...

NB: Some tests in this project require a local Kafka (port 29092). Use the -short flag in order to omit those.

Documentation

Index

Constants

View Source
const LagTechnicalSummary string = "Messages awaiting handling exceed the configured lag tolerance. Check if Kafka consumer is stuck."

LagTechnicalSummary is used as technical summary in consumer monitoring healthchecks.

Variables

View Source
var ErrConnectivityTimedOut = fmt.Errorf("kafka connectivity timed out")
View Source
var ErrUnknownConsumerStatus = fmt.Errorf("consumer status is unknown")

Functions

func DefaultConsumerOptions

func DefaultConsumerOptions() *sarama.Config

DefaultConsumerOptions returns a new sarama configuration with predefined default settings.

func DefaultProducerOptions

func DefaultProducerOptions() *sarama.Config

DefaultProducerOptions creates a new Sarama producer configuration with default values.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config ConsumerConfig, topics []*Topic, log *logger.UPPLogger) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the connection to Kafka.

func (*Consumer) ConnectivityCheck

func (c *Consumer) ConnectivityCheck() error

ConnectivityCheck checks whether a connection to Kafka can be established.

func (*Consumer) MonitorCheck

func (c *Consumer) MonitorCheck() error

MonitorCheck checks whether the consumer group is lagging behind when reading messages.

func (*Consumer) Start

func (c *Consumer) Start(messageHandler func(message FTMessage))

Start will start the message consumption and consumer monitoring processes.

Each message will be handled using the provided handler.

The consumer monitoring process is using a separate Kafka connection and will:

  1. Request the offsets for a topic and the respective claimed partitions on a given time interval from the Kafka broker;
  2. Deduce the message lag by subtracting the last committed consumer group offset from the next topic offset;
  3. Store the partition lag if such is present;
  4. Report a status error on MonitorCheck() calls.

Close() calls will terminate both the message consumption and the consumer monitoring processes.

type ConsumerConfig

type ConsumerConfig struct {
	ClusterArn              *string
	BrokersConnectionString string
	ConsumerGroup           string
	// Time interval between each offset fetching request.
	// Default value (3 minutes) would be used if not set or exceeds 10 minutes.
	OffsetFetchInterval time.Duration
	// Total count of offset fetching request failures until consumer status is marked as unknown.
	// Default value (5) would be used if not set or exceeds 10.
	// Note: A single failure will result in follow-up requests to be sent on
	// shorter interval than the value of OffsetFetchInterval until successful.
	OffsetFetchMaxFailureCount int
	// Whether to disable the automatic reset of the sarama.ClusterAdmin
	// monitoring connection upon exceeding the OffsetFetchMaxFailureCount threshold.
	// Default value is false.
	// Note: Resetting the connection is necessary for the current version of Sarama (1.37.2)
	// due to numerous issues originating from the library. This flag is currently only used in tests.
	DisableMonitoringConnectionReset bool
	Options                          *sarama.Config
}

type FTMessage

type FTMessage struct {
	Headers map[string]string
	Body    string
	Topic   string
}

func NewFTMessage

func NewFTMessage(headers map[string]string, body string) FTMessage

func (*FTMessage) Build

func (m *FTMessage) Build() string

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config ProducerConfig) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close() error

Close closes the connection to Kafka.

func (*Producer) ConnectivityCheck

func (p *Producer) ConnectivityCheck() error

ConnectivityCheck checks whether a connection to Kafka can be established.

func (*Producer) SendMessage

func (p *Producer) SendMessage(message FTMessage) error

SendMessage publishes a message to Kafka.

type ProducerConfig

type ProducerConfig struct {
	ClusterArn              *string
	BrokersConnectionString string
	Topic                   string
	Options                 *sarama.Config
}

type Topic

type Topic struct {
	Name string
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name string, opts ...TopicOption) *Topic

type TopicOption

type TopicOption func(topic *Topic)

func WithLagTolerance

func WithLagTolerance(tolerance int64) TopicOption

WithLagTolerance sets custom lag tolerance threshold used for monitoring. Consumer lagging behind with more messages than the configured tolerance will be reported as unhealthy. Default is 500 messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL