crdt

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2023 License: Apache-2.0, MIT, Apache-2.0, + 1 more Imports: 26 Imported by: 16

README

go-ds-crdt

A distributed go-datastore implementation using Merkle-CRDTs.

go-ds-crdt is a key-value store implementation using Merkle CRDTs, as described in the paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira. It satisfies the Datastore and Batching interfaces from go-datastore.

This means that you can create a network of nodes that use this datastore, and that each key-value pair written to it will automatically replicate to every other node. Updates can be published by any node. Network messages can be dropped, reordered, corrupted or duplicated. It is not necessary to know beforehand the number of replicas participating in the system. Replicas can join and leave at will, without informing any other replica. There can be network partitions but they are resolved as soon as connectivity is re-established between replicas.

Internally it uses a delta-CRDT Add-Wins Observed-Removed set. The current value for a key is the one with highest priority. Priorities are defined as the height of the Merkle-CRDT node in which the key was introduced.

Implementation is independent from Broadcaster and DAG syncer layers, although the easiest is to use out of the box components from the IPFS stack (see below).

Performance

Using batching, Any go-ds-crdt replica can easily process and sync 400 keys/s at least. The largest known deployment has 100M keys.

go-ds-crdt is used in production as state-synchronization layer for IPFS Clusters.

Usage

go-ds-crdt needs:

  • A user-provided, thread-safe, go-datastore implementation to be used as permanent storage. We recommend using the Badger implementation.
  • A user-defined Broadcaster component to broadcast and receive updates from a set of replicas. If your application uses libp2p, you can use libp2p PubSub and the provided PubsubBroadcaster.
  • A user-defined "DAG syncer" component (ipld.DAGService) to publish and retrieve Merkle DAGs to the network. For example, you can use IPFS-Lite which casually satisfies this interface.

The permanent storage layout is optimized for KV stores with fast indexes and key-prefix support.

See https://pkg.go.dev/github.com/ipfs/go-ds-crdt for more information.

Captain

This project is captained by @hsanjuan.

License

This library is dual-licensed under Apache 2.0 and MIT terms.

Copyright 2019. Protocol Labs, Inc.

Documentation

Overview

Package crdt provides a replicated go-datastore (key-value store) implementation using Merkle-CRDTs built with IPLD nodes.

This Datastore is agnostic to how new MerkleDAG roots are broadcasted to the rest of replicas (`Broadcaster` component) and to how the IPLD nodes are made discoverable and retrievable to by other replicas (`DAGSyncer` component).

The implementation is based on the "Merkle-CRDTs: Merkle-DAGs meet CRDTs" paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira.

Note that, in the absence of compaction (which must be performed manually), a crdt.Datastore will only grow in size even when keys are deleted.

The time to be fully synced for new Datastore replicas will depend on how fast they can retrieve the DAGs announced by the other replicas, but newer values will be available before older ones.

Index

Constants

This section is empty.

Variables

View Source
var (
	TombstonesBloomFilterSize   float64 = 30 * 1024 * 1024 * 8 // 30 MiB
	TombstonesBloomFilterHashes float64 = 2
)

Tombstones bloom filter options. We have optimized the defaults for speed: - commit 30 MiB of memory to the filter - only hash twice - False positive probabily is 1 in 10000 when working with 1M items in the filter. See https://hur.st/bloomfilter/?n=&p=0.0001&m=30MiB&k=2

View Source
var (
	ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
)

Common errors.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster interface {
	// Send payload to other replicas.
	Broadcast([]byte) error
	// Obtain the next payload received from the network.
	Next() ([]byte, error)
}

A Broadcaster provides a way to send (notify) an opaque payload to all replicas and to retrieve payloads broadcasted.

type Datastore

type Datastore struct {
	// contains filtered or unexported fields
}

Datastore makes a go-datastore a distributed Key-Value store using Merkle-CRDTs and IPLD.

func New

func New(
	store ds.Datastore,
	namespace ds.Key,
	dagSyncer ipld.DAGService,
	bcast Broadcaster,
	opts *Options,
) (*Datastore, error)

New returns a Merkle-CRDT-based Datastore using the given one to persist all the necessary data under the given namespace. It needs a DAG-Service component for IPLD nodes and a Broadcaster component to distribute and receive information to and from the rest of replicas. Actual implementation of these must be provided by the user, but it normally means using ipfs-lite (https://github.com/hsanjuan/ipfs-lite) as a DAG Service and the included libp2p PubSubBroadcaster as a Broadcaster.

The given Datastatore is used to back all CRDT-datastore contents and accounting information. When using an asynchronous datastore, the user is in charge of calling Sync() regularly. Sync() will persist paths related to the given prefix, but note that if other replicas are modifying the datastore, the prefixes that will need syncing are not only those modified by the local replica. Therefore the user should consider calling Sync("/"), with an empty prefix, in that case, or use a synchronous underlying datastore that persists things directly on write.

The CRDT-Datastore should call Close() before the given store is closed.

func (*Datastore) Batch

func (store *Datastore) Batch(ctx context.Context) (ds.Batch, error)

Batch implements batching for writes by accumulating Put and Delete in the same CRDT-delta and only applying it and broadcasting it on Commit().

func (*Datastore) Close

func (store *Datastore) Close() error

Close shuts down the CRDT datastore. It should not be used afterwards.

func (*Datastore) Delete

func (store *Datastore) Delete(ctx context.Context, key ds.Key) error

Delete removes the value for given `key`.

func (*Datastore) DotDAG added in v0.2.4

func (store *Datastore) DotDAG(w io.Writer) error

DotDAG writes a dot-format representation of the CRDT DAG to the given writer. It can be converted to image format and visualized with graphviz tooling.

func (*Datastore) Get

func (store *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error)

Get retrieves the object `value` named by `key`. Get will return ErrNotFound if the key is not mapped to a value.

func (*Datastore) GetSize

func (store *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error)

GetSize returns the size of the `value` named by `key`. In some contexts, it may be much cheaper to only get the size of the value rather than retrieving the value itself.

func (*Datastore) Has

func (store *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error)

Has returns whether the `key` is mapped to a `value`. In some contexts, it may be much cheaper only to check for existence of a value, rather than retrieving the value itself. (e.g. HTTP HEAD). The default implementation is found in `GetBackedHas`.

func (*Datastore) InternalStats added in v0.3.9

func (store *Datastore) InternalStats() Stats

InternalStats returns internal datastore information like the current heads and max height.

func (*Datastore) IsDirty added in v0.3.9

func (store *Datastore) IsDirty() bool

IsDirty returns whether the datastore is marked dirty.

func (*Datastore) MarkClean added in v0.3.9

func (store *Datastore) MarkClean()

MarkClean removes the dirty mark from the datastore.

func (*Datastore) MarkDirty added in v0.3.9

func (store *Datastore) MarkDirty()

MarkDirty marks the Datastore as dirty.

func (*Datastore) PrintDAG

func (store *Datastore) PrintDAG() error

PrintDAG pretty prints the current Merkle-DAG to stdout in a pretty fashion. Only use for small DAGs. DotDAG is an alternative for larger DAGs.

func (*Datastore) Put

func (store *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error

Put stores the object `value` named by `key`.

func (*Datastore) Query

func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)

Query searches the datastore and returns a query result. This function may return before the query actually runs. To wait for the query:

result, _ := ds.Query(q)

// use the channel interface; result may come in at different times
for entry := range result.Next() { ... }

// or wait for the query to be completely done
entries, _ := result.Rest()
for entry := range entries { ... }

func (*Datastore) Repair added in v0.3.0

func (store *Datastore) Repair() error

Repair triggers a DAG-repair, which tries to re-walk the CRDT-DAG from the current heads until the roots, processing currently unprocessed branches.

Calling Repair will walk the full DAG even if the dirty bit is unset, but will mark the store as clean unpon successful completion.

func (*Datastore) Sync added in v0.1.8

func (store *Datastore) Sync(ctx context.Context, prefix ds.Key) error

Sync ensures that all the data under the given prefix is flushed to disk in the underlying datastore.

type Options

type Options struct {
	Logger              logging.StandardLogger
	RebroadcastInterval time.Duration
	// The PutHook function is triggered whenever an element
	// is successfully added to the datastore (either by a local
	// or remote update), and only when that addition is considered the
	// prevalent value.
	PutHook func(k ds.Key, v []byte)
	// The DeleteHook function is triggered whenever a version of an
	// element is successfully removed from the datastore (either by a
	// local or remote update). Unordered and concurrent updates may
	// result in the DeleteHook being triggered even though the element is
	// still present in the datastore because it was re-added. If that is
	// relevant, use Has() to check if the removed element is still part
	// of the datastore.
	DeleteHook func(k ds.Key)
	// NumWorkers specifies the number of workers ready to walk DAGs
	NumWorkers int
	// DAGSyncerTimeout specifies how long to wait for a DAGSyncer.
	// Set to 0 to disable.
	DAGSyncerTimeout time.Duration
	// MaxBatchDeltaSize will automatically commit any batches whose
	// delta size gets too big. This helps keep DAG nodes small
	// enough that they will be transferred by the network.
	MaxBatchDeltaSize int
	// RepairInterval specifies how often to walk the full DAG until
	// the root(s) if it has been marked dirty. 0 to disable.
	RepairInterval time.Duration
	// MultiHeadProcessing lets several new heads to be processed in
	// parallel.  This results in more branching in general. More
	// branching is not necessarily a bad thing and may improve
	// throughput, but everything depends on usage.
	MultiHeadProcessing bool
}

Options holds configurable values for Datastore.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions initializes an Options object with sensible defaults.

type PubSubBroadcaster added in v0.0.3

type PubSubBroadcaster struct {
	// contains filtered or unexported fields
}

PubSubBroadcaster implements a Broadcaster using libp2p PubSub.

func NewPubSubBroadcaster added in v0.0.3

func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (*PubSubBroadcaster, error)

NewPubSubBroadcaster returns a new broadcaster using the given PubSub and a topic to subscribe/broadcast to. The given context can be used to cancel the broadcaster. Please register any topic validators before creating the Broadcaster.

The broadcaster can be shut down by cancelling the given context. This must be done before Closing the crdt.Datastore, otherwise things may hang.

func (*PubSubBroadcaster) Broadcast added in v0.0.3

func (pbc *PubSubBroadcaster) Broadcast(data []byte) error

Broadcast publishes some data.

func (*PubSubBroadcaster) Next added in v0.0.3

func (pbc *PubSubBroadcaster) Next() ([]byte, error)

Next returns published data.

type SessionDAGService added in v0.3.0

type SessionDAGService interface {
	ipld.DAGService
	Session(context.Context) ipld.NodeGetter
}

A SessionDAGService is a Sessions-enabled DAGService. This type of DAG-Service provides an optimized NodeGetter to make multiple related requests. The same session-enabled NodeGetter is used to download DAG branches when the DAGSyncer supports it.

type Stats added in v0.3.9

type Stats struct {
	Heads      []cid.Cid
	MaxHeight  uint64
	QueuedJobs int
}

Stats wraps internal information about the datastore. Might be expanded in the future.

Directories

Path Synopsis
examples
globaldb Module
Package pb contains generated protobuf types
Package pb contains generated protobuf types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL