kafkax

package
v0.0.0-...-cd938e1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	sarama.AsyncProducer
}

异步生产者

func (AsyncProducer) SendMessage

func (producer AsyncProducer) SendMessage(topic, text string)

异步发送消息

type Consumer

type Consumer struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

消费者

func (*Consumer) Cleanup

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Consume

func (consumer *Consumer) Consume(topics string, callback func(*ConsumerMessage) bool) (err error)

开始消费

func (*Consumer) ConsumeClaim

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerConf

type ConsumerConf struct {
	Brokers          string `yaml:"brokers"      json:"brokers"`
	Version          string `yaml:"version"      json:"version"`
	Group            string `yaml:"group"        json:"group"`
	ClientID         string `yaml:"clientID"     json:"clientID"`
	Oldest           bool   `yaml:"oldest"       json:"oldest"`
	MaxWaitTimeMs    int64  `yaml:"maxWaitTimeMs"    json:"maxWaitTimeMs"`    //fetch.wait.max.ms
	SessionTimeoutMs int64  `yaml:"sessionTimeoutMs" json:"sessionTimeoutMs"` //session.timeout.ms
	KeepAliveMs      int64  `yaml:"keepAliveMs"  json:"keepAliveMs"`

	Logger sarama.StdLogger
}

消费者配置

func NewConsumerConf

func NewConsumerConf() *ConsumerConf

func (ConsumerConf) NewConsumer

func (x ConsumerConf) NewConsumer() *Consumer

创建消费者

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
}

type ProducerConf

type ProducerConf struct {
	Hosts        string `yaml:"hosts"        json:"hosts"`
	ClientID     string `yaml:"clientID"     json:"clientID"`
	KeepAliveMs  int64  `yaml:"keepAliveMs"  json:"keepAliveMs"`
	ReqTimeoutMs int64  `yaml:"reqTimeoutMs" json:"reqTimeoutMs"`
	RetSuccesses bool   `yaml:"retSuccesses" json:"retSuccesses"`
}

生产者配置

func NewProducerConf

func NewProducerConf() *ProducerConf

创建生产者配置

func (ProducerConf) NewAsyncProducer

func (x ProducerConf) NewAsyncProducer() *AsyncProducer

创建异步生产者

func (ProducerConf) NewClient

func (x ProducerConf) NewClient() (client sarama.Client)

创建Kafka客户端

func (ProducerConf) NewSyncProducer

func (x ProducerConf) NewSyncProducer() *SyncProducer

创建同步生产者

type SyncProducer

type SyncProducer struct {
	sarama.SyncProducer
}

同步生产者

func (SyncProducer) SendMessage

func (producer SyncProducer) SendMessage(topic, text string) (partition int32, offset int64, err error)

同步发送消息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL