gonsumer

package module
v0.0.0-...-ae462fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2016 License: Apache-2.0 Imports: 9 Imported by: 1

README

Gonsumer

GoDoc Build Status Go Report Card Coverage Status

  1. Installation
  2. Examples

Installation

Go 1.4 or higher is required (Get Golang)

# go get github.com/serejja/gonsumer

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ByteDecoder = func(bytes []byte) (interface{}, error) {
	return bytes, nil
}

ByteDecoder is a default decoder implementation that does nothing and just returns the input untouched. Never returns an error.

View Source
var ErrMetricsDisabled = errors.New("Metrics are disabled. Use ConsumerConfig.EnableMetrics to enable")

ErrMetricsDisabled is used when trying to get consumer metrics while they are disabled.

View Source
var ErrPartitionConsumerAlreadyExists = errors.New("Partition consumer already exists")

ErrPartitionConsumerAlreadyExists is used when trying to add a topic/partition that is already added to the given Consumer.

View Source
var ErrPartitionConsumerDoesNotExist = errors.New("Partition consumer does not exist")

ErrPartitionConsumerDoesNotExist is used when trying to perform any action on a topic/partition that is not owned by the given Consumer.

View Source
var StringDecoder = func(bytes []byte) (interface{}, error) {
	return string(bytes), nil
}

StringDecoder converts the given bytes into a string. Never returns an error.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Fetch is responsible for fetching messages for given topic, partition and offset from Kafka broker.
	// Leader change handling happens inside Fetch and is hidden from user so he should not handle such cases.
	// Returns a fetch response and error if it occurred.
	Fetch(topic string, partition int32, offset int64) (*client.FetchResponse, error)

	// GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
	// Returns an offet for given topic, partition and offset time and an error if it occurs.
	GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)

	// GetOffset gets the latest committed offset for a given group, topic and partition from Kafka.
	// Returns an offset and an error if it occurs.
	GetOffset(group string, topic string, partition int32) (int64, error)

	// CommitOffset commits the given offset for a given group, topic and partition to Kafka.
	// Returns an error if commit was unsuccessful.
	CommitOffset(group string, topic string, partition int32, offset int64) error
}

Client is an interface responsible for low-level Kafka interaction. The only supported implmentation now is github.com/serejja/kafka-client. One other implementation, MockClient, is used for testing purposes.

type Consumer

type Consumer interface {
	// Add adds a topic/partition to consume for this consumer and starts consuming it immediately.
	// Returns an error if PartitionConsumer for this topic/partition already exists.
	Add(topic string, partition int32) error

	// Remove stops consuming a topic/partition by this consumer once it is done with the current batch.
	// This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work.
	// Returns an error if PartitionConsumer for this topic/partition does not exist.
	Remove(topic string, partition int32) error

	// Assignment returns a map of topic/partitions being consumed at the moment by this consumer.
	// The keys are topic names and values are slices of partitions.
	Assignment() map[string][]int32

	// Offset returns the current consuming offset for a given topic/partition.
	// Please note that this value does not correspond to the latest committed offset but the latest fetched offset.
	// This call will return an error if the PartitionConsumer for given topic/partition does not exist.
	Offset(topic string, partition int32) (int64, error)

	// Commit commits the given offset for a given topic/partition to Kafka.
	// Returns an error if the commit was unsuccessful.
	Commit(topic string, partition int32, offset int64) error

	// SetOffset overrides the current fetch offset value for given topic/partition.
	// This does not commit offset but allows you to move back and forth throughout the partition.
	// Returns an error if the PartitionConsumer for this topic/partition does not exist.
	SetOffset(topic string, partition int32, offset int64) error

	// Lag returns the difference between the latest available offset in the partition and the
	// latest fetched offset by this consumer. This allows you to see how much behind the consumer is.
	// Returns lag value for a given topic/partition and an error if the PartitionConsumer for given
	// topic/partition does not exist.
	Lag(topic string, partition int32) (int64, error)

	// Stop stops consuming all topics and partitions with this consumer.
	Stop()

	// AwaitTermination blocks until Stop() is called.
	AwaitTermination()

	// ConsumerMetrics returns a metrics structure for this consumer. An error is returned if metrics are disabled.
	ConsumerMetrics() (ConsumerMetrics, error)

	// PartitionConsumerMetrics returns a metrics structure for a given topic and partition. An error is returned
	// if metrics are disabled or PartitionConsumer for given topic and partition does not exist
	PartitionConsumerMetrics(topic string, partition int32) (PartitionConsumerMetrics, error)

	// AllMetrics returns metrics registries for this consumer and all its PartitionConsumers. An error is returned
	// if metrics are disabled.
	AllMetrics() (*Metrics, error)

	// Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0.
	Join()
}

Consumer is essentially a collection of PartitionConsumers and exposes nearly the same API but on a bit higher level. Consumer is something similar to JVM High Level Consumer except the load balancing functionality is not implemented here thus allowing the Consumer to be independent from Zookeeper.

func New

func New(client Client, config *ConsumerConfig, strategy Strategy) Consumer

New creates a new Consumer using the given client and config. The message processing logic is passed via strategy.

type ConsumerConfig

type ConsumerConfig struct {
	// Group is a string that uniquely identifies a set of consumers within the same consumer group.
	Group string

	// ConsumerID is a string that uniquely identifies a consumer within a consumer group.
	// Defaults to a random UUID.
	ConsumerID string

	// KeyDecoder is a function that turns plain bytes into a decoded message key.
	KeyDecoder Decoder

	// ValueDecoder is a function that turns plain bytes into a decoded message value.
	ValueDecoder Decoder

	// AutoOffsetReset defines what to do when there is no committed offset or committed offset is out of range.
	// kafka-client.EarliestTime - automatically reset the offset to the smallest offset.
	// kafka-client.LatestTime - automatically reset the offset to the largest offset.
	// Defaults to kafka-client.EarliestTime.
	AutoOffsetReset int64

	// AutoCommitEnable determines whether the consumer will automatically commit offsets after each batch
	// is finished (e.g. the call to strategy function returns). Turned off by default.
	AutoCommitEnable bool

	// EnableMetrics determines whether the consumer will collect all kinds of metrics to better understand what's
	// going on under the hood. Turned off by default as it may significantly affect performance.
	EnableMetrics bool

	// Backoff between attempts to initialize consumer offset.
	InitOffsetBackoff time.Duration
}

ConsumerConfig provides configuration options for both Consumer and PartitionConsumer.

func NewConfig

func NewConfig() *ConsumerConfig

NewConfig creates a consumer config with sane defaults.

type ConsumerMetrics

type ConsumerMetrics interface {
	// NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by
	// enclosing Consumer.
	NumOwnedTopicPartitions(func(metrics.Counter))

	// Registry provides access to metrics registry for enclosing Consumer.
	Registry() metrics.Registry

	// Stop unregisters all metrics from the registry.
	Stop()
}

ConsumerMetrics is an interface for accessing and modifying Consumer metrics.

type Decoder

type Decoder func(raw []byte) (interface{}, error)

Decoder serves to decode given raw message bytes to something meaningful. Returns an error if fails to decoder given bytes.

type FetchData

type FetchData struct {
	// Messages is a slice of actually fetched messages from Kafka.
	Messages []*MessageAndMetadata

	// HighwaterMarkOffset is an offset in the end of topic/partition this FetchData comes from.
	HighwaterMarkOffset int64

	// Error is an error that occurs both on fetch level or topic/partition level.
	Error error
}

FetchData is a slightly processed FetchResponse to be more user-friendly to use.

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

KafkaConsumer implements Consumer and is something similar to JVM High Level Consumer except the load balancing functionality is not implemented here thus allowing to be independent from Zookeeper.

func (*KafkaConsumer) Add

func (c *KafkaConsumer) Add(topic string, partition int32) error

Add adds a topic/partition to consume for this consumer and starts consuming it immediately. Returns an error if PartitionConsumer for this topic/partition already exists.

func (*KafkaConsumer) AllMetrics

func (c *KafkaConsumer) AllMetrics() (*Metrics, error)

AllMetrics returns metrics registries for this consumer and all its PartitionConsumers. An error is returned if metrics are disabled.

func (*KafkaConsumer) Assignment

func (c *KafkaConsumer) Assignment() map[string][]int32

Assignment returns a map of topic/partitions being consumer at the moment by this consumer. The keys are topic names and values are slices of partitions.

func (*KafkaConsumer) AwaitTermination

func (c *KafkaConsumer) AwaitTermination()

AwaitTermination blocks until Stop() is called.

func (*KafkaConsumer) Commit

func (c *KafkaConsumer) Commit(topic string, partition int32, offset int64) error

Commit commits the given offset for a given topic/partition to Kafka. Returns an error if the commit was unsuccessful.

func (*KafkaConsumer) ConsumerMetrics

func (c *KafkaConsumer) ConsumerMetrics() (ConsumerMetrics, error)

ConsumerMetrics returns a metrics structure for this consumer. An error is returned if metrics are disabled.

func (*KafkaConsumer) Join

func (c *KafkaConsumer) Join()

Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0.

func (*KafkaConsumer) Lag

func (c *KafkaConsumer) Lag(topic string, partition int32) (int64, error)

Lag returns the difference between the latest available offset in the partition and the latest fetched offset by this consumer. This allows you to see how much behind the consumer is. Returns lag value for a given topic/partition and an error if the PartitionConsumer for given topic/partition does not exist.

func (*KafkaConsumer) Offset

func (c *KafkaConsumer) Offset(topic string, partition int32) (int64, error)

Offset returns the current consuming offset for a given topic/partition. Please note that this value does not correspond to the latest committed offset but the latest fetched offset. This call will return an error if the PartitionConsumer for given topic/partition does not exist.

func (*KafkaConsumer) PartitionConsumerMetrics

func (c *KafkaConsumer) PartitionConsumerMetrics(topic string, partition int32) (PartitionConsumerMetrics, error)

PartitionConsumerMetrics returns a metrics structure for a given topic and partition. An error is returned if metrics are disabled or PartitionConsumer for given topic and partition does not exist

func (*KafkaConsumer) Remove

func (c *KafkaConsumer) Remove(topic string, partition int32) error

Remove stops consuming a topic/partition by this consumer once it is done with the current batch. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work. Returns an error if PartitionConsumer for this topic/partition does not exist.

func (*KafkaConsumer) SetOffset

func (c *KafkaConsumer) SetOffset(topic string, partition int32, offset int64) error

SetOffset overrides the current fetch offset value for given topic/partition. This does not commit offset but allows you to move back and forth throughout the partition. Returns an error if the PartitionConsumer for this topic/partition does not exist.

func (*KafkaConsumer) Stop

func (c *KafkaConsumer) Stop()

Stop stops consuming all topics and partitions with this consumer.

type KafkaConsumerMetrics

type KafkaConsumerMetrics struct {
	// contains filtered or unexported fields
}

KafkaConsumerMetrics implements ConsumerMetrics and is used when ConsumerConfig.EnableMetrics is set to true.

func NewKafkaConsumerMetrics

func NewKafkaConsumerMetrics(groupID string, consumerID string) *KafkaConsumerMetrics

NewKafkaConsumerMetrics creates new KafkaConsumerMetrics for a given consumer group.

func (*KafkaConsumerMetrics) NumOwnedTopicPartitions

func (cm *KafkaConsumerMetrics) NumOwnedTopicPartitions(f func(metrics.Counter))

NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by enclosing Consumer.

func (*KafkaConsumerMetrics) Registry

func (cm *KafkaConsumerMetrics) Registry() metrics.Registry

Registry provides access to metrics registry for enclosing Consumer.

func (*KafkaConsumerMetrics) Stop

func (cm *KafkaConsumerMetrics) Stop()

Stop unregisters all metrics from the registry.

type KafkaPartitionConsumer

type KafkaPartitionConsumer struct {
	// contains filtered or unexported fields
}

KafkaPartitionConsumer serves to consume exactly one topic/partition from Kafka. This is very similar to JVM SimpleConsumer except the PartitionConsumer is able to handle leader changes and supports committing offsets to Kafka via kafka-client.

func (*KafkaPartitionConsumer) Commit

func (pc *KafkaPartitionConsumer) Commit(offset int64) error

Commit commits the given offset to Kafka. Returns an error on unsuccessful commit.

func (*KafkaPartitionConsumer) Lag

func (pc *KafkaPartitionConsumer) Lag() int64

Lag returns the difference between the latest available offset in the partition and the latest fetched offset by this consumer. This allows you to see how much behind the consumer is.

func (*KafkaPartitionConsumer) Metrics

Metrics returns a metrics structure for this partition consumer. An error is returned if metrics are disabled.

func (*KafkaPartitionConsumer) Offset

func (pc *KafkaPartitionConsumer) Offset() int64

Offset returns the last fetched offset for this partition consumer.

func (*KafkaPartitionConsumer) SetOffset

func (pc *KafkaPartitionConsumer) SetOffset(offset int64)

SetOffset overrides the current fetch offset value for given topic/partition. This does not commit offset but allows you to move back and forth throughout the partition.

func (*KafkaPartitionConsumer) Start

func (pc *KafkaPartitionConsumer) Start()

Start starts consuming a single partition from Kafka. This call blocks until Stop() is called.

func (*KafkaPartitionConsumer) Stop

func (pc *KafkaPartitionConsumer) Stop()

Stop stops consuming partition from Kafka. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work.

type KafkaPartitionConsumerMetrics

type KafkaPartitionConsumerMetrics struct {
	// contains filtered or unexported fields
}

KafkaPartitionConsumerMetrics implements PartitionConsumerMetrics and is used when ConsumerConfig.EnableMetrics is set to true.

func NewKafkaPartitionConsumerMetrics

func NewKafkaPartitionConsumerMetrics(topic string, partition int32) *KafkaPartitionConsumerMetrics

NewKafkaPartitionConsumerMetrics creates new KafkaPartitionConsumerMetrics for a given topic and partition.

func (*KafkaPartitionConsumerMetrics) BatchDuration

func (kpcm *KafkaPartitionConsumerMetrics) BatchDuration(f func(metrics.Timer))

BatchDuration is a timer that measures time to process a single batch of data from Kafka broker by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) FetchDuration

func (kpcm *KafkaPartitionConsumerMetrics) FetchDuration(f func(metrics.Timer))

FetchDuration is a timer that measures time to fetch from Kafka broker by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) Lag

func (kpcm *KafkaPartitionConsumerMetrics) Lag(f func(metrics.Gauge))

Lag is a gauge with a current lag value for enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumEmptyFetches

func (kpcm *KafkaPartitionConsumerMetrics) NumEmptyFetches(f func(metrics.Counter))

NumEmptyFetches is a counter with a number of fetches that returned 0 messages done by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumFailedFetches

func (kpcm *KafkaPartitionConsumerMetrics) NumFailedFetches(f func(metrics.Counter))

NumFailedFetches is a counter with a number of failed fetches done by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumFailedOffsetCommits

func (kpcm *KafkaPartitionConsumerMetrics) NumFailedOffsetCommits(f func(metrics.Counter))

NumFailedOffsetCommits is a counter with a number of failed offset commits done by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumFetchedMessages

func (kpcm *KafkaPartitionConsumerMetrics) NumFetchedMessages(f func(metrics.Counter))

NumFetchedMessages is a counter with a total number of fetched messages by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumFetches

func (kpcm *KafkaPartitionConsumerMetrics) NumFetches(f func(metrics.Counter))

NumFetches is a counter with a total number of fetches done by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) NumOffsetCommits

func (kpcm *KafkaPartitionConsumerMetrics) NumOffsetCommits(f func(metrics.Counter))

NumOffsetCommits is a counter with a total number of offset commits done by enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) Registry

func (kpcm *KafkaPartitionConsumerMetrics) Registry() metrics.Registry

Registry provides access to metrics registry for enclosing PartitionConsumer.

func (*KafkaPartitionConsumerMetrics) Stop

func (kpcm *KafkaPartitionConsumerMetrics) Stop()

Stop unregisters all metrics from the registry.

type MessageAndMetadata

type MessageAndMetadata struct {
	// Key is a raw message key.
	Key []byte

	// Value is a raw message value.
	Value []byte

	// Topic is a Kafka topic this message comes from.
	Topic string

	// Partition is a Kafka partition this message comes from.
	Partition int32

	// Offset is an offset for this message.
	Offset int64

	// DecodedKey is a message key processed by KeyDecoder.
	DecodedKey interface{}

	// DecodedValue is a message value processed by ValueDecoder.
	DecodedValue interface{}
}

MessageAndMetadata is a single Kafka message and its metadata.

type Metrics

type Metrics struct {
	// Consumer is all metrics for enclosing Consumer instance.
	Consumer ConsumerMetrics

	// PartitionConsumers is a map of topic/partitions to PartitionConsumer metrics.
	PartitionConsumers map[string]map[int32]PartitionConsumerMetrics
}

Metrics is a set of all metrics for one Consumer instance.

type PartitionConsumer

type PartitionConsumer interface {
	// Start starts consuming given topic/partition.
	Start()

	// Stop stops consuming given topic/partition.
	Stop()

	// Offset returns the last fetched offset for this partition consumer.
	Offset() int64

	// Commit commits the given offset to Kafka. Returns an error on unsuccessful commit.
	Commit(offset int64) error

	// SetOffset overrides the current fetch offset value for given topic/partition.
	// This does not commit offset but allows you to move back and forth throughout the partition.
	SetOffset(offset int64)

	// Lag returns the difference between the latest available offset in the partition and the
	// latest fetched offset by this consumer. This allows you to see how much behind the consumer is.
	Lag() int64

	// Metrics returns a metrics structure for this partition consumer. An error is returned if metrics are disabled.
	Metrics() (PartitionConsumerMetrics, error)
}

PartitionConsumer is an interface responsible for consuming exactly one topic/partition from Kafka. Used to switch between PartitionConsumer in live mode and MockPartitionConsumer in tests.

func NewPartitionConsumer

func NewPartitionConsumer(client Client, config *ConsumerConfig, topic string, partition int32, strategy Strategy) PartitionConsumer

NewPartitionConsumer creates a new PartitionConsumer for given client and config that will consume given topic and partition. The message processing logic is passed via strategy.

type PartitionConsumerMetrics

type PartitionConsumerMetrics interface {
	// BatchDuration is a timer that measures time to process a single batch of data from Kafka broker by enclosing PartitionConsumer.
	BatchDuration(func(metrics.Timer))

	// FetchDuration is a timer that measures time to fetch from Kafka broker by enclosing PartitionConsumer.
	FetchDuration(func(metrics.Timer))

	// NumFetches is a counter with a total number of fetches done by enclosing PartitionConsumer.
	NumFetches(func(metrics.Counter))

	// NumFailedFetches is a counter with a number of failed fetches done by enclosing PartitionConsumer.
	NumFailedFetches(func(metrics.Counter))

	// NumEmptyFetches is a counter with a number of fetches that returned 0 messages done by enclosing PartitionConsumer.
	NumEmptyFetches(func(metrics.Counter))

	// NumFetchedMessages is a counter with a total number of fetched messages by enclosing PartitionConsumer.
	NumFetchedMessages(func(metrics.Counter))

	// NumOffsetCommits is a counter with a total number of offset commits done by enclosing PartitionConsumer.
	NumOffsetCommits(func(metrics.Counter))

	// NumFailedOffsetCommits is a counter with a number of failed offset commits done by enclosing PartitionConsumer.
	NumFailedOffsetCommits(func(metrics.Counter))

	// Lag is a gauge with a current lag value for enclosing PartitionConsumer.
	Lag(func(metrics.Gauge))

	// Registry provides access to metrics registry for enclosing PartitionConsumer.
	Registry() metrics.Registry

	// Stop unregisters all metrics from the registry.
	Stop()
}

PartitionConsumerMetrics is an interface for accessing and modifying PartitionConsumer metrics.

type Strategy

type Strategy func(data *FetchData, consumer *KafkaPartitionConsumer)

Strategy is a function that actually processes Kafka messages. FetchData contains actual messages, highwater mark offset and fetch error. PartitionConsumer which is passed to this function allows to commit/rewind offset if necessary, track offset/lag, stop the consumer. Please note that you should NOT stop the consumer if using Consumer but rather use consumer.Remove(topic, partition) call. The processing happens on per-partition level - the amount of strategies running simultaneously is defined by the number of partitions being consumed. The next batch for topic/partition won't start until the previous one finishes.

Directories

Path Synopsis
communication
tcp
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL