sarama-cluster: github.com/bsm/sarama-cluster Index | Examples | Files

package cluster

import "github.com/bsm/sarama-cluster"

Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.

It requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Index

Examples

Package Files

balancer.go client.go cluster.go config.go consumer.go doc.go offsets.go partitions.go util.go

type Client Uses

type Client struct {
    sarama.Client
    // contains filtered or unexported fields
}

Client is a group client

func NewClient Uses

func NewClient(addrs []string, config *Config) (*Client, error)

NewClient creates a new client instance

func (*Client) ClusterConfig Uses

func (c *Client) ClusterConfig() *Config

ClusterConfig returns the cluster configuration.

type Config Uses

type Config struct {
    sarama.Config

    // Group is the namespace for group management properties
    Group struct {

        // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
        PartitionStrategy Strategy

        // By default, messages and errors from the subscribed topics and partitions are all multiplexed and
        // made available through the consumer's Messages() and Errors() channels.
        //
        // Users who require low-level access can enable ConsumerModePartitions where individual partitions
        // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
        // themselves.
        Mode ConsumerMode

        Offsets struct {
            Retry struct {
                // The number retries when committing offsets (defaults to 3).
                Max int
            }
            Synchronization struct {
                // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
                // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
                DwellTime time.Duration
            }
        }

        Session struct {
            // The allowed session timeout for registered consumers (defaults to 30s).
            // Must be within the allowed server range.
            Timeout time.Duration
        }

        Heartbeat struct {
            // Interval between each heartbeat (defaults to 3s). It should be no more
            // than 1/3rd of the Group.Session.Timeout setting
            Interval time.Duration
        }

        // Return specifies which group channels will be populated. If they are set to true,
        // you must read from the respective channels to prevent deadlock.
        Return struct {
            // If enabled, rebalance notification will be returned on the
            // Notifications channel (default disabled).
            Notifications bool
        }

        Topics struct {
            // An additional whitelist of topics to subscribe to.
            Whitelist *regexp.Regexp
            // An additional blacklist of topics to avoid. If set, this will precede over
            // the Whitelist setting.
            Blacklist *regexp.Regexp
        }

        Member struct {
            // Custom metadata to include when joining the group. The user data for all joined members
            // can be retrieved by sending a DescribeGroupRequest to the broker that is the
            // coordinator for the group.
            UserData []byte
        }
    }
}

Config extends sarama.Config with Group specific namespace

This example shows how to use the consumer with topic whitelists.

Code:


// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Group.Topics.Whitelist = regexp.MustCompile(`myservice.*`)

// init consumer
consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "my-consumer-group", nil, config)
if err != nil {
    panic(err)
}
defer consumer.Close()

// consume messages
msg := <-consumer.Messages()
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)

func NewConfig Uses

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate Uses

func (c *Config) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Consumer Uses

type Consumer struct {
    // contains filtered or unexported fields
}

Consumer is a cluster group consumer

This example shows how to use the consumer to read messages from a multiple topics through a multiplexed channel.

Code:


// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true

// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
    panic(err)
}
defer consumer.Close()

// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// consume errors
go func() {
    for err := range consumer.Errors() {
        log.Printf("Error: %s\n", err.Error())
    }
}()

// consume notifications
go func() {
    for ntf := range consumer.Notifications() {
        log.Printf("Rebalanced: %+v\n", ntf)
    }
}()

// consume messages, watch signals
for {
    select {
    case msg, ok := <-consumer.Messages():
        if ok {
            fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
            consumer.MarkOffset(msg, "") // mark message as processed
        }
    case <-signals:
        return
    }
}

func NewConsumer Uses

func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error)

NewConsumer initializes a new consumer

func NewConsumerFromClient Uses

func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error)

NewConsumerFromClient initializes a new consumer from an existing client.

Please note that clients cannot be shared between consumers (due to Kafka internals), they can only be re-used which requires the user to call Close() on the first consumer before using this method again to initialize another one. Attempts to use a client with more than one consumer at a time will return errors.

func (*Consumer) Close Uses

func (c *Consumer) Close() (err error)

Close safely closes the consumer and releases all resources

func (*Consumer) CommitOffsets Uses

func (c *Consumer) CommitOffsets() error

CommitOffsets allows to manually commit previously marked offsets. By default there is no need to call this function as the consumer will commit offsets automatically using the Config.Consumer.Offsets.CommitInterval setting.

Please be aware that calling this function during an internal rebalance cycle may return broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).

func (*Consumer) Errors Uses

func (c *Consumer) Errors() <-chan error

Errors returns a read channel of errors that occur during offset management, if enabled. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.

func (*Consumer) HighWaterMarks Uses

func (c *Consumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns the current high water marks for each topic and partition Consistency between partitions is not guaranteed since high water marks are updated separately.

func (*Consumer) MarkOffset Uses

func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.

func (*Consumer) MarkOffsets Uses

func (c *Consumer) MarkOffsets(s *OffsetStash)

MarkOffsets marks stashed offsets as processed. See MarkOffset for additional explanation.

func (*Consumer) MarkPartitionOffset Uses

func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.

func (*Consumer) Messages Uses

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

Messages returns the read channel for the messages that are returned by the broker.

This channel will only return if Config.Group.Mode option is set to ConsumerModeMultiplex (default).

func (*Consumer) Notifications Uses

func (c *Consumer) Notifications() <-chan *Notification

Notifications returns a channel of Notifications that occur during consumer rebalancing. Notifications will only be emitted over this channel, if your config's Group.Return.Notifications setting to true.

func (*Consumer) Partitions Uses

func (c *Consumer) Partitions() <-chan PartitionConsumer

Partitions returns the read channels for individual partitions of this broker.

This channel will only return if Config.Group.Mode option is set to ConsumerModePartitions.

The Partitions() channel must be listened to for the life of this consumer; when a rebalance happens old partitions will be closed (naturally come to completion) and new ones will be emitted. The returned channel will only close when the consumer is completely shut down.

This example shows how to use the consumer to read messages through individual partitions.

Code:


// init (custom) config, set mode to ConsumerModePartitions
config := cluster.NewConfig()
config.Group.Mode = cluster.ConsumerModePartitions

// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
    panic(err)
}
defer consumer.Close()

// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// consume partitions
for {
    select {
    case part, ok := <-consumer.Partitions():
        if !ok {
            return
        }

        // start a separate goroutine to consume messages
        go func(pc cluster.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "") // mark message as processed
            }
        }(part)
    case <-signals:
        return
    }
}

func (*Consumer) ResetOffset Uses

func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string)

ResetOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset

func (*Consumer) ResetOffsets Uses

func (c *Consumer) ResetOffsets(s *OffsetStash)

ResetOffsets marks stashed offsets as processed. See ResetOffset for additional explanation.

func (*Consumer) ResetPartitionOffset Uses

func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string)

ResetPartitionOffset marks an offset of the provided topic/partition as processed. See ResetOffset for additional explanation.

func (*Consumer) Subscriptions Uses

func (c *Consumer) Subscriptions() map[string][]int32

Subscriptions returns the consumed topics and partitions

type ConsumerMode Uses

type ConsumerMode uint8
const (
    ConsumerModeMultiplex ConsumerMode = iota
    ConsumerModePartitions
)

type Error Uses

type Error struct {
    Ctx string
    // contains filtered or unexported fields
}

Error instances are wrappers for internal errors with a context and may be returned through the consumer's Errors() channel

type Notification Uses

type Notification struct {
    // Type exposes the notification type
    Type NotificationType

    // Claimed contains topic/partitions that were claimed by this rebalance cycle
    Claimed map[string][]int32

    // Released contains topic/partitions that were released as part of this rebalance cycle
    Released map[string][]int32

    // Current are topic/partitions that are currently claimed to the consumer
    Current map[string][]int32
}

Notification are state events emitted by the consumers on rebalance

type NotificationType Uses

type NotificationType uint8

NotificationType defines the type of notification

const (
    UnknownNotification NotificationType = iota
    RebalanceStart
    RebalanceOK
    RebalanceError
)

func (NotificationType) String Uses

func (t NotificationType) String() string

String describes the notification type

type OffsetStash Uses

type OffsetStash struct {
    // contains filtered or unexported fields
}

OffsetStash allows to accumulate offsets and mark them as processed in a bulk

func NewOffsetStash Uses

func NewOffsetStash() *OffsetStash

NewOffsetStash inits a blank stash

func (*OffsetStash) MarkOffset Uses

func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset stashes the provided message offset

func (*OffsetStash) MarkPartitionOffset Uses

func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset stashes the offset for the provided topic/partition combination

func (*OffsetStash) Offsets Uses

func (s *OffsetStash) Offsets() map[string]int64

Offsets returns the latest stashed offsets by topic-partition

func (*OffsetStash) ResetOffset Uses

func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string)

ResetOffset stashes the provided message offset See ResetPartitionOffset for explanation

func (*OffsetStash) ResetPartitionOffset Uses

func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string)

ResetPartitionOffset stashes the offset for the provided topic/partition combination. Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets

type PartitionConsumer Uses

type PartitionConsumer interface {
    sarama.PartitionConsumer

    // Topic returns the consumed topic name
    Topic() string

    // Partition returns the consumed partition
    Partition() int32

    // InitialOffset returns the offset used for creating the PartitionConsumer instance.
    // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
    InitialOffset() int64

    // MarkOffset marks the offset of a message as preocessed.
    MarkOffset(offset int64, metadata string)

    // ResetOffset resets the offset to a previously processed message.
    ResetOffset(offset int64, metadata string)
}

PartitionConsumer allows code to consume individual partitions from the cluster.

See docs for Consumer.Partitions() for more on how to implement this.

type Strategy Uses

type Strategy string

Strategy for partition to consumer assignement

const (
    // StrategyRange is the default and assigns partition ranges to consumers.
    // Example with six partitions and two consumers:
    //   C1: [0, 1, 2]
    //   C2: [3, 4, 5]
    StrategyRange Strategy = "range"

    // StrategyRoundRobin assigns partitions by alternating over consumers.
    // Example with six partitions and two consumers:
    //   C1: [0, 2, 4]
    //   C2: [1, 3, 5]
    StrategyRoundRobin Strategy = "roundrobin"
)

Package cluster imports 9 packages (graph) and is imported by 124 packages. Updated 2019-06-18. Refresh now. Tools for package owners.