kafkax

package
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultRetryMax 默认重试时间, 单位秒.
	DefaultRetryMax = 10
	// DefaultFlushFrequency 默认刷新频率, 500毫秒.
	DefaultFlushFrequency = 500
	// DefaultVersion 默认卡夫卡版本
	DefaultVersion = "2.8.0"
)

Variables

View Source
var (
	ProducerEnableErr      = errno.NewError("cfg.Producer.Enable is false")      //nolint:nolintlint,errname
	AsyncProducerEnableErr = errno.NewError("cfg.AsyncProducer.Enable is false") //nolint:nolintlint,errname
)

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	Client sarama.AsyncProducer
	// contains filtered or unexported fields
}

func (*AsyncProducer) AsyncClose

func (p *AsyncProducer) AsyncClose()

func (*AsyncProducer) Close

func (p *AsyncProducer) Close() error

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before process shutting down, or you may lose messages. You must call this before calling Close on the underlying client.

func (*AsyncProducer) Errors

func (p *AsyncProducer) Errors() <-chan *ProducerError

Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock when the channel is full. Alternatively, you can set Producer.Return.Errors in your config to false, which prevents errors to be returned.

func (*AsyncProducer) GetClient

func (p *AsyncProducer) GetClient() *AsyncProducer

func (*AsyncProducer) SendMessage

func (p *AsyncProducer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)

func (*AsyncProducer) SendMessageByte

func (p *AsyncProducer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)

func (*AsyncProducer) SendMessages

func (p *AsyncProducer) SendMessages(topic Topic,
	key string, values ...interface{}) (msgList []*ProducerMessage, err error)

func (*AsyncProducer) SendMessagesByte

func (p *AsyncProducer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)

func (*AsyncProducer) Successes

func (p *AsyncProducer) Successes() <-chan *ProducerMessage

Successes is the success output channel back to the user when Return.Successes is enabled. If Return.Successes is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.

type AsyncProducerRepo

type AsyncProducerRepo interface {
	GetClient() *AsyncProducer

	// AsyncClose triggers a shutdown of the producer. The shutdown has completed
	// when both the Errors and Successes channels have been closed. When calling
	// AsyncClose, you *must* continue to read from those channels in order to
	// drain the results of any messages in flight.
	AsyncClose()

	// Close shuts down the producer and waits for any buffered messages to be
	// flushed. You must call this function before a producer object passes out of
	// scope, as it may otherwise leak memory. You must call this before process
	// shutting down, or you may lose messages. You must call this before calling
	// Close on the underlying client.
	Close() error
	// SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
	SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
	SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)
	// SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
	SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
	SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)
	// Successes is the success output channel back to the user when Return.Successes is
	// enabled. If Return.Successes is true, you MUST read from this channel or the
	// Producer will deadlock. It is suggested that you send and read messages
	// together in a single select statement.
	Successes() <-chan *ProducerMessage

	// Errors is the error output channel back to the user. You MUST read from this
	// channel or the Producer will deadlock when the channel is full. Alternatively,
	// you can set Producer.Return.Errors in your config to false, which prevents
	// errors to be returned.
	Errors() <-chan *ProducerError
}

func DefaultAsyncProducer

func DefaultAsyncProducer() AsyncProducerRepo

func NewAsyncProducer

func NewAsyncProducer(cfg Info, optionHandlers ...OptionHandler) (AsyncProducerRepo, error)

type CloseFunc

type CloseFunc func() error

type ConsumeGroupHandler

type ConsumeGroupHandler func(msg *ConsumerMessage) (error, bool)

type ConsumeHandler

type ConsumeHandler func(msgChan <-chan *ConsumerMessage,
	errChan <-chan *ConsumerError, markFunc MarkMessageFunc) error

type ConsumerError

type ConsumerError struct {
	sarama.ConsumerError
}

type ConsumerGroup

type ConsumerGroup struct {
	Client sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func (*ConsumerGroup) Close

func (g *ConsumerGroup) Close() (err error)

func (*ConsumerGroup) Consume

func (g *ConsumerGroup) Consume(topics []Topic, handler ConsumeGroupHandler) error

Consume 消费者, 这个是一个阻塞的动作. 应该包裹在一个for循环中. for循环结束记得调用cancel.

func (*ConsumerGroup) Errors

func (g *ConsumerGroup) Errors() <-chan error

Errors returns a read channel of errors that occurred during the Consumer life-cycle. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.

func (*ConsumerGroup) GetClient

func (g *ConsumerGroup) GetClient() *ConsumerGroup

func (*ConsumerGroup) GetConfig

func (p *ConsumerGroup) GetConfig() Info

type ConsumerGroupHandler

type ConsumerGroupHandler interface {
	sarama.ConsumerGroupHandler
}

type ConsumerGroupRepo

type ConsumerGroupRepo interface {
	Close() error
	GetClient() *ConsumerGroup
	// Consume 消费者, 这个是一个阻塞的动作. 应该包裹在一个for循环中.
	Consume(topics []Topic, handler ConsumeGroupHandler) error
	Errors() <-chan error
	GetConfig() Info
}

func DefaultConsumerGroup

func DefaultConsumerGroup() ConsumerGroupRepo

func NewConsumerGroup

func NewConsumerGroup(cfg Info, optionHandlers ...OptionHandler) (ConsumerGroupRepo, error)

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
}

type GroupConfigInfo

type GroupConfigInfo struct {
	GroupID  string `toml:"GroupID" json:"GroupID"`
	Assignor string `toml:"Assignor" json:"Assignor"`
	Oldest   bool   `toml:"Oldest" json:"Oldest"`
}

type Info

type Info struct {
	// 连接地址, 集群则配置多个. 必填.
	BrokerList []string `toml:"BrokerList" json:"BrokerList"`
	// 生产者相关配置.
	Producer      ProducerConfigInfo `toml:"Producer" json:"Producer"`
	AsyncProducer ProducerConfigInfo `toml:"AsyncProducer" json:"AsyncProducer"`
	TLS           TLSConfigInfo      `toml:"TLS" json:"TLS"`
	Group         GroupConfigInfo    `toml:"Group" json:"Group"`
	Version       string             `toml:"Version" json:"Version"`
}

type MarkMessageFunc

type MarkMessageFunc func(msg *ConsumerMessage, metadata string)

type OptionHandler

type OptionHandler func(*option)

type PartitionConsumerRepo

type PartitionConsumerRepo interface {
	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
	// function, or Close before a Consumer object passes out of scope, as it will otherwise leak memory. You must call
	// this before calling Close on the underlying client.
	AsyncClose()

	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
	// the Messages channel when this function is called, you will be competing with Close for messages; consider
	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a Consumer object passes
	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
	Close() error

	// Messages returns the read channel for the messages that are returned by
	// the broker.
	Messages() <-chan *ConsumerMessage

	// Errors returns a read channel of errors that occurred during consuming, if
	// enabled. By default, errors are logged and not returned over this channel.
	// If you want to implement any custom error handling, set your config's
	// Consumer.Return.Errors setting to true, and read from this channel.
	Errors() <-chan *ConsumerError

	// HighWaterMarkOffset returns the high water mark offset of the partition,
	// i.e. the offset that will be used for the next message that will be produced.
	// You can use this to determine how far behind the processing is.
	HighWaterMarkOffset() int64
}

type Producer

type Producer struct {
	Client sarama.SyncProducer
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) GetClient

func (p *Producer) GetClient() *Producer

func (*Producer) GetConfig

func (p *Producer) GetConfig() Info

func (*Producer) SendMessage

func (p *Producer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)

SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量.

func (*Producer) SendMessageByte

func (p *Producer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)

func (*Producer) SendMessages

func (p *Producer) SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)

SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量.

func (*Producer) SendMessagesByte

func (p *Producer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)

type ProducerConfigInfo

type ProducerConfigInfo struct {
	Enable bool `toml:"Enable" json:"Enable"`
	// RequiredAcks 两种模式都要配置.要求填数字. 因为有0值, 所以不用int型, 以免产生歧义. 不填有默认值.
	RequiredAcks string `toml:"RequiredAcks" json:"RequiredAcks"`

	// RetryMax 同步模式时的重试次数. 要求填数字, 不填有默认值
	RetryMax int `toml:"RetryMax" json:"RetryMax"`

	// Compression 异步模式时配置 要求填数字. 因为有0值, 所以不用int型, 以免产生歧义. 不填有默认值.
	Compression string `toml:"Compression" json:"Compression"`
	// 异步模式时刷新频率, 单位毫秒. 不填有默认值
	FlushFrequency int64 `toml:"FlushFrequency" json:"FlushFrequency"`
}

type ProducerError

type ProducerError struct {
	sarama.ProducerError
}

type ProducerMessage

type ProducerMessage struct {
	sarama.ProducerMessage
}

type ProducerRepo

type ProducerRepo interface {

	// Close shuts down the producer; you must call this function before a producer
	// object passes out of scope, as it may otherwise leak memory.
	// You must call this before calling Close on the underlying client.
	Close() error
	GetClient() *Producer
	// SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
	SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
	SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)
	// SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
	SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
	SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)
}

func DefaultProducer

func DefaultProducer() ProducerRepo

func NewProducer

func NewProducer(cfg Info, optionHandlers ...OptionHandler) (ProducerRepo, error)

type TLSConfigInfo

type TLSConfigInfo struct {
	CertFile  string `toml:"CertFile" json:"CertFile"`
	KeyFile   string `toml:"KeyFile" json:"KeyFile"`
	CaFile    string `toml:"CaFile" json:"CaFile"`
	VerifySsl bool   `toml:"VerifySsl" json:"VerifySsl"`
}

type Topic

type Topic interface {
	String() (topic string)
}

Topic 卡夫卡的主题名称, 都用这个结构体封装

type Trace

type Trace = trace.T

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL