kafka

package
v0.0.0-...-b7e27bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Index

Constants

View Source
const (
	SpecifiedPartitionKey = "specifiedPartition"
)

Variables

View Source
var ErrInvalidArgument = errors.New("invalid argument")

Functions

func NewMQ

func NewMQ(network string, address []string) (mq.MQ, error)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(address []string, topic, groupID string) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error)

func (*Consumer) ConsumeChan

func (c *Consumer) ConsumeChan(ctx context.Context) (<-chan *mq.Message, error)

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func (*MQ) Close

func (m *MQ) Close() error

func (*MQ) Consumer

func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error)

func (*MQ) CreateTopic

func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error

func (*MQ) DeleteTopics

func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error

DeleteTopics 删除topic

func (*MQ) Producer

func (m *MQ) Producer(topic string) (mq.Producer, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(address []string, topic string, balancer kafkago.Balancer) *Producer

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResult, error)

func (*Producer) ProduceWithPartition

func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error)

ProduceWithPartition 并没有校验 partition 的正确性。

type SpecifiedPartitionBalancer

type SpecifiedPartitionBalancer struct {
	// contains filtered or unexported fields
}

SpecifiedPartitionBalancer 借助kafka客户端提供的Balancer接口实现向指定分区生产消息 如果在message.WriterData中找到指定的partition信息则直接用其作为目标partition 如果没有找到则使用默认负载均衡器计算目标partition

func NewSpecifiedPartitionBalancer

func NewSpecifiedPartitionBalancer(defaultBalancer kafkago.Balancer) (*SpecifiedPartitionBalancer, error)

func (*SpecifiedPartitionBalancer) Balance

func (b *SpecifiedPartitionBalancer) Balance(msg kafkago.Message, partitions ...int) (partition int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL