crdt

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2021 License: Apache-2.0 Imports: 22 Imported by: 1

README

n3-crdt

CRDT framework for n3 infrastructure

Send & Recive any json files, will be wrapped in version-vector crdt, sent to messaging server, and merged on receive with any other changes to the same objects received from other users.

For receive to work an instance of nats-streaming-server needs to be running on your machine.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LexicographicConflictResolver = func(key string, left, right vvmap.Record) bool {
	leftVal := fmt.Sprintf("%v", left.Value)
	rightVal := fmt.Sprintf("%v", right.Value)
	return strings.Compare(leftVal, rightVal) > 0
}

conflict resolver to choose between versions of a value when versions are the same

needs Go 12, where maps are printed with all elements in key order.

Functions

func AddTempClassifierConfig added in v0.1.2

func AddTempClassifierConfig(dataModel, n3id string, requiredPaths, links, unique []string) string

AddTempClassifierConfig appends "[[classifier]]/data_model/required_paths/n3id/links/unique" to default coded classifierConfigText. return temp config text

func ClearTempClassifierConfig added in v0.1.4

func ClearTempClassifierConfig()

ClearTempClassifierConfig :

func DecodeCRDT

func DecodeCRDT(encoded []byte) (*vvmap.Map, error)

binary decoding for messages coming from datastore.

func EncodeCRDT

func EncodeCRDT(vvm *vvmap.Map) ([]byte, error)

binary encding for messages going to internal datastore.

func Flatten

func Flatten(m map[string]interface{}) map[string]interface{}

Flatten takes a map of a json file and returns a new one where nested maps are replaced by dot-delimited keys.

Also returns a sorted list of keys if updating in same order is important as when setting versions in the crdt.

func GetCurClassifierConfig added in v0.1.3

func GetCurClassifierConfig() string

GetCurClassifierConfig :

func MergeErrors

func MergeErrors(cs ...<-chan error) <-chan error

MergeErrors merges multiple channels of errors. Based on https://blog.golang.org/pipelines.

func WaitForPipeline

func WaitForPipeline(errs ...<-chan error) error

WaitForPipeline waits for results from all error channels. It returns early on the first error.

Types

type CRDTData

type CRDTData struct {
	//
	// unique id determined for this object
	// will be taken from the object if it has a
	// declared unique id (in config)
	// If no unique id present in the object one will be
	// assigned.
	//
	N3id string
	//
	// The data model this object associated with through
	// classification
	//
	DataModel string
	//
	// Type of the object, derived from classifier
	//
	Type string
	//
	// map containing the original json
	//
	RawData map[string]interface{}
	//
	// the crdt to hold the data
	//
	CRDT *vvmap.Map
	//
	// encoded binary of the crdt
	//
	EncodedCRDT []byte
	//
	// user id to identify the owner of the
	// changes to the data
	//
	UserId string
	//
	// streaming topic to pubish to
	//
	TopicName string
	//
	// Flag whether any new data was added
	//
	Updated bool
}

data type passed through all stages of the send pipeline

type CRDTManager

type CRDTManager struct {

	//
	// set level of audit ouput, one of: none, basic, high
	//
	AuditLevel string
	//
	// user id to identify who is making changes
	//
	UserId string
	//
	// topic/context stream name used to exchange data
	//
	TopicName string
	//
	// conext cancelFunc used to close the
	// stream-receiver cleanly
	//
	ReceiverCancelFunc func()
	// contains filtered or unexported fields
}

func NewCRDTManager

func NewCRDTManager(userid string, topic string) (*CRDTManager, error)

Open a crdt manager with supporting datastores will use the local path ./contexts/[userid]/[topic]/crdt/send & ./contexts/[usierid]/[topic]/crdt/recv by default

func (*CRDTManager) Close

func (crdtm *CRDTManager) Close()

safely shut down all databases & connections

func (*CRDTManager) SendFromFile

func (crdtm *CRDTManager) SendFromFile(fname string) error

Sends a file of json objects through the crdt manager

func (*CRDTManager) SendFromHTTPRequest

func (crdtm *CRDTManager) SendFromHTTPRequest(r *http.Request) error

sends an htttp request contianing json objects through the crdt manager

func (*CRDTManager) SendFromReader

func (crdtm *CRDTManager) SendFromReader(r io.Reader) error

sends the content of the given reader (assumed to be stream of json objects) through the crdt manager.

func (*CRDTManager) StartReceiver

func (crdtm *CRDTManager) StartReceiver() (<-chan []byte, error)

starts a stream listener/processor for the topic associated with this manager

func (*CRDTManager) StopReceiver

func (crdtm *CRDTManager) StopReceiver()

shuts down the receiver gracefully

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL