Documentation ¶
Index ¶
- func DataFromError(err error) map[string]string
- type Consumer
- func NewOffsetConsumer(messageHandler MessageHandler, client sarama.Client, topic string, ...) Consumer
- func NewPartitionConsumer(messageHandler MessageHandler, client sarama.Client, topic string, ...) Consumer
- func NewSimpleConsumer(messageHandler MessageHandler, client sarama.Client, topic string) Consumer
- type DataError
- type HasCause
- type HasData
- type MessageHandler
- func NewMessageHandlerHook(preMessageHandler MessageHandler, messageHandler MessageHandler, ...) MessageHandler
- func NewMetricsMessageHandler(namespace string, subsystem string, messageHandler MessageHandler) MessageHandler
- func NewRetryMessageHandler(messageHandler MessageHandler, producer sarama.SyncProducer, retries int) MessageHandler
- func SendErrorsToSentry(messageHandler MessageHandler, ravenClient RavenClient) MessageHandler
- type MessageHandlerFunc
- type RavenClient
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DataFromError ¶
Types ¶
type Consumer ¶
type Consumer interface { // Consume messages until context is canceled. Consume(ctx context.Context) error }
Consumer is the common interface for all consumers.
func NewOffsetConsumer ¶
func NewOffsetConsumer( messageHandler MessageHandler, client sarama.Client, topic string, group string, ) Consumer
NewOffsetConsumer return an consumer with offset tracking.
func NewPartitionConsumer ¶
func NewPartitionConsumer( messageHandler MessageHandler, client sarama.Client, topic string, partition int32, offset int64, ) Consumer
NewPartitionConsumer returns a consumer for the given topic, partition and offset.
func NewSimpleConsumer ¶
func NewSimpleConsumer( messageHandler MessageHandler, client sarama.Client, topic string, ) Consumer
NewSimpleConsumer consumes messages in all partitions.
type MessageHandler ¶
type MessageHandler interface { // ConsumeMessage is called for each message. ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error }
MessageHandler is responsible for handling arriving Kakfa messages.
func NewMessageHandlerHook ¶
func NewMessageHandlerHook( preMessageHandler MessageHandler, messageHandler MessageHandler, postMessageHandler MessageHandler, ) MessageHandler
func NewMetricsMessageHandler ¶
func NewMetricsMessageHandler( namespace string, subsystem string, messageHandler MessageHandler, ) MessageHandler
NewMetricsMessageHandler is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.
func NewRetryMessageHandler ¶
func NewRetryMessageHandler( messageHandler MessageHandler, producer sarama.SyncProducer, retries int, ) MessageHandler
func SendErrorsToSentry ¶
func SendErrorsToSentry(messageHandler MessageHandler, ravenClient RavenClient) MessageHandler
type MessageHandlerFunc ¶
type MessageHandlerFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error
MessageHandlerFunc allow use a function as MessageHandler.
func (MessageHandlerFunc) ConsumeMessage ¶
func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
ConsumeMessage forward to the function.
Click to show internal directories.
Click to hide internal directories.