producer

package
v0.0.0-...-a36dcc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithBrokerList

func WithBrokerList(brokers string) Option

kafka brokers ip:port,ip:port

func WithChannelBufferSize

func WithChannelBufferSize(channelBufferSize int) Option

func WithClientID

func WithClientID(clientId string) Option

The client ID sent with every request to the brokers.

func WithCompression

func WithCompression(compression string) Option

func WithFlushBytes

func WithFlushBytes(flushBytes int) Option

func WithFlushFrequencyMs

func WithFlushFrequencyMs(flushFrequencyMs int) Option

func WithFlushMaxMessages

func WithFlushMaxMessages(flushMaxMessages int) Option

func WithFlushMessages

func WithFlushMessages(flushMessages int) Option

func WithMaxMessageBytes

func WithMaxMessageBytes(maxMessageBytes int) Option

func WithMaxOpenRequests

func WithMaxOpenRequests(maxOpenRequests int) Option

func WithPartitioning

func WithPartitioning(partitioning string) Option

key partition: partition(manual),hash,random - manual partitioning if a partition number is provided - hash partitioning by msg key - random partitioning otherwise.

func WithRequiredAcks

func WithRequiredAcks(requiredAcks int) Option

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration). 0: NoResponse doesn't send any response, the TCP ACK is all you get. 1: WaitForLocal waits for only the local commit to succeed before responding. -1: WaitForAll waits for all in-sync replicas to commit before responding. The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration key.

func WithRetryMaxCn

func WithRetryMaxCn(retryMaxCn int) Option

func WithTimeOut

func WithTimeOut(timeOut time.Duration) Option

The duration the producer will wait to receive -required-acks

func WithVersion

func WithVersion(version string) Option

kafka version support kafka min version 0.8.2.0

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(topic string, pType string, authOpts []auth.Option, options ...Option) (p *Producer)

new sync/async producer to topic with option(requiredAcks,retryMaxCn,partitioning,compressions,TLS...etc)

func (*Producer) Close

func (p *Producer) Close()

close sync/async producer

func (*Producer) Send

func (p *Producer) Send(val string)

send string msg no key

func (*Producer) SendByKey

func (p *Producer) SendByKey(key, val string)

send string msg by string key

type ProducerOptions

type ProducerOptions struct {
	*auth.AuthOptions
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL