kafka

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2021 License: MIT Imports: 13 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConsumerConfig

func DefaultConsumerConfig() *consumergroup.Config

func DefaultProducerConfig

func DefaultProducerConfig() *sarama.Config

Types

type Config

type Config struct {
	ZookeeperConnectionString string
	ConsumerGroup             string
	Topics                    []string
	ConsumerGroupConfig       *consumergroup.Config
	Err                       chan error
	Logger                    *logger.UPPLogger
}

type Consumer

type Consumer interface {
	StartListening(messageHandler func(message FTMessage) error)
	Shutdown()
	ConnectivityCheck() error
}

func NewConsumer

func NewConsumer(config Config) (Consumer, error)

func NewPerseverantConsumer

func NewPerseverantConsumer(zookeeperConnectionString string, consumerGroup string, topics []string, config *consumergroup.Config, retryInterval time.Duration, errCh *chan error, logger *logger.UPPLogger) (Consumer, error)

type ConsumerGrouper

type ConsumerGrouper interface {
	Errors() <-chan error
	Messages() <-chan *sarama.ConsumerMessage
	CommitUpto(message *sarama.ConsumerMessage) error
	Close() error
	Closed() bool
}

type FTMessage

type FTMessage struct {
	Headers map[string]string
	Body    string
}

func NewFTMessage

func NewFTMessage(headers map[string]string, body string) FTMessage

func (*FTMessage) Build

func (m *FTMessage) Build() string

type MessageConsumer

type MessageConsumer struct {
	// contains filtered or unexported fields
}

func (*MessageConsumer) ConnectivityCheck

func (c *MessageConsumer) ConnectivityCheck() error

func (*MessageConsumer) Shutdown

func (c *MessageConsumer) Shutdown()

func (*MessageConsumer) StartListening

func (c *MessageConsumer) StartListening(messageHandler func(message FTMessage) error)

type MessageProducer

type MessageProducer struct {
	// contains filtered or unexported fields
}

func (*MessageProducer) ConnectivityCheck

func (p *MessageProducer) ConnectivityCheck() error

func (*MessageProducer) SendMessage

func (p *MessageProducer) SendMessage(message FTMessage) error

func (*MessageProducer) Shutdown

func (p *MessageProducer) Shutdown()

type Producer

type Producer interface {
	SendMessage(message FTMessage) error
	ConnectivityCheck() error
	Shutdown()
}

func NewPerseverantProducer

func NewPerseverantProducer(brokers string, topic string, config *sarama.Config, initialDelay time.Duration, retryInterval time.Duration, logger *logger.UPPLogger) (Producer, error)

func NewProducer

func NewProducer(brokers string, topic string, config *sarama.Config, logger *logger.UPPLogger) (Producer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL