dot: github.com/dotchain/dot Index | Examples | Files | Directories

package dot

import "github.com/dotchain/dot"

Package dot implements data synchronization of user defined types using operational transformation/OT.

Please see https://github.com/dotchain/dot for a tutorial on how to use DOT.

The core functionality is spread out between dot/changes, dot/streams, dot/refs and dot/ops but this package exposes simple client and server implementations for common use cases:

Server example

import "encoding/gob"
import "net/http"
import "github.com/dotchain/dot"
...
gob.Register(..) // register any user-standard OT types used
http.Handle("/api/", dot.BoltServer("file.bolt"))
http.ListenAndServe(":8080", nil)

Client example

import "encoding/gob"
import "net/http"
import "github.com/dotchain/dot"
...
gob.Register(..) // register any user-standard OT types used
session, stream := dot.Connect("http://localhost:8080/api/")

Immutable values

DOT uses immutable values. Every Value must implement the change.Value interface which is a single Apply method that returns the result of applying a mutation (while leaving the original value effectively unchanged).

If the underlying type behaves like a collection (such as with Slices), the type must also implement some collection specific methods specified in the changes.Collection interface.

Most actual types are likely to be structs or slices with boilerplate implementaations of the interfaces. The x/dotc package has a code generator which can emit such boilerplate implementations simplifying this task.

Changes

The changes package implements a set of simple changes (Replace, Splice and Move). Richer changes are expected to be built up by composition via changes.ChangeSet (which is a sequence of changes) and changes.PathChange (which modifies a value at a path).

Changes are immutable too and generally are meant to not maintain any reference to the value they apply on. While custom changes are possible (they have to implement the changes.Custom interface), they are expected to be rare as the default set of chnange types cover a vast variety of scenarios.

The core logic of DOT is in the Merge methods of changes: they guaranteee that if two independent changes are done to a value, the deviation in the values can be converged. The basic property of any two changes (on the same value) is that:

leftx, rightx := left.Merge(right)
initial.Apply(nil, left).Apply(nil, leftx) ==
initial.Apply(nil, right).Apply(nil, rightx)

Care must be taken with custom changes to ensure that this property is preserved.

Streams

Streams represent the sequence of changes associated with a single value. Stream instances behave like they are immutable: when a change happens, a new stream instance captures the change. Streams also support multiple-writers: it is possible for two independent changes to the same stream instance. In this case, the newly-created stream instances only capture the respective changes but these both have a "Next" value that converges to the same value. That is, the two separate streams implicitly have the changes from each other (but after transforming through the Merge) method.

This allows streams to perform quite nicely as convergent data structures without much syntax overhead:

initial := streams.S8{Stream:  streams.New(), Value: "hello"}

// two changes: append " world" and delete "lo"
s1 := initial.Splice(5, 0, " world")
s2 := initial.Splice(3, len("lo"), "")

// streams automatically merge because they are both
// based on initial
s1 = s1.Latest()
s2 = s2.Latest()

fmt.Println(s1.Value, s1.Value == s2.Value)
// Output: hel world true

Strongly typed streams

The streams package provides a generic Stream implementation (via the New function) which implements the idea of a sequence of convergent changes. But much of the power of streams is in having strongly type streams where the stream is associated with a strongly typed value. The streams package provides simple text streamss (S8 and S16) as well as Bool and Counter types. Richer types like structs and slices can be converted to their stream equivalent rather mechanically and this is done by the x/dotc package -- using code generation.

Some day, Golang would support generics and then the code
generation ugliness of x/dotc will no longer be needed.

Substreams

Substreams are streams that refer into a particular field of a parent stream. For example, if the parent value is a struct with a "Done" field, it is possible to treat the "Done stream" as the changes scoped to this field. This allows code to be written much more cleanly. See the https://github.com/dotchain/dot#toggling-complete section of the documentation for an example.

Other features

Streams support branching (a la Git) and folding. See the examples!

Streams also support references. A typical use case is maintaining the user cursor within a region of text. When remote changes happen to the text, the cursor needs to be updated. In fact, when one takes a substream of an element of an array, the array index needs to be automatically managed (i.e. insertions into the array before the index should automatically update the index etc). This is managed within streams using references.

Server implementations

A particular value can be reconstituted from the sequence of changes to that value. In DOT, only these changes are stored and that too in an append-only log. This make the backend rather simple and generally agnostic of application types to a large extent.

See https://github.com/dotchain/dot#server for example code.

Code:

// import fmt
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types

// S8 is DOT-compatible string type with UTF8 string indices
initial := types.S8("hello")

append := changes.Splice{
    Offset: len("hello"),       // end of "hello"
    Before: types.S8(""),       // nothing to remove
    After:  types.S8(" world"), // insert " world"
}

// apply the change
updated := initial.Apply(nil, append)

fmt.Println(updated)

Output:

hello world

Code:

// import fmt
// import github.com/dotchain/dot/streams

initial := &streams.S8{Stream: streams.New(), Value: "hello"}
updated := initial.Splice(5, 0, " world")

fmt.Println(updated.Value)

Output:

hello world

Code:

// import fmt
// import github.com/dotchain/dot/streams
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types

// local is a branch of master
master := &streams.S16{Stream: streams.New(), Value: "hello"}
local := &streams.S16{Stream: streams.Branch(master.Stream), Value: master.Value}

// edit locally: hello => hallo
local.Splice(len("h"), len("e"), "a")

// changes will not be reflected on master yet
fmt.Println(master.Latest().Value)

// push local changes up to master now
local.Stream.Push()

// now master = hallo
fmt.Println(master.Latest().Value)

Output:

hello
hallo

Code:

// import fmt
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types

initial := types.S8("hello")

// append " world" => "hello world"
append1 := changes.Splice{
    Offset: len("hello"),
    Before: types.S8(""),
    After:  types.S8(" world"),
}

// append "." => "hello world."
append2 := changes.Splice{
    Offset: len("hello world"),
    Before: types.S8(""),
    After:  types.S8("."),
}

// now combine the two appends and apply
both := changes.ChangeSet{append1, append2}
updated := initial.Apply(nil, both)
fmt.Println(updated)

Output:

hello world.

Code:

defer remove("file.bolt")()

logger := log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
srv := dot.WithLogger(dot.BoltServer("file.bolt"), logger)
defer dot.CloseServer(srv)
httpSrv := httptest.NewServer(srv)
defer httpSrv.Close()

stream1, store1 := dot.NewSession().NonBlockingStream(httpSrv.URL, nil)
stream2, store2 := dot.NewSession().Stream(httpSrv.URL, nil)

defer store1.Close()
defer store2.Close()

stream1.Append(changes.Replace{Before: changes.Nil, After: types.S8("hello")})
fmt.Println("push", stream1.Push())
fmt.Println("pull", stream2.Pull())

Output:

push <nil>
pull <nil>

Code:

sourceName := "user=postgres dbname=dot_test sslmode=disable"
maxPoll := pg.MaxPoll
defer func() {
    pg.MaxPoll = maxPoll
    db, err := sql.Open("postgres", sourceName)
    must(err)
    _, err = db.Exec("DROP TABLE operations")
    must(err)
    must(db.Close())
}()

pg.MaxPoll = time.Second
logger := log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
srv := dot.WithLogger(dot.PostgresServer(sourceName), logger)
defer dot.CloseServer(srv)
httpSrv := httptest.NewServer(srv)
defer httpSrv.Close()

stream1, store1 := dot.NewSession().Stream(httpSrv.URL, logger)
stream2, store2 := dot.NewSession().Stream(httpSrv.URL, logger)

defer store1.Close()
defer store2.Close()

stream1.Append(changes.Replace{Before: changes.Nil, After: types.S8("hello")})
fmt.Println("push", stream1.Push())
fmt.Println("pull", stream2.Pull())

Output:

push <nil>
pull <nil>

Code:

// import fmt
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types

initial := types.S8("hello")

// two changes: append " world" and delete "lo"
insert := changes.Splice{Offset: 5, Before: types.S8(""), After: types.S8(" world")}
remove := changes.Splice{Offset: 3, Before: types.S8("lo"), After: types.S8("")}

// two versions derived from initial
inserted := initial.Apply(nil, insert)
removed := initial.Apply(nil, remove)

// merge the changes
removex, insertx := insert.Merge(remove)

// converge by applying the above
final1 := inserted.Apply(nil, removex)
final2 := removed.Apply(nil, insertx)

fmt.Println(final1, final1 == final2)

Output:

hel world true

Code:

// import fmt
// import github.com/dotchain/dot/streams

initial := streams.S8{Stream: streams.New(), Value: "hello"}

// two changes: append " world" and delete "lo"
s1 := initial.Splice(5, 0, " world")
s2 := initial.Splice(3, len("lo"), "")

// streams automatically merge because they are both
// based on initial
s1 = s1.Latest()
s2 = s2.Latest()

fmt.Println(s1.Value, s1.Value == s2.Value)

Output:

hel world true

Code:

// import fmt
// import github.com/dotchain/dot/streams
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types
// import github.com/dotchain/dot/x/fold

// create master, folded child and the folding itself
master := &streams.S16{Stream: streams.New(), Value: "hello world!"}
foldChange := changes.Splice{
    Offset: len("hello"),
    Before: types.S16(" world"),
    After:  types.S16("..."),
}
foldedStream := fold.New(foldChange, master.Stream)
folded := &streams.S16{Stream: foldedStream, Value: "hello...!"}

// folded:  hello...! => Hello...!!!
folded = folded.Splice(0, len("h"), "H")
folded = folded.Splice(len("Hello...!"), 0, "!!")
fmt.Println(folded.Value)

// master: hello world => hullo world
master = master.Splice(len("h"), len("e"), "u")
fmt.Println(master.Value)

// now folded = Hullo...!!!
fmt.Println(folded.Latest().Value)

// master = Hullo world!!!
fmt.Println(master.Latest().Value)

Output:

Hello...!!!
hullo world!
Hullo...!!!
Hullo world!!!

Code:

// import fmt
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types

// types.A is a generic array type and types.M is a map type
initial := types.A{types.M{"hello": types.S8("world")}}

// replace "world" with "world!"
replace := changes.Replace{Before: types.S8("world"), After: types.S8("world!")}

// replace "world" with "world!" of initial[0]["hello"]
path := []interface{}{0, "hello"}
c := changes.PathChange{Path: path, Change: replace}
updated := initial.Apply(nil, c)
fmt.Println(updated)

Output:

[map[hello:world!]]

Code:

// import fmt
// import github.com/dotchain/dot/streams
// import github.com/dotchain/dot/changes
// import github.com/dotchain/dot/changes/types
// import github.com/dotchain/dot/streams/undo

// create master, undoable child and the undo stack itself
master := &streams.S16{Stream: streams.New(), Value: "hello"}
s := undo.New(master.Stream)
undoableChild := &streams.S16{Stream: s, Value: master.Value}

// change hello => Hello
undoableChild = undoableChild.Splice(0, len("h"), "H")
fmt.Println(undoableChild.Value)

// for kicks, update master hello => hello$ as if it came
// from the server
master.Splice(len("hello"), 0, "$")

// now undo this via the stack
s.Undo()

// now undoableChild should be hello$
undoableChild = undoableChild.Latest()
fmt.Println(undoableChild.Value)

// now redo the last operation to get Hello$
s.Redo()
undoableChild = undoableChild.Latest()
fmt.Println(undoableChild.Value)

Output:

Hello
hello$
Hello$

Index

Examples

Package Files

client.go dot.go server.go

func BoltServer Uses

func BoltServer(fileName string) http.Handler

BoltServer returns a http.Handler serving DOT requests backed by the db

func CloseServer Uses

func CloseServer(h http.Handler)

CloseServer closes the http.Handler returned by this package

func PostgresServer Uses

func PostgresServer(sourceName string) http.Handler

PostgresServer returns a http.Handler serving DOT requests backed by the db

func WithLogger Uses

func WithLogger(h http.Handler, l log.Log) http.Handler

WithLogger updates the logger for server

type Session Uses

type Session struct {
    Version        int
    Pending, Merge []ops.Op

    OpCache    map[int]ops.Op
    MergeCache map[int][]ops.Op
}

Session represents a client session

func NewSession Uses

func NewSession() *Session

NewSession creates an empty session

func (*Session) Load Uses

func (s *Session) Load(ver int) (ops.Op, []ops.Op)

Load implements the ops.Cache load interface

func (*Session) NonBlockingStream Uses

func (s *Session) NonBlockingStream(url string, logger dotlog.Log) (streams.Stream, ops.Store)

NonBlockingStream returns the stream of changes for this session

The returned store can be used to *close* the stream when needed

Actual syncing of messages happens when Push and Pull are called on the stream. Pull() does the server-fetch asynchronously, returning immediately if there is no server data available.

func (*Session) Store Uses

func (s *Session) Store(ver int, op ops.Op, merge []ops.Op)

Store implements the ops.Cache store interface

func (*Session) Stream Uses

func (s *Session) Stream(url string, logger dotlog.Log) (streams.Stream, ops.Store)

Stream returns the stream of changes for this session

The returned store can be used to *close* the stream when needed

Actual syncing of messages happens when Push and Pull are called on the stream

func (*Session) UpdateVersion Uses

func (s *Session) UpdateVersion(version int, pending, merge []ops.Op)

UpdateVersion updates the version/pending info

Directories

PathSynopsis
changesPackage changes implements the core mutation types for OT.
changes/crdtPackage crdt implements CRDT types and associated changes
changes/diffPackage diff compares two values and returns the changes
changes/runPackage run implements a custom change that applies to a sequence of array elements.
changes/tablePackage table implements a loose 2d collection of values
changes/typesPackage types implements OT-compatible immutable values.
exampleGenerated.
logPackage log defines the interface for loging within the DOT project.
opsPackage ops implements network and storage for DOT
ops/boltPackage bolt implements the dot storage for files using boltdb
ops/nw
ops/pgPackage pg implements the dot storage for postgres 9.5+
ops/sjsonPackage sjson implements a portable strongly-typed json-like codec.
ops/sync
refsPackage refs implements reference paths, carets and selections.
streamsPackage streams defines convergent streams of changes
streams/undo
test/seqtestPackage seqtest implements a standard suite of validations.
test/testops
x/dotcPackage dotc implements code-generation tools for dot.changes
x/foldPackage fold implements a simple scheme for folding.
x/heapPackage heap implements a heap value type
x/richPackage rich implements rich text data types
x/rich/dataPackage data impleements data structures for use with rich text
x/rich/evalPackage eval implements evaluated objects
x/rich/htmlPackage html implements rich text to HTML conversion
x/rich/riched
x/snapshotPackage snapshot manages session storage

Package dot imports 12 packages (graph) and is imported by 3 packages. Updated 2019-11-19. Refresh now. Tools for package owners.