kafka: github.com/optiopay/kafka Index | Examples | Files | Directories

package kafka

import "github.com/optiopay/kafka"

Package kafka a provides high level client API for Apache Kafka.

Use 'Broker' for node connection management, 'Producer' for sending messages, and 'Consumer' for fetching. All those structures implement Client, Consumer and Producer interface, that is also implemented in kafkatest package.

Index

Examples

Package Files

broker.go connection.go distributing_producer.go doc.go log.go multiplexer.go

Constants

const (
    // StartOffsetNewest configures the consumer to fetch messages produced
    // after creating the consumer.
    StartOffsetNewest = -1

    // StartOffsetOldest configures the consumer to fetch starting from the
    // oldest message available.
    StartOffsetOldest = -2
)

Variables

var ErrClosed = errors.New("closed")

ErrClosed is returned as result of any request made using closed connection.

var ErrMxClosed = errors.New("closed")

ErrMxClosed is returned as a result of closed multiplexer consumption.

var (
    // ErrNoData is returned by consumers on Fetch when the retry limit is set
    // and exceeded.
    ErrNoData = errors.New("no data")
)

type BatchConsumer Uses

type BatchConsumer interface {
    ConsumeBatch() ([]*proto.Message, error)
}

BatchConsumer is the interface that wraps the ConsumeBatch method.

ConsumeBatch reads a batch of messages from a consumer, returning an error when encountered.

type Broker Uses

type Broker struct {
    // contains filtered or unexported fields
}

Broker is an abstract connection to kafka cluster, managing connections to all kafka nodes.

func Dial Uses

func Dial(nodeAddresses []string, conf BrokerConf) (*Broker, error)

Dial connects to any node from a given list of kafka addresses and after successful metadata fetch, returns broker.

The returned broker is not initially connected to any kafka node.

func (*Broker) BatchConsumer Uses

func (b *Broker) BatchConsumer(conf ConsumerConf) (BatchConsumer, error)

BatchConsumer creates a new BatchConsumer instance, bound to the broker.

func (*Broker) Close Uses

func (b *Broker) Close()

Close closes the broker and all active kafka nodes connections.

func (*Broker) Consumer Uses

func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error)

Consumer creates a new consumer instance, bound to the broker.

func (*Broker) CreateTopic Uses

func (b *Broker) CreateTopic(topics []proto.TopicInfo, timeout time.Duration, validateOnly bool) (*proto.CreateTopicsResp, error)

CreateTopic request topic creation

func (*Broker) Metadata Uses

func (b *Broker) Metadata() (*proto.MetadataResp, error)

Metadata requests metadata information from any node.

func (*Broker) OffsetCoordinator Uses

func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)

OffsetCoordinator returns offset management coordinator for single consumer group, bound to broker.

func (*Broker) OffsetEarliest Uses

func (b *Broker) OffsetEarliest(topic string, partition int32) (offset int64, err error)

OffsetEarliest returns the oldest offset available on the given partition.

func (*Broker) OffsetLatest Uses

func (b *Broker) OffsetLatest(topic string, partition int32) (offset int64, err error)

OffsetLatest return the offset of the next message produced in given partition

func (*Broker) PartitionCount Uses

func (b *Broker) PartitionCount(topic string) (int32, error)

PartitionCount returns how many partitions a given topic has. If a topic is not known, 0 and an error are returned.

func (*Broker) Producer Uses

func (b *Broker) Producer(conf ProducerConf) Producer

Producer returns new producer instance, bound to the broker.

type BrokerConf Uses

type BrokerConf struct {
    // Kafka client ID.
    ClientID string

    // LeaderRetryLimit limits the number of connection attempts to a single
    // node before failing. Use LeaderRetryWait to control the wait time
    // between retries.
    //
    // Defaults to 10.
    LeaderRetryLimit int

    // LeaderRetryWait sets a limit to the waiting time when trying to connect
    // to a single node after failure.
    //
    // Defaults to 500ms.
    //
    // Timeout on a connection is controlled by the DialTimeout setting.
    LeaderRetryWait time.Duration

    // AllowTopicCreation enables a last-ditch "send produce request" which
    // happens if we do not know about a topic. This enables topic creation
    // if your Kafka cluster is configured to allow it.
    //
    // Defaults to False.
    AllowTopicCreation bool

    // Any new connection dial timeout.
    //
    // Default is 10 seconds.
    DialTimeout time.Duration

    // DialRetryLimit limits the number of connection attempts to every node in
    // cluster before failing. Use DialRetryWait to control the wait time
    // between retries.
    //
    // Defaults to 10.
    DialRetryLimit int

    // DialRetryWait sets a limit to the waiting time when trying to establish
    // broker connection to single node to fetch cluster metadata.
    //
    // Defaults to 500ms.
    DialRetryWait time.Duration

    // ReadTimeout is TCP read timeout
    //
    // Default is 30 seconds
    ReadTimeout time.Duration

    // RetryErrLimit limits the number of retry attempts when an error is
    // encountered.
    //
    // Default is 10.
    RetryErrLimit int

    // RetryErrWait controls the wait duration between retries after failed
    // fetch request.
    //
    // Default is 500ms.
    RetryErrWait time.Duration

    // DEPRECATED 2015-07-10 - use Logger instead
    //
    // TODO(husio) remove
    //
    // Logger used by the broker.
    Log interface {
        Print(...interface{})
        Printf(string, ...interface{})
    }

    // Logger is general logging interface that can be provided by popular
    // logging frameworks. Used to notify and as replacement for stdlib `log`
    // package.
    Logger Logger

    //TLS CA pem
    TLSCa []byte
    //TLS certificate
    TLSCert []byte
    //TLS key
    TLSKey []byte
}

BrokerConf represents the configuration of a broker.

func NewBrokerConf Uses

func NewBrokerConf(clientID string) BrokerConf

NewBrokerConf returns the default broker configuration.

type Client Uses

type Client interface {
    Producer(conf ProducerConf) Producer
    Consumer(conf ConsumerConf) (Consumer, error)
    OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)
    OffsetEarliest(topic string, partition int32) (offset int64, err error)
    OffsetLatest(topic string, partition int32) (offset int64, err error)
    Close()
}

Client is the interface implemented by Broker.

type Consumer Uses

type Consumer interface {
    Consume() (*proto.Message, error)
}

Consumer is the interface that wraps the Consume method.

Consume reads a message from a consumer, returning an error when encountered.

Code:

// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
    panic(err)
}
defer broker.Close()

// create new consumer
conf := NewConsumerConf("my-messages", 0)
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
    panic(err)
}

// read all messages
for {
    msg, err := consumer.Consume()
    if err != nil {
        if err == ErrNoData {
            break
        }
        panic(err)
    }

    fmt.Printf("message: %#v", msg)
}

type ConsumerConf Uses

type ConsumerConf struct {
    // Topic name that should be consumed
    Topic string

    // Partition ID that should be consumed.
    Partition int32

    // RequestTimeout controls fetch request timeout. This operation is
    // blocking the whole connection, so it should always be set to a small
    // value. By default it's set to 50ms.
    // To control fetch function timeout use RetryLimit and RetryWait.
    RequestTimeout time.Duration

    // RetryLimit limits fetching messages a given amount of times before
    // returning ErrNoData error.
    //
    // Default is -1, which turns this limit off.
    RetryLimit int

    // RetryWait controls the duration of wait between fetch request calls,
    // when no data was returned.
    //
    // Default is 50ms.
    RetryWait time.Duration

    // RetryErrLimit limits the number of retry attempts when an error is
    // encountered.
    //
    // Default is 10.
    RetryErrLimit int

    // RetryErrWait controls the wait duration between retries after failed
    // fetch request.
    //
    // Default is 500ms.
    RetryErrWait time.Duration

    // MinFetchSize is the minimum size of messages to fetch in bytes.
    //
    // Default is 1 to fetch any message available.
    MinFetchSize int32

    // MaxFetchSize is the maximum size of data which can be sent by kafka node
    // to consumer.
    //
    // Default is 4MB.
    MaxFetchSize int32

    // Consumer cursor starting point. Set to StartOffsetNewest to receive only
    // newly created messages or StartOffsetOldest to read everything. Assign
    // any offset value to manually set cursor -- consuming starts with the
    // message whose offset is equal to given value (including first message).
    //
    // Default is StartOffsetOldest.
    StartOffset int64

    // Logger used by consumer. By default, reuse logger assigned to broker.
    Logger Logger
}

ConsumerConf represents the configuration of a consumer.

func NewConsumerConf Uses

func NewConsumerConf(topic string, partition int32) ConsumerConf

NewConsumerConf returns the default consumer configuration.

type DistributingProducer Uses

type DistributingProducer interface {
    Distribute(topic string, messages ...*proto.Message) (offset int64, err error)
}

DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.

Distribute writes messages to the given topic, automatically choosing partition, returning the post-commit offset and any error encountered. The offset of each message is also updated accordingly.

func NewHashProducer Uses

func NewHashProducer(p Producer, numPartitions int32) DistributingProducer

NewHashProducer wraps given producer and return DistributingProducer that publish messages to kafka, computing partition number from message key hash, using fnv hash and [0, numPartitions) range.

func NewRandomProducer Uses

func NewRandomProducer(p Producer, numPartitions int32) DistributingProducer

NewRandomProducer wraps given producer and return DistributingProducer that publish messages to kafka, randomly picking partition number from range [0, numPartitions)

func NewRoundRobinProducer Uses

func NewRoundRobinProducer(p Producer, numPartitions int32) DistributingProducer

NewRoundRobinProducer wraps given producer and return DistributingProducer that publish messages to kafka, choosing destination partition from cycle build from [0, numPartitions) range.

type Logger Uses

type Logger interface {
    Debug(msg string, args ...interface{})
    Info(msg string, args ...interface{})
    Warn(msg string, args ...interface{})
    Error(msg string, args ...interface{})
}

Logger is general logging interface that can be provided by popular logging frameworks.

* https://github.com/go-kit/kit/tree/master/log * https://github.com/husio/log

type Mx Uses

type Mx struct {
    // contains filtered or unexported fields
}

Mx is multiplexer combining into single stream number of consumers.

It is responsibility of the user of the multiplexer and the consumer implementation to handle errors. ErrNoData returned by consumer is not passed through by the multiplexer, instead consumer that returned ErrNoData is removed from merged set. When all consumers are removed (set is empty), Mx is automatically closed and any further Consume call will result in ErrMxClosed error.

It is important to remember that because fetch from every consumer is done by separate worker, most of the time there is one message consumed by each worker that is held in memory while waiting for opportunity to return it once Consume on multiplexer is called. Closing multiplexer may result in ignoring some of already read, waiting for delivery messages kept internally by every worker.

func Merge Uses

func Merge(consumers ...Consumer) *Mx

Merge is merging consume result of any number of consumers into single stream and expose them through returned multiplexer.

Code:

// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
    panic(err)
}
defer broker.Close()

topics := []string{"fruits", "vegetables"}
fetchers := make([]Consumer, len(topics))

// create consumers for different topics
for i, topic := range topics {
    conf := NewConsumerConf(topic, 0)
    conf.RetryLimit = 20
    conf.StartOffset = StartOffsetNewest
    consumer, err := broker.Consumer(conf)
    if err != nil {
        panic(err)
    }
    fetchers[i] = consumer
}

// merge all created consumers (they don't even have to belong to the same broker!)
mx := Merge(fetchers...)
defer mx.Close()

// consume messages from all sources
for {
    msg, err := mx.Consume()
    if err != nil {
        panic(err)
    }
    fmt.Printf("message: %#v", msg)
}

func (*Mx) Close Uses

func (p *Mx) Close()

Close is closing multiplexer and stopping all underlying workers.

Closing multiplexer will stop all workers as soon as possible, but any consume-in-progress action performed by worker has to be finished first. Any consumption result received after closing multiplexer is ignored.

Close is returning without waiting for all the workers to finish.

Closing closed multiplexer has no effect.

func (*Mx) Consume Uses

func (p *Mx) Consume() (*proto.Message, error)

Consume returns Consume result from any of the merged consumer.

func (*Mx) Workers Uses

func (p *Mx) Workers() int

Workers return number of active consumer workers that are pushing messages to multiplexer conumer queue.

type OffsetCoordinator Uses

type OffsetCoordinator interface {
    Commit(topic string, partition int32, offset int64) error
    Offset(topic string, partition int32) (offset int64, metadata string, err error)
}

OffsetCoordinator is the interface which wraps the Commit and Offset methods.

Code:

// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
    panic(err)
}
defer broker.Close()

// create offset coordinator and customize configuration
conf := NewOffsetCoordinatorConf("my-consumer-group")
conf.RetryErrLimit = 20
coordinator, err := broker.OffsetCoordinator(conf)
if err != nil {
    panic(err)
}

// write consumed message offset for topic/partition
if err := coordinator.Commit("my-topic", 0, 12); err != nil {
    panic(err)
}

// get latest consumed offset for given topic/partition
off, _, err := coordinator.Offset("my-topic", 0)
if err != nil {
    panic(err)
}

if off != 12 {
    panic(fmt.Sprintf("offset is %d, not 12", off))
}

type OffsetCoordinatorConf Uses

type OffsetCoordinatorConf struct {
    ConsumerGroup string

    // RetryErrLimit limits messages fetch retry upon failure. By default 10.
    RetryErrLimit int

    // RetryErrWait controls wait duration between retries after failed fetch
    // request. By default 500ms.
    RetryErrWait time.Duration

    // Logger used by consumer. By default, reuse logger assigned to broker.
    Logger Logger
}

OffsetCoordinatorConf represents the configuration of an offset coordinator.

func NewOffsetCoordinatorConf Uses

func NewOffsetCoordinatorConf(consumerGroup string) OffsetCoordinatorConf

NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.

type Producer Uses

type Producer interface {
    Produce(topic string, partition int32, messages ...*proto.Message) (offset int64, err error)
}

Producer is the interface that wraps the Produce method.

Produce writes the messages to the given topic and partition. It returns the offset of the first message and any error encountered. The offset of each message is also updated accordingly.

Code:

// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
    panic(err)
}
defer broker.Close()

// create new producer
conf := NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal

// write two messages to kafka using single call to make it atomic
producer := broker.Producer(conf)
messages := []*proto.Message{
    {Value: []byte("first")},
    {Value: []byte("second")},
}
if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
    panic(err)
}

type ProducerConf Uses

type ProducerConf struct {
    // Compression method to use, defaulting to proto.CompressionNone.
    Compression proto.Compression

    // Message ACK configuration. Use proto.RequiredAcksAll to require all
    // servers to write, proto.RequiredAcksLocal to wait only for leader node
    // answer or proto.RequiredAcksNone to not wait for any response.
    // Setting this to any other, greater than zero value will make producer to
    // wait for given number of servers to confirm write before returning.
    RequiredAcks int16

    // Timeout of single produce request. By default, 5 seconds.
    RequestTimeout time.Duration

    // RetryLimit specify how many times message producing should be retried in
    // case of failure, before returning the error to the caller. By default
    // set to 10.
    RetryLimit int

    // RetryWait specify wait duration before produce retry after failure. By
    // default set to 200ms.
    RetryWait time.Duration

    // Logger used by producer. By default, reuse logger assigned to broker.
    Logger Logger
}

ProducerConf represents the configuration of a producer.

func NewProducerConf Uses

func NewProducerConf() ProducerConf

NewProducerConf returns a default producer configuration.

Directories

PathSynopsis
integration
kafkatestPackage kafkatest provides mock objects for high level kafka interface.
protoPackage proto provides kafka binary protocol implementation.

Package kafka imports 15 packages (graph) and is imported by 36 packages. Updated 2019-05-23. Refresh now. Tools for package owners.