Documentation ¶
Index ¶
- Constants
- Variables
- type Consumer
- type ConsumerOption
- type Producer
- func (p *Producer) AsyncErrors() <-chan *sarama.ProducerError
- func (p *Producer) AsyncSendMessages(topic string, msgs ...ProducerMsg) error
- func (p *Producer) AsyncSendMsgWithHeaders(topic string, msg interface{}, headers map[string]string)
- func (p *Producer) AsyncSuccesses() <-chan *sarama.ProducerMessage
- func (p *Producer) Close() (err error)
- func (p *Producer) SyncSendMessages(topic string, msgs ...ProducerMsg) (partition int32, offset int64, err error)
- func (p *Producer) SyncSendMsgWithHeaders(topic string, msg interface{}, headers map[string]string) (partition int32, offset int64, err error)
- type ProducerMsg
- type ProducerOption
- func Async() ProducerOption
- func AsyncMonitor(success, failed bool) ProducerOption
- func Compression(compression int8) ProducerOption
- func MarshalMsgFunc(marshal codec.Marshal) ProducerOption
- func RequiredAcks(requiredAcks int16) ProducerOption
- func Sync() ProducerOption
- func TLSConfig(certFile, keyFile, caFile string, verifySsl bool) ProducerOption
- type Publication
- type PublicationHandlerFunc
Constants ¶
View Source
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. RequiredAcksNoResponse int16 = 0 // WaitForLocal waits for only the local commit to succeed before responding. RequiredAcksWaitForLocal int16 = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. RequiredAcksWaitForAll int16 = -1 )
View Source
const ( //CompressionNone no compression CompressionNone int8 = iota //CompressionGZIP compression using GZIP CompressionGZIP //CompressionSnappy compression using snappy CompressionSnappy //CompressionLZ4 compression using LZ4 CompressionLZ4 //CompressionZSTD compression using ZSTD CompressionZSTD )
Variables ¶
View Source
var ( ConsumerReceiveMultipleCalledError = errors.New("The receive method can only be called once, and if you want to call it more than once, create multiple consumers") ConsumerCloseMultipleCalledError = errors.New("The close method can only be called once, and if you want to call it more than once, create multiple consumers") ProducerClosedError = errors.New("Producer had closed") SyncError = errors.New("this is a sync method, but Producer is async") AsyncError = errors.New("this is an async method, but Producer is sync") NotConsumerGroupError = errors.New("this is not Consumer group") ConsumeCanceledError = errors.New("consume was canceled") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶ added in v0.0.7
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(addresses []string, opts ...ConsumerOption) (*Consumer, error)
func (*Consumer) GroupConsume ¶ added in v0.0.7
type ConsumerOption ¶ added in v0.0.7
type ConsumerOption func(*consumerOptions)
func BalanceStrategyRange ¶ added in v0.0.6
func BalanceStrategyRange() ConsumerOption
func BalanceStrategyRoundRobin ¶ added in v0.0.6
func BalanceStrategyRoundRobin() ConsumerOption
func BalanceStrategySticky ¶ added in v0.0.6
func BalanceStrategySticky() ConsumerOption
func ChannelBufferSize ¶ added in v0.1.0
func ChannelBufferSize(size int) ConsumerOption
func Group ¶ added in v0.0.6
func Group(groupID string) ConsumerOption
func Newest ¶ added in v0.0.6
func Newest() ConsumerOption
func Oldest ¶ added in v0.0.6
func Oldest() ConsumerOption
type Producer ¶ added in v0.0.2
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(addresses []string, opts ...ProducerOption) (*Producer, error)
func (*Producer) AsyncErrors ¶ added in v0.0.2
func (p *Producer) AsyncErrors() <-chan *sarama.ProducerError
func (*Producer) AsyncSendMessages ¶ added in v0.0.3
func (p *Producer) AsyncSendMessages(topic string, msgs ...ProducerMsg) error
func (*Producer) AsyncSendMsgWithHeaders ¶ added in v0.0.3
func (*Producer) AsyncSuccesses ¶ added in v0.0.2
func (p *Producer) AsyncSuccesses() <-chan *sarama.ProducerMessage
func (*Producer) SyncSendMessages ¶ added in v0.0.3
type ProducerMsg ¶
type ProducerOption ¶ added in v0.0.2
type ProducerOption func(*producerOptions)
func Async ¶
func Async() ProducerOption
func AsyncMonitor ¶ added in v0.1.3
func AsyncMonitor(success, failed bool) ProducerOption
func Compression ¶ added in v0.1.3
func Compression(compression int8) ProducerOption
func MarshalMsgFunc ¶
func MarshalMsgFunc(marshal codec.Marshal) ProducerOption
func RequiredAcks ¶ added in v0.1.3
func RequiredAcks(requiredAcks int16) ProducerOption
func Sync ¶
func Sync() ProducerOption
func TLSConfig ¶ added in v0.1.3
func TLSConfig(certFile, keyFile, caFile string, verifySsl bool) ProducerOption
type Publication ¶
type Publication struct { Msg *sarama.ConsumerMessage Session sarama.ConsumerGroupSession }
type PublicationHandlerFunc ¶ added in v0.1.3
type PublicationHandlerFunc func(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
Click to show internal directories.
Click to hide internal directories.