kafka

package
v1.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Logger *zap.SugaredLogger

	UseOldestOnFail bool
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(lg *zap.SugaredLogger, brokersAddr, topic, clientId string) (*Consumer, error)

func (Consumer) Ack

func (c Consumer) Ack(partition int32, offset int64)

Ack offsets to be marked as processed on kafka offset-manager. Note: sarama MarkOffset is buffered (default 1s). Upon unclean shutdown we could double process the last 1s messages

func (Consumer) Errors

func (c Consumer) Errors() <-chan error

Errors returns error while consuming kafka messages.

func (Consumer) LastOffset

func (c Consumer) LastOffset() map[int32]int64

LastOffset returns the last offset of the Client. This uses Kafka's offset manager API. If not found, it returns the oldest-offset for each partition

func (Consumer) Messages

func (c Consumer) Messages() <-chan Message

Messages returns channel where new Message instances will be published.

func (Consumer) Start

func (c Consumer) Start(offsets map[int32]int64) error

func (Consumer) Stop

func (c Consumer) Stop()

stop consuming

type HighLevelConsumer

type HighLevelConsumer struct {
	Consumer *Consumer
	Ctx      context.Context
}

func NewHighLevelConsumer

func NewHighLevelConsumer(ctx context.Context, lg *zap.SugaredLogger, addr string, topic string, app string) *HighLevelConsumer

func (*HighLevelConsumer) StartConsumption

func (con *HighLevelConsumer) StartConsumption(handler func(value []byte) error)

type Message

type Message struct {
	Offset    int64
	Partition int32
	Message   *sarama.ConsumerMessage
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL