kafka

package module
v2.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2024 License: MIT Imports: 22 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTopic added in v2.0.7

func CreateTopic(addr string, topic string, numPartitions, replicationFactor int) error

func DeleteTopic added in v2.0.7

func DeleteTopic(addr string, topics ...string) error

func NewBroker

func NewBroker(opts ...broker.Option) broker.Broker

NewBroker .

func WithAllowPublishAutoTopicCreation added in v2.0.7

func WithAllowPublishAutoTopicCreation(enable bool) broker.Option

WithAllowPublishAutoTopicCreation .

func WithAsync

func WithAsync(enable bool) broker.Option

WithAsync default:true

func WithBatchBytes

func WithBatchBytes(by int64) broker.Option

WithBatchBytes default:1048576 bytes

func WithBatchSize

func WithBatchSize(size int) broker.Option

WithBatchSize batch.size default:100

func WithBatchTimeout

func WithBatchTimeout(timeout time.Duration) broker.Option

WithBatchTimeout linger.ms default:10ms

func WithCommitInterval

func WithCommitInterval(interval time.Duration) broker.Option

WithCommitInterval .

func WithCrc32Balancer

func WithCrc32Balancer(consistent bool) broker.PublishOption

WithCrc32Balancer .

func WithDialer

func WithDialer(cfg *kafkaGo.Dialer) broker.Option

WithDialer .

func WithDialerTimeout

func WithDialerTimeout(tm time.Duration) broker.Option

WithDialerTimeout .

func WithEnableErrorLogger

func WithEnableErrorLogger(enable bool) broker.Option

WithEnableErrorLogger enable go-micro error logger

func WithEnableLogger

func WithEnableLogger(enable bool) broker.Option

WithEnableLogger enable go-micro info logger

func WithEnableOneTopicOneWriter

func WithEnableOneTopicOneWriter(enable bool) broker.Option

WithEnableOneTopicOneWriter .

func WithErrorLogger

func WithErrorLogger(l kafkaGo.Logger) broker.Option

WithErrorLogger inject error logger

func WithHashBalancer

func WithHashBalancer(hasher hash.Hash32) broker.PublishOption

WithHashBalancer .

func WithHeaders

func WithHeaders(headers map[string]interface{}) broker.PublishOption

WithHeaders .

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) broker.Option

WithHeartbeatInterval .

func WithLeastBytesBalancer

func WithLeastBytesBalancer() broker.PublishOption

WithLeastBytesBalancer .

func WithLogger

func WithLogger(l kafkaGo.Logger) broker.Option

WithLogger inject info logger

func WithMaxAttempts

func WithMaxAttempts(cnt int) broker.Option

WithMaxAttempts .

func WithMaxBytes

func WithMaxBytes(bytes int) broker.Option

WithMaxBytes .

func WithMaxWait

func WithMaxWait(time time.Duration) broker.Option

WithMaxWait fetch.max.wait.ms

func WithMessageKey

func WithMessageKey(key []byte) broker.PublishOption

WithMessageKey .

func WithMessageOffset

func WithMessageOffset(offset int64) broker.PublishOption

WithMessageOffset .

func WithMinBytes

func WithMinBytes(bytes int) broker.Option

WithMinBytes fetch.min.bytes

func WithMurmur2Balancer

func WithMurmur2Balancer(consistent bool) broker.PublishOption

WithMurmur2Balancer .

func WithPartitionWatchInterval

func WithPartitionWatchInterval(interval time.Duration) broker.Option

WithPartitionWatchInterval .

func WithPlainMechanism

func WithPlainMechanism(username, password string) broker.Option

WithPlainMechanism .

func WithPublishMaxAttempts

func WithPublishMaxAttempts(cnt int) broker.Option

WithPublishMaxAttempts .

func WithQueueCapacity

func WithQueueCapacity(cap int) broker.Option

WithQueueCapacity .

func WithReadLagInterval

func WithReadLagInterval(interval time.Duration) broker.Option

WithReadLagInterval .

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) broker.Option

WithReadTimeout default:10s

func WithReaderConfig

func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option

WithReaderConfig .

func WithRebalanceTimeout

func WithRebalanceTimeout(timeout time.Duration) broker.Option

WithRebalanceTimeout .

func WithReferenceHashBalancer

func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption

WithReferenceHashBalancer .

func WithRetentionTime

func WithRetentionTime(time time.Duration) broker.Option

WithRetentionTime .

func WithRetries

func WithRetries(cnt int) broker.Option

WithRetries .

func WithRoundRobinBalancer

func WithRoundRobinBalancer() broker.PublishOption

WithRoundRobinBalancer .

func WithScramMechanism added in v2.0.7

func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option

WithScramMechanism .

func WithSessionTimeout

func WithSessionTimeout(timeout time.Duration) broker.Option

WithSessionTimeout .

func WithStartOffset

func WithStartOffset(offset int64) broker.Option

WithStartOffset .

func WithSubscribeAutoCreateTopic added in v2.0.7

func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption

func WithWatchPartitionChanges

func WithWatchPartitionChanges(enable bool) broker.Option

WithWatchPartitionChanges .

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) broker.Option

WithWriteTimeout default:10s

func WithWriterConfig

func WithWriterConfig(cfg WriterConfig) broker.Option

WithWriterConfig .

Types

type BalancerName added in v2.0.7

type BalancerName string
const (
	LeastBytesBalancer    BalancerName = "LeastBytes"
	RoundRobinBalancer    BalancerName = "RoundRobin"
	HashBalancer          BalancerName = "Hash"
	ReferenceHashBalancer BalancerName = "ReferenceHash"
	Crc32Balancer         BalancerName = "CRC32Balancer"
	Murmur2Balancer       BalancerName = "Murmur2Balancer"
)

type ErrorLogger

type ErrorLogger struct {
}

func (ErrorLogger) Printf

func (l ErrorLogger) Printf(msg string, args ...interface{})

Printf .

type Logger

type Logger struct {
}

func (Logger) Printf

func (l Logger) Printf(msg string, args ...interface{})

Printf .

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier

NewMessageCarrier .

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

Get .

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

Keys .

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set .

type ScramAlgorithm added in v2.0.7

type ScramAlgorithm string
const (
	ScramAlgorithmSHA256 ScramAlgorithm = "SHA256"
	ScramAlgorithmSHA512 ScramAlgorithm = "SHA512"
)

type Writer

type Writer struct {
	Writer                  *kafkaGo.Writer
	Writers                 map[string]*kafkaGo.Writer
	EnableOneTopicOneWriter bool
}

func NewWriter

func NewWriter(enableOneTopicOneWriter bool) *Writer

NewWriter creates new Writer

func (*Writer) Close

func (w *Writer) Close()

Close flushes pending writes, and waits for all writes to complete before returning

func (*Writer) CreateProducer

func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer

CreateProducer creates kafka-go Writer

type WriterConfig

type WriterConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// The balancer used to distribute messages across partitions.
	//
	// The default is to use a round-robin distribution.
	Balancer kafkaGo.Balancer

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int

	// Limit the maximum size of a request in bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int64

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAcks kafkaGo.RequiredAcks

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool

	// If not nil, specifies a logger used to report internal changes within the
	// Writer.
	Logger kafkaGo.Logger

	// ErrorLogger is the logger used to report errors. If nil, the Writer falls
	// back to using Logger instead.
	ErrorLogger kafkaGo.Logger

	// AllowAutoTopicCreation notifies Writer to create topic if missing.
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL