kafka

package
v3.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2018 License: MIT Imports: 4 Imported by: 17

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateKeyMessage

func CreateKeyMessage(topic string, key string, value []byte) *sarama.ProducerMessage

CreateKeyMessage creates producer-formatted message with key.

func CreateMessage

func CreateMessage(topic string, value []byte) *sarama.ProducerMessage

CreateMessage creates keyless producer-formatted message.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps Sarama Consumer-Group.

func NewConsumer

func NewConsumer(config *ConsumerConfig) (*Consumer, error)

NewConsumer returns a configured Sarama Consumer-Group.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the ConsumerGroup and detaches any running sessions. It is required to call this function before the object passes out of scope, as it will otherwise leak memory.

func (*Consumer) Consume

func (c *Consumer) Consume(
	ctx context.Context,
	handler sarama.ConsumerGroupHandler,
) error

Consume starts consuming messages using provided Context and ConsumerGroupHandler.

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns error-channel for Consumer-Group.

func (*Consumer) SaramaConsumer

func (c *Consumer) SaramaConsumer() *sarama.ConsumerGroup

SaramaConsumer returns the wrapper Sarama Consumer-Group. Only use this when you really have to.

type ConsumerConfig

type ConsumerConfig struct {
	// Name for ConsumerGroup
	GroupName    string
	KafkaBrokers []string
	// Overwrites the default sarama-config
	SaramaConfig *sarama.Config
	Topics       []string
}

ConsumerConfig wraps configuration for Sarama Consumer-Group.

type Producer

type Producer struct {
	sarama.AsyncProducer
}

Producer wraps sarama's AsyncProducer

func NewProducer

func NewProducer(config *ProducerConfig) (*Producer, error)

NewProducer returns a configured Sarama AsyncProducer.

type ProducerConfig

type ProducerConfig struct {
	KafkaBrokers []string
	// Allow overwriting default sarama-config
	SaramaConfig *sarama.Config
}

ProducerConfig wraps configuration for producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL