consumer

package
v0.0.0-...-a36dcc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func NewConsumerGroup

func NewConsumerGroup(name string, msg IConsumerMsg, authOpts []auth.Option, options ...Option) (consumer *ConsumerGroup, err error)

user just defined open consumer group option

func (*ConsumerGroup) Cleanup

func (consumer *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) Close

func (consumer *ConsumerGroup) Close()

func (*ConsumerGroup) ConsumeClaim

func (consumer *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) Start

func (consumer *ConsumerGroup) Start()

func (*ConsumerGroup) StartWithDeadline

func (consumer *ConsumerGroup) StartWithDeadline(time time.Time)

func (*ConsumerGroup) StartWithTimeOut

func (consumer *ConsumerGroup) StartWithTimeOut(timeout time.Duration)

type ConsumerGroupOptions

type ConsumerGroupOptions struct {
	*auth.AuthOptions
	// contains filtered or unexported fields
}

type IConsumerMsg

type IConsumerMsg interface {
	Consumer(msg *sarama.ConsumerMessage) error
}

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithBrokerList

func WithBrokerList(brokers string) Option

kafka brokers ip:port,ip:port

func WithGroupId

func WithGroupId(groupId string) Option

consumer groupId

func WithInitialOffset

func WithInitialOffset(initialOffset string) Option

initial offset to consumer (oldest, newest)

func WithReBalanceStrategy

func WithReBalanceStrategy(reBalanceStrategy string) Option

consumer group partition assignment strategy (range, roundrobin, sticky)

func WithTopicList

func WithTopicList(topics string) Option

subscribe topics "test1,test2"

func WithVersion

func WithVersion(version string) Option

kafka version

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL