queue

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2020 License: AGPL-3.0 Imports: 8 Imported by: 0

README

Package queue implements a few types of queues. Notably an asynchronous queue
for non-blocking data processing and a SQLite3 queue.

Package httpq is a specialization of queue.AsyncQueue for HTTP requests.

Documentation

Overview

Package queue implements a few types of queues. Notably an asynchronous queue for non-blocking data processing and a SQLite3 queue.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmpty = errors.New("queue: queue is empty")

ErrEmpty is returned when dequeuing from an empty queue.

Functions

This section is empty.

Types

type AsyncQueue

type AsyncQueue interface {
	// Add data to the queue. Safe for concurrent use.
	Enqueue(data []byte) error

	// Close the queue. Can be done at any point after the queue is
	// constructed.
	io.Closer
}

AsyncQueue processes queue data asynchronously.

Example

An example of using the async queue to process data.

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/esote/queue"
)

func main() {
	sqlite3, err := queue.NewSqlite3Queue("test.db")
	if err != nil {
		log.Fatal(err)
	}
	defer sqlite3.Close()

	handler := func(data []byte, err error) {
		if err != nil {
			log.Println(err)
			return
		}
		// Note printing via fmt is racey with multiple workers.
		fmt.Println(string(data))
	}
	// q is an async queue, backed by SQLite3, with one worker. It is
	// generally recommended to have multiple workers.
	q, err := queue.NewAsyncQueue(sqlite3, handler, 1)
	if err != nil {
		log.Fatal(err)
	}
	defer q.Close()

	msgs := []string{"hi", "hello", "hey", "hiya"}
	for _, msg := range msgs {
		if err = q.Enqueue([]byte(msg)); err != nil {
			log.Fatal(err)
		}
	}

	// In an async queue, data will be dequeued "eventually." For the
	// purposes of this example we wait for all of it to be processed.
	time.Sleep(10 * time.Millisecond)

}
Output:

hi
hello
hey
hiya

func NewAsyncQueue

func NewAsyncQueue(q Queue, handler Handler, workers int) (AsyncQueue, error)

NewAsyncQueue creates an async queue that processes inner queue data through a handler and worker pool. Closing the async queue does NOT close the inner queue.

type Handler

type Handler func(data []byte, err error)

Handler operates on data from the async queue. Err comes from the inner queue's dequeue operation when err is not ErrEmpty.

type Queue

type Queue interface {
	// Add data to the queue. Safe for concurrent use.
	Enqueue(data []byte) error

	// Remove data from the queue. Safe for concurrent use. Returns ErrEmpty
	// if the queue contains no data.
	Dequeue() ([]byte, error)

	// Close the queue.
	io.Closer
}

Queue contains data.

func NewMemoryQueue

func NewMemoryQueue() Queue

NewMemoryQueue creates an in-memory queue.

func NewSqlite3Queue

func NewSqlite3Queue(file string) (Queue, error)

NewSqlite3Queue creates an SQLite3-backed queue with ACID properties.

Directories

Path Synopsis
internal
pkg
httpq
Package httpq is a specialization of queue.AsyncQueue for HTTP requests.
Package httpq is a specialization of queue.AsyncQueue for HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL