consumer

package module
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2020 License: MIT Imports: 9 Imported by: 0

README

sarama-consumer --- a kafka consumer group client

This is as simple a kafka consumer-group client package as I can create. I deliberately expose sarama types like sarama.Message and sarama.Client. I don't think wrapping APIs in what is basically a fancy utility API is wise. (and each wrapper wastes CPU and creates more garbage for gc to collect)

The assignment of partitions to consumers is pluggable. Two partitioners are included: the trivial round-robin, and a stable & consistent partitioner. The stable partitioner keeps the assignment of partitions to consumers as stable as it can across time. With it, restarting a consumer within the kafka heartbeat timeout (typically 10s of seconds) does not disturb the partition assignments of the other consumers, and adding and removing consumers reassigns the minimum number of partitions.

The stable partitioner can optionally also be consistent across topics. When enabled, all topics with the same number of partitions and the same consumer group members are assigned the same partition->consumer mapping. This is useful if the messages in the topics use the same partitioning key, and the consumers benefit from having messages with the same key (but in different topics) be processed in the same consumer.

Simplest usage, a perpetual consumer of a single topic with default (round-robin) partitioning:

import "github.com/mistsys/sarama-consumer/v3"
import "github.com/Shopify/sarama"

func main() {
  cfg := sarama.NewConfig()
  cfg.Version = consumer.MinVersion // needed until sarama defaults to >= 0.9
  client, err := sarama.NewClient(..., cfg)
  consumer,err := consumer.NewClient("my group", nil, client).Consume("my topic")
  for {
    select {
      case msg := <-consumer.Output():
        // process the *sarama.Message 'msg'
        consumer.Done(msg)
      case err := <-consumer.Errors():
        // log, whatever
    }
  }
}

Documentation

Overview

Package consumer provides kafka 0.9 consumer groups on top of the low level Sarama kafka package.

Consumer groups distribute topics' partitions dynamically across group members, and restart at the last comitted offset of each partition.

This requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

CONFIGURATION

Three customization APIs may be set in the Config:

Config.Offset.OffsetOutOfRange func(topic, partition, sarama.Client) (restart_offset, error) allows users to decide how to react to falling off the tail of the kafka log. The default is to restart at the newest offset. However depending on the use case restarting at an offset T time in the past, or even the oldest offset, may make more sense.

Config.StartingOffset func(topic, partition, committed_offset, sarama.Client) (starting_offset, error) allows users to decide where to restart when consuming a partition. The default is to restart at the committed offset, or at sarama.Config.Consumer.Offsets.Initial if the starting offset is -1 (indicating no committed offset could be found).

Config.Partitioner interface allows users to control how the consumer group distributes partitions across the group members. The default is to distribute the partitions of each topic in a round-robin fashion across the available members. This is good for basic load balancing. Round-robin is no good if it is desireable that the partitions stay at the same consumer during repartitioning.

A stable partitioner is provided by the stable package. It keeps the partition->consumer mapping stable as best it can. When one consumer restart quickly enough (within the kafka consumer heartbeat timeout) the partition mapping of the rest of the consumers is not altered. When consumers are added to the group only a minimum number of partitions are reassigned from existing consumers to the new consumers.

Using the stable partition means setting

Config.Partitioner = stable.New(false)

Passing true to stable.New() returns a stable & consistent consumer. See the documentation.

More complex partitioners, for example one which did some sort of weighted balancing, are yours to implement.

PHILOSOPHY

The consumer API has three rules the calling code must abide: messages must be passed to Consumer.Done() once each message does not need to be replayed, Client.Errors() must be consumed, and Client.Close() or Consumer.AsyncClose() must be called to clean up resources if your code wishes to stop consuming messages.

Kafka's rule that [if consumers keep up] all messages will be seen at least once, and possibly many times always applies.

The API of this package deliberately does not wrap or otherwise hide the underlying sarama API. I believe doing so is a waste of CPU time, generates more work for the gc, and makes building on top of a package harder than it should be. It also makes no assumptions about how the caller's work should be done. There are no requirements to process messages in order, nor does it dictate a go-routine organization on the caller. I've applied RFC1925 #5 and #12 as best I can.

I've used other kafka APIs which did wrap and impose structure and found them difficult to really use, and as a reaction I try not to impose such APIs on others (nor on myself) even if it means the calling code is a little more complex.

(For example you have to create a suitably configured samara.Client yourself before calling NewClient. That's 3 more lines of code, but it also lets you tune the samara.Client's config just as you need it to be, or even mock the client for test.)

The simple use case of this package is shown in the NewClient example code.

Index

Examples

Constants

This section is empty.

Variables

View Source
var Logf func(fmt string, args ...interface{}) = log.Printf

low level logging function. Replace it with your own if desired before making any calls to the rest of the API

View Source
var MinVersion = sarama.V0_9_0_0

minimum kafka API version required. Use this when constructing the sarama.Client's sarama.Config.MinVersion

Functions

func DefaultOffsetOutOfRange

func DefaultOffsetOutOfRange(topic string, partition int32, client sarama.Client) (int64, error)

default implementation of Config.OffsetOutOfRange jumps to the current head of the partition.

func DefaultStartingOffset

func DefaultStartingOffset(topic string, partition int32, offset int64, client sarama.Client) (int64, error)

default implementation of Config.StartingOffset starts at the committed offset, or at sarama.Config.Consumer.Offsets.Initial if there is no committed offset.

Types

type AssignmentNotification

type AssignmentNotification func(assignments map[string][]int32) // assignments is a map from topic -> list of partitions

type Client

type Client interface {
	// Consume returns a consumer of the given topic
	Consume(topic string) (Consumer, error)

	// ConsumeMany starts consuming many topics at once. It is much more efficient than calling Consume
	// repeatedly because kafka brokers serialize joining topics
	ConsumeMany(topics []string) ([]Consumer, error)

	// Close closes the client. It must be called to shutdown
	// the client. It cleans up any unclosed topic Consumers created by this Client.
	// It does NOT close the inner sarama.Client.
	// Calling twice is NOT supported.
	Close()

	// Errors returns a channel which can (should) be monitored
	// for errors. callers should probably log or otherwise report
	// the returned errors. The channel closes when the client
	// is closed.
	Errors() <-chan error
}

Client is a kafaka client belonging to a consumer group. It is created by NewClient.

func NewClient

func NewClient(group_name string, config *Config, sarama_client sarama.Client) (Client, error)

NewClient creates a new consumer group client on top of an existing sarama.Client.

After this call the contents of config should be treated as read-only. config can be nil if the defaults are acceptable.

The consumer group name is used to match this client with other instances running elsewhere, but connected to the same cluster of kafka brokers and using the same consumer group name.

The supplied sarama.Client should have been constructed with a sarama.Config where sarama.Config.Version is >= consumer.MinVersion, and if full handling of ErrOffsetOutOfRange is desired, sarama.Config.Consumer.Return.Errors = true.

In addition, this package uses the settings in sarama.Config.Consumer.Offsets and sarama.Config.Metadata.RefreshFrequency

Example
package main

import (
	"fmt"
	"time"

	"github.com/Shopify/sarama"
	consumer "github.com/mistsys/sarama-consumer/v3"
	"github.com/mistsys/sarama-consumer/v3/offsets"
	"github.com/mistsys/sarama-consumer/v3/stable"
)

func main() {
	// create a suitable sarama.Client
	sconfig := sarama.NewConfig()
	sconfig.Version = consumer.MinVersion // consumer requires at least 0.9
	sconfig.Consumer.Return.Errors = true // needed if asynchronous ErrOffsetOutOfRange handling is desired (it's a good idea)
	sclient, _ := sarama.NewClient([]string{"kafka-broker:9092"}, sconfig)

	// from that, create a consumer.Config with some fancy options
	config := consumer.NewConfig()
	config.Partitioner = stable.New(false)                                                 // use a stable (but inconsistent) partitioner
	config.StartingOffset, config.OffsetOutOfRange = offsets.NoOlderThan(time.Second * 30) // always start and restart no more than 30 seconds in the past (NOTE: requires kafka 0.10 brokers to work properly)

	// and finally a consumer Client
	client, _ := consumer.NewClient("group_name", config, sclient)
	defer client.Close() // not strictly necessary, since we don't exit, but this is example code and someone might C&V it and exit

	// consume and print errors
	go func() {
		for err := range client.Errors() {
			fmt.Println(err)
		}
	}()

	// consume a topic
	topic_consumer, _ := client.Consume("topic1")
	defer topic_consumer.AsyncClose() // same comment as for client.Close() above

	// process messages
	for msg := range topic_consumer.Messages() {
		fmt.Println("processing message", msg)
		topic_consumer.Done(msg) // required
	}
}
Output:

type Config

type Config struct {
	Session struct {
		// The allowed session timeout for registered consumers (defaults to 30s).
		// Must be within the allowed server range.
		Timeout time.Duration
	}
	Rebalance struct {
		// The allowed rebalance timeout for registered consumers (defaults to 30s).
		// Must be within the allowed server range. Only functions if sarama.Config.Version >= 0.10.1
		// Otherwise Session.Timeout is used for rebalancing too.
		Timeout time.Duration
	}
	Heartbeat struct {
		// Interval between each heartbeat (defaults to 3s). It should be no more
		// than 1/3rd of the Group.Session.Timout setting
		Interval time.Duration
	}

	// the partitioner used to map partitions to consumer group members (defaults to a round-robin partitioner)
	Partitioner Partitioner

	// OffsetOutOfRange is the handler for sarama.ErrOffsetOutOfRange errors (defaults to sarama.OffsetNewest,nil).
	// Implementations must return the new starting offset in the partition, or an error. The sarama.Client is included
	// for convenience, since handling this might involve querying the partition's current offsets.
	OffsetOutOfRange OffsetOutOfRange

	// StartingOffset is a hook to allow modifying the starting offset when a Consumer begins to consume
	// a partition. (defaults to returning the last committed offset). Some consumers might want to jump
	// ahead to fresh messages. The sarama.Client is included for convenience, since handling this might involve
	// looking up a partition's offset by time. When no committed offset could be found -1 (sarama.OffsetNewest)
	// is passed in. An implementation might want to return client.Config().Consumer.Offsets.Initial in that case.
	StartingOffset StartingOffset

	// SidechannelOffset is a kafka topic used to exchange partition offsets between dying and rebalancing consumers.
	// It defaults to "sarama-consumer-sidechannel-offsets".  If SidechannelTopic is "" then this feature is disabled,
	// and consumers can rewind as much as Config.Rebalance.Timeout + sarama.Config.Offset.CommitInterval
	// when a partition is reassigned. That's always possible (kafka only promises at-least-once), but in high frequency
	// topics rewinding the default 30 seconds creates a measureable burst).
	// We can't comit offsets normally during a rebalance because at that point in time we still belong to the old generation,
	// but the broker belongs to the new generation. Hence this side channel.
	SidechannelTopic string

	// AssignmentNotification is an optional callback to inform the client code whenever the client gets a new
	// partition assignment.
	AssignmentNotification AssignmentNotification
}

Config is the configuration of a Client. Typically you'd create a default configuration with NewConfig, modify any fields of interest, and pass it to NewClient. Once passed to NewClient the Config must not be modified. (doing so leads to data races, and may caused bugs as well).

In addition to this config, consumer's code also looks at the sarama.Config of the sarama.Client supplied to NewClient, especially at the Consumer.Offsets settings, Version, Metadata.Retry.Backoff, Metadata.RefreshFrequency and ChannelBufferSize.

func NewConfig

func NewConfig() *Config

NewConfig constructs a default configuration.

type Consumer

type Consumer interface {
	// Messages returns the channel of messages arriving from kafka. It always
	// returns the same result, so it is safe to call once and store the result.
	// Every message read from the channel should be passed to Done when processing
	// of the message is complete.
	// It is not necessary to call Done in the same order as messages are received
	// from this channel.
	Messages() <-chan *sarama.ConsumerMessage

	// Done indicates the processing of the message is complete, and its offset can
	// be committed to kafka. Calling Done twice with the same message, or with a
	// garbage message, can cause trouble.
	Done(*sarama.ConsumerMessage)

	// AsyncClose terminates the consumer cleanly. Callers can continue to read from
	// Messages channel until it is closed, or not, as they wish.
	// Calling Client.Close() performs a AsyncClose() on any remaining consumers.
	// Calling AsyncClose multiple times is permitted. Only the first call has any effect.
	// Never calling AsyncClose is also permitted. Client.Close() implies Consumer.AsyncClose.
	AsyncClose()

	// Close terminates the consumer and waits for it to be finished committing the current
	// offsets to kafka. Calling twice happens to work at the moment, but let's not encourage it.
	Close()
}

Consumer is a consumer of a topic.

Messages from any partition assigned to this client arrive on the channel returned by Messages.

Every message read from the Messages channel must be eventually passed to Done. Calling Done is the signal that that message has been consumed and the offset of that message can be committed back to kafka.

Of course this requires that the message's Partition and Offset fields not be altered. Then again for what possible reason would you do such a thing?

type Error

type Error struct {
	Err       error    // underlying error
	Context   string   // description of the context surrounding the error
	Consumer  Consumer // nil, or Consumer which produced the error
	Topic     string   // "", or the topic which had the error
	Partition int32    // -1, or the partition which had the error
	// contains filtered or unexported fields
}

Error holds the errors generated by this package

func (*Error) Error

func (err *Error) Error() string

type OffsetOutOfRange

type OffsetOutOfRange func(topic string, partition int32, client sarama.Client) (offset int64, err error)

type Partitioner

type Partitioner interface {
	// name this partitioner (used for log messages)
	Name() string

	// PrepareJoin prepares a JoinGroupRequest given the topics supplied.
	// The simplest implementation would be something like
	//   join_req.AddGroupProtocolMetadata("<partitioner name>", &sarama.ConsumerGroupMemberMetadata{ Version: 1, Topics:  topics, })
	PrepareJoin(join_req *sarama.JoinGroupRequest, topics []string, current_assignments map[string][]int32)

	// Partition performs the partitioning. Given the requested
	// memberships from the JoinGroupResponse, it adds the results
	// to the SyncGroupRequest. Returning an error cancels everything.
	// The sarama.Client supplied to NewClient is included for convenince,
	// since performing the partitioning probably requires looking at each
	// topic's metadata, especially its list of partitions.
	Partition(*sarama.SyncGroupRequest, *sarama.JoinGroupResponse, sarama.Client) error

	// ParseSync parses the SyncGroupResponse and returns the map of topics
	// to partitions assigned to this client, or an error if the information
	// is not parsable.
	ParseSync(*sarama.SyncGroupResponse) (map[string][]int32, error)
}

Partitioner maps partitions to consumer group members.

When the user wants control over the partitioning they should set Config.Partitioner to their implementation of Partitioner.

type SidechannelMsg

type SidechannelMsg struct {
	Ver           int                            // should be 1
	ConsumerGroup string                         // name of the consumer group sending the offsets (also used as the kafka message key)
	Offsets       map[string][]SidechannelOffset // map from topic to list of <partition,offset> pairs
}

SidechannelMsg is what is published to and read from the Config.SidechannelTopic

type SidechannelOffset

type SidechannelOffset struct {
	Partition int32 `json:"p"` // use short field names in JSON to keep the size of the messages low
	Offset    int64 `json:"o"` // since there can be a lot of SidechannelOffsets in a SidechannelMsg

}

SidechannelOffset contains the offset a single partition

type StartingOffset

type StartingOffset func(topic string, partition int32, committed_offset int64, client sarama.Client) (offset int64, err error)

types of the functions in the Config

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL