kq

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conf

type Conf struct {
	// kafka broker addresses
	Brokers []string `json:"brokers" yaml:"brokers"`

	// topic name
	Topic string `json:"topic" yaml:"topic"`

	// consumer group name
	Group string `json:"group" yaml:"group"`

	// whether to create topic if topic is missing, default false
	AutoCreateTopic *bool `json:"auto_create_topic" yaml:"auto_create_topic"`

	// the count of the topics to create, default 4
	Partitions int `json:"topic_partitions" yaml:"topic_partitions"`

	// the replication count of each topic partition, default 3
	Replications int `json:"replications" yaml:"replications"`

	// the count of workers that consumes synchronously, default is the count of topic partition
	Processors int `json:"processors" yaml:"processors"`

	// default 10K
	MinBytes int `json:"min_bytes" yaml:"min_bytes"`

	// default 10M
	MaxBytes int `json:"max_bytes" yaml:"max_bytes"`

	// certificate file path for connecting to kafka
	CaFile string `json:"ca_file" yaml:"ca_file"`

	// username for connecting to kafka
	Username string `json:"username" yaml:"username"`

	// password for connecting to kafka
	Password string `json:"password" yaml:"password"`
}

func LoadConfig

func LoadConfig(file string) (*Conf, error)

LoadConfig loads Conf from specified file path

func MustLoadConfig

func MustLoadConfig(file string) *Conf

MustLoadConfig loads Conf from specified file path,panics on error

type ConsumeHandler

type ConsumeHandler func(ctx context.Context, topic string, msg *model.Msg) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func MustNewConsumer

func MustNewConsumer(cfg *Conf, handler ConsumeHandler, opts ...ConsumerOption) *Consumer

MustNewConsumer returns a consumer, if it fails, panic

func NewConsumer

func NewConsumer(cfg *Conf, handler ConsumeHandler, opts ...ConsumerOption) (*Consumer, error)

NewConsumer returns a consumer and error

func (*Consumer) Consume

func (c *Consumer) Consume() error

Consume consumes once in single goroutine

func (*Consumer) LoopConsume

func (c *Consumer) LoopConsume()

LoopConsume blocks and consumes msgs in loop with multi goroutine

type ConsumerOption

type ConsumerOption func(consumer *Consumer)

func WithConsumerContext

func WithConsumerContext(ctx context.Context) ConsumerOption

func WithConsumerListener

func WithConsumerListener(listener plugins.ConsumeListener) ConsumerOption

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func MustNewProducer

func MustNewProducer(cfg *Conf, opts ...ProducerOption) *Producer

MustNewProducer returns a producer or panic if fails

func NewProducer

func NewProducer(cfg *Conf, opts ...ProducerOption) (*Producer, error)

NewProducer returns a producer and an error

func (*Producer) Send

func (p *Producer) Send(data any) (string, error)

Send sends data to message queue, the data must be Json serializable

type ProducerOption

type ProducerOption func(producer *Producer)

func WithIdCreator

func WithIdCreator(creator plugins.IdCreator) ProducerOption

func WithProducerContext

func WithProducerContext(ctx context.Context) ProducerOption

func WithProducerListener

func WithProducerListener(listener plugins.ProducerListener) ProducerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL