kafka

package module
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2020 License: MIT Imports: 10 Imported by: 3

README

dp-kafka

Kafka client wrapper using channels to abstract kafka consumers and producers.

Life-cycle

Creation

Kafka producers and consumers can be created with constructors that accept the required channels and configuration. You may create the channels using CreateProducerChannels and CreateConsumerChannels respectively.

Example: create a kafka producer

pChannels := kafka.CreateProducerChannels()
producer, err := kafka.NewProducer(
	ctx, cfg.Brokers, cfg.ProducedTopic, cfg.KafkaMaxBytes, pChannels)

Example: create a kafka consumer

cgChannels := kafka.CreateConsumerGroupChannels(cfg.KafkaSync)
consumer, err := kafka.NewConsumerGroup(
	ctx, cfg.Brokers, cfg.ConsumedTopic, cfg.ConsumedGroup, kafka.OffsetNewest, cfg.KafkaSync, cgChannels)

For consumers, is recommended to use sync=true - where, when you have read a message from Incoming(), the listener for messages will block (and not read the next message from kafka) until you signal that the message has been consumed (typically with CommitAndRelease(msg)). Otherwise, if the application gets shutdown (e.g. interrupt signal), and has to be shutdown, the consumer may not be shutdown in a timely manner (because it is blocked sending the read message to Incoming()).

please, note that if you do not provide the necessary channels, an ErrNoChannel error will be returned by the constructors, which must be considered fatal.

The constructor tires to initialise the producer/consumer by creating the underlying client. If the initialisation fails, a non-fatal error is returned; you can try to initialise it again later.

Initialisation

A producer/consumer might not have been successfully initialised at creation time. If this is the case, you can always try to initialise it by calling Initialise. To validate the initialisation state, please call IsInitialised.

If a producer/consumer is not initialised, it cannot contact the kafka broker.

An uninitialised kafka producer cannot send messages, and any attempt to do so will result in an error being sent to the Errors channel.

An uninitialised kafka consumer group will not receive any message.

When a producer/consumer is successfully initialised, it will close the channel Init. You can trigger some event on kafka initialisation by waiting for the channel to be closed. For example:

	go func() {
		<-channels.Init
		doKafkaStuff()
	}()

Waiting for this channel is a convenient hook, but not a necessary requirement. The other channels will send/receive data when Sarama is initialised in any case.

Closing

Producers can be closed calling the Close method.

For graceful handling of Closing consumers, it is advised to use the StopListeningToConsumer method prior to the Close method. This will allow inflight messages to be completed and successfully call commit so that the message does not get replayed once the application restarts.

After successfully closing a producer or consumer, the corresponding Closed channel is closed.

Health-check

The health status of a consumer or producer can be obtained by calling Checker method, which updates the provided CheckState structure with the relevant information:

check, err = cli.Checker(ctx)
  • If a broker cannot be reached, the Status is set to CRITICAL.
  • If all brokers can be reached, but a broker does not provide the expected topic metadata, the Status is set to WARNING.
  • If all brokers can be reached and return the expected topic metadata, we try to initialise the consumer/producer. If it was already initialised, or the initialisation is successful, the Status is set to OK.

Example

See the example source file for a typical usage.

Testing

Some mocks are provided, so that you can test your code interactions with this library. More details here.

Documentation

Index

Constants

View Source
const (
	Errors       = "Errors"
	Init         = "Init"
	Closer       = "Closer"
	Closed       = "Closed"
	Upstream     = "Upstream"
	UpstreamDone = "UpstreamDone"
	Output       = "Output"
)

channel names

View Source
const (
	OffsetNewest = sarama.OffsetNewest
	OffsetOldest = sarama.OffsetOldest
)
View Source
const MsgHealthyConsumerGroup = "kafka consumer group is healthy"

MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy.

View Source
const MsgHealthyProducer = "kafka producer is healthy"

MsgHealthyProducer Check message returned when Kafka producer is healthy.

View Source
const ServiceName = "Kafka"

ServiceName is the name of this service: Kafka.

Variables

View Source
var ErrInitSarama = errors.New("Failed to initialise client")

ErrInitSarama is used when Sarama client cannot be initialised

View Source
var ErrShutdownTimedOut = errors.New("Shutdown context timed out")

ErrShutdownTimedOut represents an error received due to the context deadline being exceeded

View Source
var ErrUninitialisedProducer = errors.New("Producer is not initialised")

ErrUninitialisedProducer is used when a caller tries to send a message to the output channel with an uninitialised producer.

Functions

func SetMaxMessageSize

func SetMaxMessageSize(maxSize int32)

Types

type AsyncProducer

type AsyncProducer = sarama.AsyncProducer

AsyncProducer is a wrapper around sarama.AsyncProducer

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a Kafka consumer group instance.

func NewConsumerGroup

func NewConsumerGroup(
	ctx context.Context, brokerAddrs []string, topic string, group string, offset int64, sync bool,
	channels *ConsumerGroupChannels) (*ConsumerGroup, error)

NewConsumerGroup returns a new consumer group using default configuration and provided channels

func NewConsumerWithClusterClient

func NewConsumerWithClusterClient(
	ctx context.Context, brokerAddrs []string, topic string, group string, offset int64, syncConsumer bool,
	channels *ConsumerGroupChannels, cli SaramaCluster) (cg *ConsumerGroup, err error)

NewConsumerWithClusterClient returns a new consumer group with the provided sarama cluster client

func (*ConsumerGroup) Channels added in v1.1.0

func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels

Channels returns the ConsumerGroup channels for this consumer group

func (*ConsumerGroup) Checker

func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error

Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close(ctx context.Context) (err error)

Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended

func (*ConsumerGroup) CommitAndRelease

func (cg *ConsumerGroup) CommitAndRelease(msg Message)

CommitAndRelease commits the consumed message and release the consumer listener to read another message

func (*ConsumerGroup) Initialise added in v1.1.0

func (cg *ConsumerGroup) Initialise(ctx context.Context) error

Initialise creates a new Sarama Consumer and the channel redirection, only if it was not already initialised.

func (*ConsumerGroup) IsInitialised

func (cg *ConsumerGroup) IsInitialised() bool

IsInitialised returns true only if Sarama consumer has been correctly initialised.

func (*ConsumerGroup) Release

func (cg *ConsumerGroup) Release()

Release signals that upstream has completed an incoming message i.e. move on to read the next message

func (*ConsumerGroup) StopListeningToConsumer

func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)

StopListeningToConsumer stops any more messages being consumed off kafka topic

type ConsumerGroupChannels

type ConsumerGroupChannels struct {
	Upstream     chan Message
	Errors       chan error
	Init         chan struct{}
	Closer       chan struct{}
	Closed       chan struct{}
	UpstreamDone chan bool
}

ConsumerGroupChannels represents the channels used by ConsumerGroup.

func CreateConsumerGroupChannels

func CreateConsumerGroupChannels(sync bool) *ConsumerGroupChannels

CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels according to sync

func (*ConsumerGroupChannels) LogErrors added in v1.1.0

func (consumerChannels *ConsumerGroupChannels) LogErrors(ctx context.Context, errMsg string)

LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.

func (*ConsumerGroupChannels) Validate

func (consumerChannels *ConsumerGroupChannels) Validate() error

Validate returns ErrNoChannel if any consumer channel is nil

type ErrBrokersNotReachable

type ErrBrokersNotReachable struct {
	Addrs []string
}

ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses

func (*ErrBrokersNotReachable) Error

func (e *ErrBrokersNotReachable) Error() string

Error returns the error message with a list of unreachable addresses

type ErrInvalidBrokers

type ErrInvalidBrokers struct {
	Addrs []string
}

ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses

func (*ErrInvalidBrokers) Error

func (e *ErrInvalidBrokers) Error() string

Error returns the error message with a list of broker addresses that returned unexpected responses

type ErrNoChannel

type ErrNoChannel struct {
	ChannelNames []string
}

ErrNoChannel is an Error type generated when a kafka producer or consumer is created with a missing channel

func (*ErrNoChannel) Error

func (e *ErrNoChannel) Error() string

Error returns the error message with a list of missing channels

type IConsumerGroup added in v1.1.3

type IConsumerGroup interface {
	Channels() *ConsumerGroupChannels
	IsInitialised() bool
	Initialise(ctx context.Context) error
	Release()
	CommitAndRelease(msg Message)
	StopListeningToConsumer(ctx context.Context) (err error)
	Checker(ctx context.Context, state *health.CheckState) error
	Close(ctx context.Context) (err error)
}

IConsumerGroup is an interface representing a Kafka Consumer Group.

type IProducer added in v1.1.3

type IProducer interface {
	Channels() *ProducerChannels
	IsInitialised() bool
	Initialise(ctx context.Context) error
	Checker(ctx context.Context, state *health.CheckState) error
	Close(ctx context.Context) (err error)
}

IProducer is an interface representing a Kafka Producer

type Message

type Message interface {

	// GetData returns the message contents.
	GetData() []byte

	// Commit the message's offset.
	Commit()

	// Offset returns the message offset
	Offset() int64
}

Message represents a single kafka message.

type MessageConsumer

type MessageConsumer interface {
	Incoming() chan Message
	Closer() chan bool
	Errors() chan error
}

MessageConsumer provides a generic interface for consuming []byte messages

type MessageProducer

type MessageProducer interface {
	Output() chan []byte
	Closer() chan bool
	Errors() chan error
}

MessageProducer provides a generic interface for producing []byte messages

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a producer of Kafka messages

func NewProducer

func NewProducer(
	ctx context.Context, brokerAddrs []string, topic string, envMax int, channels *ProducerChannels) (*Producer, error)

NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.

func NewProducerWithSaramaClient

func NewProducerWithSaramaClient(
	ctx context.Context, brokerAddrs []string, topic string, envMax int,
	channels *ProducerChannels, cli Sarama) (producer *Producer, err error)

NewProducerWithSaramaClient returns a new producer with a provided Sarama client

func (*Producer) Channels added in v1.1.0

func (p *Producer) Channels() *ProducerChannels

Channels returns the Producer channels for this producer

func (*Producer) Checker

func (p *Producer) Checker(ctx context.Context, state *health.CheckState) error

Checker checks health of Kafka producer and updates the provided CheckState accordingly

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) (err error)

Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended

func (*Producer) Initialise added in v1.1.0

func (p *Producer) Initialise(ctx context.Context) error

Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.

func (*Producer) IsInitialised

func (p *Producer) IsInitialised() bool

IsInitialised returns true only if Sarama producer has been correctly initialised.

type ProducerChannels

type ProducerChannels struct {
	Output chan []byte
	Errors chan error
	Init   chan struct{}
	Closer chan struct{}
	Closed chan struct{}
}

ProducerChannels represents the channels used by Producer.

func CreateProducerChannels

func CreateProducerChannels() *ProducerChannels

CreateProducerChannels initialises a ProducerChannels with new channels.

func (*ProducerChannels) LogErrors added in v1.1.0

func (producerChannels *ProducerChannels) LogErrors(ctx context.Context, errMsg string)

LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.

func (*ProducerChannels) Validate

func (producerChannels *ProducerChannels) Validate() error

Validate returns ErrNoChannel if any producer channel is nil

type Sarama

type Sarama interface {
	NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
}

Sarama is an interface representing the Sarama library.

type SaramaClient

type SaramaClient struct{}

SaramaClient implements Sarama interface and wraps the real calls to Sarama library.

func (*SaramaClient) NewAsyncProducer

func (s *SaramaClient) NewAsyncProducer(addrs []string, conf *sarama.Config) (AsyncProducer, error)

NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration.

type SaramaCluster

type SaramaCluster interface {
	NewConsumer(addrs []string, groupID string, topics []string, config *cluster.Config) (SaramaClusterConsumer, error)
}

SaramaCluster is an interface representing the bsm sarama-cluster library.

type SaramaClusterClient

type SaramaClusterClient struct{}

SaramaClusterClient implements SaramaCluster interface and wraps the real calls to bsm sarama-cluster library.

func (*SaramaClusterClient) NewConsumer

func (c *SaramaClusterClient) NewConsumer(addrs []string, groupID string, topics []string, config *cluster.Config) (SaramaClusterConsumer, error)

NewConsumer creates a new sarama cluster consumer.

type SaramaClusterConsumer

type SaramaClusterConsumer interface {
	Close() (err error)
	Messages() <-chan *sarama.ConsumerMessage
	CommitOffsets() error
	Errors() <-chan error
	Notifications() <-chan *cluster.Notification
	MarkOffset(msg *sarama.ConsumerMessage, metadata string)
}

SaramaClusterConsumer is an interface representing the bsm sarama-cluster Consumer struct

type SaramaMessage

type SaramaMessage struct {
	// contains filtered or unexported fields
}

SaramaMessage represents a Sarama specific Kafka message

func (SaramaMessage) Commit

func (M SaramaMessage) Commit()

Commit the message's offset.

func (SaramaMessage) GetData

func (M SaramaMessage) GetData() []byte

GetData returns the message contents.

func (SaramaMessage) Offset

func (M SaramaMessage) Offset() int64

Offset returns the message offset

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL