queue

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: Apache-2.0 Imports: 6 Imported by: 2

Documentation

Overview

Package queue implements a queue processor for delayed events. Events are maintained in an in-memory queue, where items are in the order of when they are to be executed. Users should interact with the Processor to process events in the queue. When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrProcessorStopped = errors.New("processor is stopped")

ErrProcessorStopped is returned when the processor is not running.

Functions

This section is empty.

Types

type Processor

type Processor[K comparable, T queueable[K]] struct {
	// contains filtered or unexported fields
}

Processor manages the queue of items and processes them at the correct time.

Example
package main

import (
	"fmt"
	"time"
)

// queueableItem is an item that can be queued and it's used for testing.
type queueableItem struct {
	Name          string
	ExecutionTime time.Time
}

// Key returns the key for this unique item.
func (r queueableItem) Key() string {
	return r.Name
}

// ScheduledTime returns the time the item is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r queueableItem) ScheduledTime() time.Time {
	return r.ExecutionTime
}

func main() {
	// Method invoked when an item is to be executed
	executed := make(chan string, 3)
	executeFn := func(r *queueableItem) {
		executed <- "Executed: " + r.Name
	}

	// Create the processor
	processor := NewProcessor[string, *queueableItem](executeFn)

	// Add items to the processor, in any order, using Enqueue
	processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: time.Now().Add(200 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(300 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: time.Now().Add(time.Second)})

	// Items with the same value returned by Key() are considered the same, so will be replaced
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(100 * time.Millisecond)})

	// Using Dequeue allows removing an item from the queue
	processor.Dequeue("item4")

	for i := 0; i < 3; i++ {
		fmt.Println(<-executed)
	}
}
Output:

Executed: item3
Executed: item2
Executed: item1

func NewProcessor

func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T]

NewProcessor returns a new Processor object. executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.

func (*Processor[K, T]) Close

func (p *Processor[K, T]) Close() error

Close stops the processor. This method blocks until the processor loop returns.

func (*Processor[K, T]) Dequeue

func (p *Processor[K, T]) Dequeue(key K) error

Dequeue removes a item from the queue.

func (*Processor[K, T]) Enqueue

func (p *Processor[K, T]) Enqueue(r T) error

Enqueue adds a new item to the queue. If a item with the same ID already exists, it'll be replaced.

func (*Processor[K, T]) WithClock added in v0.12.1

func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T]

WithClock sets the clock used by the processor. Used for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL