Documentation ¶
Overview ¶
Package kvq is a persistent transactional queue. It supports goleveldb, levigo and Bolt as backends.
To open a queue, first get a database instance, and then the queue itself. A DB can contain many queues of different names.
db, err := kvq.Open("db.db") queue, err := db.Queue("tweets")
(Remember to Close() when finished)
To push data onto a queue, start a transaction, add values, and then commit.
t, err := q.Transaction() defer t.Close() t.Put([]byte("hello")) t.Put([]byte("world")) err = t.Commit()
To take data from the queue, start a transaction, take values, and the commit.
t, err := q.Transaction() defer t.Close() value, err := t.Take() err = t.Commit()
Items on the queue are keyed by the time at which they were Put(), so the push order should generally match the pop order (barring any run-time pauses).
To retrieve a certain number of items within a timeframe, use Txn.TakeN:
values, err := t.TakeN(100, 5 * time.Second)
This will take up to 100 items within 5 seconds (total), and return the values when either of those conditions are met.
Data in transactions are only persisted when Commit() is called. Any items put in a transaction do not become available for taking until the transaction is committed. Similarly, any items taken in a transaction are made available again if the transaction is closed without being committed.
Index ¶
Constants ¶
const ( // DefaultMaxQueue is the default maximum queue capacity. DefaultMaxQueue int = 1e6 )
Variables ¶
var ( // DefaultOptions holds the default settings to use when creating a queue. DefaultOptions = QueueOptions{ MaxQueue: DefaultMaxQueue, } // ErrInsufficientCapacity is returned if the queue does not have enough // space to add the requested item(s). ErrInsufficientCapacity = errors.New("insufficient queue capacity") )
Functions ¶
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue encapsulates a namespaced queue held by a DB.
func (*Queue) Clear ¶
Clear removes all entries in the DB. Do not call if any transactions are in progress.
func (Queue) Size ¶
Size returns the number of keys currently available within the queue. This does not include keys that are in the process of being put or taken.
func (*Queue) Transaction ¶
Transaction starts a new transaction on the queue.
type QueueOptions ¶
type QueueOptions struct { // MaxQueue is the capacity of the queue. Items will start to be rejected // if the queue reaches this size. MaxQueue int }
QueueOptions specifies the operational parameters of a queue
type Txn ¶
type Txn struct {
// contains filtered or unexported fields
}
Txn represents a transaction on a Queue
func (*Txn) Close ¶
Close reverts all changes from the transaction and releases any held resources. The Txn will remain valid for further use.
func (*Txn) Commit ¶
Commit writes transaction to storage. The Txn will remain valid for further use.
func (*Txn) Reset ¶
func (txn *Txn) Reset()
Reset empties the transaction and resets it to an empty (default) state.