kafka

package module
v2.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2023 License: MIT Imports: 18 Imported by: 56

README

dp-kafka

Kafka client wrapper using channels to abstract kafka consumers and producers. This library is built on top of Sarama

Configuration

By default, the library assumes plaintext connections, unless the configuration argument has a non-nil SecurityConfig field.

Life-cycle

Creation

Kafka producers and consumers can be created with constructors that accept the required channels and configuration. You may create the channels using CreateProducerChannels and CreateConsumerChannels respectively:

	// Create Producer with channels and config
	pChannels := kafka.CreateProducerChannels()
	pConfig := &kafka.ProducerConfig{MaxMessageBytes: &cfg.KafkaMaxBytes}
	producer, err := kafka.NewProducer(ctx, cfg.Brokers, cfg.ProducedTopic, pChannels, pConfig)
	// Create ConsumerGroup with channels and config
	cgChannels := kafka.CreateConsumerGroupChannels(cfg.KafkaParallelMessages)
	cgConfig := &kafka.ConsumerGroupConfig{KafkaVersion: &cfg.KafkaVersion}
	cg, err := kafka.NewConsumerGroup(ctx, cfg.Brokers, cfg.ConsumedTopic, cfg.ConsumedGroup, cgChannels, cgConfig)

For consumers, you can specify the batch size that determines the number of messages to be stored in the Upstream channel. It is recommended to provide a batch size equal to the number of parallel messages that are consumed.

You can provide an optional config parameter to the constructor (ProducerConfig and ConsumerGroupConfig). Any provided configuration will overwrite the default sarama config, or you can pass a nil value to use the default sarama config.

The constructor tries to initialise the producer/consumer by creating the underlying Sarama client, but failing to initialise it is not considered a fatal error, hence the constructor will not error.

please, note that if you do not provide the necessary channels, an ErrNoChannel error will be returned by the constructors, which must be considered fatal.

Initialisation

If the producer/consumer can establish a connection with the Kafka cluster, it will be initialised at creation time, which is usually the case. But it might not be able to do so, for example if the kafka cluster is not running. If a producer/consumer is not initialised, it cannot contact the kafka broker, and it cannot send or receive any message. Any attempt to send a message in this state will result in an error being sent to the Errors channel.

An uninitialised producer/consumer will try to initialise later, asynchronously, in a retry loop following an exponential backoff strategy. You may also try to initialise it calling Initialise(). In any case, when the initialisation succeeds, the initialisation loop will exit, and it will start producing/consuming.

You can check if a producer/consumer is initialised by calling IsInitialised() or wait for it to be initialised by waiting for the Ready channel to be closed, like so:

	// wait in a parallel go-routine
	go func() {
		<-channels.Ready
		doStuff()
	}()
	// block until kafka is initialised
	<-channels.Ready
	doStuff()

Waiting for this channel is a convenient hook, but not a necessary requirement.

Message production

Messages are sent to Kafka by sending them to a producer Output channel, as byte arrays:

	// send message
	pChannels.Output <- []byte(msg)
Message consumption

Messages can be consumed by creating an infinite consumption loop. Once a message has finished being processed, you need to call Commit(), so that Sarama releases the go-routine consuming a message and Kafka knows that the message does not need to be delivered again (marks the message, and commits the offset):

// consumer loop
func consume(upstream chan kafka.Message) {
	for {
		msg := <-upstream
		doStuff(msg)
		msg.Commit()
	}
}

You may create a single go-routine to consume messages sequentially, or multiple parallel go-routines (workers) to consume them concurrently:

	// single consume go-routine
	go consume(channels.Upstream)
	// multiple workers to consume messages in parallel
	for w := 1; w <= cfg.KafkaParallelMessages; w++ {
		go consume(channels.Upstream)
	}

You can consume up to as may messages in parallel as partitions are assigned to your consumer, more info in the deep dive section.

Message consumption deep dive

Sarama creates as many go-routines as partitions are assigned to the consumer, for the topic being consumed.

For example, if we have a topic with 60 partitions and we have 2 instances of a service that consumes that topic running at the same time, kafka will assign 30 partitions to each one.

Then Sarama will create 30 parallel go-routines, which this library uses in order to send messages to the upstream channel. Each go-routine waits for the message to finish being processed by waiting for the message-specific upstreamDone channel to be closed, like so:

	channels.Upstream <- msg
	<-msg.upstreamDone

Each Sarama consumption go routine exists only during a particular session. Sessions are periodically destroyed and created by Sarama, according to Kafka events like a cluster re-balance (where the number of partitions assigned to a consumer may change). It is important that messages are released as soon as possible when this happens. The default message consumption timeout is 10 seconds in this scenario (determined by config.Consumer.Group.Session.Timeout).

When a session finishes, we call Consume() again, which tries to establish a new session. If an error occurs trying to establish a new session, it will be retried following an exponential backoff strategy.

Closing

Producers can be closed by calling the Close method.

For graceful handling of Closing consumers, it is advised to use the StopListeningToConsumer method prior to the Close method. This will allow inflight messages to be completed and successfully call commit so that the message does not get replayed once the application restarts.

The Closer channel is used to signal to all the loops that they need to exit because the consumer is being closed.

After successfully closing a producer or consumer, the corresponding Closed channel is closed.

Headers

The headers are key-value pairs that are transparently passed by Kafka between producers and consumers.By default the traceid predefined header will be added to every kafka producer message.There is also the option to add custom headers to kafka by doing the following

// Create Producer with channels and config
pChannels := kafka.CreateProducerChannels()
pConfig := &kafka.ProducerConfig{MaxMessageBytes: &cfg.KafkaMaxBytes}
producer, err := kafka.NewProducer(ctx, cfg.Brokers, cfg.ProducedTopic, pChannels, pConfig)
producer.AddHeader(key, value)

The consumers can then retrieve these headers by the GetHeader api as follows.

// consumer loop
func consume(upstream chan kafka.Message) {
	for {
		msg := <-upstream
		value := msg.GetHeader(key)
		msg.Commit()
	}
}

Health-check

The health status of a consumer or producer can be obtained by calling Checker method, which updates the provided CheckState structure with the relevant information:

check, err = cli.Checker(ctx)
  • If a broker cannot be reached, the Status is set to CRITICAL.
  • If all brokers can be reached, but a broker does not provide the expected topic metadata, the Status is set to WARNING.
  • If all brokers can be reached and return the expected topic metadata, we try to initialise the consumer/producer. If it was already initialised, or the initialisation is successful, the Status is set to OK.

Examples

See the examples below for some typical usages of this library.

Testing

Some mocks are provided, so that you can test your code interactions with this library. More details here.

Documentation

Index

Constants

View Source
const (
	Errors       = "Errors"
	Ready        = "Ready"
	Closer       = "Closer"
	Closed       = "Closed"
	Upstream     = "Upstream"
	UpstreamDone = "UpstreamDone"
	Output       = "Output"
)

channel names

View Source
const (
	OffsetNewest = sarama.OffsetNewest
	OffsetOldest = sarama.OffsetOldest
)

Common constants

View Source
const (
	// ServiceName is the name of this service: Kafka.
	ServiceName = "Kafka"

	// MsgHealthyProducer Check message returned when Kafka producer is healthy.
	MsgHealthyProducer = "kafka producer is healthy"

	// MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy.
	MsgHealthyConsumerGroup = "kafka consumer group is healthy"

	// ProducerMinBrokersHealthy is the minimum number of healthy brokers required for a healthcheck to not be considered critical for a producer
	ProducerMinBrokersHealthy = 2

	// ProducerMinBrokersHealthy is the minimum number of healthy brokers required for a healthcheck to not be considered critical for a consumer
	ConsumerMinBrokersHealthy = 1
)
View Source
const (
	TraceIDHeaderKey = string(request.RequestIdKey)
)

Variables

View Source
var ConsumeErrRetryPeriod = 250 * time.Millisecond

ConsumeErrRetryPeriod is the initial time period between consumer retries on error (for consumer groups)

View Source
var ErrInitSarama = errors.New("failed to initialise client")

ErrInitSarama is used when Sarama client cannot be initialised

View Source
var ErrShutdownTimedOut = errors.New("shutdown context timed out")

ErrShutdownTimedOut represents an error received due to the context deadline being exceeded

View Source
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")

ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded

View Source
var ErrUninitialisedProducer = errors.New("producer is not initialised")

ErrUninitialisedProducer is used when a caller tries to send a message to the output channel with an uninitialised producer.

View Source
var InitRetryPeriod = 250 * time.Millisecond

InitRetryPeriod is the initial time period between initialisation retries (for producers and consumer gropus)

View Source
var MaxRetryInterval = 31 * time.Second

MaxRetryInterval is the maximum time between retries (plus or minus a random amount)

Functions

func GetPrincipal added in v2.3.0

func GetPrincipal(app, subnet, domain string) string

func NewAdmin added in v2.2.1

func NewAdmin(brokerAddrs []string, pConfig *AdminConfig) (sarama.ClusterAdmin, error)

NewAdmin creates an admin-based client

func SetMaxMessageSize

func SetMaxMessageSize(maxSize int32)

SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize

func SetMaxRetryInterval added in v2.4.1

func SetMaxRetryInterval(maxPause time.Duration)

SetMaxRetryInterval sets MaxRetryInterval to its duration argument

Types

type Acls added in v2.3.0

type Acls []*sarama.AclCreation

func (Acls) Apply added in v2.3.0

func (t Acls) Apply(adm sarama.ClusterAdmin) error

type AdminConfig added in v2.3.0

type AdminConfig struct {
	KafkaVersion   *string
	KeepAlive      *time.Duration
	RetryBackoff   *time.Duration
	RetryMax       *int
	SecurityConfig *SecurityConfig
}

AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a Kafka consumer group instance.

func NewConsumerGroup

func NewConsumerGroup(ctx context.Context, brokerAddrs []string, topic, group string,
	channels *ConsumerGroupChannels, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)

NewConsumerGroup creates a new consumer group with the provided parameters

func (*ConsumerGroup) Channels

func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels

Channels returns the ConsumerGroup channels for this consumer group

func (*ConsumerGroup) Checker

func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error

Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)

Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended

func (*ConsumerGroup) Initialise

func (cg *ConsumerGroup) Initialise(ctx context.Context) error

Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.

func (*ConsumerGroup) IsInitialised

func (cg *ConsumerGroup) IsInitialised() bool

IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.

func (*ConsumerGroup) StopListeningToConsumer

func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)

StopListeningToConsumer stops any more messages being consumed off kafka topic

type ConsumerGroupChannels

type ConsumerGroupChannels struct {
	Upstream chan Message
	Errors   chan error
	Ready    chan struct{}
	Closer   chan struct{}
	Closed   chan struct{}
}

ConsumerGroupChannels represents the channels used by ConsumerGroup.

func CreateConsumerGroupChannels

func CreateConsumerGroupChannels(bufferSize int) *ConsumerGroupChannels

CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages)

func (*ConsumerGroupChannels) LogErrors

func (consumerChannels *ConsumerGroupChannels) LogErrors(ctx context.Context, errMsg string)

LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.

func (*ConsumerGroupChannels) Validate

func (consumerChannels *ConsumerGroupChannels) Validate() error

Validate returns ErrNoChannel if any consumer channel is nil

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	KafkaVersion     *string
	KeepAlive        *time.Duration
	RetryBackoff     *time.Duration
	RetryBackoffFunc *func(retries int) time.Duration
	Offset           *int64
	SecurityConfig   *SecurityConfig
}

ConsumerGroupConfig exposes the optional configurable parameters for a consumer group, to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.

type ErrBrokersNotReachable

type ErrBrokersNotReachable struct {
	Addrs []string
}

ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses

func (*ErrBrokersNotReachable) Error

func (e *ErrBrokersNotReachable) Error() string

Error returns the error message with a list of unreachable addresses

type ErrInvalidBrokers

type ErrInvalidBrokers struct {
	Addrs []string
}

ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses

func (*ErrInvalidBrokers) Error

func (e *ErrInvalidBrokers) Error() string

Error returns the error message with a list of broker addresses that returned unexpected responses

type ErrNoChannel

type ErrNoChannel struct {
	ChannelNames []string
}

ErrNoChannel is an Error type generated when a kafka producer or consumer is created with a missing channel

func (*ErrNoChannel) Error

func (e *ErrNoChannel) Error() string

Error returns the error message with a list of missing channels

type HealthInfo added in v2.4.3

type HealthInfo struct {
	Reachable bool
	HasTopic  bool
}

HealthInfo contains the health information for one broker

type HealthInfoMap added in v2.4.3

type HealthInfoMap map[*sarama.Broker]HealthInfo

HealthInfoMap contains the health information for a set of brokers

func (*HealthInfoMap) ErrorMsg added in v2.4.3

func (h *HealthInfoMap) ErrorMsg() string

ErrorMsg returns an tailored message according to the information kept in HealthInfoMap

func (*HealthInfoMap) UpdateStatus added in v2.4.3

func (h *HealthInfoMap) UpdateStatus(state *health.CheckState, minHealthyThreshold int, msgHealthy string) error

UpdateStatus returns the health status string according to the provided minimum number of healthy brokers for the group to be considered healthy. If the health status is OK, the provided msgHealthy will be used as status message.

type IConsumerGroup

type IConsumerGroup interface {
	Channels() *ConsumerGroupChannels
	IsInitialised() bool
	Initialise(ctx context.Context) error
	StopListeningToConsumer(ctx context.Context) (err error)
	Checker(ctx context.Context, state *health.CheckState) error
	Close(ctx context.Context, optFuncs ...OptFunc) (err error)
}

IConsumerGroup is an interface representing a Kafka Consumer Group.

type IProducer

type IProducer interface {
	Channels() *ProducerChannels
	IsInitialised() bool
	Initialise(ctx context.Context) error
	Checker(ctx context.Context, state *health.CheckState) error
	Close(ctx context.Context) (err error)
	AddHeader(key, value string)
}

IProducer is an interface representing a Kafka Producer

type Message

type Message interface {

	// GetData returns the message contents.
	GetData() []byte

	// GetHeader takes a key for the header and returns the value if the key exist in the header.
	GetHeader(key string) string

	// Context returns a context with traceid.
	Context() context.Context

	// Mark marks the message as consumed, but doesn't commit the offset to the backend
	Mark()

	// Commit marks the message as consumed and commits its offset to the backend
	Commit()

	// Release closes the UpstreamDone channel for this message
	Release()

	// CommitAndRelease marks a message as consumed, commits it and closes the UpstreamDone channel
	CommitAndRelease()

	// Offset returns the message offset
	Offset() int64

	// UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed
	UpstreamDone() chan struct{}
}

Message represents a single kafka message.

type OptFunc added in v2.7.0

type OptFunc func()

OptFunc is basically an optional function that is run once the upstream channel is closed and before consumer closer is called. for example , while doing the graceful shutdown you would have received messages which are not processed, like you would have released a message when you receive it from upstream and added to a batch and after a certain time when the batch is processed then only messages are committed back . Now if you don't process this batch during the graceful shutdown this can create a lag in the consumer group.

The optional functions can basically do this for you. During the graceful shutdown you can pass, say for the above case the batch processing to process the unprocessed batch and commit them back to the consumer group while making sure no new messages are received.The optional function is run once the upstream channel is closed to make sure no new messages are received and before the consumer is closed so that the remaining messages can be processed and committed back to the consumer group.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a producer of Kafka messages

func NewProducer

func NewProducer(ctx context.Context, brokerAddrs []string, topic string,
	channels *ProducerChannels, pConfig *ProducerConfig) (producer *Producer, err error)

NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.

func (*Producer) AddHeader added in v2.6.0

func (p *Producer) AddHeader(key, value string)

func (*Producer) Channels

func (p *Producer) Channels() *ProducerChannels

Channels returns the Producer channels for this producer

func (*Producer) Checker

func (p *Producer) Checker(ctx context.Context, state *health.CheckState) error

Checker checks health of Kafka producer and updates the provided CheckState accordingly

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) (err error)

Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended

func (*Producer) Initialise

func (p *Producer) Initialise(ctx context.Context) error

Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.

func (*Producer) IsInitialised

func (p *Producer) IsInitialised() bool

IsInitialised returns true only if Sarama producer has been correctly initialised.

type ProducerChannels

type ProducerChannels struct {
	Output chan []byte
	Errors chan error
	Ready  chan struct{}
	Closer chan struct{}
	Closed chan struct{}
}

ProducerChannels represents the channels used by Producer.

func CreateProducerChannels

func CreateProducerChannels() *ProducerChannels

CreateProducerChannels initialises a ProducerChannels with new channels.

func (*ProducerChannels) LogErrors

func (producerChannels *ProducerChannels) LogErrors(ctx context.Context, errMsg string)

LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.

func (*ProducerChannels) Validate

func (producerChannels *ProducerChannels) Validate() error

Validate returns ErrNoChannel if any producer channel is nil

type ProducerConfig

type ProducerConfig struct {
	KafkaVersion     *string
	MaxMessageBytes  *int
	RetryMax         *int
	KeepAlive        *time.Duration
	RetryBackoff     *time.Duration
	RetryBackoffFunc *func(retries, maxRetries int) time.Duration
	SecurityConfig   *SecurityConfig
}

ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.

type SaramaAsyncProducer

type SaramaAsyncProducer = sarama.AsyncProducer

SaramaAsyncProducer is a wrapper around sarama.AsyncProducer

type SaramaConsumerGroup

type SaramaConsumerGroup = sarama.ConsumerGroup

SaramaConsumerGroup is a wrapper around sarama.ConsumerGroup

type SaramaConsumerGroupClaim

type SaramaConsumerGroupClaim = sarama.ConsumerGroupClaim

SaramaConsumerGroupClaim is a wrapper around sarama.ConsumerGroupClaim

type SaramaConsumerGroupSession

type SaramaConsumerGroupSession = sarama.ConsumerGroupSession

SaramaConsumerGroupSession is a wrapper around sarama.ConsumerGroupSession

type SaramaMessage

type SaramaMessage struct {
	// contains filtered or unexported fields
}

SaramaMessage represents a Sarama specific Kafka message

func (SaramaMessage) Commit

func (M SaramaMessage) Commit()

Commit marks the message as consumed, and then commits the offset to the backend

func (SaramaMessage) CommitAndRelease added in v2.1.0

func (M SaramaMessage) CommitAndRelease()

CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel

func (SaramaMessage) Context added in v2.6.0

func (M SaramaMessage) Context() context.Context

Context returns a context with traceid.

func (SaramaMessage) GetData

func (M SaramaMessage) GetData() []byte

GetData returns the message contents.

func (SaramaMessage) GetHeader added in v2.6.0

func (M SaramaMessage) GetHeader(key string) string

GetHeader takes a key for the header and returns the value if the key exist in the header.

func (SaramaMessage) Mark added in v2.1.0

func (M SaramaMessage) Mark()

Mark marks the message as consumed, but doesn't commit the offset to the backend

func (SaramaMessage) Offset

func (M SaramaMessage) Offset() int64

Offset returns the message offset

func (SaramaMessage) Release added in v2.1.0

func (M SaramaMessage) Release()

Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset

func (SaramaMessage) UpstreamDone

func (M SaramaMessage) UpstreamDone() chan struct{}

UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)

type SecurityConfig added in v2.2.0

type SecurityConfig struct {
	RootCACerts        string
	ClientCert         string
	ClientKey          string
	InsecureSkipVerify bool
}

SecurityConfig is common to producers and consumer configs, above

func GetSecurityConfig added in v2.3.0

func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig

type TopicAuth added in v2.3.0

type TopicAuth struct {
	App        string
	Subnets    []string
	Topic      string
	Operations []sarama.AclOperation
	Hosts      []string
}

func (TopicAuth) GetAcls added in v2.3.0

func (t TopicAuth) GetAcls(domain string) Acls

type TopicAuthList added in v2.3.0

type TopicAuthList struct {
	Domain  string
	Brokers []string
	Acls    []TopicAuth
}

func (TopicAuthList) Apply added in v2.3.0

func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error

Directories

Path Synopsis
Package avro provides a user functionality to return the avro encoding of s.
Package avro provides a user functionality to return the avro encoding of s.
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL