kafka

package
v5.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadMessage

func LoadMessage(src *kafka.Message) (*utils.Message, error)

Types

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	*KafkaCredConfig
	GroupID                string
	AutoCommit             bool
	MaxBuffer              int
	AutoCommitIntervalInMs uint64
	EnableLog              bool
}

type KafkaCredConfig

type KafkaCredConfig struct {
	Brokers       []string
	ServiceName   string
	SASLType      string
	SASLMechanism sasl.Mechanism
	TLSConfig     *tls.Config
}

type KafkaProducerConfig

type KafkaProducerConfig struct {
	*KafkaCredConfig
	Acknowledge            int
	BatchMaxBuffer         int
	BatchFlushIntervalInMs uint64
	Async                  bool
	Batch                  bool
	Topic                  string
	EnableLog              bool
	Name                   string
}

type Message

type Message struct {
	*kafka.Message
	// contains filtered or unexported fields
}

func (*Message) GetBody

func (m *Message) GetBody() string

func (*Message) GetHeaders

func (m *Message) GetHeaders() map[string]string

func (*Message) GetKey

func (m *Message) GetKey() string

func (*Message) GetMeta

func (m *Message) GetMeta() map[string]any

func (*Message) LoadBody

func (m *Message) LoadBody(v any) error

type Poller

type Poller struct {
	*api.Reader
	// contains filtered or unexported fields
}

func NewPoller

func NewPoller(ctx context.Context, logger log.Log, config KafkaConsumerConfig, tr api.ConsumerTracer, topics ...string) (*Poller, error)

func (*Poller) Close

func (k *Poller) Close(ctx context.Context) error

func (*Poller) Poll

func (k *Poller) Poll(ctx context.Context, ch chan<- *kafka.Message) error

type Producer

type Producer struct {
	*api.Writer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(ctx context.Context, logger log.Log, config *KafkaProducerConfig, tr api.ProduceTracer) (*Producer, error)

func (*Producer) Close

func (k *Producer) Close(ctx context.Context) error

func (*Producer) HealthCheck

func (k *Producer) HealthCheck(ctx context.Context) error

func (*Producer) Name

func (k *Producer) Name(ctx context.Context) string

func (*Producer) ProduceMessage

func (k *Producer) ProduceMessage(ctx context.Context, key string, message *utils.Message, headers map[string]string) (err error)

func (*Producer) ProduceMessageWithTopic

func (k *Producer) ProduceMessageWithTopic(ctx context.Context, topic, key string, message *utils.Message, headers map[string]string) (err error)

func (*Producer) ProduceToTopic

func (k *Producer) ProduceToTopic(ctx context.Context, topic, key string, message []byte, headers map[string]string) (err error)

func (*Producer) Shutdown

func (k *Producer) Shutdown(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL