consumer

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrRetry error retry type representation
	ErrRetry = errors.New("retry message")
)

Functions

This section is empty.

Types

type Claimer

type Claimer interface {
	Claim(*sarama.ConsumerMessage)
}

Claimer represents a consumer message claimer struct

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client consumes kafka messages

func NewClient

func NewClient(brokers []string, group string) *Client

NewClient initializes a new consumer client and a Kafka consumer

func (*Client) Claim

func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error)

Claim consumes and emit's the given Kafka message to the subscribed subscriptions. All subscriptions are awaited untill done. An error is returned if one of the subscriptions failed to process the message.

func (*Client) Close

func (client *Client) Close() error

Close closes the Kafka consumer

func (*Client) Connect added in v0.2.1

func (client *Client) Connect(brokers []string, config *sarama.Config, initialOffset int64, ts ...types.Topic) error

Connect opens a new Kafka consumer

func (*Client) Healthy added in v0.4.0

func (client *Client) Healthy() bool

Healthy checks the health of the Kafka client

func (*Client) Subscribe

func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, error)

Subscribe subscribes to the given topics and returns a message channel

func (*Client) Unsubscribe

func (client *Client) Unsubscribe(sub <-chan *types.Message) error

Unsubscribe removes the given channel from the available subscriptions. A new goroutine is spawned to avoid locking the channel.

type GroupHandle

type GroupHandle struct {
	// contains filtered or unexported fields
}

GroupHandle represents a Sarama consumer group consumer handle

func NewGroupHandle

func NewGroupHandle(client *Client) *GroupHandle

NewGroupHandle initializes a new GroupHandle

func (*GroupHandle) Cleanup

func (handle *GroupHandle) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*GroupHandle) Close

func (handle *GroupHandle) Close() error

Close closes the group consume handle and awaits till all claimed messages are processed. The consumer group get's marked for closing

func (*GroupHandle) Connect

func (handle *GroupHandle) Connect(conn sarama.Client, topics []string, group string) error

Connect initializes a new Sarama consumer group and awaits till the consumer group is set up and ready to consume messages.

func (*GroupHandle) ConsumeClaim

func (handle *GroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). When a Kafka message is claimed is it passed to the client Claim method. If an error occurred during processing of the claimed message is the message marked to be retried.

func (*GroupHandle) Setup

Setup is run at the beginning of a new session, before ConsumeClaim. This method is a implementation of the sarama consumer interface.

type Handle

type Handle interface {
	Close() error
}

Handle represents a Kafka consumer handle

type HandleType

type HandleType int8

HandleType represents the type of consumer that is adviced to use for the given connectionstring

const (
	PartitionConsumerHandle HandleType = 0
	GroupConsumerHandle     HandleType = 1
)

Plausible consumer types

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer represents a single partition consumer

type PartitionHandle

type PartitionHandle struct {
	// contains filtered or unexported fields
}

PartitionHandle represents a Sarama partition consumer

func NewPartitionHandle

func NewPartitionHandle(client *Client) *PartitionHandle

NewPartitionHandle initializes a new PartitionHandle

func (*PartitionHandle) Close

func (handle *PartitionHandle) Close() error

Close closes the given consumer and all topic partition consumers. First are all partition consumers closed before the client consumer is closed.

func (*PartitionHandle) Connect

func (handle *PartitionHandle) Connect(conn sarama.Client, topics []string, initialOffset int64) error

Connect initializes a new Sarama partition consumer and awaits till the consumer group is set up and ready to consume messages.

func (*PartitionHandle) Heartbeat

func (handle *PartitionHandle) Heartbeat()

Heartbeat set's up a new time ticker that checks every time if the partition count has changed for the consumed topics. By default does the heartbeat tick every 1500ms

func (*PartitionHandle) PartitionConsumer

func (handle *PartitionHandle) PartitionConsumer(topic string, partition int32) error

PartitionConsumer set's up a new partition consumer for the given topic and partition

func (*PartitionHandle) PullPartitions

func (handle *PartitionHandle) PullPartitions(topic string) ([]int32, error)

PullPartitions pulls the available partitions for the set topics. If the partition count has changed are the new partitions returned.

func (*PartitionHandle) Rebalance

func (handle *PartitionHandle) Rebalance() error

Rebalance pulls the latest available topics and starts new partition consumers when nessasery.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents a consumer topic(s) subscription

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a thread safe list of subscriptions

func NewTopic

func NewTopic() *Topic

NewTopic constructs and returns a new Topic struct

type TopicPartitionConsumers

type TopicPartitionConsumers struct {
	// contains filtered or unexported fields
}

TopicPartitionConsumers represents a topic and it's partition consumers

func (*TopicPartitionConsumers) ClaimMessages

func (tc *TopicPartitionConsumers) ClaimMessages(topic string, partition int32, consumer sarama.PartitionConsumer)

ClaimMessages handles the claiming of consumed messages

func (*TopicPartitionConsumers) Consume

func (tc *TopicPartitionConsumers) Consume(partition int32) error

Consume opens a new consumer for the given partition

func (*TopicPartitionConsumers) Delist

func (tc *TopicPartitionConsumers) Delist(consumer *PartitionConsumer) error

Delist unlists the consumer as available

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL