gpq

package module
v0.0.0-...-3db37ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: MIT Imports: 14 Imported by: 0

README

GPQ is an extremely fast and flexible priority queue, capable of a few million transactions a second when run in RAM and hundreds of thousands of transactions a second when synced to disk. GPQ supports a complex "Double Priority Queue" which allows for priorities to be distributed across N buckets, with each bucket holding a second priority queue which allows for internal escalation and timeouts of items based on parameters the user can specify during submission combined with how frequently you ask GPQ to prioritize the queue.

Table of Contents

Background

GPQ was written as an experiment when I was playing with Fibonacci Heaps and wanted to find something faster. I was disappointed by the state of research and libraries being used by most common applications, so GPQ is meant to be a highly flexible framework that can support a multitude of workloads.

Benchmarks

Due to the fact that most operations are done in constant time O(1) or logarithmic time O(log n), with the exception of the prioritize function which happens in linear time O(n), all GPQ operations are extremely fast. A single GPQ can handle a few million transactions a second and can be tuned depending on your work load. I have included some basic benchmarks using C++, Rust, Zig, and Go to measure GPQ's performance against the standard implementations of other languages.

While not a direct comparison, 10 million entries fully enqueued and dequeued (WITHOUT multiple routines) takes about 3.5 seconds with Go/GPQ, 5.5 seconds with Zig, 6 seconds with Rust, and about 9 seconds for C++. (Happy to have someone who knows C++ or Rust or Zig well to comment here and update what I have in bench)

Server Usage

GPQ includes a fully featured server implementation that supports, users, topics, token auth, batching through avro or arrow, settings and state recovery, and graceful exits among many other features.

The Server comes with it's own documents which you can read here: ReadMe

Direct Usage

Prerequisites

For this you will need Go >= 1.22 and gpq itself uses hashmap and BadgerDB.

Import Directly

GPQ at the core is a embeddable priority queue meant to be used at the core of critical workloads that require complex queueing and delivery order guarantees. The best way to use it is just to import it.

import "github.com/JustinTimperio/gpq"

API Reference

  1. NewGPQ[d any](NumOfBuckets int) *GPQ[d] - Creates a new GPQ with n number of buckets
    1. EnQueue(data d, priorityBucket int64, escalationRate time.Duration) error - Adds a piece of data into the queue with a priority and escalation rate
    2. DeQueue() (priority int64, data d, err error) - Retrieves the highest priority item in the queue along with its priority
    3. Prioritize() (uint64, []error) - Prioritize stops transactions on each bucket concurrently to shuffle the priorities internally within the bucket depending on the escalation rate given at time of EnQueue'ing

Submitting Items to the Queue

Once you have an initialized queue you can easily submit items like the following:


opts := schema.GPQOptions{
	NumberOfBatches:       10,
	DiskCacheEnabled:      true,
	DiskCachePath:         "/tmp/gpq/queue",
	DiskCacheCompression:  true,
	DiskEncryptionEnabled: true,
	DiskEncryptionKey:     []byte("12345678901234567890123456789012"),
	LazyDiskCacheEnabled:  true,
	LazyDiskBatchSize:     1000,
}

defaultEnqueueOptions := schema.EnqueueOptions{
  ShouldEscalate: true,
  EscalationRate: time.Duration(time.Second),
  CanTimeout:     true,
  Timeout:        time.Duration(10 * time.Second),
}

queue := gpq.NewGPQ[int](opts)

var (
	data int = 1
	priority int64 = 5 
  options = defaultEnqueueOptions
)

queue.EnQueue(data, priority, options)

You have a few options when you submit a job such as if the item should escalate over time if not sent, or inversely can timeout if it has been enqueued to long to be relevant anymore.

Contributing

GPQ is actively looking for maintainers so feel free to help out when:

  • Reporting a bug
  • Discussing the current state of the code
  • Submitting a fix
  • Proposing new features

We Develop with Github

We use github to host code, to track issues and feature requests, as well as accept pull requests.

All Code Changes Happen Through Pull Requests

  1. Fork the repo and create your branch from master.
  2. If you've added code that should be tested, add tests.
  3. If you've changed APIs, update the documentation.
  4. Ensure the test suite passes.
  5. Make sure your code lints.
  6. Issue that pull request!

Any contributions you make will be under the MIT Software License

In short, when you submit code changes, your submissions are understood to be under the same MIT License that covers the project. Feel free to contact the maintainers if that's a concern.

Report bugs using Github's Issues

We use GitHub issues to track public bugs. Report a bug by opening a new issue; it's that easy!

Write bug reports with detail, background, and sample code

Great Bug Reports tend to have:

  • A quick summary and/or background
  • Steps to reproduce
    • Be specific!
    • Give sample code if you can.
  • What you expected would happen
  • What actually happens
  • Notes (possibly including why you think this might be happening, or stuff you tried that didn't work)

License

By contributing, you agree that your contributions will be licensed under its MIT License.

License

All code here was originally written by me, Justin Timperio, under an MIT license with the exception of some code directly forked under a BSD license from the Go maintainers.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	BucketID   int64
	Prev, Next *Bucket
}

type BucketPriorityQueue

type BucketPriorityQueue struct {
	ActiveBuckets  int64
	BucketIDs      map[int64]*Bucket
	First, Last    *Bucket
	LastRemoved    int64
	ObjectsInQueue uint64
	// contains filtered or unexported fields
}

Bucket priority queue implementation. This is used to keep track of non-empty buckets in the GPQ This is a combination of a HashSet, doubly linked list, and a priority queue to allow for O(1) removal of buckets and removal of items from the buckets and O(1) addition of buckets and addition of items to the buckets

func NewBucketPriorityQueue

func NewBucketPriorityQueue() *BucketPriorityQueue

NewBucketPriorityQueue creates a new BucketPriorityQueue

func (*BucketPriorityQueue) Add

func (pq *BucketPriorityQueue) Add(bucketID int64)

func (*BucketPriorityQueue) Contains

func (pq *BucketPriorityQueue) Contains(bucketID int64) bool

func (*BucketPriorityQueue) Len

func (pq *BucketPriorityQueue) Len() *int64

func (*BucketPriorityQueue) Peek

func (pq *BucketPriorityQueue) Peek() (bucketID int64, exists bool)

func (*BucketPriorityQueue) Remove

func (pq *BucketPriorityQueue) Remove(bucketID int64)

type CorePriorityQueue

type CorePriorityQueue[T any] struct {
	// contains filtered or unexported fields
}

A PriorityQueue implements heap.Interface and holds Items.

func NewCorePriorityQueue

func NewCorePriorityQueue[T any](bpq *BucketPriorityQueue) CorePriorityQueue[T]

NewCorePriorityQueue creates a new CorePriorityQueue

func (*CorePriorityQueue[T]) DeQueue

func (pq *CorePriorityQueue[T]) DeQueue() (wasRecoverd bool, batchNumber uint64, diskUUID []byte, priority int64, data T, err error)

DeQueue removes the first item from the heap

func (*CorePriorityQueue[T]) EnQueue

func (pq *CorePriorityQueue[T]) EnQueue(data schema.Item[T])

EnQueue adds an item to the heap and the end of the array

func (CorePriorityQueue[T]) Len

func (pq CorePriorityQueue[T]) Len() int

Len is used to get the length of the heap It is needed to implement the heap.Interface

func (CorePriorityQueue[T]) Less

func (pq CorePriorityQueue[T]) Less(i, j int) bool

Less is used to compare the priority of two items It is needed to implement the heap.Interface

func (*CorePriorityQueue[T]) NoLockDeQueue

func (pq *CorePriorityQueue[T]) NoLockDeQueue()

NoLockDeQueue removes the first item from the heap without locking the queue This is used for nested calls to avoid deadlocks

func (CorePriorityQueue[T]) Peek

func (pq CorePriorityQueue[T]) Peek() (data T, err error)

Peek returns the first item in the heap without removing it

func (CorePriorityQueue[T]) ReadPointers

func (pq CorePriorityQueue[T]) ReadPointers() []*schema.Item[T]

Exposes the raw pointers to the items in the queue so that reprioritization can be done

func (*CorePriorityQueue[T]) Remove

func (pq *CorePriorityQueue[T]) Remove(item *schema.Item[T])

Remove removes an item from the queue

func (CorePriorityQueue[T]) Swap

func (pq CorePriorityQueue[T]) Swap(i, j int)

Swap is used to swap two items in the heap It is needed to implement the heap.Interface

func (*CorePriorityQueue[T]) UpdatePriority

func (pq *CorePriorityQueue[T]) UpdatePriority(item *schema.Item[T], priority int64)

UpdatePriority modifies the priority of an Item in the queue.

type GPQ

type GPQ[d any] struct {
	// BucketCount is the number of priority buckets
	BucketCount int64
	// NonEmptyBuckets is a priority queue of non-empty buckets
	NonEmptyBuckets *BucketPriorityQueue
	// contains filtered or unexported fields
}

GPQ is a generic priority queue that supports priority levels and timeouts It is implemented using a heap for each priority level and a priority queue of non-empty buckets It also supports disk caching using badgerDB with the option to lazily disk writes and deletes The GPQ is thread-safe and supports concurrent access

func NewGPQ

func NewGPQ[d any](Options schema.GPQOptions) (uint64, *GPQ[d], error)

NewGPQ creates a new GPQ with the given number of buckets The number of buckets is the number of priority levels you want to support You must provide the number of buckets ahead of time and all priorities you submit must be within the range of 0 to NumOfBuckets

func (*GPQ[d]) Close

func (g *GPQ[d]) Close()

Close closes the GPQ and the disk cache

func (*GPQ[d]) DeQueue

func (g *GPQ[d]) DeQueue() (priority int64, data d, err error)

DeQueue removes and returns the item with the highest priority from the GPQ. It returns the priority of the item, the data associated with it, and an error if the queue is empty or if any internal data structures are missing.

func (*GPQ[d]) EnQueue

func (g *GPQ[d]) EnQueue(data d, priorityBucket int64, options schema.EnQueueOptions) error

EnQueue adds an item to the GPQ The priorityBucket is the priority level of the item The escalationRate is the amount of time before the item is escalated to the next priority level The data is the data you want to store in the GPQ item

func (*GPQ[d]) Peek

func (g *GPQ[d]) Peek() (data d, err error)

Peek returns the item with the highest priority from the GPQ. It returns the data associated with the item and an error if the queue is empty.

func (*GPQ[d]) Prioritize

func (g *GPQ[d]) Prioritize() (timedOutItems uint64, escalatedItems uint64, errs []error)

Prioritize is a method of the GPQ type that prioritizes items within a heap. It iterates over each bucket in the GPQ, locks the corresponding mutex, and checks if there are items to prioritize. If there are items, it calculates the number of durations that have passed since the last escalation and updates the priority accordingly. It returns an array of errors if any of the required data structures are missing or if there are no items to prioritize.

Directories

Path Synopsis
bench
go
Package heap provides heap operations for any type that implements heap.Interface.
Package heap provides heap operations for any type that implements heap.Interface.
docs
Package docs Code generated by swaggo/swag.
Package docs Code generated by swaggo/swag.
ws

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL