queue

package
v0.0.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeMetrics

func ComputeMetrics(ctx context.Context, pool *Queue)

func ProcessLockPool

func ProcessLockPool(ctx context.Context, queue *Queue)

processLockPool moves messages from the lock message pool to the message pool.

func ProcessTimeoutMessages

func ProcessTimeoutMessages(ctx context.Context, queue *Queue) error

ProcessTimeoutMessages process messages timeout that have not been acked (or nacked) for more than 5 minutes returning them back to the active queue with the maximum score. TODO: allow each queue to have its own deadline for timeout. TODO: change the behavior of this so it doesn't need to load all queue names in memory, we could use the storage to list queues with a cursor TODO: we could even change the timeout mechanism to be not based on the queue name

func RecoveryMessagesPool

func RecoveryMessagesPool(ctx context.Context, pool *Queue) (metrify bool)

RecoveryMessagesPool recover messages pool sending all storage data to cache

func RemoveExceedingMessages

func RemoveExceedingMessages(ctx context.Context, pool *Queue) (bool, error)

Checks if there is any queue with max_elements configuration and then remove every exceeding messages using expiry_date to sort which elements will be removed TODO manage message pool update individually by queue to avoid future bottlenecks

func RemoveTTLMessages

func RemoveTTLMessages(ctx context.Context, pool *Queue, filterDate *time.Time) (bool, error)

Remove 10000 expired elements ordered by expiration date asc for each queue.

Types

type DeckardQueue

type DeckardQueue interface {
	AddMessagesToCache(ctx context.Context, messages ...*message.Message) (int64, error)
	AddMessagesToStorage(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error)
	Nack(ctx context.Context, message *message.Message, timestamp time.Time, reason string) (bool, error)
	Ack(ctx context.Context, message *message.Message, reason string) (bool, error)
	TimeoutMessages(ctx context.Context, queue string) ([]string, error)
	Pull(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (*[]message.Message, error)
	Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)
	Count(ctx context.Context, opts *storage.FindOptions) (int64, error)
	GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]message.Message, error)

	// Flushes all deckard content from cache and storage.
	// Used only for memory instance.
	Flush(ctx context.Context) (bool, error)
}

type DefaultQueueConfigurationService

type DefaultQueueConfigurationService struct {
	// contains filtered or unexported fields
}

func NewQueueConfigurationService

func NewQueueConfigurationService(_ context.Context, storage storage.Storage) *DefaultQueueConfigurationService

func (*DefaultQueueConfigurationService) EditQueueConfiguration

func (queueService *DefaultQueueConfigurationService) EditQueueConfiguration(ctx context.Context, cfg *configuration.QueueConfiguration) error

func (*DefaultQueueConfigurationService) GetQueueConfiguration

func (queueService *DefaultQueueConfigurationService) GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)

type Queue

type Queue struct {
	QueueConfigurationService QueueConfigurationService
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(auditor audit.Auditor, storageImpl storage.Storage, queueService QueueConfigurationService, cache cache.Cache) *Queue

func (*Queue) Ack

func (pool *Queue) Ack(ctx context.Context, msg *message.Message, reason string) (bool, error)

func (*Queue) AddMessagesToCache

func (pool *Queue) AddMessagesToCache(ctx context.Context, messages ...*message.Message) (int64, error)

func (*Queue) AddMessagesToCacheWithAuditReason

func (pool *Queue) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*message.Message) (int64, error)

func (*Queue) AddMessagesToStorage

func (pool *Queue) AddMessagesToStorage(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error)

func (*Queue) Count

func (pool *Queue) Count(ctx context.Context, opts *storage.FindOptions) (int64, error)

func (*Queue) Flush

func (pool *Queue) Flush(ctx context.Context) (bool, error)

func (*Queue) GetStorageMessages

func (pool *Queue) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]message.Message, error)

func (*Queue) Nack

func (pool *Queue) Nack(ctx context.Context, msg *message.Message, timestamp time.Time, reason string) (bool, error)

func (*Queue) Pull

func (pool *Queue) Pull(ctx context.Context, queue string, n int64, minScore *float64, maxScore *float64, ackDeadlineMs int64) (*[]message.Message, error)

func (*Queue) Remove

func (pool *Queue) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error)

func (*Queue) TimeoutMessages

func (pool *Queue) TimeoutMessages(ctx context.Context, queue string) ([]string, error)

type QueueConfigurationService

type QueueConfigurationService interface {
	EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error
	GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL