qr

package module
v0.0.0-...-63849de Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2016 License: MIT Imports: 12 Imported by: 8

README

In-process queue with disk based overflow.

When everything is fine elements flow over Qr.q. This is a simple channel connecting the producer(s) and the consumer(s). If that channel is full elements are written to the Qr.planb channel. swapout() will write all elements from Qr.planb to disk. It makes a new file every timeout. At the same time swapin() will deal with completed files. swapin() will open the oldest file and write the elements to Qr.q.

  ---> Enqueue()   ------   .q   ----->    merge() -> .out -> Dequeue() --->
           \                                 ^
         .planb                         .confluence
            \                               /
             \--> swapout()     swapin() --/
                     \             ^
                      \--> fs() --/

Gob is used to serialize entries; custom types should be registered using gob.Register().

Same idea as https://github.com/alicebob/q but cleaner, and this queue doesn't care about keeping things ordered.

&c.

Build Status

Documentation

Overview

Package qr is an in process queue with disk based overflow. Element order is not strictly preserved.

When everything is fine elements flow over Qr.q. This is a simple channel connecting the producer(s) and the consumer(s). If that channel is full elements are written to the Qr.planb channel. swapout() will write all elements from Qr.planb to disk. It makes a new file every `timeout`. At the same time swapin() will deal with completed files. swapin() will open the oldest file and write the elements to Qr.q.

---> Enqueue()   ------   .q   ----->    merge() -> .out -> Dequeue() --->
         \                                 ^
       .planb                         .confluence
          \                               /
           \--> swapout()     swapin() --/
                   \             ^
                    \--> fs() --/

Gob is used to serialize entries; custom types should be registered using gob.Register().

Example
package main

import (
	"fmt"
	"github.com/alicebob/qr"
)

func main() {
	q, err := qr.New(
		"/tmp/",
		"example",
		qr.OptionBuffer(100),
		qr.OptionTest("your datatype"),
	)
	if err != nil {
		panic(err)
	}
	defer q.Close()
	go func() {
		for e := range q.Dequeue() {
			fmt.Printf("We got: %v\n", e)
		}
	}()

	// elsewhere:
	q.Enqueue("aap")
	q.Enqueue("noot")
}
Output:

Index

Examples

Constants

View Source
const (
	// DefaultTimeout can be changed with OptionTimeout.
	DefaultTimeout = 10 * time.Second
	// DefaultBuffer can be changed with OptionBuffer.
	DefaultBuffer = 1000
)

Variables

View Source
var (
	// ErrInvalidPrefix is potentially returned by New.
	ErrInvalidPrefix = errors.New("invalid prefix")
)

Functions

This section is empty.

Types

type Option

type Option func(qr *Qr) error

Option is an option to New(), which can change some settings.

func OptionBuffer

func OptionBuffer(n int) Option

OptionBuffer is an option for New(). It specifies the in-memory size of the queue. Smaller means the disk will be used sooner, larger means more memory.

func OptionLogger

func OptionLogger(l func(string, ...interface{})) Option

OptionLogger is an option for New(). Is sets the logger, the default is log.Printf, but glog.Errorf would also work.

func OptionTest

func OptionTest(t interface{}) Option

OptionTest is an option for New(). It tests that the given sample item can be serialized to disk and deserialized successfully. This verifies that disk access works, and that the type can be fully serialized and deserialized with gob. The option can be repeated.

func OptionTimeout

func OptionTimeout(t time.Duration) Option

OptionTimeout is an option for New(). It specifies the time after which a queue file is closed. Smaller means more files.

type Qr

type Qr struct {
	// contains filtered or unexported fields
}

Qr is a disk-based queue. Create one with New().

func New

func New(dir, prefix string, options ...Option) (*Qr, error)

New starts a Queue which stores files in <dir>/<prefix>-.<timestamp>.qr 'prefix' must be a simple ASCII string.

func (*Qr) Close

func (qr *Qr) Close()

Close shuts down all Go routines and closes the Dequeue() channel. It'll write all in-flight entries to disk. Calling Enqueue() after Close will panic.

func (*Qr) Dequeue

func (qr *Qr) Dequeue() <-chan interface{}

Dequeue is the channel where elements come out the queue. It'll be closed on Close().

func (*Qr) Enqueue

func (qr *Qr) Enqueue(e interface{})

Enqueue adds something in the queue. This never blocks, and is safe to be called by different goroutines.

func (*Qr) FileCount

func (qr *Qr) FileCount() int

FileCount gives the number of files on disk. Useful to graph to get an idea about disk usage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL