hafka

package
v0.0.0-...-81c1d68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2022 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MetricsMiddleware

func MetricsMiddleware(cfg MetricsConfig) hevent.Middleware

func NewConfig

func NewConfig(options ...ConfigOption) *sarama.Config

func NewConfigWithDefaults

func NewConfigWithDefaults(version sarama.KafkaVersion, initialOffset int64) *sarama.Config

func NewEmitter

func NewEmitter(o EmitterOptions) (hevent.Emitter, error)

func NewLogger

func NewLogger(l hlog.Logger) sarama.StdLogger

func NewReceiver

func NewReceiver(o ReceiverOptions) (hevent.Receiver, error)

func NewSubscriptionOptions

func NewSubscriptionOptions(o ConsumerOptions) *hevent.SubscriptionOptions

Types

type ConfigOption

type ConfigOption func(cfg *sarama.Config)

func WithInitialOffset

func WithInitialOffset(offset int64) ConfigOption

func WithVersion

func WithVersion(v sarama.KafkaVersion) ConfigOption

type ConsumerGroup

type ConsumerGroup interface {
	Consume() error
	Close() error
}

type ConsumerGroupHandler

type ConsumerGroupHandler interface {
	sarama.ConsumerGroupHandler
	Ready() <-chan struct{}
}

type ConsumerGroupHandlerOptions

type ConsumerGroupHandlerOptions struct {
	ConsumerOptions  *ConsumerOptions
	QueueManager     QueueManager
	Handler          hevent.EventHandler
	MessageConverter MessageConverter
	Producer         sarama.AsyncProducer
}

type ConsumerOption

type ConsumerOption func(o *ConsumerOptions)

type ConsumerOptions

type ConsumerOptions struct {
	BootstrapServers []string
	Config           *sarama.Config
	Topic            string
	RetryTopic       string // The topic name that we push the message to retry it.
	Group            string // consumer group name
	RetryPolicy      RetryPolicy

	Handler hevent.EventHandler
}

func (ConsumerOptions) Validate

func (o ConsumerOptions) Validate() error

type EmitterOptions

type EmitterOptions struct {
	Client            sarama.Client
	ContextPropagator hexa.ContextPropagator
	Encoder           hevent.Encoder
}

func (EmitterOptions) Validate

func (o EmitterOptions) Validate() error

type MessageConverter

type MessageConverter interface {
	EventToProducerMessage(context.Context, *hevent.Event) (*sarama.ProducerMessage, error)
	ConsumerMessageToEventMessage(msg *sarama.ConsumerMessage) (context.Context, hevent.Message, error)
	ConsumerToProducerMessage(newTopic string, msg *sarama.ConsumerMessage) *sarama.ProducerMessage
}

type MetricsConfig

type MetricsConfig struct {
	MeterProvider metric.MeterProvider
	ServerName    string
}

type QueueManager

type QueueManager interface {
	// RetryTopics returns the retry topics list.
	// e.g., order.created.retry.1 order.created.retry.2 order.created.retry.3
	RetryTopics() []string

	// DeadLetterQueue returns the dead letter queue name for the topic
	// e.g., order.created.dlq
	DeadLetterQueue() string

	// NextTopic returns the next topic name for failed message.
	// e.g., retriedCount is 2 and your max attempt value is 3 or more,
	// so it will returns order.created.retry.3 otherwise
	// it should return "order.created.dlq".
	NextTopic(retriedCount int) string

	// RetryNumberFromTopic returns the number from the retry topci.
	// e.g., If you provide a.retry.4 as topic name, it will returns 4.
	// for default topic name without number it will should returns 0.
	RetryNumberFromTopic(topic string) (int, error)

	// RetryAfter send duration which we need to wait for the next retry.
	// If it's not the first retry,it's algorithm can be
	// "diff LastRetry and Min(BackoffCoefficient^(retriedCount-1)*InitialInterval,MaximumInterval)"
	// but for the fist time should return the InitialInterval.
	RetryAfter(retriedCount int, lastRetry time.Time) time.Duration
}

type ReceiverOptions

type ReceiverOptions struct {
	ContextPropagator hexa.ContextPropagator

	// We use this client to crate producer to push
	// messages retry queues.
	Client sarama.Client

	Middlewares []hevent.Middleware
}

type RetryPolicy

type RetryPolicy struct {
	// Backoff interval for the first retry. If BackoffCoefficient is 1.0 then it is used for all retries.
	// Required.
	// Default is 1.
	InitialInterval time.Duration

	// Coefficient used to calculate the next retry backoff interval.
	// The next retry interval is previous interval multiplied by this coefficient.
	// Must be 1 or larger.
	// Default is 2.0.
	BackoffCoefficient float64

	// Maximum backoff interval between retries. Exponential backoff leads to interval increase.
	// This value is the cap of the interval. Default is 100x of initial interval.
	MaximumInterval time.Duration

	// Maximum number of attempts. When exceeded the retries stop even if not expired yet.
	// If not set or set to 0, it means without retry. default is 3.
	MaximumAttempts int

	// default is value of MaximumAttempts, when you decrease
	// MaximumAttempts value, so you have old topics which has
	// topics which do not get more failed events but
	// has old failed evnets, using this field you can
	// listen to old topics. e.g., let's say count of MaximumAttempts
	// is 4, you change to 3 , so you has three topics, but you
	// need to listen to Topic 4 also for old failed evnets in
	// that Topic, until be sure that Topic is empty, then you
	// can change this field's value to 3.
	// must be equal or more than MaximumAttempts.
	RetryTopicsCount int // default is value of MaxRetry
}

RetryPolicy defines the retry policy.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

func (RetryPolicy) Validate

func (rp RetryPolicy) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL