kafka

package
v0.0.0-...-b08fcc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

kafka 组件

初步简单封装一下,后续更加完善

生产者使用方式

  1. 配置config文件
kafka_producer_client:
  - producer_to: 'message'
    brokers:
      - '127.0.0.1:9092'
      - '127.0.0.1:9092'
      - '127.0.0.1:9092'
    max_attempts: 3
    async: false
    required_ack: 1
  • producer_to: 生产商名称

  • brokers: 地址

  • async: 是否异步发送消息,注意 异步无法保证消息成功送达

  • max_attempts: 最大重试次数

  • required_ack: 消息确认机制,默认设置为 1,三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

    • RequiredAck = 0 (NoResponse) 这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

    • RequiredAck = 1 (WaitForLocal) 这意味着producer在le.

    • ader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

    • RequiredAck = -1 (WaitForAll) 这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

  • read_timeout: 等待响应的时间,默认 30s

  1. 初始化
service := micro.NewService(ServiceName)

micro.Init(
kafka.InitKafkaProducer, // 这里初始化 kafka 生产者客户端
)

defer kafka.ProducerClose() // 这里释放生产者客户端
  1. 发送消息
  • Send(ctx context.Context, message *kafka.ProducerMessage) error // 原始发送
  • SendMsg(ctx context.Context, topic string, key string, msg []byte) error // 封装一层,方便发送
err = kafka.InitProducerProxy("message").SendMsg(ctx, topic, strconv.FormatInt(req.ReceiveId, 10), sendPrivateMsgBuff)
  • InitProducerProxy("message") 的 producerTo 需要填写配置文件的 producer_to

消费者使用方式

  1. 配置 config 文件
kafka_consume_client:
  - consume_from: message.job.event.message.private
    brokers:
      - '127.0.0.1:2181'
      - '127.0.0.1:2181'
      - '127.0.0.1:2181'
    topic: event.message.private
    group: message.job.event.message.broadcast
    start_offset: -1
    commit_interval: 10
  - consume_from: message.job.event.message.broadcast
    brokers:
      - '127.0.0.1:2181'
      - '127.0.0.1:2181'
      - '127.0.0.1:2181'
    topic: event.message.broadcast
    group: message.job.event.message.broadcast
    start_offset: -1
    commit_interval: 10
  • consume_from: 消费者名称
  • brokers: zk地址, 暂不支持从 kafka 直接消费
  • topic: 主题名称
  • group: 消费组名称
  • start_offset: 初始消费游标,默认 -1,从最新消息消费
    • -1 从最新开始消费
    • -2 从最旧开始消费
  • commit_interval: 设置自动提交offset间隔,默认10s
  1. 消费者初始化
service := micro.NewService(serviceName)
micro.Init(
kafka.InitKafkaConsumer, // 初始化kafka 消费者客户端
)

defer kafka.ConsumeClose() // 释放
  1. 消费消息
InitConsumerProxy(consumeFrom).GetMessages(ctx, func (ctx context.Context, msg *kafka.ConsumerMessage) (ack bool, err error) {
var req message.SendMessageReq
logger.For(ctx, "payload", string(msg.Value)).Info("broadcast consumer success")

err := json.Unmarshal(msg.Value, &req)
if err != nil {
logger.For(ctx, "payload", string(msg.Value)).Error(err)
return true, nil
}

service.NewMessage().BroadcastSend(ctx, req)
return true, nil
})
  • InitConsumerProxy(consumeFrom): consumeFrom 需要填写配置文件的 consume_from

参考示例

生产者
消费者

至此结束

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumeClient

func ConsumeClient(ctx context.Context, consumeFrom string) kafka.Reader

func ConsumeClose

func ConsumeClose()

func InitKafkaConsumer

func InitKafkaConsumer(kafkaPath string) error

func InitKafkaProducer

func InitKafkaProducer(kafkaPath string) error

func ProducerClient

func ProducerClient(ctx context.Context, producerTo string) kafka.Writer

func ProducerClose

func ProducerClose()

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func InitConsumerProxy

func InitConsumerProxy(consumeFrom string) *Consumer

func (*Consumer) GetMessages

func (c *Consumer) GetMessages(rootCtx context.Context, handler func(context.Context, *kafka.ConsumerMessage) (ack bool))

type ConsumerConfig

type ConsumerConfig struct {
	ConsumeFrom    string   `json:"consume_from"`
	Brokers        []string `json:"brokers"`
	Topic          string   `json:"topic"`
	Group          string   `json:"group"`
	StartOffset    int      `json:"start_offset"`
	CommitInterval int      `json:"commit_interval"`
}

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

func (*GroupConsumer) Cleanup

func (consumer *GroupConsumer) Cleanup(_ sarama.ConsumerGroupSession) error

func (*GroupConsumer) ConsumeClaim

func (consumer *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*GroupConsumer) Setup

func (consumer *GroupConsumer) Setup(_ sarama.ConsumerGroupSession) error

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func InitProducerProxy

func InitProducerProxy(producerTo string) *Producer

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, message *kafka.ProducerMessage) error

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, topic string, key string, msg interface{}) (err error)

func (*Producer) SendMsg

func (p *Producer) SendMsg(ctx context.Context, topic string, key string, msg []byte) error

type ProductConfig

type ProductConfig struct {
	ProducerTo  string   `json:"producer_to"`
	Brokers     []string `json:"brokers"`
	Async       bool     `json:"async"`
	MaxAttempts int      `json:"max_attempts"`
	RequiredAck int      `json:"required_ack"`
	ReadTimeout int      `json:"read_timeout"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL