rbolt

package module
v0.0.0-...-2912b85 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2017 License: MIT Imports: 4 Imported by: 0

README

RBolt

RBolt is a package for replicating boltdb databases.

Journal

It provides a transaction type which builds a transaction journal. This journal can be played in another transaction to replicate the changes performed during the original one. This is the basic building block for replication.

To use it, call the rbolt.RTx() func in a writeable transaction:

  err := db.Update(func(tx *bolt.Tx) error {
      rtx := rbolt.RTx(tx)
      // ... use rtx
  })
  ...

Transport and JournalBuffer

The Transport interface is the sender side. It forwards the journals somewhere. The Update() function calls boltdb.DB.Update(), creates a journal update record, and either a commit or rollback journal record, and sends those two with the transport. It is optional, use whatever system suits you to send the journal from rbolt.RTx elsewhere.

JournalBuffer acts on the receiver side. It buffers the journals it receives. JournalBuffer.Flush() plays the journals and empties the buffer. It ensures the journals are played in the correct order (not the order they are received, as transports may mix it) when a monotonic LSN is used. Use rbolt.MonotonicLSN for this.

Example with the LocalTransport, for replication in the same go program:

  // db is the "master" db.
  // dbt is the target bolt.DB to synchronize
  jbuf := rbolt.NewJournalBuffer(dbt.DB)
  transport := &rbolt.LocalTransport{JournalBuffer: jbuf}
  lsn := new(rbolt.MonotonicLSN)

  if err := rbolt.DBUpdate(db, transport, lsn, func(tx *rbolt.Tx) error {
      // ... use tx
  }); err != nil {
      ...
  }

  if err := jbuf.Flush(); err != nil {
      ...
  }

Performance

The overhead is when recording what happens during a writeable transaction. Read-only transactions are not affected. There is a cost in memory since key/values written in the journal are copies (byte-slices are valid only during the transaction):

  BenchmarkRTx-8   	    5000	   4024116 ns/op	 1256659 B/op	   19527 allocs/op
  BenchmarkTx-8    	    5000	   3907194 ns/op	  997511 B/op	   16506 allocs/op

and that cost will depend on the size of the keys/values of your database.

Status

Things are working and only ask for more thorough use to ensure it's solid. However the package is still very young, so the API may still change in minor ways. Also, Transport implementations could be good to have to give more ready-to-use options.

People who want to test it and provide feedback are very welcome!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Update

func Update(db *bolt.DB, transport Transport, lsn LSNGenerator, fn func(tx *Tx) error) error

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func (*Bucket) Bucket

func (b *Bucket) Bucket(name []byte) *Bucket

func (*Bucket) CreateBucket

func (b *Bucket) CreateBucket(key []byte) (*Bucket, error)

func (*Bucket) CreateBucketIfNotExists

func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)

func (*Bucket) Cursor

func (b *Bucket) Cursor() *Cursor

func (*Bucket) Delete

func (b *Bucket) Delete(key []byte) error

func (*Bucket) DeleteBucket

func (b *Bucket) DeleteBucket(key []byte) error

func (*Bucket) ForEach

func (b *Bucket) ForEach(fn func(k, v []byte) error) error

func (*Bucket) Get

func (b *Bucket) Get(key []byte) []byte

func (*Bucket) NextSequence

func (b *Bucket) NextSequence() (uint64, error)

func (*Bucket) Put

func (b *Bucket) Put(key []byte, value []byte) error

func (*Bucket) Root

func (b *Bucket) Root() uint64

func (*Bucket) Sequence

func (b *Bucket) Sequence() uint64

func (*Bucket) SetSequence

func (b *Bucket) SetSequence(v uint64) error

func (*Bucket) Stats

func (b *Bucket) Stats() bolt.BucketStats

func (*Bucket) Tx

func (b *Bucket) Tx() *Tx

func (*Bucket) Writable

func (b *Bucket) Writable() bool

type Cursor

type Cursor struct {
	// contains filtered or unexported fields
}

func (*Cursor) Bucket

func (c *Cursor) Bucket() *Bucket

func (*Cursor) Delete

func (c *Cursor) Delete() error

func (*Cursor) First

func (c *Cursor) First() (key []byte, value []byte)

func (*Cursor) Last

func (c *Cursor) Last() (key []byte, value []byte)

func (*Cursor) Next

func (c *Cursor) Next() (key []byte, value []byte)

func (*Cursor) Prev

func (c *Cursor) Prev() (key []byte, value []byte)

func (*Cursor) Seek

func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)

type Journal

type Journal struct {
	LSN  int
	TxID int
	Ws   []W
	Type JournalType
	// contains filtered or unexported fields
}

func (*Journal) Play

func (j *Journal) Play(tx *bolt.Tx) error

type JournalBuffer

type JournalBuffer struct {
	// contains filtered or unexported fields
}

func NewJournalBuffer

func NewJournalBuffer(db *bolt.DB) *JournalBuffer

func (*JournalBuffer) Flush

func (jb *JournalBuffer) Flush() error

func (*JournalBuffer) WriteJournal

func (jb *JournalBuffer) WriteJournal(j *Journal)

type JournalType

type JournalType int
const (
	JournalTypeUpdate JournalType = iota + 1
	JournalTypeCommit
	JournalTypeRollback
)

func (JournalType) String

func (t JournalType) String() string

type LSNGenerator

type LSNGenerator interface {
	NextLSN() int
}

type LocalTransport

type LocalTransport struct {
	JournalBuffer *JournalBuffer
}

func (*LocalTransport) Send

func (t *LocalTransport) Send(j *Journal)

type MonotonicLSN

type MonotonicLSN struct {
	// contains filtered or unexported fields
}

func (*MonotonicLSN) NextLSN

func (l *MonotonicLSN) NextLSN() int

type MultiTransport

type MultiTransport struct {
	// contains filtered or unexported fields
}

func (*MultiTransport) Send

func (mt *MultiTransport) Send(j *Journal)

type NullTransport

type NullTransport struct{}

func (NullTransport) Send

func (t NullTransport) Send(_ *Journal)

type Op

type Op int
const (
	OpCreateBucket Op = iota + 1
	OpCreateBucketIfNotExists
	OpDelete
	OpDeleteBucket
	OpPut
	OpBucketCursor
	OpCursorDelete
	OpCursorFirst
	OpCursorLast
	OpCursorNext
	OpCursorPrev
	OpCursorSeek
)

func (Op) String

func (op Op) String() string

type Transport

type Transport interface {
	Send(*Journal)
}

type Tx

type Tx struct {
	*bolt.Tx
	// contains filtered or unexported fields
}

func RTx

func RTx(tx *bolt.Tx) *Tx

func (*Tx) Bucket

func (tx *Tx) Bucket(name []byte) *Bucket

func (*Tx) CreateBucket

func (tx *Tx) CreateBucket(name []byte) (*Bucket, error)

func (*Tx) CreateBucketIfNotExists

func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error)

func (*Tx) DeleteBucket

func (tx *Tx) DeleteBucket(name []byte) error

func (*Tx) ForEach

func (tx *Tx) ForEach(fn func([]byte, *Bucket) error) error

func (*Tx) Journal

func (tx *Tx) Journal() *Journal

type W

type W struct {
	Op       Op
	Path     [][]byte
	CursorID int
	Value    []byte
}

Directories

Path Synopsis
Command bolt-diff compares two bolt databases and reports if they are equal or not.
Command bolt-diff compares two bolt databases and reports if they are equal or not.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL