Documentation ¶
Index ¶
- type Bucket
- type BucketPriorityQueue
- type CorePriorityQueue
- func (pq *CorePriorityQueue[T]) DeQueue() (wasRecoverd bool, batchNumber uint64, diskUUID []byte, priority int64, data T, ...)
- func (pq *CorePriorityQueue[T]) EnQueue(data schema.Item[T])
- func (pq CorePriorityQueue[T]) Len() int
- func (pq CorePriorityQueue[T]) Less(i, j int) bool
- func (pq *CorePriorityQueue[T]) NoLockDeQueue()
- func (pq CorePriorityQueue[T]) Peek() (data T, err error)
- func (pq CorePriorityQueue[T]) ReadPointers() []*schema.Item[T]
- func (pq *CorePriorityQueue[T]) Remove(item *schema.Item[T])
- func (pq CorePriorityQueue[T]) Swap(i, j int)
- func (pq *CorePriorityQueue[T]) UpdatePriority(item *schema.Item[T], priority int64)
- type GPQ
- func (g *GPQ[d]) Close()
- func (g *GPQ[d]) DeQueue() (priority int64, data d, err error)
- func (g *GPQ[d]) EnQueue(data d, priorityBucket int64, options schema.EnQueueOptions) error
- func (g *GPQ[d]) Peek() (data d, err error)
- func (g *GPQ[d]) Prioritize() (timedOutItems uint64, escalatedItems uint64, errs []error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BucketPriorityQueue ¶
type BucketPriorityQueue struct { ActiveBuckets int64 BucketIDs map[int64]*Bucket First, Last *Bucket LastRemoved int64 ObjectsInQueue uint64 // contains filtered or unexported fields }
Bucket priority queue implementation. This is used to keep track of non-empty buckets in the GPQ This is a combination of a HashSet, doubly linked list, and a priority queue to allow for O(1) removal of buckets and removal of items from the buckets and O(1) addition of buckets and addition of items to the buckets
func NewBucketPriorityQueue ¶
func NewBucketPriorityQueue() *BucketPriorityQueue
NewBucketPriorityQueue creates a new BucketPriorityQueue
func (*BucketPriorityQueue) Add ¶
func (pq *BucketPriorityQueue) Add(bucketID int64)
func (*BucketPriorityQueue) Contains ¶
func (pq *BucketPriorityQueue) Contains(bucketID int64) bool
func (*BucketPriorityQueue) Len ¶
func (pq *BucketPriorityQueue) Len() *int64
func (*BucketPriorityQueue) Peek ¶
func (pq *BucketPriorityQueue) Peek() (bucketID int64, exists bool)
func (*BucketPriorityQueue) Remove ¶
func (pq *BucketPriorityQueue) Remove(bucketID int64)
type CorePriorityQueue ¶
type CorePriorityQueue[T any] struct { // contains filtered or unexported fields }
A PriorityQueue implements heap.Interface and holds Items.
func NewCorePriorityQueue ¶
func NewCorePriorityQueue[T any](bpq *BucketPriorityQueue) CorePriorityQueue[T]
NewCorePriorityQueue creates a new CorePriorityQueue
func (*CorePriorityQueue[T]) DeQueue ¶
func (pq *CorePriorityQueue[T]) DeQueue() (wasRecoverd bool, batchNumber uint64, diskUUID []byte, priority int64, data T, err error)
DeQueue removes the first item from the heap
func (*CorePriorityQueue[T]) EnQueue ¶
func (pq *CorePriorityQueue[T]) EnQueue(data schema.Item[T])
EnQueue adds an item to the heap and the end of the array
func (CorePriorityQueue[T]) Len ¶
func (pq CorePriorityQueue[T]) Len() int
Len is used to get the length of the heap It is needed to implement the heap.Interface
func (CorePriorityQueue[T]) Less ¶
func (pq CorePriorityQueue[T]) Less(i, j int) bool
Less is used to compare the priority of two items It is needed to implement the heap.Interface
func (*CorePriorityQueue[T]) NoLockDeQueue ¶
func (pq *CorePriorityQueue[T]) NoLockDeQueue()
NoLockDeQueue removes the first item from the heap without locking the queue This is used for nested calls to avoid deadlocks
func (CorePriorityQueue[T]) Peek ¶
func (pq CorePriorityQueue[T]) Peek() (data T, err error)
Peek returns the first item in the heap without removing it
func (CorePriorityQueue[T]) ReadPointers ¶
func (pq CorePriorityQueue[T]) ReadPointers() []*schema.Item[T]
Exposes the raw pointers to the items in the queue so that reprioritization can be done
func (*CorePriorityQueue[T]) Remove ¶
func (pq *CorePriorityQueue[T]) Remove(item *schema.Item[T])
Remove removes an item from the queue
func (CorePriorityQueue[T]) Swap ¶
func (pq CorePriorityQueue[T]) Swap(i, j int)
Swap is used to swap two items in the heap It is needed to implement the heap.Interface
func (*CorePriorityQueue[T]) UpdatePriority ¶
func (pq *CorePriorityQueue[T]) UpdatePriority(item *schema.Item[T], priority int64)
UpdatePriority modifies the priority of an Item in the queue.
type GPQ ¶
type GPQ[d any] struct { // BucketCount is the number of priority buckets BucketCount int64 // NonEmptyBuckets is a priority queue of non-empty buckets NonEmptyBuckets *BucketPriorityQueue // contains filtered or unexported fields }
GPQ is a generic priority queue that supports priority levels and timeouts It is implemented using a heap for each priority level and a priority queue of non-empty buckets It also supports disk caching using badgerDB with the option to lazily disk writes and deletes The GPQ is thread-safe and supports concurrent access
func NewGPQ ¶
NewGPQ creates a new GPQ with the given number of buckets The number of buckets is the number of priority levels you want to support You must provide the number of buckets ahead of time and all priorities you submit must be within the range of 0 to NumOfBuckets
func (*GPQ[d]) DeQueue ¶
DeQueue removes and returns the item with the highest priority from the GPQ. It returns the priority of the item, the data associated with it, and an error if the queue is empty or if any internal data structures are missing.
func (*GPQ[d]) EnQueue ¶
func (g *GPQ[d]) EnQueue(data d, priorityBucket int64, options schema.EnQueueOptions) error
EnQueue adds an item to the GPQ The priorityBucket is the priority level of the item The escalationRate is the amount of time before the item is escalated to the next priority level The data is the data you want to store in the GPQ item
func (*GPQ[d]) Peek ¶
Peek returns the item with the highest priority from the GPQ. It returns the data associated with the item and an error if the queue is empty.
func (*GPQ[d]) Prioritize ¶
Prioritize is a method of the GPQ type that prioritizes items within a heap. It iterates over each bucket in the GPQ, locks the corresponding mutex, and checks if there are items to prioritize. If there are items, it calculates the number of durations that have passed since the last escalation and updates the priority accordingly. It returns an array of errors if any of the required data structures are missing or if there are no items to prioritize.