rocksq

package module
v0.0.0-...-db047f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2015 License: MIT Imports: 12 Imported by: 0

README

rocksq

An embeded persistent queue based on RocksDB

Example

	store, err := rocksq.NewStore(rocksq.StoreOptions{ 
		Directory:  "/opt/rocksq",
		MemorySize: 5 * 1024 * 1024,
	})
	defer store.Close()
	
	q, err := store.NewQueue("queue_name")
	msg := "Hello World"
	if _, err := q.Enqueue([]byte(msg)); err != nil {
		// ....
	}
	
	id, data, err := q.Dequeue()
	// ....
	
	id, anotherData, err := q.Dequeue(id) // boosting the seek
	if err == rocksq.EmptyQueue {
		break
	}

Benchmark

BenchmarkEnqueue-4        500000              6798 ns/op
BenchmarkDequeue-4        200000             16434 ns/op

DiableWAL = true

BenchmarkEnqueue-4        500000              4983 ns/op
BenchmarkDequeue-4        200000             17159 ns/op

SetFillCache = false, DisableWAL = true

BenchmarkEnqueue-4        500000              4623 ns/op
BenchmarkDequeue-4        300000             10098 ns/op

DisableWAL = true, UseTailing = true

BenchmarkEnqueue-4        500000              4690 ns/op
BenchmarkDequeue-4        500000              6108 ns/op

PS: Tailing iterator really helps a lot to reduce the seek cost, but the test case maybe not that realistic since only very few seek happens.

Notes

  1. Rocksq uses the tailing iterator inside the Dequeue which I don't think it's thread safe, so should set the StoreOptions.DisableTailing = true for this.

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	EmptyQueue = errors.New("No new message in the queue")
)

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) ApproximateSize

func (q *Queue) ApproximateSize() uint64

func (*Queue) Close

func (q *Queue) Close()

func (*Queue) Dequeue

func (q *Queue) Dequeue(startId ...uint64) (uint64, []byte, error)

func (*Queue) DequeueGob

func (q *Queue) DequeueGob(value interface{}, startId ...uint64) (uint64, error)

func (*Queue) DequeueJson

func (q *Queue) DequeueJson(value interface{}, startId ...uint64) (uint64, error)

func (*Queue) DequeueString

func (q *Queue) DequeueString(startId ...uint64) (uint64, string, error)

func (*Queue) Enqueue

func (q *Queue) Enqueue(data []byte) (uint64, error)

func (*Queue) EnqueueGob

func (q *Queue) EnqueueGob(value interface{}) (uint64, error)

func (*Queue) EnqueueJson

func (q *Queue) EnqueueJson(value interface{}) (uint64, error)

func (*Queue) EnqueueString

func (q *Queue) EnqueueString(value string) (uint64, error)

type Store

type Store struct {
	*rocks.DB
	sync.RWMutex
	// contains filtered or unexported fields
}

Store defines the basic rocksdb wrapper

func NewStore

func NewStore(options StoreOptions) (*Store, error)

NewStore returns the Store a rocksdb wrapper

func (*Store) Close

func (s *Store) Close()

Close the rocksdb database

func (*Store) Destroy

func (s *Store) Destroy()

Destroy the rocksdb instance and data files

func (*Store) NewQueue

func (s *Store) NewQueue(name string) (*Queue, error)

NewQueue will return a named queue using Column Family from RocksDB

type StoreOptions

type StoreOptions struct {
	Directory             string
	WriteBufferSize       int
	WriteBufferNumber     int
	MemorySize            int
	FileSizeBase          uint64
	Compression           rocks.CompressionType
	Parallel              int
	DisableAutoCompaction bool
	DisableWAL            bool
	DisableTailing        bool
	Sync                  bool
	IsDebug               bool
}

StoreOptions defines the options for rocksdb storage

func (*StoreOptions) SetDefaults

func (so *StoreOptions) SetDefaults()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL