dque: github.com/joncrlsn/dque Index | Examples | Files

package dque

import "github.com/joncrlsn/dque"

Package dque is a fast embedded durable queue for Go

Index

Examples

Package Files

queue.go segment.go util.go

Variables

var (

    // ErrEmpty is returned when attempting to dequeue from an empty queue.
    ErrEmpty = errors.New("dque is empty")
)
var ErrQueueClosed = errors.New("queue is closed")

ErrQueueClosed is the error returned when a queue is closed.

type DQue Uses

type DQue struct {
    Name    string
    DirPath string
    // contains filtered or unexported fields
}

DQue is the in-memory representation of a queue on disk. You must never have two *active* DQue instances pointing at the same path on disk. It is acceptable to reconstitute a new instance from disk, but make sure the old instance is never enqueued to (or dequeued from) again.

ExampleDQue shows how the queue works

Code:

package main

//
// Example usage
// Run with: go test -v example_test.go
//

import (
    "fmt"
    "log"

    "github.com/joncrlsn/dque"
)

// Item is what we'll be storing in the queue.  It can be any struct
// as long as the fields you want stored are public.
type Item struct {
    Name string
    Id   int
}

// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
    return &Item{}
}

// ExampleDQue shows how the queue works
func main() {
    qName := "item-queue"
    qDir := "/tmp"
    segmentSize := 50

    // Create a new queue with segment size of 50
    q, err := dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
    if err != nil {
        log.Fatal("Error creating new dque ", err)
    }

    // Add an item to the queue
    if err := q.Enqueue(&Item{"Joe", 1}); err != nil {
        log.Fatal("Error enqueueing item ", err)
    }
    log.Println("Size should be 1:", q.Size())

    // Properly close a queue
    q.Close()

    // You can reconsitute the queue from disk at any time
    q, err = dque.Open(qName, qDir, segmentSize, ItemBuilder)
    if err != nil {
        log.Fatal("Error opening existing dque ", err)
    }

    // Peek at the next item in the queue
    var iface interface{}
    if iface, err = q.Peek(); err != nil {
        if err != dque.ErrEmpty {
            log.Fatal("Error peeking at item", err)
        }
    }
    log.Println("Peeked at:", iface)

    // Dequeue the next item in the queue
    if iface, err = q.Dequeue(); err != nil && err != dque.ErrEmpty {
        log.Fatal("Error dequeuing item:", err)
    }
    log.Println("Dequeued an interface:", iface)
    log.Println("Size should be zero:", q.Size())

    go func() {
        err := q.Enqueue(&Item{"Joe", 1})
        log.Println("Enqueued from goroutine", err == nil)
    }()

    // Dequeue the next item in the queue and block until one is available
    if iface, err = q.DequeueBlock(); err != nil {
        log.Fatal("Error dequeuing item ", err)
    }

    // Assert type of the response to an Item pointer so we can work with it
    item, ok := iface.(*Item)
    if !ok {
        log.Fatal("Dequeued object is not an Item pointer")
    }

    doSomething(item)
}

func doSomething(item *Item) {
    fmt.Println("Dequeued:", item)
}

func New Uses

func New(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

New creates a new durable queue

func NewOrOpen Uses

func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

NewOrOpen either creates a new queue or opens an existing durable queue.

func Open Uses

func Open(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

Open opens an existing durable queue.

func (*DQue) Close Uses

func (q *DQue) Close() error

Close releases the lock on the queue rendering it unusable for further usage by this instance. Close will return an error if it has already been called.

func (*DQue) Dequeue Uses

func (q *DQue) Dequeue() (interface{}, error)

Dequeue removes and returns the first item in the queue. When the queue is empty, nil and dque.ErrEmpty are returned.

func (*DQue) DequeueBlock Uses

func (q *DQue) DequeueBlock() (interface{}, error)

DequeueBlock behaves similar to Dequeue, but is a blocking call until an item is available.

func (*DQue) Enqueue Uses

func (q *DQue) Enqueue(obj interface{}) error

Enqueue adds an item to the end of the queue

func (*DQue) Peek Uses

func (q *DQue) Peek() (interface{}, error)

Peek returns the first item in the queue without dequeueing it. When the queue is empty, nil and dque.ErrEmpty are returned. Do not use this method with multiple dequeueing threads or you may regret it.

func (*DQue) PeekBlock Uses

func (q *DQue) PeekBlock() (interface{}, error)

PeekBlock behaves similar to Peek, but is a blocking call until an item is available.

func (*DQue) SegmentNumbers Uses

func (q *DQue) SegmentNumbers() (int, int)

SegmentNumbers returns the number of both the first last segmment. There is likely no use for this information other than testing.

func (*DQue) Size Uses

func (q *DQue) Size() int

Size locks things up while calculating so you are guaranteed an accurate size... unless you have changed the itemsPerSegment value since the queue was last empty. Then it could be wildly inaccurate.

func (*DQue) SizeUnsafe Uses

func (q *DQue) SizeUnsafe() int

SizeUnsafe returns the approximate number of items in the queue. Use Size() if having the exact size is important to your use-case.

The return value could be wildly inaccurate if the itemsPerSegment value has changed since the queue was last empty. Also, because this method is not synchronized, the size may change after entering this method.

func (*DQue) Turbo Uses

func (q *DQue) Turbo() bool

Turbo returns true if the turbo flag is on. Having turbo on speeds things up significantly.

func (*DQue) TurboOff Uses

func (q *DQue) TurboOff() error

TurboOff re-enables the "safety" mode that syncs every file change to disk as they happen. If turbo is already off an error is returned

func (*DQue) TurboOn Uses

func (q *DQue) TurboOn() error

TurboOn allows the filesystem to decide when to sync file changes to disk. Throughput is greatly increased by turning turbo on, however there is some risk of losing data if a power-loss occurs. If turbo is already on an error is returned

func (*DQue) TurboSync Uses

func (q *DQue) TurboSync() error

TurboSync allows you to fsync changes to disk, but only if turbo is on. If turbo is off an error is returned

type ErrCorruptedSegment Uses

type ErrCorruptedSegment struct {
    Path string
    Err  error
}

ErrCorruptedSegment is returned when a segment file cannot be opened due to inconsistent formatting. Recovery may be possible by clearing or deleting the file, then reloading using dque.New().

func (ErrCorruptedSegment) Error Uses

func (e ErrCorruptedSegment) Error() string

Error returns a string describing ErrCorruptedSegment

func (ErrCorruptedSegment) Unwrap Uses

func (e ErrCorruptedSegment) Unwrap() error

Unwrap returns the wrapped error

type ErrUnableToDecode Uses

type ErrUnableToDecode struct {
    Path string
    Err  error
}

ErrUnableToDecode is returned when an object cannot be decoded.

func (ErrUnableToDecode) Error Uses

func (e ErrUnableToDecode) Error() string

Error returns a string describing ErrUnableToDecode error

func (ErrUnableToDecode) Unwrap Uses

func (e ErrUnableToDecode) Unwrap() error

Unwrap returns the wrapped error

Package dque imports 14 packages (graph). Updated 2020-05-15. Refresh now. Tools for package owners.