consumer

package
v0.0.0-...-0bb0fe6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2019 License: BSD-2-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DataFromError

func DataFromError(err error) map[string]string

Types

type Consumer

type Consumer interface {

	// Consume messages until context is canceled.
	Consume(ctx context.Context) error
}

Consumer is the common interface for all consumers.

func NewOffsetConsumer

func NewOffsetConsumer(
	messageHandler MessageHandler,
	client sarama.Client,
	topic string,
	group string,
) Consumer

NewOffsetConsumer return an consumer with offset tracking.

func NewPartitionConsumer

func NewPartitionConsumer(
	messageHandler MessageHandler,
	client sarama.Client,
	topic string,
	partition int32,
	offset int64,
) Consumer

NewPartitionConsumer returns a consumer for the given topic, partition and offset.

func NewSimpleConsumer

func NewSimpleConsumer(
	messageHandler MessageHandler,
	client sarama.Client,
	topic string,
) Consumer

NewSimpleConsumer consumes messages in all partitions.

type DataError

type DataError interface {
	error
	HasData
	HasCause
}

func AddDataToError

func AddDataToError(err error, data map[string]string) DataError

type HasCause

type HasCause interface {
	Cause() error
}

type HasData

type HasData interface {
	Data() map[string]string
}

type MessageHandler

type MessageHandler interface {

	// ConsumeMessage is called for each message.
	ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
}

MessageHandler is responsible for handling arriving Kakfa messages.

func NewMessageHandlerHook

func NewMessageHandlerHook(
	preMessageHandler MessageHandler,
	messageHandler MessageHandler,
	postMessageHandler MessageHandler,
) MessageHandler

func NewMetricsMessageHandler

func NewMetricsMessageHandler(
	namespace string,
	subsystem string,
	messageHandler MessageHandler,
) MessageHandler

NewMetricsMessageHandler is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.

func NewRetryMessageHandler

func NewRetryMessageHandler(
	messageHandler MessageHandler,
	producer sarama.SyncProducer,
	retries int,
) MessageHandler

func SendErrorsToSentry

func SendErrorsToSentry(messageHandler MessageHandler, ravenClient RavenClient) MessageHandler

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error

MessageHandlerFunc allow use a function as MessageHandler.

func (MessageHandlerFunc) ConsumeMessage

func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error

ConsumeMessage forward to the function.

type RavenClient

type RavenClient interface {
	CaptureErrorAndWait(err error, tags map[string]string, interfaces ...raven.Interface) string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL