kvq

package
v0.0.0-...-bdd3411 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2015 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package kvq is a persistent transactional queue. It supports goleveldb, levigo and Bolt as backends.

To open a queue, first get a database instance, and then the queue itself. A DB can contain many queues of different names.

db, err := kvq.Open("db.db")
queue, err := db.Queue("tweets")

(Remember to Close() when finished)

To push data onto a queue, start a transaction, add values, and then commit.

t, err := q.Transaction()
defer t.Close()
t.Put([]byte("hello"))
t.Put([]byte("world"))
err = t.Commit()

To take data from the queue, start a transaction, take values, and the commit.

t, err := q.Transaction()
defer t.Close()
value, err := t.Take()
err = t.Commit()

Items on the queue are keyed by the time at which they were Put(), so the push order should generally match the pop order (barring any run-time pauses).

To retrieve a certain number of items within a timeframe, use Txn.TakeN:

values, err := t.TakeN(100, 5 * time.Second)

This will take up to 100 items within 5 seconds (total), and return the values when either of those conditions are met.

Data in transactions are only persisted when Commit() is called. Any items put in a transaction do not become available for taking until the transaction is committed. Similarly, any items taken in a transaction are made available again if the transaction is closed without being committed.

Index

Constants

View Source
const (
	// DefaultMaxQueue is the default maximum queue capacity.
	DefaultMaxQueue int = 1e6
)

Variables

View Source
var (
	// DefaultOptions holds the default settings to use when creating a queue.
	DefaultOptions = QueueOptions{
		MaxQueue: DefaultMaxQueue,
	}
	// ErrInsufficientCapacity is returned if the queue does not have enough
	// space to add the requested item(s).
	ErrInsufficientCapacity = errors.New("insufficient queue capacity")
)

Functions

func Destroy

func Destroy(path string) error

Types

type DB

type DB struct {
	backend.DB
}

DB wraps the backend being used.

func NewDB

func NewDB(db backend.DB) *DB

NewDB creates a new DB instance from a backend database.

func Open

func Open(path string) (*DB, error)

func (*DB) Queue

func (db *DB) Queue(namespace string) (*Queue, error)

Queue opens a queue within the given namespace (key prefix), whereby keys are prefixed with the namespace value and a NUL byte, followed by the ID of the queued item.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue encapsulates a namespaced queue held by a DB.

func NewQueue

func NewQueue(db backend.DB, namespace string, opts *QueueOptions) (*Queue, error)

NewQueue instantiates a new queue from the given database and namespace.

func (*Queue) Clear

func (q *Queue) Clear() error

Clear removes all entries in the DB. Do not call if any transactions are in progress.

func (Queue) Size

func (q Queue) Size() int

Size returns the number of keys currently available within the queue. This does not include keys that are in the process of being put or taken.

func (*Queue) Transaction

func (q *Queue) Transaction() *Txn

Transaction starts a new transaction on the queue.

type QueueOptions

type QueueOptions struct {
	// MaxQueue is the capacity of the queue. Items will start to be rejected
	// if the queue reaches this size.
	MaxQueue int
}

QueueOptions specifies the operational parameters of a queue

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn represents a transaction on a Queue

func NewTxn

func NewTxn(q *Queue) *Txn

NewTxn returns a new Txn that operates on the given Queue.

func (*Txn) Close

func (txn *Txn) Close() error

Close reverts all changes from the transaction and releases any held resources. The Txn will remain valid for further use.

func (*Txn) Commit

func (txn *Txn) Commit() error

Commit writes transaction to storage. The Txn will remain valid for further use.

func (*Txn) Put

func (txn *Txn) Put(v []byte) error

Put inserts the data into the queue.

func (*Txn) Reset

func (txn *Txn) Reset()

Reset empties the transaction and resets it to an empty (default) state.

func (*Txn) Take

func (txn *Txn) Take() ([]byte, error)

Take gets an item from the queue, returning nil if no items are available.

func (*Txn) TakeN

func (txn *Txn) TakeN(n int, t time.Duration) ([][]byte, error)

TakeN gets `n` items from the queue, waiting at most `t` for them to all become available. If no items are available, nil is returned.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL