Documentation ¶
Index ¶
- Constants
- Variables
- type Message
- type MessageProcessor
- type MessageStore
- type PollingProcessor
- type PollingProcessorOption
- func WithPollingProcessorLogger(logger edatlog.Logger) PollingProcessorOption
- func WithPollingProcessorMessagesPerPolling(messagePerPolling int) PollingProcessorOption
- func WithPollingProcessorPollingInterval(pollingInterval time.Duration) PollingProcessorOption
- func WithPollingProcessorPurgeInterval(purgeInterval time.Duration) PollingProcessorOption
- func WithPollingProcessorPurgeOlderThan(purgeOtherThan time.Duration) PollingProcessorOption
- func WithPollingProcessorRetryer(retryer retry.Retryer) PollingProcessorOption
Constants ¶
const ( DefaultMessagesPerPolling = 500 DefaultPollingInterval = 500 * time.Millisecond DefaultPurgeOlderThan = 60 * time.Second DefaultPurgeInterval = 30 * time.Second DefaultMaxRetries = 100 DefaultRetryMultiplier = 1.25 DefaultRetryRandomizationFactor = 0.33 )
Package defaults
Variables ¶
var DefaultRetryer = retry.NewExponentialBackoff( retry.WithBackoffInitialInterval(DefaultPollingInterval), retry.WithBackoffMaxRetries(DefaultMaxRetries), retry.WithBackoffMultiplier(DefaultRetryMultiplier), retry.WithBackoffRandomizationFactor(DefaultRetryRandomizationFactor), )
DefaultRetryer with exponential backoff strategy
Functions ¶
This section is empty.
Types ¶
type MessageProcessor ¶
type MessageProcessor interface { Start(ctx context.Context) error Stop(ctx context.Context) error }
MessageProcessor interface
type MessageStore ¶
type MessageStore interface { Fetch(ctx context.Context, limit int) ([]Message, error) Save(ctx context.Context, message Message) error MarkPublished(ctx context.Context, messageIDs []string) error PurgePublished(ctx context.Context, olderThan time.Duration) error }
MessageStore interface
type PollingProcessor ¶
type PollingProcessor struct {
// contains filtered or unexported fields
}
PollingProcessor implements MessageProcessor
func NewPollingProcessor ¶
func NewPollingProcessor(in MessageStore, out msg.MessagePublisher, options ...PollingProcessorOption) *PollingProcessor
NewPollingProcessor constructs a new PollingProcessor
type PollingProcessorOption ¶
type PollingProcessorOption func(processor *PollingProcessor)
PollingProcessorOption options for PollingProcessor
func WithPollingProcessorLogger ¶
func WithPollingProcessorLogger(logger edatlog.Logger) PollingProcessorOption
WithPollingProcessorLogger sets the log.Logger for PollingProcessor
func WithPollingProcessorMessagesPerPolling ¶
func WithPollingProcessorMessagesPerPolling(messagePerPolling int) PollingProcessorOption
WithPollingProcessorMessagesPerPolling sets the number of messages to fetch for PollingProcessor
func WithPollingProcessorPollingInterval ¶
func WithPollingProcessorPollingInterval(pollingInterval time.Duration) PollingProcessorOption
WithPollingProcessorPollingInterval sets the interval between attempts to fetch new messages for PollingProcessor
func WithPollingProcessorPurgeInterval ¶
func WithPollingProcessorPurgeInterval(purgeInterval time.Duration) PollingProcessorOption
WithPollingProcessorPurgeInterval sets the interval between attempts to purge published messages for PollingProcessor
func WithPollingProcessorPurgeOlderThan ¶
func WithPollingProcessorPurgeOlderThan(purgeOtherThan time.Duration) PollingProcessorOption
WithPollingProcessorPurgeOlderThan sets the max age of published messages to purge for PollingProcessor
func WithPollingProcessorRetryer ¶
func WithPollingProcessorRetryer(retryer retry.Retryer) PollingProcessorOption
WithPollingProcessorRetryer sets the retry strategy for failed calls for PollingProcessor