opqueue

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: Apache-2.0 Imports: 20 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// PoolSize is the number of AMQP subscribers that are listening for operation messages.
	PoolSize int
	// TaskMonitorInterval is the interval (period) in which operation queue tasks from other server instances
	// are monitored.
	TaskMonitorInterval time.Duration
	// TaskExpiration is the maximum time that an operation queue task can exist in the database before it is
	// considered to have expired. At which point, any other server instance may delete the task and take over
	// processing of all operations associated with the task.
	TaskExpiration time.Duration
	// MaxRetries is the maximum number of retries for a failed operation in a batch.
	MaxRetries int
	// RetriesInitialDelay is the delay for the initial retry attempt.
	RetriesInitialDelay time.Duration
	// RetriesMaxDelay is the maximum delay for a retry attempt.
	RetriesMaxDelay time.Duration
	// RetriesMultiplier is the multiplier for a retry attempt. For example, if set to 1.5 and
	// the previous retry interval was 2s then the next retry interval is set 3s.
	RetriesMultiplier float64
	// MaxOperationsToRepost is the maximum number of operations to repost to the queue after
	// an instance dies.
	MaxOperationsToRepost int
	// OperationLifespan is the maximum time that an operation can exist in the database before
	// it is deleted
	OperationLifeSpan time.Duration
	// BatchWriterTimeout specifies the timeout for when a batch of operations is cut.
	BatchWriterTimeout time.Duration
	// MaxContiguousWithError specifies the maximum number of previously failed operations to rearrange contiguously.
	MaxContiguousWithError int
	// MaxContiguousWithoutError specifies the maximum number of operations (with no error) to rearrange contiguously.
	MaxContiguousWithoutError int
}

Config contains configuration parameters for the operation queue.

type OperationMessage added in v1.0.0

type OperationMessage struct {
	ID        string                           `json:"id"`
	Operation *operation.QueuedOperationAtTime `json:"operation"`
	Retries   int                              `json:"retries"`
	HasError  bool                             `json:"hasError,omitempty"`
}

OperationMessage contains the data that is sent to the message broker.

type Queue

type Queue struct {
	*lifecycle.Lifecycle
	// contains filtered or unexported fields
}

Queue implements an operation queue that uses a publisher/subscriber.

func New

func New(cfg *Config, pubSub pubSub, p storage.Provider, taskMgr taskManager,
	expiryService dataExpiryService, metrics metricsProvider,
) (*Queue, error)

New returns a new operation queue.

func (*Queue) Add

func (q *Queue) Add(op *operation.QueuedOperation, protocolVersion uint64) (uint, error)

Add publishes the given operation.

func (*Queue) Len

func (q *Queue) Len() uint

Len returns the length of the pending queue.

func (*Queue) Peek

Peek returns (up to) the given number of operations from the head of the queue but does not remove them.

func (*Queue) Remove

func (q *Queue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error)

Remove removes (up to) the given number of items from the head of the queue. Returns the actual number of items that were removed and the new length of the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL