Documentation ¶
Index ¶
- func MetricsMiddleware(cfg MetricsConfig) hevent.Middleware
- func NewConfig(options ...ConfigOption) *sarama.Config
- func NewConfigWithDefaults(version sarama.KafkaVersion, initialOffset int64) *sarama.Config
- func NewEmitter(o EmitterOptions) (hevent.Emitter, error)
- func NewLogger(l hlog.Logger) sarama.StdLogger
- func NewReceiver(o ReceiverOptions) (hevent.Receiver, error)
- func NewSubscriptionOptions(o ConsumerOptions) *hevent.SubscriptionOptions
- type ConfigOption
- type ConsumerGroup
- type ConsumerGroupHandler
- type ConsumerGroupHandlerOptions
- type ConsumerOption
- type ConsumerOptions
- type EmitterOptions
- type MessageConverter
- type MetricsConfig
- type QueueManager
- type ReceiverOptions
- type RetryPolicy
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MetricsMiddleware ¶
func MetricsMiddleware(cfg MetricsConfig) hevent.Middleware
func NewConfig ¶
func NewConfig(options ...ConfigOption) *sarama.Config
func NewConfigWithDefaults ¶
func NewConfigWithDefaults(version sarama.KafkaVersion, initialOffset int64) *sarama.Config
func NewEmitter ¶
func NewEmitter(o EmitterOptions) (hevent.Emitter, error)
func NewReceiver ¶
func NewReceiver(o ReceiverOptions) (hevent.Receiver, error)
func NewSubscriptionOptions ¶
func NewSubscriptionOptions(o ConsumerOptions) *hevent.SubscriptionOptions
Types ¶
type ConfigOption ¶
func WithInitialOffset ¶
func WithInitialOffset(offset int64) ConfigOption
func WithVersion ¶
func WithVersion(v sarama.KafkaVersion) ConfigOption
type ConsumerGroup ¶
type ConsumerGroupHandler ¶
type ConsumerGroupHandler interface { sarama.ConsumerGroupHandler Ready() <-chan struct{} }
type ConsumerGroupHandlerOptions ¶
type ConsumerGroupHandlerOptions struct { ConsumerOptions *ConsumerOptions QueueManager QueueManager Handler hevent.EventHandler MessageConverter MessageConverter Producer sarama.AsyncProducer }
type ConsumerOption ¶
type ConsumerOption func(o *ConsumerOptions)
type ConsumerOptions ¶
type ConsumerOptions struct { BootstrapServers []string Config *sarama.Config Topic string RetryTopic string // The topic name that we push the message to retry it. Group string // consumer group name RetryPolicy RetryPolicy Handler hevent.EventHandler }
func (ConsumerOptions) Validate ¶
func (o ConsumerOptions) Validate() error
type EmitterOptions ¶
type EmitterOptions struct { Client sarama.Client ContextPropagator hexa.ContextPropagator Encoder hevent.Encoder }
func (EmitterOptions) Validate ¶
func (o EmitterOptions) Validate() error
type MessageConverter ¶
type MessageConverter interface { EventToProducerMessage(context.Context, *hevent.Event) (*sarama.ProducerMessage, error) ConsumerMessageToEventMessage(msg *sarama.ConsumerMessage) (context.Context, hevent.Message, error) ConsumerToProducerMessage(newTopic string, msg *sarama.ConsumerMessage) *sarama.ProducerMessage }
type MetricsConfig ¶
type MetricsConfig struct { MeterProvider metric.MeterProvider ServerName string }
type QueueManager ¶
type QueueManager interface { // RetryTopics returns the retry topics list. // e.g., order.created.retry.1 order.created.retry.2 order.created.retry.3 RetryTopics() []string // DeadLetterQueue returns the dead letter queue name for the topic // e.g., order.created.dlq DeadLetterQueue() string // NextTopic returns the next topic name for failed message. // e.g., retriedCount is 2 and your max attempt value is 3 or more, // so it will returns order.created.retry.3 otherwise // it should return "order.created.dlq". NextTopic(retriedCount int) string // RetryNumberFromTopic returns the number from the retry topci. // e.g., If you provide a.retry.4 as topic name, it will returns 4. // for default topic name without number it will should returns 0. RetryNumberFromTopic(topic string) (int, error) // RetryAfter send duration which we need to wait for the next retry. // If it's not the first retry,it's algorithm can be // "diff LastRetry and Min(BackoffCoefficient^(retriedCount-1)*InitialInterval,MaximumInterval)" // but for the fist time should return the InitialInterval. RetryAfter(retriedCount int, lastRetry time.Time) time.Duration }
type ReceiverOptions ¶
type ReceiverOptions struct { ContextPropagator hexa.ContextPropagator // We use this client to crate producer to push // messages retry queues. Client sarama.Client Middlewares []hevent.Middleware }
type RetryPolicy ¶
type RetryPolicy struct { // Backoff interval for the first retry. If BackoffCoefficient is 1.0 then it is used for all retries. // Required. // Default is 1. InitialInterval time.Duration // Coefficient used to calculate the next retry backoff interval. // The next retry interval is previous interval multiplied by this coefficient. // Must be 1 or larger. // Default is 2.0. BackoffCoefficient float64 // Maximum backoff interval between retries. Exponential backoff leads to interval increase. // This value is the cap of the interval. Default is 100x of initial interval. MaximumInterval time.Duration // Maximum number of attempts. When exceeded the retries stop even if not expired yet. // If not set or set to 0, it means without retry. default is 3. MaximumAttempts int // default is value of MaximumAttempts, when you decrease // MaximumAttempts value, so you have old topics which has // topics which do not get more failed events but // has old failed evnets, using this field you can // listen to old topics. e.g., let's say count of MaximumAttempts // is 4, you change to 3 , so you has three topics, but you // need to listen to Topic 4 also for old failed evnets in // that Topic, until be sure that Topic is empty, then you // can change this field's value to 3. // must be equal or more than MaximumAttempts. RetryTopicsCount int // default is value of MaxRetry }
RetryPolicy defines the retry policy.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
func (RetryPolicy) Validate ¶
func (rp RetryPolicy) Validate() error
Click to show internal directories.
Click to hide internal directories.