chotki

package module
v0.0.0-...-73d9541 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 23 Imported by: 0

README

◌ Chotki: fast syncable store

godoc MIT License Build Status

Chotki is a syncable store with really fast counters. Internally, it is pebble db running CRDT natively, using the Replicated Data Interchange format (RDX). Chotki is sync-centric and causally consistent. That means, Chotki replicas can sync master-master, either incrementally in real-time or stay offline to diff-sync periodically. Chotki API is REST/object based, no SQL yet.

Use

Chotki's data model has three key entities:

  1. a class is a list of named fields,
  2. an object is an instance of a class,
  3. a field is a value of a Replicated Data Type (RDT).

Available RDTs are basic FIRST types:

  1. Float,
  2. Integer,
  3. Reference,
  4. String and
  5. Term.

There are also collection ELM types:

  1. Eulerian (set) is an unordered collection of FIRST values,
  2. Linear (array) is an ordered collection of FIRST values, and
  3. Mapping is a collection of key-value pairs (both FIRST).

There are also two counter types:

  1. N are "natural" increment-only uint64 counters and
  2. Z are signed int64 increment/decrement counters.

Objects can Reference each other thus forming object graphs. As an example, an object of class Team has a field Players which is a E-set of references to objects of class Player. Each object of class Player has an Integer field Number, a String field Name, and a counter NetWorth.

Chotki does not support any filter queries yet (of select ... where type). It can fetch an object by its ID or recursively by references (a reference is an ID).

REPL

Chotki REPL supports a number of commands to manage databases, network activities and the data itself. Each command takes an argument in the text version of RDX, which is very close to JSON, e.g.

    ◌ connect "replica:1234"
    ◌ cat b0b-28f 
    ◌ new {_ref:Player, Name: "Lionel Messi", Number: 10}

Each command returns an ID and/or an error.

Text RDX is different from JSON in several aspects: it has the ID type, arbitrary literals apart from true, false and null, also set collections and some other minor differences. On the human readability side, it is pretty much the same thing.

HTTP

Chotki HTTP interface is your typical REST API that accepts and delivers RDX or JSON.

Native API

Chotki aims at implementing protoc-like compilation step producing code for objects, including database store/load, RDX serialization/parsing, etc.

Replicas

The main superpower of Chotki is syncing. Replicas may work offline, reconnect and resync, or they may sync continuously in real time. Chotki so far only supports a spanning-tree overlay network topology. Each replica has a 20-bit number (aka source); a replica can only connect to replicas with lesser src number to prevent loops. E.g. a1ece can connect to b0b, but not the other way around (replica numbers are given in hex).

Implementations of client replicas working on mobile devices or in a browser are planned.

Microbenchmarks

Chotki is based on pebble db, which is an LSM database. A superpower of LSM is "blind writes", i.e. writes with no preceding read necessary. On a Lenovo Yoga laptop, a Chotki replica can do about 1mln blind increments of a counter in about 3 seconds single-thread, be it connected to other replicas or not:

    ◌ sinc {fid:b0b-6-2,count:10000000,ms:0}
    b0b-6-2
    inc storm: 10000000 incs complete for b0b-6-2, elapsed 1m4.506868865s, a1ece-5debd1..a1ece-f68251

Installation

go get -u github.com/drpcorg/chotki

Inner workings

Internally, Chotki is pebble db using RDX merge operators. See the RDX doc for further details on its serialization format (type-length-value) and a very predictable choice of CRDTs.

Comparison to other projects

Overall, RDX is the final version of RON (Replicated Object Notation), a research project that lasted from 2017 till 2022. One may check the first 2017 RON/RDX talk manifesting the project's goals. In that regard, we may compare RON/RDX to Automerge, which is a project of exactly the same age. Both projects started with a columnar-like coding of operations, which Automerge is using to this day, while RDX followed the Einstein's maxim: "Everything should be made as simple as possible, but not simpler". After spending quite some time to cram columnar-encoded CRDT into exising databases, RDX was greatly simplified and now all the RDX CRDT logic fits into a merge operator. That greatly improved the performance. Effectively, that simplicity allows to use a commodity LSM storage engine to natively store arbitrary CRDT.

We can also compare Chotki to a number of JavaScript-centric CRDT databases, such as RxDB, TinyBase or SyncedStore. Historically RON/RDX also has it roots in the JavaScript world. Swarm.js was likely the first CRDT sync lib in history (2013-2018); although it was distilled from the earlier Citrea project (2011-2012). Chotki/RDX has an objective of creating a production-ready scalable CRDT store, which JavaScript does not really allow. Still, we will be extremely happy if some of the JavaScript libs would consider supporting RDX as a unifying format. (Ping us any time!)

The original Chotki project summary

The mission of the system is to keep and update real-time statistics, such as quotas, counters, billing and suchlike. Update propagation time is expected to be close to the theoretic minimum: the one-way delay. We expect to be able to process data at bulk transfer speeds (i.e. as much as we can move by the network we can process in real-time). It is preferred if the database is embedded into the application to minimize response times. The data should (preferably) fit in memory. Consistency guarantees are: causally consistent (strong EC). The data model is hierarchical, JSON like, with an emphasis on numeric values. There are no aggregations or queries on the data, only point lookups and subtree scans. The inner workings of the database is a combination of a self-orginizing overlay network and an LSM like storage engine. A node is not connected to every other node: the topology is closer to a spanning tree. That is to minimize network traffic. The API is object-handle based; the entire object is not raised into the memory; instead, once the user is reading a field, we do a lookup. That wav we minimize reads, de-serializations and GC busywork. Updates are lamport-timestamped, there is a re-sync protocol for newly joining and re-joining replicas. Overall, we take every shortcut to make the store lightweight and fast while focusing on our specific usecase (distributed counters, mainly).

Documentation

Index

Constants

View Source
const (
	IdNames    = id2 + 1
	IdNodes    = id2 + 2
	IdNodeInfo = id2 + 3

	// ID from which we count user static objects
	IdLog1 = id2 + 4
)
View Source
const (
	SyncRead   = 1
	SyncWrite  = 2
	SyncLive   = 4
	SyncRW     = SyncRead | SyncWrite
	SyncRL     = SyncRead | SyncLive
	SyncRWLive = SyncRead | SyncWrite | SyncLive
)
View Source
const (
	SendHandshake = iota
	SendDiff
	SendLive
	SendEOF
	SendNone
)
View Source
const LidLKeyLen = 1 + 8 + 1
View Source
const SyncBlockBits = 28
View Source
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
View Source
const SyncOutQueueLimit = 1 << 20

Variables

View Source
var (
	ErrDbClosed       = errors.New("chotki: db is closed")
	ErrDirnameIsFile  = errors.New("chotki: the dirname is file")
	ErrNotImplemented = errors.New("chotki: not implemented yet")
	ErrHookNotFound   = errors.New("chotki: hook not found")
	ErrBadIRecord     = errors.New("chotki: bad id-ref record")
	ErrBadORecord     = errors.New("chotki: bad id-ref record")
	ErrBadHPacket     = errors.New("chotki: bad handshake packet")
	ErrBadEPacket     = errors.New("chotki: bad E packet")
	ErrBadVPacket     = errors.New("chotki: bad V packet")
	ErrBadYPacket     = errors.New("chotki: bad Y packet")
	ErrBadLPacket     = errors.New("chotki: bad L packet")
	ErrBadTPacket     = errors.New("chotki: bad T packet")
	ErrBadOPacket     = errors.New("chotki: bad O packet")
	ErrSrcUnknown     = errors.New("chotki: source unknown")
	ErrSyncUnknown    = errors.New("chotki: sync session unknown")
	ErrBadRRecord     = errors.New("chotki: bad ref record")
	ErrClosed         = errors.New("chotki: no replica open")

	ErrBadTypeDescription  = errors.New("chotki: bad type description")
	ErrObjectUnknown       = errors.New("chotki: unknown object")
	ErrTypeUnknown         = errors.New("chotki: unknown object type")
	ErrUnknownFieldInAType = errors.New("chotki: unknown field for the type")
	ErrBadValueForAType    = errors.New("chotki: bad value for the type")
	ErrBadClass            = errors.New("chotki: bad class description")

	ErrOutOfOrder      = errors.New("chotki: order fail: sequence gap")
	ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op")
)
View Source
var ClassTemplate = `` /* 1096-byte string literal not displayed */

todo RDX formula

View Source
var ETemplate = `
func (o *{{Name}}) Get{{- Name}}() {
}

func (o *{{Name}}) Put{{- Name}}() {
}
`

todo collection description

View Source
var ErrOffsetOpId = errors.New("op id is offset")
View Source
var ErrWrongFieldType = errors.New("wrong field type")
View Source
var FIRSTnatives = map[byte]string{
	'F': "float64",
	'I': "int64",
	'R': "rdx.ID",
	'S': "string",
	'T': "string",
	'N': "uint64",
	'Z': "int64",
}
View Source
var Log0 = protocol.Records{
	protocol.Record('Y',
		protocol.Record('I', rdx.ID0.ZipBytes()),
		protocol.Record('R', rdx.ID0.ZipBytes()),
		protocol.Record('T', rdx.Ttlv("Name")),
		protocol.Record('T', rdx.Ttlv("S")),
	),
	protocol.Record('C',
		protocol.Record('I', id1.ZipBytes()),
		protocol.Record('R', rdx.ID0.ZipBytes()),
		protocol.Record('T', rdx.Ttlv("Names")),
		protocol.Record('T', rdx.Ttlv("M")),
		protocol.Record('T', rdx.Ttlv("Nodes")),
		protocol.Record('T', rdx.Ttlv("M")),
		protocol.Record('T', rdx.Ttlv("NodeInfo")),
		protocol.Record('T', rdx.Ttlv("M")),
	),
	protocol.Record('O',
		protocol.Record('I', id2.ZipBytes()),
		protocol.Record('R', id1.ZipBytes()),
		protocol.Record('M',
			protocol.Record('T', rdx.Ttlv("0")),
			protocol.Record('R', rdx.Rtlv(rdx.ID0)),
			protocol.Record('T', rdx.Ttlv("Global")),
			protocol.Record('R', rdx.Rtlv(id2)),
			protocol.Record('T', rdx.Ttlv("Names")),
			protocol.Record('R', rdx.Rtlv(IdNames)),
			protocol.Record('T', rdx.Ttlv("Nodes")),
			protocol.Record('R', rdx.Rtlv(IdNodes)),
		),
		protocol.Record('M'),
		protocol.Record('M'),
	),
}

FORMAT: replica creation packet

View Source
var SendStates = []string{
	"SendHandshake",
	"SendDiff",
	"SendLive",
	"SendEOF",
	"SendNone",
}

Functions

func CMerge

func CMerge(inputs [][]byte) (merged []byte)

I id C int

func CState

func CState(sum int64) []byte

func ChotkiKVString

func ChotkiKVString(key, value []byte) string

func EditTLV

func EditTLV(off uint64, rdt byte, tlv []byte) (edit []byte)

func Exists

func Exists(dirname string) (bool, error)

func FieldOffset

func FieldOffset(fields []string, name string) rdx.ID

func LastLit

func LastLit(recs protocol.Records) byte

func OKey

func OKey(id rdx.ID, rdt byte) (key []byte)

func OKeyIdRdt

func OKeyIdRdt(key []byte) (id rdx.ID, rdt byte)

func ObjectKeyRange

func ObjectKeyRange(oid rdx.ID) (fro, til []byte)

func PacketID

func PacketID(pack []byte) rdx.ID

func PacketSrcSeq

func PacketSrcSeq(pack []byte) (src, seq uint32)

PacketSeqSrc picks the I field from the packet. Returns 0,0 if nothing found.

func ParseHandshake

func ParseHandshake(body []byte) (mode uint64, vv rdx.VV, err error)

func ParsePacket

func ParsePacket(pack []byte) (lit byte, id, ref rdx.ID, body []byte, err error)

func ParseVPack

func ParseVPack(vpack []byte) (vvs map[rdx.ID]rdx.VV, err error)

func ProbeI

func ProbeI(input []byte) rdx.ID

func ProbeID

func ProbeID(lit byte, input []byte) rdx.ID

func VKey

func VKey(id rdx.ID) (key []byte)

func VKeyId

func VKeyId(key []byte) rdx.ID

Types

type CallHook

type CallHook struct {
	// contains filtered or unexported fields
}

type Chotki

type Chotki struct {
	// contains filtered or unexported fields
}

TLV all the way down

func Open

func Open(dirname string, opts Options) (*Chotki, error)

func (*Chotki) AddHook

func (cho *Chotki) AddHook(fid rdx.ID, hook Hook)

func (*Chotki) AddPacketHose

func (cho *Chotki) AddPacketHose(name string) (feed protocol.FeedCloser)

func (*Chotki) AddToNField

func (cho *Chotki) AddToNField(fid rdx.ID, count uint64) (id rdx.ID, err error)

func (*Chotki) ApplyC

func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyD

func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyE

func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)

func (*Chotki) ApplyH

func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyOY

func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyV

func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) BroadcastPacket

func (cho *Chotki) BroadcastPacket(records protocol.Records, except string)

func (*Chotki) ClassFields

func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error)

todo []Field -> map[uint64]Field

func (*Chotki) Clock

func (cho *Chotki) Clock() rdx.Clock

func (*Chotki) Close

func (cho *Chotki) Close() error

func (*Chotki) CommitPacket

func (cho *Chotki) CommitPacket(lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)

Here new packets are timestamped and queued for save

func (*Chotki) Connect

func (cho *Chotki) Connect(ctx context.Context, addr string) error

func (*Chotki) Database

func (cho *Chotki) Database() *pebble.DB

func (*Chotki) Directory

func (cho *Chotki) Directory() string

func (*Chotki) Disconnect

func (cho *Chotki) Disconnect(addr string) error

func (*Chotki) Drain

func (cho *Chotki) Drain(recs protocol.Records) (err error)

func (*Chotki) DumpAll

func (cho *Chotki) DumpAll(writer io.Writer)

func (*Chotki) DumpObjects

func (cho *Chotki) DumpObjects(writer io.Writer)

func (*Chotki) DumpVV

func (cho *Chotki) DumpVV(writer io.Writer)

func (*Chotki) EditFieldTLV

func (cho *Chotki) EditFieldTLV(fid rdx.ID, delta []byte) (id rdx.ID, err error)

func (*Chotki) EditObject

func (cho *Chotki) EditObject(oid rdx.ID, fields ...string) (id rdx.ID, err error)

func (*Chotki) EditObjectRDX

func (cho *Chotki) EditObjectRDX(oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)

func (*Chotki) GetFieldTLV

func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte)

func (*Chotki) IncNField

func (cho *Chotki) IncNField(fid rdx.ID) (id rdx.ID, err error)

func (*Chotki) Last

func (cho *Chotki) Last() rdx.ID

func (*Chotki) Listen

func (cho *Chotki) Listen(ctx context.Context, addr string) error

func (*Chotki) NewClass

func (cho *Chotki) NewClass(parent rdx.ID, fields ...Field) (id rdx.ID, err error)

func (*Chotki) NewObject

func (cho *Chotki) NewObject(tid rdx.ID, fields ...string) (id rdx.ID, err error)

func (*Chotki) ObjectFieldMapTermId

func (cho *Chotki) ObjectFieldMapTermId(fid rdx.ID) (themap rdx.MapTR, err error)

func (*Chotki) ObjectFieldTLV

func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)

func (*Chotki) ObjectFields

func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact protocol.Records, err error)

func (*Chotki) ObjectFieldsByClass

func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, tlvs protocol.Records, err error)

func (*Chotki) ObjectFieldsTLV

func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv protocol.Records, err error)

func (*Chotki) ObjectIterator

func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator

returns nil for "not found"

func (*Chotki) ObjectMapper

func (cho *Chotki) ObjectMapper() *ORM

func (*Chotki) ObjectString

func (cho *Chotki) ObjectString(oid rdx.ID) (txt string, err error)

func (*Chotki) RemoveAllHooks

func (cho *Chotki) RemoveAllHooks(fid rdx.ID)

func (*Chotki) RemoveHook

func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error)

func (*Chotki) RemovePacketHose

func (cho *Chotki) RemovePacketHose(name string) error

func (*Chotki) RestoreNet

func (cho *Chotki) RestoreNet(ctx context.Context) error

func (*Chotki) SetFieldTLV

func (cho *Chotki) SetFieldTLV(fid rdx.ID, tlve []byte) (id rdx.ID, err error)

func (*Chotki) Snapshot

func (cho *Chotki) Snapshot() pebble.Reader

func (*Chotki) Source

func (cho *Chotki) Source() uint64

ToyKV convention key, lit O, then O00000-00000000-000 id

func (*Chotki) Unlisten

func (cho *Chotki) Unlisten(addr string) error

func (*Chotki) UpdateVTree

func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error)

func (*Chotki) VersionVector

func (cho *Chotki) VersionVector() (vv rdx.VV, err error)

type Counter64

type Counter64 int64

func (*Counter64) Apply

func (c *Counter64) Apply(state []byte)

func (Counter64) Diff

func (c Counter64) Diff(id rdx.ID, state []byte) (changes []byte)

type Field

type Field struct {
	Name    string
	RdxType byte
}

func (Field) Valid

func (f Field) Valid() bool

type Fields

type Fields []Field

func (Fields) Find

func (f Fields) Find(name string) (ndx int)

type Hook

type Hook func(cho *Chotki, id rdx.ID) error

type Merger

type Merger interface {
	// merges values, sorted old to new
	Merge(inputs [][]byte) []byte
}

type NativeObject

type NativeObject interface {
	// Read data from an iterator
	Load(field uint64, rdt byte, tlv []byte) error
	// Compare to the stored state, serialize the changes
	Store(field uint64, rdt byte, old []byte, clock rdx.Clock) (changes []byte, err error)
}

type ORM

type ORM struct {
	Host *Chotki
	Snap *pebble.Snapshot
	// contains filtered or unexported fields
}

func NewORM

func NewORM(host *Chotki, snap *pebble.Snapshot) *ORM

func (*ORM) Clear

func (orm *ORM) Clear() error

Clear forgets all the objects loaded; all the unsaved changes discarded

func (*ORM) Close

func (orm *ORM) Close() error

func (*ORM) Compile

func (orm *ORM) Compile(name string, cid rdx.ID) (code string, err error)

func (*ORM) FindID

func (orm *ORM) FindID(obj NativeObject) rdx.ID

func (*ORM) Load

func (orm *ORM) Load(id rdx.ID, blanc NativeObject) (obj NativeObject, err error)

func (*ORM) New

func (orm *ORM) New(cid rdx.ID, objs ...NativeObject) (err error)

func (*ORM) Object

func (orm *ORM) Object(id rdx.ID) (obj NativeObject)

func (*ORM) Save

func (orm *ORM) Save(objs ...NativeObject) (err error)

Save the object changes. Recommended, especially if you loaded many, modified few.

func (*ORM) SaveAll

func (orm *ORM) SaveAll() (err error)

SaveAll the changed fields; this will re-scan the objects in the database.

func (*ORM) SyncAll

func (orm *ORM) SyncAll() (err error)

Saves all the changes, takes a new snapshot, updates

func (*ORM) UpdateAll

func (orm *ORM) UpdateAll() (err error)

func (*ORM) UpdateObject

func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error

type Options

type Options struct {
	pebble.Options

	Src          uint64
	Name         string
	Log1         protocol.Records
	MaxLogLen    int64
	RelaxedOrder bool
	Logger       utils.Logger

	TlsConfig *tls.Config
}

func (*Options) SetDefaults

func (o *Options) SetDefaults()

type PebbleMergeAdaptor

type PebbleMergeAdaptor struct {
	// contains filtered or unexported fields
}

func (*PebbleMergeAdaptor) Finish

func (a *PebbleMergeAdaptor) Finish(includesBase bool) (res []byte, cl io.Closer, err error)

func (*PebbleMergeAdaptor) MergeNewer

func (a *PebbleMergeAdaptor) MergeNewer(value []byte) error

func (*PebbleMergeAdaptor) MergeOlder

func (a *PebbleMergeAdaptor) MergeOlder(value []byte) error

type Syncer

type Syncer struct {
	Host *Chotki
	Mode uint64
	Name string
	Log  utils.Logger
	// contains filtered or unexported fields
}

func (*Syncer) Close

func (sync *Syncer) Close() error

func (*Syncer) Drain

func (sync *Syncer) Drain(recs protocol.Records) (err error)

func (*Syncer) DrainHandshake

func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error)

func (*Syncer) EndGracefully

func (sync *Syncer) EndGracefully(reason error) (err error)

func (*Syncer) Feed

func (sync *Syncer) Feed() (recs protocol.Records, err error)

func (*Syncer) FeedBlockDiff

func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error)

func (*Syncer) FeedDiffVV

func (sync *Syncer) FeedDiffVV() (vv protocol.Records, err error)

func (*Syncer) FeedHandshake

func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error)

func (*Syncer) SetDrainState

func (sync *Syncer) SetDrainState(state int)

func (*Syncer) SetFeedState

func (sync *Syncer) SetFeedState(state int)

func (*Syncer) WaitDrainState

func (sync *Syncer) WaitDrainState(state int) (ds int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL