Documentation ¶
Index ¶
- Constants
- type Config
- type Consumer
- type ErrorTracker
- func (t *ErrorTracker) Errors() <-chan error
- func (t *ErrorTracker) Free(ctx context.Context, msg Message) error
- func (t *ErrorTracker) IsRelated(topic string, msg Message) bool
- func (t *ErrorTracker) LockMessage(ctx context.Context, m Message) error
- func (t *ErrorTracker) Redirect(ctx context.Context, msg Message) error
- func (t *ErrorTracker) ReleaseMessage(ctx context.Context, msg Message) error
- func (t *ErrorTracker) Start(ctx context.Context, topic string) error
- type ErrorTrackerConfig
- type HandlerRegistry
- type Header
- type HeaderList
- type KafkaConsumer
- func (c *KafkaConsumer) Close() error
- func (c *KafkaConsumer) Consume(ctx context.Context, topic string, messageHandler MessageHandler) error
- func (c *KafkaConsumer) Stream(ctx context.Context, topic string) (<-chan Message, <-chan error)
- func (c *KafkaConsumer) WithMiddlewares(middlewares ...Middleware) *KafkaConsumer
- type KafkaPartitionConsumer
- type KeyTracker
- type Message
- type MessageChainTracker
- type MessageHandleFunc
- type MessageHandler
- type Middleware
- type Option
- type RedirectHandler
- type RetriableError
- type Streamer
Constants ¶
const ( OffsetOldest = sarama.OffsetOldest OffsetNewest = sarama.OffsetNewest )
const (
HeaderTopic = "topic"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Base configuration. Brokers []string `envconfig:"KAFKA_BROKERS" required:"true"` Version string `envconfig:"KAFKA_VERSION" required:"true"` //Topic string `envconfig:"KAFKA_TOPIC" required:"true"` GroupID string `envconfig:"KAFKA_GROUP_ID" required:"true"` ClientID string `envconfig:"KAFKA_CLIENT_ID" default:"kafetrain-consumer"` // Security configuration. Username string `envconfig:"KAFKA_USERNAME"` Password string `envconfig:"KAFKA_PASSWORD"` CACert string `envconfig:"KAFKA_CA_CERT"` // Consumer configuration. MaxProcessingTime uint16 `envconfig:"KAFKA_MAX_PROCESSING_TIME_MS" default:"100"` InitialOffset int64 `envconfig:"KAFKA_CONSUMER_INITIAL_OFFSET" default:"-2"` Silent bool `envconfig:"KAFKA_CONSUMER_SILENT" default:"false"` // Streamer configuration. BuffSize uint16 `envconfig:"KAFKA_MAX_PROCESSING_TIME_MS" default:"256"` // Retry configuration. RetryTopicPrefix string `envconfig:"KAFKA_RETRY_TOPIC_PREFIX" default:"retry"` // topic for messages to Retry RedirectTopicPrefix string `envconfig:"KAFKA_REDIRECT_TOPIC_PREFIX" default:"redirect"` // topic with message ids that should be retried RetrySegmentSize int64 `envconfig:"KAFKA_REDIRECT_TOPIC_PREFIX" default:"redirect"` // topic with message ids that should be retried }
type ErrorTracker ¶
TODO: implement error_train
func NewTracker ¶
func NewTracker( cfg Config, logger *zap.Logger, comparator MessageChainTracker, registry *HandlerRegistry, ) (*ErrorTracker, error)
func (*ErrorTracker) Errors ¶
func (t *ErrorTracker) Errors() <-chan error
func (*ErrorTracker) LockMessage ¶
func (t *ErrorTracker) LockMessage(ctx context.Context, m Message) error
func (*ErrorTracker) Redirect ¶
func (t *ErrorTracker) Redirect(ctx context.Context, msg Message) error
func (*ErrorTracker) ReleaseMessage ¶
func (t *ErrorTracker) ReleaseMessage(ctx context.Context, msg Message) error
type ErrorTrackerConfig ¶
type ErrorTrackerConfig struct { }
type HandlerRegistry ¶
func NewHandlerRegistry ¶
func NewHandlerRegistry() *HandlerRegistry
func (*HandlerRegistry) Add ¶
func (r *HandlerRegistry) Add(topic string, handler MessageHandler)
func (*HandlerRegistry) Get ¶
func (r *HandlerRegistry) Get(topic string) (MessageHandler, bool)
type HeaderList ¶
type HeaderList []*Header
func (*HeaderList) Set ¶
func (h *HeaderList) Set(key, val string)
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer wrapper around sarama.ConsumerGroup. It provides a way to consume messages from kafka topic in a consumer group.
func NewKafkaConsumer ¶
func (*KafkaConsumer) Consume ¶
func (c *KafkaConsumer) Consume(ctx context.Context, topic string, messageHandler MessageHandler) error
Consume consume messages from kafka.
func (*KafkaConsumer) WithMiddlewares ¶
func (c *KafkaConsumer) WithMiddlewares(middlewares ...Middleware) *KafkaConsumer
type KafkaPartitionConsumer ¶
type KafkaPartitionConsumer struct {
// contains filtered or unexported fields
}
KafkaPartitionConsumer consumes topic.
func NewKafkaPartitionConsumer ¶
func NewKafkaPartitionConsumer( cfg Config, logger *zap.Logger, middlewares []Middleware, topic string, partitions []int32, ) (*KafkaPartitionConsumer, error)
func (*KafkaPartitionConsumer) Close ¶
func (k *KafkaPartitionConsumer) Close() error
func (*KafkaPartitionConsumer) Consume ¶
func (k *KafkaPartitionConsumer) Consume(ctx context.Context, topic string, messageHandler MessageHandler) error
type KeyTracker ¶
func NewKeyTracker ¶
func NewKeyTracker() *KeyTracker
func (*KeyTracker) AddMessage ¶
func (*KeyTracker) ReleaseMessage ¶
func (kt *KeyTracker) ReleaseMessage(_ context.Context, msg Message) error
type Message ¶
type Message struct { Key []byte Payload []byte Headers HeaderList Timestamp time.Time // contains filtered or unexported fields }
Message generic kafka message. TODO: add generic type for marshaling
type MessageChainTracker ¶
type MessageChainTracker interface { IsRelated(ctx context.Context, msg Message) bool // IsRelated returns true if message is related to error chain AddMessage(ctx context.Context, msg Message) (string, error) // AddMessage adds message to error chain ReleaseMessage(ctx context.Context, msg Message) error // ReleaseMessage removes message from error chain }
type MessageHandleFunc ¶
MessageHandleFunc handles messages.
type MessageHandler ¶
MessageHandler implementation of business logic to process received message.
type Middleware ¶
type Middleware func(next MessageHandleFunc) MessageHandleFunc
Middleware function.
func NewErrorHandlingMiddleware ¶
func NewErrorHandlingMiddleware(t *ErrorTracker) Middleware
NewErrorHandlingMiddleware track message processing time.
func NewFilterMiddleware ¶
func NewFilterMiddleware(filterFunc func(msg Message) bool) Middleware
NewFilterMiddleware filter messages based on provided filter function.
func NewLoggingMiddleware ¶
func NewLoggingMiddleware(logger *zap.Logger) Middleware
NewLoggingMiddleware Middleware for logging.
func NewRetryMiddleware ¶
func NewRetryMiddleware(et *ErrorTracker) Middleware
NewRetryMiddleware track message processing time.
type Option ¶
type Option interface {
Apply(cfg *consumerOptionConfig)
}
Option sets a parameter for the logger.
func WithEndsAt ¶
func WithOffset ¶
func WithStartFrom ¶
func WithoutCommit ¶
func WithoutCommit() Option
type RedirectHandler ¶
type RedirectHandler struct {
// contains filtered or unexported fields
}
func NewRedirectHandler ¶
func NewRedirectHandler(t *ErrorTracker) *RedirectHandler
type RetriableError ¶
func NewRetriableError ¶
func NewRetriableError(origin error, retry bool) *RetriableError
func (RetriableError) Error ¶
func (e RetriableError) Error() string
func (RetriableError) ShouldRetry ¶
func (e RetriableError) ShouldRetry() bool
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
pkg/logging
Package logging is a wrapper around zapctxd.Logger that adds context to the log entries.
|
Package logging is a wrapper around zapctxd.Logger that adds context to the log entries. |