kafka

package
v0.0.0-...-7d43bac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2017 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package kafka provides support for Kafka message broker.

Index

Constants

View Source
const (
	// Key defines Kafka message key.
	Key = "key"
	// Offset defines Kafka message offset.
	Offset = "offset"
	// Partition defines Kafka message partition.
	Partition = "partition"
	// Timestamp defines Kafka message timestamp.
	Timestamp = "timestamp"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type MessageBroker

type MessageBroker struct {
	Consumer sarama.Consumer     // message consumer
	Producer sarama.SyncProducer // message producer
}

MessageBroker represents message broker based on Kafka.

func NewMessageBroker

func NewMessageBroker(addrs []string, cfg *sarama.Config, opts ...cb.Option) *MessageBroker

NewMessageBroker creates instance of Kafka client message broker which is connected to provided addresses. Additional options passed as arguments are used to configure Kafka client and circuit breaker pattern to connect to Kafka instance. Panics if cannot create an instance (producer and/or consumer).

func (*MessageBroker) Dispose

func (b *MessageBroker) Dispose()

Dispose closes Kafka client instance.

func (MessageBroker) PublishMessage

func (b MessageBroker) PublishMessage(ctx context.Context, m broker.Message) error

PublishMessage publishes message to Kafka broker.

func (MessageBroker) Subscribe

func (b MessageBroker) Subscribe(ctx context.Context, topic string) (<-chan broker.Message, error)

Subscribe subscribes to specified topic in Kafka broker. Using context (ctx) parameter it is possible to pass additional arguments such as kafka.Partition (int32) and kafka.Offset (int64).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL