Documentation ¶
Index ¶
- Variables
- type Client
- type Consumer
- type ConsumerConfig
- type ConsumerMetrics
- type Decoder
- type FetchData
- type KafkaConsumer
- func (c *KafkaConsumer) Add(topic string, partition int32) error
- func (c *KafkaConsumer) AllMetrics() (*Metrics, error)
- func (c *KafkaConsumer) Assignment() map[string][]int32
- func (c *KafkaConsumer) AwaitTermination()
- func (c *KafkaConsumer) Commit(topic string, partition int32, offset int64) error
- func (c *KafkaConsumer) ConsumerMetrics() (ConsumerMetrics, error)
- func (c *KafkaConsumer) Join()
- func (c *KafkaConsumer) Lag(topic string, partition int32) (int64, error)
- func (c *KafkaConsumer) Offset(topic string, partition int32) (int64, error)
- func (c *KafkaConsumer) PartitionConsumerMetrics(topic string, partition int32) (PartitionConsumerMetrics, error)
- func (c *KafkaConsumer) Remove(topic string, partition int32) error
- func (c *KafkaConsumer) SetOffset(topic string, partition int32, offset int64) error
- func (c *KafkaConsumer) Stop()
- type KafkaConsumerMetrics
- type KafkaPartitionConsumer
- func (pc *KafkaPartitionConsumer) Commit(offset int64) error
- func (pc *KafkaPartitionConsumer) Lag() int64
- func (pc *KafkaPartitionConsumer) Metrics() (PartitionConsumerMetrics, error)
- func (pc *KafkaPartitionConsumer) Offset() int64
- func (pc *KafkaPartitionConsumer) SetOffset(offset int64)
- func (pc *KafkaPartitionConsumer) Start()
- func (pc *KafkaPartitionConsumer) Stop()
- type KafkaPartitionConsumerMetrics
- func (kpcm *KafkaPartitionConsumerMetrics) BatchDuration(f func(metrics.Timer))
- func (kpcm *KafkaPartitionConsumerMetrics) FetchDuration(f func(metrics.Timer))
- func (kpcm *KafkaPartitionConsumerMetrics) Lag(f func(metrics.Gauge))
- func (kpcm *KafkaPartitionConsumerMetrics) NumEmptyFetches(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) NumFailedFetches(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) NumFailedOffsetCommits(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) NumFetchedMessages(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) NumFetches(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) NumOffsetCommits(f func(metrics.Counter))
- func (kpcm *KafkaPartitionConsumerMetrics) Registry() metrics.Registry
- func (kpcm *KafkaPartitionConsumerMetrics) Stop()
- type MessageAndMetadata
- type Metrics
- type PartitionConsumer
- type PartitionConsumerMetrics
- type Strategy
Constants ¶
This section is empty.
Variables ¶
var ByteDecoder = func(bytes []byte) (interface{}, error) { return bytes, nil }
ByteDecoder is a default decoder implementation that does nothing and just returns the input untouched. Never returns an error.
var ErrMetricsDisabled = errors.New("Metrics are disabled. Use ConsumerConfig.EnableMetrics to enable")
ErrMetricsDisabled is used when trying to get consumer metrics while they are disabled.
var ErrPartitionConsumerAlreadyExists = errors.New("Partition consumer already exists")
ErrPartitionConsumerAlreadyExists is used when trying to add a topic/partition that is already added to the given Consumer.
var ErrPartitionConsumerDoesNotExist = errors.New("Partition consumer does not exist")
ErrPartitionConsumerDoesNotExist is used when trying to perform any action on a topic/partition that is not owned by the given Consumer.
var StringDecoder = func(bytes []byte) (interface{}, error) { return string(bytes), nil }
StringDecoder converts the given bytes into a string. Never returns an error.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Fetch is responsible for fetching messages for given topic, partition and offset from Kafka broker. // Leader change handling happens inside Fetch and is hidden from user so he should not handle such cases. // Returns a fetch response and error if it occurred. Fetch(topic string, partition int32, offset int64) (*client.FetchResponse, error) // GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time. // Returns an offet for given topic, partition and offset time and an error if it occurs. GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error) // GetOffset gets the latest committed offset for a given group, topic and partition from Kafka. // Returns an offset and an error if it occurs. GetOffset(group string, topic string, partition int32) (int64, error) // CommitOffset commits the given offset for a given group, topic and partition to Kafka. // Returns an error if commit was unsuccessful. CommitOffset(group string, topic string, partition int32, offset int64) error }
Client is an interface responsible for low-level Kafka interaction. The only supported implmentation now is github.com/serejja/kafka-client. One other implementation, MockClient, is used for testing purposes.
type Consumer ¶
type Consumer interface { // Add adds a topic/partition to consume for this consumer and starts consuming it immediately. // Returns an error if PartitionConsumer for this topic/partition already exists. Add(topic string, partition int32) error // Remove stops consuming a topic/partition by this consumer once it is done with the current batch. // This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work. // Returns an error if PartitionConsumer for this topic/partition does not exist. Remove(topic string, partition int32) error // Assignment returns a map of topic/partitions being consumed at the moment by this consumer. // The keys are topic names and values are slices of partitions. Assignment() map[string][]int32 // Offset returns the current consuming offset for a given topic/partition. // Please note that this value does not correspond to the latest committed offset but the latest fetched offset. // This call will return an error if the PartitionConsumer for given topic/partition does not exist. Offset(topic string, partition int32) (int64, error) // Commit commits the given offset for a given topic/partition to Kafka. // Returns an error if the commit was unsuccessful. Commit(topic string, partition int32, offset int64) error // SetOffset overrides the current fetch offset value for given topic/partition. // This does not commit offset but allows you to move back and forth throughout the partition. // Returns an error if the PartitionConsumer for this topic/partition does not exist. SetOffset(topic string, partition int32, offset int64) error // Lag returns the difference between the latest available offset in the partition and the // latest fetched offset by this consumer. This allows you to see how much behind the consumer is. // Returns lag value for a given topic/partition and an error if the PartitionConsumer for given // topic/partition does not exist. Lag(topic string, partition int32) (int64, error) // Stop stops consuming all topics and partitions with this consumer. Stop() // AwaitTermination blocks until Stop() is called. AwaitTermination() // ConsumerMetrics returns a metrics structure for this consumer. An error is returned if metrics are disabled. ConsumerMetrics() (ConsumerMetrics, error) // PartitionConsumerMetrics returns a metrics structure for a given topic and partition. An error is returned // if metrics are disabled or PartitionConsumer for given topic and partition does not exist PartitionConsumerMetrics(topic string, partition int32) (PartitionConsumerMetrics, error) // AllMetrics returns metrics registries for this consumer and all its PartitionConsumers. An error is returned // if metrics are disabled. AllMetrics() (*Metrics, error) // Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0. Join() }
Consumer is essentially a collection of PartitionConsumers and exposes nearly the same API but on a bit higher level. Consumer is something similar to JVM High Level Consumer except the load balancing functionality is not implemented here thus allowing the Consumer to be independent from Zookeeper.
type ConsumerConfig ¶
type ConsumerConfig struct { // Group is a string that uniquely identifies a set of consumers within the same consumer group. Group string // ConsumerID is a string that uniquely identifies a consumer within a consumer group. // Defaults to a random UUID. ConsumerID string // KeyDecoder is a function that turns plain bytes into a decoded message key. KeyDecoder Decoder // ValueDecoder is a function that turns plain bytes into a decoded message value. ValueDecoder Decoder // AutoOffsetReset defines what to do when there is no committed offset or committed offset is out of range. // kafka-client.EarliestTime - automatically reset the offset to the smallest offset. // kafka-client.LatestTime - automatically reset the offset to the largest offset. // Defaults to kafka-client.EarliestTime. AutoOffsetReset int64 // AutoCommitEnable determines whether the consumer will automatically commit offsets after each batch // is finished (e.g. the call to strategy function returns). Turned off by default. AutoCommitEnable bool // EnableMetrics determines whether the consumer will collect all kinds of metrics to better understand what's // going on under the hood. Turned off by default as it may significantly affect performance. EnableMetrics bool // Backoff between attempts to initialize consumer offset. InitOffsetBackoff time.Duration }
ConsumerConfig provides configuration options for both Consumer and PartitionConsumer.
func NewConfig ¶
func NewConfig() *ConsumerConfig
NewConfig creates a consumer config with sane defaults.
type ConsumerMetrics ¶
type ConsumerMetrics interface { // NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by // enclosing Consumer. NumOwnedTopicPartitions(func(metrics.Counter)) // Registry provides access to metrics registry for enclosing Consumer. Registry() metrics.Registry // Stop unregisters all metrics from the registry. Stop() }
ConsumerMetrics is an interface for accessing and modifying Consumer metrics.
type Decoder ¶
Decoder serves to decode given raw message bytes to something meaningful. Returns an error if fails to decoder given bytes.
type FetchData ¶
type FetchData struct { // Messages is a slice of actually fetched messages from Kafka. Messages []*MessageAndMetadata // HighwaterMarkOffset is an offset in the end of topic/partition this FetchData comes from. HighwaterMarkOffset int64 // Error is an error that occurs both on fetch level or topic/partition level. Error error }
FetchData is a slightly processed FetchResponse to be more user-friendly to use.
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer implements Consumer and is something similar to JVM High Level Consumer except the load balancing functionality is not implemented here thus allowing to be independent from Zookeeper.
func (*KafkaConsumer) Add ¶
func (c *KafkaConsumer) Add(topic string, partition int32) error
Add adds a topic/partition to consume for this consumer and starts consuming it immediately. Returns an error if PartitionConsumer for this topic/partition already exists.
func (*KafkaConsumer) AllMetrics ¶
func (c *KafkaConsumer) AllMetrics() (*Metrics, error)
AllMetrics returns metrics registries for this consumer and all its PartitionConsumers. An error is returned if metrics are disabled.
func (*KafkaConsumer) Assignment ¶
func (c *KafkaConsumer) Assignment() map[string][]int32
Assignment returns a map of topic/partitions being consumer at the moment by this consumer. The keys are topic names and values are slices of partitions.
func (*KafkaConsumer) AwaitTermination ¶
func (c *KafkaConsumer) AwaitTermination()
AwaitTermination blocks until Stop() is called.
func (*KafkaConsumer) Commit ¶
func (c *KafkaConsumer) Commit(topic string, partition int32, offset int64) error
Commit commits the given offset for a given topic/partition to Kafka. Returns an error if the commit was unsuccessful.
func (*KafkaConsumer) ConsumerMetrics ¶
func (c *KafkaConsumer) ConsumerMetrics() (ConsumerMetrics, error)
ConsumerMetrics returns a metrics structure for this consumer. An error is returned if metrics are disabled.
func (*KafkaConsumer) Join ¶
func (c *KafkaConsumer) Join()
Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0.
func (*KafkaConsumer) Lag ¶
func (c *KafkaConsumer) Lag(topic string, partition int32) (int64, error)
Lag returns the difference between the latest available offset in the partition and the latest fetched offset by this consumer. This allows you to see how much behind the consumer is. Returns lag value for a given topic/partition and an error if the PartitionConsumer for given topic/partition does not exist.
func (*KafkaConsumer) Offset ¶
func (c *KafkaConsumer) Offset(topic string, partition int32) (int64, error)
Offset returns the current consuming offset for a given topic/partition. Please note that this value does not correspond to the latest committed offset but the latest fetched offset. This call will return an error if the PartitionConsumer for given topic/partition does not exist.
func (*KafkaConsumer) PartitionConsumerMetrics ¶
func (c *KafkaConsumer) PartitionConsumerMetrics(topic string, partition int32) (PartitionConsumerMetrics, error)
PartitionConsumerMetrics returns a metrics structure for a given topic and partition. An error is returned if metrics are disabled or PartitionConsumer for given topic and partition does not exist
func (*KafkaConsumer) Remove ¶
func (c *KafkaConsumer) Remove(topic string, partition int32) error
Remove stops consuming a topic/partition by this consumer once it is done with the current batch. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work. Returns an error if PartitionConsumer for this topic/partition does not exist.
func (*KafkaConsumer) SetOffset ¶
func (c *KafkaConsumer) SetOffset(topic string, partition int32, offset int64) error
SetOffset overrides the current fetch offset value for given topic/partition. This does not commit offset but allows you to move back and forth throughout the partition. Returns an error if the PartitionConsumer for this topic/partition does not exist.
func (*KafkaConsumer) Stop ¶
func (c *KafkaConsumer) Stop()
Stop stops consuming all topics and partitions with this consumer.
type KafkaConsumerMetrics ¶
type KafkaConsumerMetrics struct {
// contains filtered or unexported fields
}
KafkaConsumerMetrics implements ConsumerMetrics and is used when ConsumerConfig.EnableMetrics is set to true.
func NewKafkaConsumerMetrics ¶
func NewKafkaConsumerMetrics(groupID string, consumerID string) *KafkaConsumerMetrics
NewKafkaConsumerMetrics creates new KafkaConsumerMetrics for a given consumer group.
func (*KafkaConsumerMetrics) NumOwnedTopicPartitions ¶
func (cm *KafkaConsumerMetrics) NumOwnedTopicPartitions(f func(metrics.Counter))
NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by enclosing Consumer.
func (*KafkaConsumerMetrics) Registry ¶
func (cm *KafkaConsumerMetrics) Registry() metrics.Registry
Registry provides access to metrics registry for enclosing Consumer.
func (*KafkaConsumerMetrics) Stop ¶
func (cm *KafkaConsumerMetrics) Stop()
Stop unregisters all metrics from the registry.
type KafkaPartitionConsumer ¶
type KafkaPartitionConsumer struct {
// contains filtered or unexported fields
}
KafkaPartitionConsumer serves to consume exactly one topic/partition from Kafka. This is very similar to JVM SimpleConsumer except the PartitionConsumer is able to handle leader changes and supports committing offsets to Kafka via kafka-client.
func (*KafkaPartitionConsumer) Commit ¶
func (pc *KafkaPartitionConsumer) Commit(offset int64) error
Commit commits the given offset to Kafka. Returns an error on unsuccessful commit.
func (*KafkaPartitionConsumer) Lag ¶
func (pc *KafkaPartitionConsumer) Lag() int64
Lag returns the difference between the latest available offset in the partition and the latest fetched offset by this consumer. This allows you to see how much behind the consumer is.
func (*KafkaPartitionConsumer) Metrics ¶
func (pc *KafkaPartitionConsumer) Metrics() (PartitionConsumerMetrics, error)
Metrics returns a metrics structure for this partition consumer. An error is returned if metrics are disabled.
func (*KafkaPartitionConsumer) Offset ¶
func (pc *KafkaPartitionConsumer) Offset() int64
Offset returns the last fetched offset for this partition consumer.
func (*KafkaPartitionConsumer) SetOffset ¶
func (pc *KafkaPartitionConsumer) SetOffset(offset int64)
SetOffset overrides the current fetch offset value for given topic/partition. This does not commit offset but allows you to move back and forth throughout the partition.
func (*KafkaPartitionConsumer) Start ¶
func (pc *KafkaPartitionConsumer) Start()
Start starts consuming a single partition from Kafka. This call blocks until Stop() is called.
func (*KafkaPartitionConsumer) Stop ¶
func (pc *KafkaPartitionConsumer) Stop()
Stop stops consuming partition from Kafka. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work.
type KafkaPartitionConsumerMetrics ¶
type KafkaPartitionConsumerMetrics struct {
// contains filtered or unexported fields
}
KafkaPartitionConsumerMetrics implements PartitionConsumerMetrics and is used when ConsumerConfig.EnableMetrics is set to true.
func NewKafkaPartitionConsumerMetrics ¶
func NewKafkaPartitionConsumerMetrics(topic string, partition int32) *KafkaPartitionConsumerMetrics
NewKafkaPartitionConsumerMetrics creates new KafkaPartitionConsumerMetrics for a given topic and partition.
func (*KafkaPartitionConsumerMetrics) BatchDuration ¶
func (kpcm *KafkaPartitionConsumerMetrics) BatchDuration(f func(metrics.Timer))
BatchDuration is a timer that measures time to process a single batch of data from Kafka broker by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) FetchDuration ¶
func (kpcm *KafkaPartitionConsumerMetrics) FetchDuration(f func(metrics.Timer))
FetchDuration is a timer that measures time to fetch from Kafka broker by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) Lag ¶
func (kpcm *KafkaPartitionConsumerMetrics) Lag(f func(metrics.Gauge))
Lag is a gauge with a current lag value for enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumEmptyFetches ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumEmptyFetches(f func(metrics.Counter))
NumEmptyFetches is a counter with a number of fetches that returned 0 messages done by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumFailedFetches ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumFailedFetches(f func(metrics.Counter))
NumFailedFetches is a counter with a number of failed fetches done by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumFailedOffsetCommits ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumFailedOffsetCommits(f func(metrics.Counter))
NumFailedOffsetCommits is a counter with a number of failed offset commits done by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumFetchedMessages ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumFetchedMessages(f func(metrics.Counter))
NumFetchedMessages is a counter with a total number of fetched messages by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumFetches ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumFetches(f func(metrics.Counter))
NumFetches is a counter with a total number of fetches done by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) NumOffsetCommits ¶
func (kpcm *KafkaPartitionConsumerMetrics) NumOffsetCommits(f func(metrics.Counter))
NumOffsetCommits is a counter with a total number of offset commits done by enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) Registry ¶
func (kpcm *KafkaPartitionConsumerMetrics) Registry() metrics.Registry
Registry provides access to metrics registry for enclosing PartitionConsumer.
func (*KafkaPartitionConsumerMetrics) Stop ¶
func (kpcm *KafkaPartitionConsumerMetrics) Stop()
Stop unregisters all metrics from the registry.
type MessageAndMetadata ¶
type MessageAndMetadata struct { // Key is a raw message key. Key []byte // Value is a raw message value. Value []byte // Topic is a Kafka topic this message comes from. Topic string // Partition is a Kafka partition this message comes from. Partition int32 // Offset is an offset for this message. Offset int64 // DecodedKey is a message key processed by KeyDecoder. DecodedKey interface{} // DecodedValue is a message value processed by ValueDecoder. DecodedValue interface{} }
MessageAndMetadata is a single Kafka message and its metadata.
type Metrics ¶
type Metrics struct { // Consumer is all metrics for enclosing Consumer instance. Consumer ConsumerMetrics // PartitionConsumers is a map of topic/partitions to PartitionConsumer metrics. PartitionConsumers map[string]map[int32]PartitionConsumerMetrics }
Metrics is a set of all metrics for one Consumer instance.
type PartitionConsumer ¶
type PartitionConsumer interface { // Start starts consuming given topic/partition. Start() // Stop stops consuming given topic/partition. Stop() // Offset returns the last fetched offset for this partition consumer. Offset() int64 // Commit commits the given offset to Kafka. Returns an error on unsuccessful commit. Commit(offset int64) error // SetOffset overrides the current fetch offset value for given topic/partition. // This does not commit offset but allows you to move back and forth throughout the partition. SetOffset(offset int64) // Lag returns the difference between the latest available offset in the partition and the // latest fetched offset by this consumer. This allows you to see how much behind the consumer is. Lag() int64 // Metrics returns a metrics structure for this partition consumer. An error is returned if metrics are disabled. Metrics() (PartitionConsumerMetrics, error) }
PartitionConsumer is an interface responsible for consuming exactly one topic/partition from Kafka. Used to switch between PartitionConsumer in live mode and MockPartitionConsumer in tests.
func NewPartitionConsumer ¶
func NewPartitionConsumer(client Client, config *ConsumerConfig, topic string, partition int32, strategy Strategy) PartitionConsumer
NewPartitionConsumer creates a new PartitionConsumer for given client and config that will consume given topic and partition. The message processing logic is passed via strategy.
type PartitionConsumerMetrics ¶
type PartitionConsumerMetrics interface { // BatchDuration is a timer that measures time to process a single batch of data from Kafka broker by enclosing PartitionConsumer. BatchDuration(func(metrics.Timer)) // FetchDuration is a timer that measures time to fetch from Kafka broker by enclosing PartitionConsumer. FetchDuration(func(metrics.Timer)) // NumFetches is a counter with a total number of fetches done by enclosing PartitionConsumer. NumFetches(func(metrics.Counter)) // NumFailedFetches is a counter with a number of failed fetches done by enclosing PartitionConsumer. NumFailedFetches(func(metrics.Counter)) // NumEmptyFetches is a counter with a number of fetches that returned 0 messages done by enclosing PartitionConsumer. NumEmptyFetches(func(metrics.Counter)) // NumFetchedMessages is a counter with a total number of fetched messages by enclosing PartitionConsumer. NumFetchedMessages(func(metrics.Counter)) // NumOffsetCommits is a counter with a total number of offset commits done by enclosing PartitionConsumer. NumOffsetCommits(func(metrics.Counter)) // NumFailedOffsetCommits is a counter with a number of failed offset commits done by enclosing PartitionConsumer. NumFailedOffsetCommits(func(metrics.Counter)) // Lag is a gauge with a current lag value for enclosing PartitionConsumer. Lag(func(metrics.Gauge)) // Registry provides access to metrics registry for enclosing PartitionConsumer. Registry() metrics.Registry // Stop unregisters all metrics from the registry. Stop() }
PartitionConsumerMetrics is an interface for accessing and modifying PartitionConsumer metrics.
type Strategy ¶
type Strategy func(data *FetchData, consumer *KafkaPartitionConsumer)
Strategy is a function that actually processes Kafka messages. FetchData contains actual messages, highwater mark offset and fetch error. PartitionConsumer which is passed to this function allows to commit/rewind offset if necessary, track offset/lag, stop the consumer. Please note that you should NOT stop the consumer if using Consumer but rather use consumer.Remove(topic, partition) call. The processing happens on per-partition level - the amount of strategies running simultaneously is defined by the number of partitions being consumed. The next batch for topic/partition won't start until the previous one finishes.