kfk

package module
v0.0.0-...-7193066 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumErr

type ConsumErr struct {
	Err error
	Msg kafka.Message
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf *KfkConsumCfg) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, fn Handler)

type Handler

type Handler func(kafka.Message) error

type KfkConsumCfg

type KfkConsumCfg struct {
	Brokers     []string `yaml:"brokers"`
	GroupTopics []string `yaml:"group_topics"`
	GroupID     string   `yaml:"group_id"`
	BatchSize   int      `yaml:"batch_size"`
	MinBytes    int      `yaml:"min_bytes"`
	MaxBytes    int      `yaml:"max_bytes"`
	Sasl        SaslCfg  `yaml:"sasl"`
}

type KfkProducerCfg

type KfkProducerCfg struct {
	Brokers                []string `yaml:"brokers"`
	Topic                  string   `yaml:"topic"`
	Acks                   int      `yaml:"acks"`
	Async                  bool     `yaml:"async"`
	Timeout                int      `yaml:"timeout"`
	Sasl                   SaslCfg  `yaml:"sasl"`
	AllowAutoTopicCreation bool     `yaml:"allow_auto_topic_creation"`
	BatchSize              int      `yaml:"batch_size"`
	BatchBytes             int      `yaml:"batch_bytes"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg *KfkProducerCfg) (*Producer, error)

func (*Producer) AsynStart

func (p *Producer) AsynStart()

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Pub

func (p *Producer) Pub(key, value []byte)

func (*Producer) Send

func (p *Producer) Send(msgList ...kafka.Message)

type SaslCfg

type SaslCfg struct {
	Enabled  bool   `yaml:"enabled"` //密码开关
	Algo     string `yaml:"algo"`    // 身份验证加密机制
	Username string `yaml:"username"`
	Password string `yaml:"password"`
}

func (*SaslCfg) Mechanism

func (sc *SaslCfg) Mechanism() (sasl.Mechanism, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL