Documentation ¶
Overview ¶
Package queue implements a few types of queues. Notably an asynchronous queue for non-blocking data processing and a SQLite3 queue.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errors.New("queue: queue is empty")
ErrEmpty is returned when dequeuing from an empty queue.
Functions ¶
This section is empty.
Types ¶
type AsyncQueue ¶
type AsyncQueue interface { // Add data to the queue. Safe for concurrent use. Enqueue(data []byte) error // Close the queue. Can be done at any point after the queue is // constructed. io.Closer }
AsyncQueue processes queue data asynchronously.
Example ¶
An example of using the async queue to process data.
package main import ( "fmt" "log" "time" "github.com/esote/queue" ) func main() { sqlite3, err := queue.NewSqlite3Queue("test.db") if err != nil { log.Fatal(err) } defer sqlite3.Close() handler := func(data []byte, err error) { if err != nil { log.Println(err) return } // Note printing via fmt is racey with multiple workers. fmt.Println(string(data)) } // q is an async queue, backed by SQLite3, with one worker. It is // generally recommended to have multiple workers. q, err := queue.NewAsyncQueue(sqlite3, handler, 1) if err != nil { log.Fatal(err) } defer q.Close() msgs := []string{"hi", "hello", "hey", "hiya"} for _, msg := range msgs { if err = q.Enqueue([]byte(msg)); err != nil { log.Fatal(err) } } // In an async queue, data will be dequeued "eventually." For the // purposes of this example we wait for all of it to be processed. time.Sleep(10 * time.Millisecond) }
Output: hi hello hey hiya
func NewAsyncQueue ¶
func NewAsyncQueue(q Queue, handler Handler, workers int) (AsyncQueue, error)
NewAsyncQueue creates an async queue that processes inner queue data through a handler and worker pool. Closing the async queue does NOT close the inner queue.
type Handler ¶
Handler operates on data from the async queue. Err comes from the inner queue's dequeue operation when err is not ErrEmpty.
type Queue ¶
type Queue interface { // Add data to the queue. Safe for concurrent use. Enqueue(data []byte) error // Remove data from the queue. Safe for concurrent use. Returns ErrEmpty // if the queue contains no data. Dequeue() ([]byte, error) // Close the queue. io.Closer }
Queue contains data.
func NewSqlite3Queue ¶
NewSqlite3Queue creates an SQLite3-backed queue with ACID properties.