bigqueue: github.com/grandecola/bigqueue Index | Files

package bigqueue

import "github.com/grandecola/bigqueue"

Package bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped file. bigqueue is currently not thread safe. To use bigqueue in parallel context, a Write lock needs to be acquired (even for Read APIs).

Create or open a bigqueue:

bq, err := bigqueue.NewQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), bigqueue.SetMaxInMemArenas(10))
defer bq.Close()

In this case, bigqueue will never allocate more memory than `4KB*10=40KB`. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. *This is a best effort flush*. Elapsed time and number of mutate operations are only checked upon an enqueue/dequeue.

This is how you can set these options -

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))   // size = 1

bigqueue allows writing string data directly, avoiding conversion to `[]byte`:

err := bq.EnqueueString("elem")   // size = 2

Read from bigqueue:

elem, err := bq.Peek()        // size = 1
err := bq.Dequeue()           // size = 0

we can also read string data from bigqueue:

elem, err := bq.PeekString()  // size = 1
err := bq.Dequeue()           // size = 0

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()

Index

Package Files

arena.go arenamanager.go bigqueue.go config.go doc.go index.go read.go write.go

Variables

var (
    // ErrTooSmallArenaSize is returned when arena size is smaller than OS page size
    ErrTooSmallArenaSize = errors.New("too small arena size")
    // ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3
    ErrTooFewInMemArenas = errors.New("too few in memory arenas")
    // ErrMustBeGreaterThanZero is returned when a config value has non-positive value
    ErrMustBeGreaterThanZero = errors.New("must be greater than zero")
)
var (
    // ErrEmptyQueue is returned when peek/dequeue is performed on an empty queue
    ErrEmptyQueue = errors.New("queue is empty")
)
var (
    // ErrInvalidArenaSize is returned when persisted arena size
    // doesn't match with desired arena size
    ErrInvalidArenaSize = errors.New("mismatch in arena size")
)

type MmapQueue Uses

type MmapQueue struct {
    // contains filtered or unexported fields
}

MmapQueue implements Queue interface

func (*MmapQueue) Close Uses

func (q *MmapQueue) Close() error

Close will close index and arena manager

func (*MmapQueue) Dequeue Uses

func (q *MmapQueue) Dequeue() error

Dequeue removes an element from the queue

func (*MmapQueue) Enqueue Uses

func (q *MmapQueue) Enqueue(message []byte) error

Enqueue adds a new slice of byte element to the tail of the queue

func (*MmapQueue) EnqueueString Uses

func (q *MmapQueue) EnqueueString(message string) error

EnqueueString adds a new string element to the tail of the queue

func (*MmapQueue) Flush Uses

func (q *MmapQueue) Flush() error

Flush syncs the in memory content of bigqueue to disk

func (*MmapQueue) IsEmpty Uses

func (q *MmapQueue) IsEmpty() bool

IsEmpty returns true when queue is empty

func (*MmapQueue) Peek Uses

func (q *MmapQueue) Peek() ([]byte, error)

Peek returns the head (slice of bytes) of the queue

func (*MmapQueue) PeekString Uses

func (q *MmapQueue) PeekString() (string, error)

PeekString returns the head (string) of the queue

type Option Uses

type Option func(*bqConfig) error

Option is function type that takes a bqConfig object and sets various config parameters in the object

func SetArenaSize Uses

func SetArenaSize(arenaSize int) Option

SetArenaSize returns an Option closure that sets the arena size

func SetMaxInMemArenas Uses

func SetMaxInMemArenas(maxInMemArenas int) Option

SetMaxInMemArenas returns an Option closure that sets maximum number of Arenas that could reside in memory (RAM) at any time. By default, all the arenas reside in memory and Operating System takes care of memory by paging in and out the pages from disk. A recommended value of maximum arenas that should be in memory is chosen such that -

maxInMemArenas > 2 + (maximum message size / arena size)
maxInMemArenas < (total available system memory - 1GB) / arena size

func SetPeriodicFlushDuration Uses

func SetPeriodicFlushDuration(flushPeriod time.Duration) Option

SetPeriodicFlushDuration returns an Option that sets a periodic flush every given duration after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and elapsed time is checked upon an enqueue/dequeue only.

For durability this value should be low. For performance this value should be high.

func SetPeriodicFlushOps Uses

func SetPeriodicFlushOps(flushMutOps int64) Option

SetPeriodicFlushOps returns an Option that sets the number of mutate operations (enqueue/dequeue) after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and number of mutate operations is checked upon an enqueue/dequeue.

For durability this value should be low. For performance this value should be high.

type Queue Uses

type Queue interface {
    IsEmpty() bool
    Dequeue() error
    Flush() error
    Close() error

    Peek() ([]byte, error)
    Enqueue([]byte) error
    PeekString() (string, error)
    EnqueueString(string) error
}

Queue provides an interface to big, fast and persistent queue

func NewMmapQueue Uses

func NewMmapQueue(dir string, opts ...Option) (Queue, error)

NewMmapQueue constructs a new persistent queue

Package bigqueue imports 9 packages (graph). Updated 2019-05-06. Refresh now. Tools for package owners.