kafka

package
v1.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2023 License: GPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PubConn = "Publish"
	SubConn = "Subscribe"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnParams

type ConnParams struct {
	Addr    string
	NeedPub bool // for pub
	NeedSub bool // for sub
	AutoAck bool // for sub
	Group   string
}

type Kafka

type Kafka struct {
	Params     *ConnParams
	Brokers    []string
	ConnConfig *sarama.Config
	SubConfig  *cluster.Config
	Pub        sarama.SyncProducer
	Sub        *cluster.Consumer
}

func ConnMQ

func ConnMQ(connType, addr, group string) (*Kafka, error)

连接kafka服务

func (*Kafka) Ack

func (k *Kafka) Ack(msg *sarama.ConsumerMessage)

Ack 手动提交ack

func (*Kafka) AddConsumer

func (k *Kafka) AddConsumer(topic string) (consumer *cluster.Consumer, err error)

func (*Kafka) Close

func (k *Kafka) Close()

func (*Kafka) Publish

func (k *Kafka) Publish(topic string, body []byte, key string) (partition int32, offset int64, err error)

发布消息

func (*Kafka) ResetOffset

func (k *Kafka) ResetOffset(topic string) error

重置offset,让同一消费组的消费者重新消费

func (*Kafka) ResetOffsetAppoint

func (k *Kafka) ResetOffsetAppoint(topic string, partition int32, offset int64) error

重置offset,指定分区和偏移量

func (*Kafka) Subscribe

func (k *Kafka) Subscribe(consumer *cluster.Consumer) (message *sarama.ConsumerMessage, err error)

订阅消息

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func (*Msg) Ack

func (m *Msg) Ack() error

func (*Msg) GetBody

func (m *Msg) GetBody() []byte

func (*Msg) GetID

func (m *Msg) GetID() string

func (*Msg) GetOffset

func (m *Msg) GetOffset() int64

func (*Msg) GetPid

func (m *Msg) GetPid() int32

func (*Msg) GetTopic

func (m *Msg) GetTopic() string

type MsgCb

type MsgCb func(m *Msg) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL