kafetrain

package module
v0.0.0-...-0293b04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2022 License: MIT Imports: 12 Imported by: 0

README

kafetrain

Kafka error handling implementation

Documentation

Index

Constants

View Source
const (
	OffsetOldest = sarama.OffsetOldest
	OffsetNewest = sarama.OffsetNewest
)
View Source
const (
	HeaderTopic = "topic"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Base configuration.
	Brokers []string `envconfig:"KAFKA_BROKERS" required:"true"`
	Version string   `envconfig:"KAFKA_VERSION" required:"true"`
	//Topic    string   `envconfig:"KAFKA_TOPIC" required:"true"`
	GroupID  string `envconfig:"KAFKA_GROUP_ID" required:"true"`
	ClientID string `envconfig:"KAFKA_CLIENT_ID" default:"kafetrain-consumer"`

	// Security configuration.
	Username string `envconfig:"KAFKA_USERNAME"`
	Password string `envconfig:"KAFKA_PASSWORD"`
	CACert   string `envconfig:"KAFKA_CA_CERT"`

	// Consumer configuration.
	MaxProcessingTime uint16 `envconfig:"KAFKA_MAX_PROCESSING_TIME_MS" default:"100"`
	InitialOffset     int64  `envconfig:"KAFKA_CONSUMER_INITIAL_OFFSET" default:"-2"`
	Silent            bool   `envconfig:"KAFKA_CONSUMER_SILENT" default:"false"`

	// Streamer configuration.
	BuffSize uint16 `envconfig:"KAFKA_MAX_PROCESSING_TIME_MS" default:"256"`

	// Retry configuration.
	RetryTopicPrefix    string `envconfig:"KAFKA_RETRY_TOPIC_PREFIX" default:"retry"`       // topic for messages to Retry
	RedirectTopicPrefix string `envconfig:"KAFKA_REDIRECT_TOPIC_PREFIX" default:"redirect"` // topic with message ids that should be retried
	RetrySegmentSize    int64  `envconfig:"KAFKA_REDIRECT_TOPIC_PREFIX" default:"redirect"` // topic with message ids that should be retried
}

type Consumer

type Consumer interface {
	Consume(ctx context.Context, topic string, messageHandler MessageHandler) error
	Close() error
}

type ErrorTracker

type ErrorTracker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TODO: implement error_train

func NewTracker

func NewTracker(
	cfg Config,
	logger *zap.Logger,
	comparator MessageChainTracker,
	registry *HandlerRegistry,
) (*ErrorTracker, error)

func (*ErrorTracker) Errors

func (t *ErrorTracker) Errors() <-chan error

func (*ErrorTracker) Free

func (t *ErrorTracker) Free(ctx context.Context, msg Message) error

func (*ErrorTracker) IsRelated

func (t *ErrorTracker) IsRelated(topic string, msg Message) bool

func (*ErrorTracker) LockMessage

func (t *ErrorTracker) LockMessage(ctx context.Context, m Message) error

func (*ErrorTracker) Redirect

func (t *ErrorTracker) Redirect(ctx context.Context, msg Message) error

func (*ErrorTracker) ReleaseMessage

func (t *ErrorTracker) ReleaseMessage(ctx context.Context, msg Message) error

func (*ErrorTracker) Start

func (t *ErrorTracker) Start(ctx context.Context, topic string) error

type ErrorTrackerConfig

type ErrorTrackerConfig struct {
}

type HandlerRegistry

type HandlerRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewHandlerRegistry

func NewHandlerRegistry() *HandlerRegistry

func (*HandlerRegistry) Add

func (r *HandlerRegistry) Add(topic string, handler MessageHandler)

func (*HandlerRegistry) Get

func (r *HandlerRegistry) Get(topic string) (MessageHandler, bool)
type Header struct {
	Key   []byte
	Value []byte
}

type HeaderList

type HeaderList []*Header

func (*HeaderList) Get

func (h *HeaderList) Get(key string) (string, bool)

func (*HeaderList) Set

func (h *HeaderList) Set(key, val string)

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

KafkaConsumer wrapper around sarama.ConsumerGroup. It provides a way to consume messages from kafka topic in a consumer group.

func NewKafkaConsumer

func NewKafkaConsumer(
	cfg Config,
	logger *zap.Logger,
	options ...Option,
) (*KafkaConsumer, error)

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

Close stops the consumer group.

func (*KafkaConsumer) Consume

func (c *KafkaConsumer) Consume(ctx context.Context, topic string, messageHandler MessageHandler) error

Consume consume messages from kafka.

func (*KafkaConsumer) Stream

func (c *KafkaConsumer) Stream(ctx context.Context, topic string) (<-chan Message, <-chan error)

func (*KafkaConsumer) WithMiddlewares

func (c *KafkaConsumer) WithMiddlewares(middlewares ...Middleware) *KafkaConsumer

type KafkaPartitionConsumer

type KafkaPartitionConsumer struct {
	// contains filtered or unexported fields
}

KafkaPartitionConsumer consumes topic.

func NewKafkaPartitionConsumer

func NewKafkaPartitionConsumer(
	cfg Config,
	logger *zap.Logger,
	middlewares []Middleware,
	topic string,
	partitions []int32,
) (*KafkaPartitionConsumer, error)

func (*KafkaPartitionConsumer) Close

func (k *KafkaPartitionConsumer) Close() error

func (*KafkaPartitionConsumer) Consume

func (k *KafkaPartitionConsumer) Consume(ctx context.Context, topic string, messageHandler MessageHandler) error

type KeyTracker

type KeyTracker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewKeyTracker

func NewKeyTracker() *KeyTracker

func (*KeyTracker) AddMessage

func (kt *KeyTracker) AddMessage(_ context.Context, msg Message) (string, error)

func (*KeyTracker) IsRelated

func (kt *KeyTracker) IsRelated(_ context.Context, msg Message) bool

func (*KeyTracker) ReleaseMessage

func (kt *KeyTracker) ReleaseMessage(_ context.Context, msg Message) error

type Message

type Message struct {
	Key     []byte
	Payload []byte
	Headers HeaderList

	Timestamp time.Time
	// contains filtered or unexported fields
}

Message generic kafka message. TODO: add generic type for marshaling

type MessageChainTracker

type MessageChainTracker interface {
	IsRelated(ctx context.Context, msg Message) bool             // IsRelated returns true if message is related to error chain
	AddMessage(ctx context.Context, msg Message) (string, error) // AddMessage adds message to error chain
	ReleaseMessage(ctx context.Context, msg Message) error       // ReleaseMessage removes message from error chain
}

type MessageHandleFunc

type MessageHandleFunc func(ctx context.Context, message Message) error

MessageHandleFunc handles messages.

func (MessageHandleFunc) Handle

func (f MessageHandleFunc) Handle(ctx context.Context, message Message) error

Handle type is an adapter to allow the use of ordinary functions as MessageHandler.

type MessageHandler

type MessageHandler interface {
	Handle(ctx context.Context, msg Message) error
}

MessageHandler implementation of business logic to process received message.

type Middleware

type Middleware func(next MessageHandleFunc) MessageHandleFunc

Middleware function.

func NewErrorHandlingMiddleware

func NewErrorHandlingMiddleware(t *ErrorTracker) Middleware

NewErrorHandlingMiddleware track message processing time.

func NewFilterMiddleware

func NewFilterMiddleware(filterFunc func(msg Message) bool) Middleware

NewFilterMiddleware filter messages based on provided filter function.

func NewLoggingMiddleware

func NewLoggingMiddleware(logger *zap.Logger) Middleware

NewLoggingMiddleware Middleware for logging.

func NewRetryMiddleware

func NewRetryMiddleware(et *ErrorTracker) Middleware

NewRetryMiddleware track message processing time.

type Option

type Option interface {
	Apply(cfg *consumerOptionConfig)
}

Option sets a parameter for the logger.

func WithEndsAt

func WithEndsAt(endsAt time.Time) Option

func WithLimit

func WithLimit(limit int64) Option

func WithOffset

func WithOffset(offset int64) Option

func WithStartFrom

func WithStartFrom(from time.Time) Option

func WithoutCommit

func WithoutCommit() Option

type RedirectHandler

type RedirectHandler struct {
	// contains filtered or unexported fields
}

func NewRedirectHandler

func NewRedirectHandler(t *ErrorTracker) *RedirectHandler

func (*RedirectHandler) Handle

func (r *RedirectHandler) Handle(ctx context.Context, msg Message) error

type RetriableError

type RetriableError struct {
	Origin error
	Retry  bool
}

func NewRetriableError

func NewRetriableError(origin error, retry bool) *RetriableError

func (RetriableError) Error

func (e RetriableError) Error() string

func (RetriableError) ShouldRetry

func (e RetriableError) ShouldRetry() bool

type Streamer

type Streamer interface {
	Stream(ctx context.Context, topic string) (<-chan Message, <-chan error)
	Close() error
}

Directories

Path Synopsis
pkg/logging
Package logging is a wrapper around zapctxd.Logger that adds context to the log entries.
Package logging is a wrapper around zapctxd.Logger that adds context to the log entries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL