Documentation ¶
Overview ¶
Package queue implements a queue processor for delayed events. Events are maintained in an in-memory queue, where items are in the order of when they are to be executed. Users should interact with the Processor to process events in the queue. When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrProcessorStopped = errors.New("processor is stopped")
ErrProcessorStopped is returned when the processor is not running.
Functions ¶
This section is empty.
Types ¶
type Processor ¶
type Processor[K comparable, T queueable[K]] struct { // contains filtered or unexported fields }
Processor manages the queue of items and processes them at the correct time.
Example ¶
package main import ( "fmt" "time" ) // queueableItem is an item that can be queued and it's used for testing. type queueableItem struct { Name string ExecutionTime time.Time } // Key returns the key for this unique item. func (r queueableItem) Key() string { return r.Name } // ScheduledTime returns the time the item is scheduled to be executed at. // This is implemented to comply with the queueable interface. func (r queueableItem) ScheduledTime() time.Time { return r.ExecutionTime } func main() { // Method invoked when an item is to be executed executed := make(chan string, 3) executeFn := func(r *queueableItem) { executed <- "Executed: " + r.Name } // Create the processor processor := NewProcessor[string, *queueableItem](executeFn) // Add items to the processor, in any order, using Enqueue processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)}) processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: time.Now().Add(200 * time.Millisecond)}) processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(300 * time.Millisecond)}) processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: time.Now().Add(time.Second)}) // Items with the same value returned by Key() are considered the same, so will be replaced processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(100 * time.Millisecond)}) // Using Dequeue allows removing an item from the queue processor.Dequeue("item4") for i := 0; i < 3; i++ { fmt.Println(<-executed) } }
Output: Executed: item3 Executed: item2 Executed: item1
func NewProcessor ¶
func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T]
NewProcessor returns a new Processor object. executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func (*Processor[K, T]) Close ¶
Close stops the processor. This method blocks until the processor loop returns.