kafka

package
v0.0.0-...-959073d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetTransferChan

func GetTransferChan(consumerBuffer int) chan *sarama.ProducerMessage

GetTransferChan returns a transfer channel that can be used to pass messages around

func LogRPS

func LogRPS(tag, topic string, influxAccessor influxlogger.InfluxD, rps []*uint64)

LogRPS aggregates the transaction count for this process

Types

type Config

type Config struct {
	Topic                string
	Zookeepers           []string
	ConsumerGroupName    string
	ConsumerBuffer       int
	MaxErrors            int
	MaxRetry             int
	BatchSize            int
	FlushInterval        int
	ConsumerTransactions *uint64
	ProducerTransactions *uint64
}

Config is a convenient wrapper for all the config values needed for InitKafka

type Kafka

type Kafka struct {
	// Config
	Conf Config

	Brokers []string

	// Kafka stuff
	Producer sarama.AsyncProducer
	Consumer *consumergroup.ConsumerGroup

	ProducerChan chan *sarama.ProducerMessage
	TransferChan chan *sarama.ProducerMessage

	// other stuff
	WaitGroup *sync.WaitGroup
	Shutdown  chan struct{}
	// contains filtered or unexported fields
}

Kafka object that provides nice helper functions

func InitKafka

func InitKafka(conf Config, influxAccessor influxlogger.InfluxD, signalChan chan struct{}, wg *sync.WaitGroup) (*Kafka, error)

InitKafka initializes the Kafka object creating some helper clients

func (*Kafka) Close

func (k *Kafka) Close()

Close closes all of the associated connections

func (*Kafka) GetConsumerErrors

func (k *Kafka) GetConsumerErrors()

GetConsumerErrors logs any consumer errors

func (*Kafka) InitConsumer

func (k *Kafka) InitConsumer(transferChan chan *sarama.ProducerMessage, reset bool) error

InitConsumer sets up the consumer for kafka

func (*Kafka) InitProducerFromConsumer

func (k *Kafka) InitProducerFromConsumer(transferChan chan *sarama.ProducerMessage) error

InitProducerFromConsumer sets up the producer for kafka using a consumer channel as input

func (*Kafka) Monitor

func (k *Kafka) Monitor()

Monitor monitors the transfer channel

func (*Kafka) Pull

func (k *Kafka) Pull()

Pull pulls messages from the topic partition

func (*Kafka) Push

func (k *Kafka) Push()

Push pushes messages to topic

func (*Kafka) RPSTicker

func (k *Kafka) RPSTicker()

RPSTicker simply pulls from Successes and ticks the rps counter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL