flexqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2022 License: MIT Imports: 3 Imported by: 0

README

flexqueue

The flexqueue package provides some hybrid data structures which are intended to cover the most common use cases for highly performant single lane message queues and ordered lists. No single use case is likely to use all of the provided features or methods, but integral to the design is that no arbitrary limitations were imposed.

Documentation

Design Considerations

No external dependencies, this package only uses internal go libraries.

Care was taken to implement all methods with a constant 0(1) time signature. This means that performance will be effected linearly as the collection size grows, no exponential degradation.

Care was taken to be as non-intrusive and non-prescriptive as possible. These data structures therefore do not spawn additional go routines or dictate any item interface. All features are optional and no major performance penalty is incurred by opting out of a feature.

Because these are hybrid data structures which blend several different concepts, I've maintained a coherent and consistent naming convention across everything. Depending on your existing bias of naming conventions, you may be required to adapt the language you are used to.

FlexList

FlexList is a high performance ordered map. Internally it uses a combination of a double linked list via container/list for item order and a map of strings for an index.

FlexQueue

FlexQueue is a combined FIFO/LIFO single lane queue. FlexQueue is essentially a wrapper for FlexList but provides the following additional queue concepts:

  • Thread safety via mutex
  • Max queue length
  • Message de-duplication
  • Message TTL/expiration with callback

FIFI/LIFO

  • For a FIFO (first-in first-out) queue use PushBack for insertions and PullFront for extractions.
  • For a LIFO (last-in first-out) queue use PushFront for insertions and PullFront for extractions.
  • To leave a message in a FIFO queue while it is being processed use ReadFront and Remove rather than PullFront.

De-Duplication

  • To utilize message de-duplication provide a digest value based on a hash of message contents. You implement the digest algorithm.
  • To avoid message de-duplication provide a unique digest value for every message.

TTL

  • TTL is optional, and the configuration is handled on each message insertion with a time.Duration and a callback function.
  • A queue may contain a mix of messages with and without a TTL.
  • TTL uses time.Duration to guarantee the expiration accuracy regardless of server time zone settings.
  • If you messages use expiration dates then you should map them to a time.Duration at the time of insertion.
  • All read/write functions which access a message in the queue will transparently perform a TTL analysis and if the message is expired it will be automatically removed from the queue and the access method will behave as if the message had not existed. The only exceptions to this are the Len, Empty and Full methods which do not perform TTL analysis and can therefore count expired messages. We did this to keep these counting methods performant. If you want to take the performance hit for better accuracy then call Prune first.

Documentation

Index

Constants

View Source
const (
	NoMax = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type FlexList

type FlexList struct {
	// contains filtered or unexported fields
}

FlexList is a high performance ordered map that maintains constant amortized time O(1) for all read/write operations. Internally it uses a combination of a double linked list for item order and a map of strings for an index.

func NewFlexList

func NewFlexList() *FlexList

NewFlexList factory func should always be used to instantiate a new FlexList

func (*FlexList) Has

func (l *FlexList) Has(index string) bool

Has will return true if the list contains the given index and false if it does not.

func (*FlexList) Len

func (l *FlexList) Len() int

Len will return the number of items in the list

func (*FlexList) Pull

func (l *FlexList) Pull(index string) (interface{}, bool)

Pull will return an item from the list based on the index and then remove it. This is an alias for Read() + Remove() Returns: * interface{}: The item * bool: true if the item was found

func (*FlexList) PullBack

func (l *FlexList) PullBack() (string, interface{}, bool)

PullBack will return an item from the back of the list and then remove it. This is an alias for ReadBack() + Remove(). Returns: * string: The index of the item * interface{}: The item * bool: true if an item was found and false for an empty list

func (*FlexList) PullFront

func (l *FlexList) PullFront() (string, interface{}, bool)

PullFront will remove an item from the front of the list and return it. This is an alias for ReadFront() + Remove(). Returns: * string: The index of the item * interface{}: The item * bool: true if an item was found and false for an empty list

func (*FlexList) PushBack

func (l *FlexList) PushBack(index string, item interface{}) bool

PushBack will create the index and add the item to the back of the list. If the index already exists then the operation is ignored. Returns: * bool: true if a new item was inserted or false if it already existed

func (*FlexList) PushFront

func (l *FlexList) PushFront(index string, item interface{}) bool

PushFront will create the index and add the item to the front of the list. If the index already exists then the operation is ignored. Returns: * bool: true if a new item was inserted or false if it already existed

func (*FlexList) Read

func (l *FlexList) Read(index string) (interface{}, bool)

Read will return an item from the list based on the index without removing it. Returns: * interface{}: The item * bool: true if the item was found

func (*FlexList) ReadBack

func (l *FlexList) ReadBack() (string, interface{}, bool)

ReadBack will return an item from the back of the list without removing it. Returns: * string: The index of the item * interface{}: The item * bool: true if an item was found and false for an empty list

func (*FlexList) ReadFront

func (l *FlexList) ReadFront() (string, interface{}, bool)

ReadFront will return an item from the front of the list without removing it. Returns: * string: The index of the item * interface{}: The item * bool: true if an item was found and false for an empty list

func (*FlexList) Remove

func (l *FlexList) Remove(index string) bool

Remove will delete an item from the list based on the index. Returns: * bool: true if the item was removed and false if not found

func (*FlexList) Update

func (l *FlexList) Update(index string, item interface{}) bool

Update will update an item in the list based on its index without changing the order. The operation is ignored if the item does not already exist. Returns: * bool: true if the item was updated and false if not found

type FlexQueue

type FlexQueue struct {
	sync.RWMutex // Shared mutex for locking
	// contains filtered or unexported fields
}

FlexQueue is a combined FIFO/LIFO single lane queue with all the features of FlexList but also supporting mutex thread safety, max queue length, message de-duplication and ttl/expiration.

func NewFlexQueue

func NewFlexQueue() *FlexQueue

NewFlexQueue is a factory method for creating a new flex queue. It is important to use this method to properly initialize the internal structs.

func (*FlexQueue) Has

func (q *FlexQueue) Has(digest string) bool

Has returns true if the message with the given digest is in the queue. Expired messages will be removed and this will return false.

func (*FlexQueue) IsEmpty

func (q *FlexQueue) IsEmpty() bool

IsEmpty returns true if the queue is empty and false if its not

func (*FlexQueue) IsFull

func (q *FlexQueue) IsFull() bool

IsFull returns true if the queue is full and false if its not

func (*FlexQueue) Len

func (q *FlexQueue) Len() int

Len returns the number of messages currently in the queue

func (*FlexQueue) Max

func (q *FlexQueue) Max() int

Max returns the maximum number of messages the queue can hold. If there is no message limit then this will return -1.

func (*FlexQueue) Prune

func (q *FlexQueue) Prune() bool

Prune will scan all messages and remove any with an expired ttl. This function is meant to be used on an interval by the caller in the case that the automatic removal of expired messages by Pull, Read, or Has methods is insufficient. Returns true if any expired messages were found and removed.

func (*FlexQueue) Pull

func (q *FlexQueue) Pull(digest string) (interface{}, bool)

Pull will return the message with the given digest and remove it from the queue. Messages with an expired ttl are automatically removed. Returns: * interface{}: The message * bool: true if a message was found or false if not found or expired/removed

func (*FlexQueue) PullBack

func (q *FlexQueue) PullBack() (string, interface{}, bool)

PullBack will remove a message from the end of the queue and return a reference to it. Messages with an expired ttl are automatically removed. Returns: * string: The message digest * interface{}: The message * bool: true if a message was found or false if empty queue

func (*FlexQueue) PullFront

func (q *FlexQueue) PullFront() (string, interface{}, bool)

PullFront will remove a message from the beginning of the queue and return a reference to it. Messages with an expired ttl are automatically removed. Returns: * string: The message digest * interface{}: The message * bool: true if a message was found or false if empty queue

func (*FlexQueue) PushBack

func (q *FlexQueue) PushBack(digest string, message interface{}) bool

PushBack will add a new message to the end of the queue. It returns true if the message was added or if it already existed in the queue based on the digest value (automatic de-duping), and false if the message was not added because the queue was full. If de-dupe occurs then the message will not be updated.

func (*FlexQueue) PushBackTTL

func (q *FlexQueue) PushBackTTL(digest string, message interface{}, ttl time.Duration, callback func(digest string, message interface{})) bool

PushBackTTL will add a new message to the back of the queue. It behaves identical to PushBack expect that it attaches a TTL and expiration callback to the message.

func (*FlexQueue) PushFront

func (q *FlexQueue) PushFront(digest string, message interface{}) bool

PushFront will add a new message to the front of the queue. It returns true if the message was added or if it already existed in the queue based on the digest value (automatic de-duping), and false if the message was not added because the queue was full. If de-dupe occurs then the message will not be updated.

func (*FlexQueue) PushFrontTTL

func (q *FlexQueue) PushFrontTTL(digest string, message interface{}, ttl time.Duration, callback func(digest string, message interface{})) bool

PushFrontTTL will add a new message to the front of the queue. It behaves identical to PushFront expect that it attaches a TTL and expiration callback to the message.

func (*FlexQueue) Read

func (q *FlexQueue) Read(digest string) (interface{}, bool)

Read will return the message with the given digest without removing it. Messages with an expired ttl are automatically removed. Returns: * interface{}: The message * bool: true if a message was found or false if not found or expired/removed

func (*FlexQueue) ReadBack

func (q *FlexQueue) ReadBack() (string, interface{}, bool)

ReadBack will return a message from the end of the queue without removing it. Messages with an expired ttl are automatically removed. Returns: * string: The message digest * interface{}: The message * bool: true if a message was found or false if empty queue

func (*FlexQueue) ReadFront

func (q *FlexQueue) ReadFront() (string, interface{}, bool)

ReadFront will return a message from the beginning of the queue without removing it. Messages with an expired ttl are automatically removed. Returns: * string: The message digest * interface{}: The message * bool: true if a message was found or false if empty queue

func (*FlexQueue) Remove

func (q *FlexQueue) Remove(digest string) bool

Remove will delete the message from the queue. Returns true if the message was found and deleted or false if not found.

func (*FlexQueue) SetMax

func (q *FlexQueue) SetMax(max int) *FlexQueue

func (*FlexQueue) Update

func (q *FlexQueue) Update(digest string, message interface{}) bool

Update will update a message already in the queue based on its digest without changing the order. Returns: * bool: true if the item was updated and false if not found

type ItemWrapper

type ItemWrapper struct {
	// contains filtered or unexported fields
}

ItemWrapper retains the relationship between the linked list and the index map

type TTL

type TTL struct {
	Expires  time.Time
	Callback func(digest string, message interface{})
}

TTL is an expiration control that applies to a single message.

func NewTTL

func NewTTL(ttl time.Duration, callback func(digest string, message interface{})) *TTL

NewTTL creates a new TTL control for the duration based on now

func (*TTL) Expired

func (ttl *TTL) Expired() bool

Expired will check the ttl expires time against now and return true if it is expired and false if not.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL