kafka

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const LogDebug = "debug"

Variables

This section is empty.

Functions

func NewConsumerGroup

func NewConsumerGroup(handler func(*sarama.ConsumerMessage)) sarama.ConsumerGroupHandler

Types

type AsyncProducer

type AsyncProducer interface {
	RunAsyncProducer()                                        // 运行异步生产者线程
	Produce(topic string, value []byte, keys ...string) error // 生产消息
	ProducerErrors() <-chan *sarama.ProducerError             // 返回生产者发送消息失败的chan
	CloseProducer()                                           // 关闭线程
	IsRunning() bool                                          // 运行状态
}

type AsyncProducerClient

type AsyncProducerClient struct {
	// contains filtered or unexported fields
}

func (*AsyncProducerClient) CloseProducer

func (p *AsyncProducerClient) CloseProducer()

func (*AsyncProducerClient) IsRunning

func (p *AsyncProducerClient) IsRunning() bool

func (*AsyncProducerClient) Produce

func (p *AsyncProducerClient) Produce(topic string, value []byte, keys ...string) error

Produce 发送消息到队列。仅当需要保证消息顺序时,才使用参数 keys,并且只允许传一个 key

func (*AsyncProducerClient) ProducerErrors

func (p *AsyncProducerClient) ProducerErrors() <-chan *sarama.ProducerError

func (*AsyncProducerClient) RunAsyncProducer

func (p *AsyncProducerClient) RunAsyncProducer()

type Cli

type Cli interface {
	Address() string                       // Address 返回kafka地址
	NewConsumer() (*ConsumerClient, error) // NewConsumer 新建消费者,kafka连接失败将返回error
	NewAsyncProducerClient() (AsyncProducer, error)
	NewSyncProducerClient() (sarama.SyncProducer, error)
}

type CliCfg

type CliCfg struct {
	// contains filtered or unexported fields
}

func Default

func Default() *CliCfg

仅用于单例模式

func (*CliCfg) Address

func (k *CliCfg) Address() string

func (*CliCfg) NewAsyncProducerClient

func (k *CliCfg) NewAsyncProducerClient() (AsyncProducer, error)

func (*CliCfg) NewConsumer

func (k *CliCfg) NewConsumer() (*ConsumerClient, error)

func (*CliCfg) NewSyncProducerClient

func (k *CliCfg) NewSyncProducerClient() (sarama.SyncProducer, error)

type Config

type Config struct {
	Addr         string `yaml:"addr" env:"KafkaAddr" env-description:"address of kafka cluster"`
	QueueLength  int    `yaml:"queue_length"`
	KafkaVersion string `yaml:"kafka_version" env:"KafkaVersion" env-description:"version of kafka cluster"`
	EnableLog    bool   `yaml:"enable_log" env:"KafkaEnableLog" env-description:"enable kafka log or not"`
	LogLevel     string `yaml:"log_level" env:"KafkaLogLevel" env-description:"record logs in which level, only support debug/info"`
}

func (*Config) BuildKafka

func (c *Config) BuildKafka(ctx context.Context) (*CliCfg, error)

BuildKafka 创建Kafka客户端实例

type Consumer

type Consumer interface {
	RunConsumer(group string, topic []string, handler sarama.ConsumerGroupHandler) error // 运行消费者线程
	Close()                                                                              // 关闭线程
	IsRunning() bool                                                                     // 运行状态
}

type ConsumerClient

type ConsumerClient struct {
	// contains filtered or unexported fields
}

func (*ConsumerClient) Close

func (cc *ConsumerClient) Close()

func (*ConsumerClient) IsRunning

func (cc *ConsumerClient) IsRunning() bool

func (*ConsumerClient) RunConsumer

func (cc *ConsumerClient) RunConsumer(group string, topic []string, groupHandler sarama.ConsumerGroupHandler) error

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func (ConsumerGroup) Cleanup

func (ConsumerGroup) ConsumeClaim

func (ConsumerGroup) Setup

Directories

Path Synopsis
examples
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL