edatkafka

package
v1.2.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultAckWait = time.Second * 30

DefaultAckWait is a time.Duration representing the maximum amount of time for a consumer to finish

View Source
var DefaultSerializer = KafkaSerializer{}

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements msg.Consumer

func NewConsumer

func NewConsumer(brokers []string, groupID string, options ...ConsumerOption) *Consumer

NewConsumer constructs a new instance of Consumer

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) Listen

func (c *Consumer) Listen(ctx context.Context, channel string, consumer msg.ReceiveMessageFunc) error

type ConsumerOption

type ConsumerOption func(consumer *Consumer)

func WithConsumerAckWait

func WithConsumerAckWait(ackWait time.Duration) ConsumerOption

func WithConsumerDialer

func WithConsumerDialer(dialer *kafka.Dialer) ConsumerOption

func WithConsumerSerializer

func WithConsumerSerializer(serializer Serializer) ConsumerOption

type KafkaSerializer

type KafkaSerializer struct{}

func (KafkaSerializer) Deserialize

func (KafkaSerializer) Deserialize(message kafka.Message) (msg.Message, error)

func (KafkaSerializer) Serialize

func (KafkaSerializer) Serialize(message msg.Message) (kafka.Message, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer implements msg.Producer

func NewProducer

func NewProducer(brokers []string, options ...ProducerOption) *Producer

NewProducer constructs a new instance of Producer

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) error

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, channel string, message msg.Message) error

type ProducerOption

type ProducerOption func(producer *Producer)

func WithProducerLogger

func WithProducerLogger(logger edatlog.Logger) ProducerOption

func WithProducerSerializer

func WithProducerSerializer(serializer Serializer) ProducerOption

func WithProducerTransport

func WithProducerTransport(transport *kafka.Transport) ProducerOption

type Serializer

type Serializer interface {
	Serialize(message msg.Message) (kafka.Message, error)
	Deserialize(message kafka.Message) (msg.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL