clouddb

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 28 Imported by: 0

README

GoDoc

CloudDB

Another attempt at building a fault tolerant storage system.

This provides no guarantee of ACID compliance, no transaction isolation and such.

It can store JSON objects only.

  • Primary keys will need to be globally unique, using for example uuid.NewRandom()
  • JSON object contains a number of values, always start with '{' (no support for root to be anything else than an object)
  • Search index keys can be specified and will be replicated globally
  • Any update will be transfromed into a delta JSON and replicated as delta only (TODO)
  • All updates will have a timestamp and be applied based on timestamp value
  • A global log of all updates is stored and kept for a limited amount of time (TODO cleanup of old records)
  • All checkpoints also have a log count value that represents the size of work so far and allows detecting missing logs
  • Local data is stored in a single leveldb in order to allow consistent snapshots

Database consistency:

  • Log entries sorted by nanotime timestamp
  • Checkpoints will include number of known log entries
  • We produce one checkpoint per 24 hours by default in order to ensure that even if a server goes down it can come back up

Leveldb prefixes

leveldb prefixes are fixed at 3 bytes for ease of storage.

  • idx+key = id (indices)
  • log+logid = log (journal, indexed by timestamp + hash)
  • dat+id = data (data)
  • nfo+id = version (16 bytes current record version)
  • kdt+id = keys (key data, 32 bits length of key followed by key, repeated for each key)
  • typ+type_name = type data
  • chk+id = checkpoint
Journal

Key: log + timestamp(16bytes) + hash(sha256)

checkpoints

  • We perform one checkpoint every 24 hours
  • Checkpoint is rounded timestamp + number of log entries in the past 24 hours + xor of log keys
  • We only keep 100 latest checkpoints (3+ months)

snapshots

  • We perform a snapshot within a few minutes of a checkpoint being reached
  • The snapshot will include all the changes up to the checkpoint, and some more
  • Upon starting from zero, a node will load the latest snapshot and download it, then apply subsequent log except for log already included in the snapshot since considered already applied
  • Log entries prior to the snapshot date will not be included in the snapshot

Documentation

Index

Constants

View Source
const (
	RecordSet    dblogType = iota // set the full value of a record
	RecordDelete                  // delete a record
)
View Source
const (
	PktGetInfo     = 0x00 // GetInfo()
	PktGetLogIds   = 0x01 // GetLogs(<peer>, <epoch>)
	PktFetchLog    = 0x02 // FetchLog(<key>) uses no response packet id, response is always inline
	PktGetInfoResp = 0x80 // GetInfo()|Response
	PktLogPush     = 0x40 // pushed log entry
	PktLogIdsPush  = 0x41 // LogIdPush(<peer>, ...<logid>)

	ResponseFlag = 0x80 // response flag
)

Variables

View Source
var (
	ErrKeyConflict = errors.New("record's key conflicts with an existing record")
)

Functions

func RegisterType

func RegisterType(t *Type)

func ValidateBackup added in v0.1.10

func ValidateBackup(r io.Reader) error

ValidateBackup will read a backup and check for some possible errors, such as corrupted checkpoints

It is a good practice to call ValidateBackup after creating a backup to ensure it will be possible to restore it in the future should the need arise.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

func New

func New(name string, rpc RPC) (*DB, error)

func (*DB) BackupTo added in v0.1.10

func (d *DB) BackupTo(w io.Writer) error

BackupTo will generate a snapshot and write it to the given writer. The format is fairly simple and can be fed to RestoreFrom to restore the database to a given point in time.

Backup will include part of the database record modification history, so the backup data will usually be larger than the original database, but because the data has a lot of repetition, compression should be effective.

func (*DB) Close

func (d *DB) Close() error

func (*DB) DebugDump

func (d *DB) DebugDump(w io.Writer)

DebugDump will dump the whole database to log.DefaultLogger

func (*DB) Delete

func (d *DB) Delete(id []byte) error

Delete globally removes an entry from the database

func (*DB) Get

func (d *DB) Get(id []byte, target any) error

Get will load and apply the object for the given id to target

func (*DB) GetRaw

func (d *DB) GetRaw(id []byte) (json.RawMessage, error)

GetRaw returns the json value for a given id

func (*DB) GetStatus

func (d *DB) GetStatus() Status

GetStatus returns the current status of this database instance

func (*DB) Has added in v0.1.2

func (d *DB) Has(id []byte) (bool, error)

Has will check if a given key exists

func (*DB) NewIterator added in v0.1.11

func (d *DB) NewIterator(start, limit []byte) Iterator

func (*DB) NewIteratorPrefix added in v0.1.11

func (d *DB) NewIteratorPrefix(pfx []byte) Iterator

func (*DB) Reindex added in v0.1.10

func (d *DB) Reindex() error

Reindex will re-generate all the indices for the records in the database

func (*DB) RestoreFrom added in v0.1.10

func (d *DB) RestoreFrom(r io.Reader) error

RestoreFrom will read a backup and restore the database from it. The current database must be pristine, or this will fail.

After a restore, in case of a cluster configuration, other peers will start replicating the data. This method will skip some basic checks on the input, and ValidateBackup should be called first to ensure backup usability (actually, ValidateBackup should be called after saving the backup as detecting invalid backups is more useful while the database still exists).

func (*DB) Search added in v0.1.12

func (d *DB) Search(typ string, search map[string]any) (Iterator, error)

func (*DB) SearchFirst

func (d *DB) SearchFirst(typ string, search map[string]any, target any) ([]byte, error)

SearchFirst will find the first record matching the search params and set target search must match an existing key in the provided type

func (*DB) SearchFirstRaw

func (d *DB) SearchFirstRaw(typ string, search map[string]any) ([]byte, json.RawMessage, error)

SearchFirstRaw will find the first record matching the search params and return its json value search must match an existing key in the provided type

func (*DB) Set

func (d *DB) Set(id []byte, val any) error

Set will update a given record in database. id must be unique in the whole db, even across types

func (*DB) String

func (d *DB) String() string

func (*DB) WaitReady

func (d *DB) WaitReady()

WaitReady waits for the database instance to be ready, or does nothing if it is already ready

type Iterator added in v0.1.11

type Iterator interface {
	// Key returns the key of the currently seeked item
	Key() []byte

	// Value return the raw value of the currently seeked item. Use Apply() to obtain the value as
	// an object.
	Value() []byte

	// Release releases resources and must be called after using the iterator
	Release()

	// Next seeks to the next record
	Next() bool

	// Prev seeks to the previous record
	Prev() bool

	// Seek seeks to the index specified and returns if it exists
	Seek([]byte) bool

	// Apply will decode the contents of the value currently pointed by Iterator
	// into the object passed by reference
	Apply(v any) error
}

type RPC

type RPC interface {
	// All will send a given data object to all other RPC instances on the fleet
	// and will collect responses
	All(ctx context.Context, data []byte) ([]any, error)

	// Broadcast will do the same as All but will not wait for responses
	Broadcast(ctx context.Context, data []byte) error

	// Request will send a given object to a specific peer and return the response
	Request(ctx context.Context, id string, data []byte) ([]byte, error)

	// Send will send a given buffer to a specific peer and ignore the response
	Send(ctx context.Context, id string, data []byte) error

	// Self will return the id of the local peer, can be used for other instances
	// to contact here with Send().
	Self() string

	// ListPeers returns a list of connected peers
	ListOnlinePeers() []string

	// CountAllPeers return the number of known connected or offline peers
	CountAllPeers() int

	// Connect connects this RPC instance incoming events to a given function
	// that will be called each time an event is received.
	Connect(cb func(context.Context, []byte) ([]byte, error))
}

RPC interface is standardized

type Status

type Status int
const (
	Initializing Status = iota
	Syncing
	Ready
	Error
)

func (Status) String added in v0.1.4

func (s Status) String() string

type Type

type Type struct {
	Name    string     `json:"name"`
	Keys    []*TypeKey `json:"keys"`
	Version int        `json:"version"` // version for the definition, can be set to zero
}

Type is a registered index type used in records. A record of a given type will inherit the passed keys and be indexed based on it. Some keys will always exist, such as to ensure objects can be referenced at all times.

Types will be stored in the database to ensure records can be indexed properly on first try.

func (*Type) Hash

func (t *Type) Hash() []byte

type TypeKey

type TypeKey struct {
	Name   string   `json:"name"`
	Fields []string `json:"fields"`
	Method string   `json:"method"` // one of: utf8, int, binary
	Unique bool     `json:"unique,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL