kafka

package module
v0.0.0-...-f89596c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

#### references: - https://medium.com/event-driven-utopia/understanding-kafka-topic-partitions-ae40f80552e8 - https://silverback-messaging.net/concepts/broker/kafka/kafka-partitioning.html?tabs=destination-partition-fluent%2Cenricher-fluent%2Cconcurrency-fluent%2Cassignment-fluent - https://www.youtube.com/watch?v=JalUUBKdcA0

Index

Constants

View Source
const (
	RFC3339ms = "2006-01-02T15:04:05.000Z07:00"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Addrs   []string `mapstructure:"addrs"`
	Version string   `mapstructure:"version"` // 3.4.0
	Topic   string   `mapstructure:"topic"`

	// consumer
	GroupId string `mapstructure:"group_id"` // default

	// producer
	Key string `mapstructure:"key"`
}

func NewConfigFromViper

func NewConfigFromViper(vp *viper.Viper, field string) (
	config *Config, scfg *sarama.Config, err error)

type Handle

type Handle func(msg *sarama.ConsumerMessage)

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func HandlerFromConfig

func HandlerFromConfig(ctx context.Context, vp *viper.Viper, field string) (
	handler *Handler, err error)

func NewHandler

func NewHandler(ctx context.Context, group sarama.ConsumerGroup, topics []string) (
	handler *Handler)

func (*Handler) Cleanup

func (handler *Handler) Cleanup(sess sarama.ConsumerGroupSession) (err error)

func (*Handler) Close

func (handler *Handler) Close() error

func (*Handler) Consume

func (handler *Handler) Consume()

func (*Handler) ConsumeClaim

func (handler *Handler) ConsumeClaim(sess sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error

func (*Handler) Ok

func (handler *Handler) Ok() (err error)

func (*Handler) Setup

func (handler *Handler) Setup(sess sarama.ConsumerGroupSession) (err error)

func (*Handler) WithHandle

func (handler *Handler) WithHandle(handle Handle) *Handler

func (*Handler) WithLogger

func (handler *Handler) WithLogger(logger *zap.Logger) *Handler

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(vp *viper.Viper, field string) (producer *KafkaProducer, err error)

func (*KafkaProducer) Close

func (producer *KafkaProducer) Close() (err error)

func (*KafkaProducer) SendMsg

func (producer *KafkaProducer) SendMsg(ctx context.Context, bts []byte) (msg *sarama.ProducerMessage)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL