resources

package
v0.0.0-...-2e7796a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2023 License: Apache-2.0 Imports: 26 Imported by: 5

Documentation

Index

Constants

View Source
const (
	LT = -1 // less than
	EQ = 0  // equal
	GT = 1  // greater
	CC = 2  // concurrent
)

Variables

View Source
var ErrNestedArchetypeProtocol = errors.New("error during interaction with nested archetype")

ErrNestedArchetypeProtocol signifies that a nested archetype misbehaved while we were trying to use it as an archetype implementation

View Source
var ErrNestedArchetypeSanity = errors.New("internal sanity check failed")

ErrNestedArchetypeSanity signifies that some internal sanity check failed regarding buffer / channel state. intended for use during panic

View Source
var ErrNestedArchetypeStopped = errors.New("a nested archetype has stopped, preventing this resource API request from being serviced")

ErrNestedArchetypeStopped signifies that an archetype contained within a nestedArchetype has stopped, so the resource operation can no longer safely proceed

View Source
var ErrPlaceHolderAccess = errors.New("no access is allowed to PlaceHolder")
View Source
var NestedArchetypeConstantDefs = distsys.EnsureMPCalContextConfigs(

	distsys.DefineConstantValue("READ_REQ", nestedArchetypeReadReq),
	distsys.DefineConstantValue("WRITE_REQ", nestedArchetypeWriteReq),
	distsys.DefineConstantValue("ABORT_REQ", nestedArchetypeAbortReq),
	distsys.DefineConstantValue("PRECOMMIT_REQ", nestedArchetypePreCommitReq),
	distsys.DefineConstantValue("COMMIT_REQ", nestedArchetypeCommitReq),

	distsys.DefineConstantValue("READ_ACK", nestedArchetypeReadAck),
	distsys.DefineConstantValue("WRITE_ACK", nestedArchetypeWriteAck),
	distsys.DefineConstantValue("ABORT_ACK", nestedArchetypeAbortAck),
	distsys.DefineConstantValue("PRECOMMIT_ACK", nestedArchetypePreCommitAck),
	distsys.DefineConstantValue("COMMIT_ACK", nestedArchetypeCommitAck),

	distsys.DefineConstantValue("ABORTED", nestedArchetypeAborted),
)

NestedArchetypeConstantDefs provides a quick way to include correct definitions for all the boilerplate constants a resource implementation will always require.

These definitions would satisfy roughly the following TLA+, binding each constant to its own name:

CONSTANTS READ_REQ, WRITE_REQ, ABORT_REQ, PRECOMMIT_REQ, COMMIT_REQ
CONSTANTS READ_ACK, WRITE_ACK, ABORT_ACK, PRECOMMIT_ACK, COMMIT_ACK

Functions

func CloseTwoPCReceiver

func CloseTwoPCReceiver(res *TwoPCReceiver) error

NOTE: This cannot be made as a method of TwoPCReceiver because it

is not an RPC call

func GetTwoPCVersion

func GetTwoPCVersion(rpcRcvr *TwoPCReceiver) int

func GetVersion

func GetVersion(res *TwoPCReceiver) int

func MakePersistent

func MakePersistent(name string, db *badger.DB, persistable Persistable) distsys.ArchetypeResource

func MakeVClock

func MakeVClock() vclock

func NewCRDT

func NewCRDT(id tla.Value, peerIds []tla.Value, addressMappingFn CRDTAddressMappingFn,
	crdtValue CRDTValue, opts ...CRDTOption) distsys.ArchetypeResource

NewCRDT returns an archetype resource implementing the behaviour of a CRDT. Given the list of peer ids, it starts broadcasting local CRDT state to all its peers every broadcastInterval. It also starts accepting incoming RPC calls from peers to receive and merge CRDT states. Note that local state is currently not persisted.

func NewMailboxesLength

func NewMailboxesLength(mailboxes *Mailboxes) distsys.ArchetypeResource

NewMailboxesLength returns the number of buffered messages in a local mailbox. The local mailbox is supposed to be an element of a mailboxes collection. Mailboxes length resources matches the following mapping macro in MPCal:

\* assuming initially that:
\* $variable := [buffer |-> <<>> (* empty buffer *)]

mapping macro NetworkBufferLength {
    read {
	      yield Len($variable.buffer);
    }

    write {
        assert FALSE;
        yield $value;
    }
}

func NewNested

func NewNested(fn func(sendCh chan<- tla.Value, receiveCh <-chan tla.Value) []*distsys.MPCalContext) distsys.ArchetypeResource

NewNested adapts a specific form of MPCal archetype to the resource interface, allowing resources to be implemented and model checked in MPCal themselves.

The argument fn should map a pair of input and output channels to the inputs and outputs of one or more nested archetype instances, which, aside from these channels, should be configured just as free-standing archetypes would be. This resource will then take over those contexts' lifecycles, calling Run on then, forwarding errors to the containing context's execution, and ensuring that all nested resources are cleaned up and/or stopped on exit.

Design note: it is important to allow multiple concurrent archetypes here, because, like in Go, many natural MPCal

implementations involve multiple communicating processes. The builder fn gives the user the opportunity
to freely set up a complete, functioning subsystem, just like a free-standing configuration would allow.

func NewPlaceHolder

func NewPlaceHolder() distsys.ArchetypeResource

NewPlaceHolder produces a distsys.ArchetypeResource that does nothing. It's just for usage of passing as placeholder for an archetype's argument and calling any of its methods causes a panic.

func NewTwoPC

func NewTwoPC(
	value tla.Value,
	address string,
	replicas []ReplicaHandle,
	archetypeID tla.Value,
	onCreate func(*TwoPCReceiver),
) distsys.ArchetypeResource

NewTwoPC is the function that enables creation of 2PC resources.

Types

type AWORSet

type AWORSet struct {
	// contains filtered or unexported fields
}

func (*AWORSet) GobDecode

func (s *AWORSet) GobDecode(input []byte) error

func (AWORSet) GobEncode

func (s AWORSet) GobEncode() ([]byte, error)

func (AWORSet) Init

func (s AWORSet) Init() CRDTValue

func (AWORSet) Merge

func (s AWORSet) Merge(other CRDTValue) CRDTValue

Merge merges this set and that set. 1. Merge the two add maps, merging the vector clocks if an element is present in both --> addK. 2. Merge the two rem maps, merging the vector clocks if an element is present in both --> remK. 3. From each element in merged addK, keep the element if remK does not have the same element with a greater vector timestamp. 4. From each element in merged remK, keep the element if addK does not have the same element with a larger, equal, or concurrent vector timestamp.

func (AWORSet) Read

func (s AWORSet) Read() tla.Value

Read returns the current value of the set. An element is in the set if it is in the add map, and its clock is less than that in remove map (if existing).

func (AWORSet) String

func (s AWORSet) String() string

func (AWORSet) Write

func (s AWORSet) Write(id tla.Value, value tla.Value) CRDTValue

Write performs the command given by value If add: add the element to the add map, incremeting the clock for the node If remove: add the elemnt to the remove map, incremeting the clock for the node

type AWORSetKeyVal

type AWORSetKeyVal struct {
	K tla.Value
	V vclock
}

type AddRemMaps

type AddRemMaps struct {
	AddMap []AWORSetKeyVal
	RemMap []AWORSetKeyVal
}

type ArchetypeState

type ArchetypeState int

ArchetypeState is an enum that denotes an archetype running state.

func (ArchetypeState) String

func (a ArchetypeState) String() string

type CRDTAddressMappingFn

type CRDTAddressMappingFn func(id tla.Value) string

CRDTAddressMappingFn is a map from each node's id sharing the CRDT state to its address

type CRDTOption

type CRDTOption func(c *crdt)

func WithCRDTBroadcastInterval

func WithCRDTBroadcastInterval(d time.Duration) CRDTOption

func WithCRDTDialTimeout

func WithCRDTDialTimeout(d time.Duration) CRDTOption

func WithCRDTSendTimeout

func WithCRDTSendTimeout(d time.Duration) CRDTOption

type CRDTRPCReceiver

type CRDTRPCReceiver struct {
	// contains filtered or unexported fields
}

func (*CRDTRPCReceiver) ReceiveValue

func (rcvr *CRDTRPCReceiver) ReceiveValue(args ReceiveValueArgs, reply *ReceiveValueResp) error

ReceiveValue receives state from other peer node, and calls the merge function. If the resource is currently in critical section, its local value cannot change. So it queues up the updates to be merged after exiting critical section.

type CRDTValue

type CRDTValue interface {
	Init() CRDTValue
	Read() tla.Value
	Write(id tla.Value, value tla.Value) CRDTValue
	Merge(other CRDTValue) CRDTValue
}

type CriticalSectionState

type CriticalSectionState int

CriticalSectionState defines the state of this resource with respect to the local critical section.

func (CriticalSectionState) String

func (state CriticalSectionState) String() string

type Dummy

type Dummy struct {
	// contains filtered or unexported fields
}

func NewDummy

func NewDummy(opts ...DummyOption) *Dummy

func (*Dummy) Abort

func (res *Dummy) Abort() chan struct{}

func (*Dummy) Close

func (res *Dummy) Close() error

func (*Dummy) Commit

func (res *Dummy) Commit() chan struct{}

func (*Dummy) Index

func (res *Dummy) Index(index tla.Value) (distsys.ArchetypeResource, error)

func (*Dummy) PreCommit

func (res *Dummy) PreCommit() chan error

func (*Dummy) ReadValue

func (res *Dummy) ReadValue() (tla.Value, error)

func (*Dummy) VClockHint

func (res *Dummy) VClockHint(archClock trace.VClock) trace.VClock

func (*Dummy) WriteValue

func (res *Dummy) WriteValue(value tla.Value) error

type DummyOption

type DummyOption func(d *Dummy)

func WithDummyValue

func WithDummyValue(v tla.Value) DummyOption

type FailureDetector

type FailureDetector struct {
	*IncMap
}

func NewFailureDetector

func NewFailureDetector(addressMappingFn FailureDetectorAddressMappingFn, opts ...FailureDetectorOption) *FailureDetector

NewFailureDetector produces a distsys.ArchetypeResource for a collection of single failure detectors. Each single failure detector is responsible to detect that a particular archetype is alive or not. Actually the single failure detector with index i is equivalent to `fd[i]` in the MPCal spec. A single failure detector periodically calls the IsAlive RPC from archetype's monitor. In case of false response or timeout, it marks the archetype as failed. Optionally, it gives options to configure parameters such as timeouts. Read from a single failure detector returns true if it detects the archetype as failed. Otherwise, it returns false. FailureDetector refines the guarantees following mapping macro:

mapping macro PracticalFD {
   read {
       if ($variable = FALSE) { \* process is alive
           either { yield TRUE; } or { yield FALSE; }; \* no accuracy guarantee
       } else {
           yield $variable; \* (eventual) completeness
       }
   }
   write { assert(FALSE); yield $value; }
}

It provides strong completeness but no accuracy guarantee. This failure detector can have both false positive (due to no accuracy) and false negative (due to [eventual] completeness) outputs.

type FailureDetectorAddressMappingFn

type FailureDetectorAddressMappingFn func(tla.Value) string

FailureDetectorAddressMappingFn returns address of the monitor that is running the archetype with the given index.

type FailureDetectorOption

type FailureDetectorOption func(fd *SingleFailureDetector)

func WithFailureDetectorPullInterval

func WithFailureDetectorPullInterval(t time.Duration) FailureDetectorOption

func WithFailureDetectorTimeout

func WithFailureDetectorTimeout(t time.Duration) FailureDetectorOption

type FileSystem

type FileSystem struct {
	*IncMap
}

func NewFileSystem

func NewFileSystem(workingDirectory string) *FileSystem

NewFileSystem produces a distsys.ArchetypeResource for a filesystem-backed map-like resource. Each element of the map will refer to a file, with keys and values being required to be string-typed, and keys being required to refer to valid paths (or create-able paths, if a key is written to before it is read).

type FillFn

type FillFn func(index tla.Value) distsys.ArchetypeResource

FillFn maps from an index of a given map resource into a distsys.ArchetypeResource for the resource intended at that location.

type GCounter

type GCounter struct {
	*immutable.Map[tla.Value, int32]
}

func (*GCounter) GobDecode

func (c *GCounter) GobDecode(input []byte) error

func (GCounter) GobEncode

func (c GCounter) GobEncode() ([]byte, error)

func (GCounter) Init

func (c GCounter) Init() CRDTValue

func (GCounter) Merge

func (c GCounter) Merge(other CRDTValue) CRDTValue

Merge current state value with other by taking the greater of each node's partial counts.

func (GCounter) Read

func (c GCounter) Read() tla.Value

func (GCounter) String

func (c GCounter) String() string

func (GCounter) Write

func (c GCounter) Write(id tla.Value, value tla.Value) CRDTValue

type GCounterKeyVal

type GCounterKeyVal struct {
	K tla.Value
	V int32
}

type HashMap

type HashMap struct {
	distsys.ArchetypeResourceMapMixin
	// contains filtered or unexported fields
}

func NewHashMap

func NewHashMap(resourceMap *hashmap.HashMap[distsys.ArchetypeResource]) *HashMap

func (*HashMap) Abort

func (res *HashMap) Abort() chan struct{}

func (*HashMap) Close

func (res *HashMap) Close() error

func (*HashMap) Commit

func (res *HashMap) Commit() chan struct{}

func (*HashMap) Index

func (res *HashMap) Index(index tla.Value) (distsys.ArchetypeResource, error)

func (*HashMap) PreCommit

func (res *HashMap) PreCommit() chan error

type IncMap

type IncMap struct {
	distsys.ArchetypeResourceMapMixin
	// contains filtered or unexported fields
}

IncMap is a generic incremental map resource, with hooks to programmatically realize child resources during execution.

func NewIncMap

func NewIncMap(fillFunction FillFn) *IncMap

func (*IncMap) Abort

func (res *IncMap) Abort() chan struct{}

func (*IncMap) Close

func (res *IncMap) Close() error

func (*IncMap) Commit

func (res *IncMap) Commit() chan struct{}

func (*IncMap) Index

func (res *IncMap) Index(index tla.Value) (distsys.ArchetypeResource, error)

func (*IncMap) PreCommit

func (res *IncMap) PreCommit() chan error

func (*IncMap) VClockHint

func (res *IncMap) VClockHint(vclock trace.VClock) trace.VClock

type InputChan

type InputChan struct {
	distsys.ArchetypeResourceLeafMixin
	// contains filtered or unexported fields
}

InputChan wraps a native Go channel, such that an MPCal model might read what is written to the channel.

func NewInputChan

func NewInputChan(ch <-chan tla.Value, opts ...InputChanOption) *InputChan

func (*InputChan) Abort

func (res *InputChan) Abort() chan struct{}

func (*InputChan) Close

func (res *InputChan) Close() error

func (*InputChan) Commit

func (res *InputChan) Commit() chan struct{}

func (*InputChan) PreCommit

func (res *InputChan) PreCommit() chan error

func (*InputChan) ReadValue

func (res *InputChan) ReadValue() (tla.Value, error)

func (*InputChan) WriteValue

func (res *InputChan) WriteValue(value tla.Value) error

type InputChanOption

type InputChanOption func(*InputChan)

func WithInputChanReadTimeout

func WithInputChanReadTimeout(t time.Duration) InputChanOption

type LWWSet

type LWWSet struct {
	// contains filtered or unexported fields
}

func (*LWWSet) GobDecode

func (s *LWWSet) GobDecode(input []byte) error

func (LWWSet) GobEncode

func (s LWWSet) GobEncode() ([]byte, error)

func (LWWSet) Init

func (s LWWSet) Init() CRDTValue

func (LWWSet) Merge

func (s LWWSet) Merge(other CRDTValue) CRDTValue

func (LWWSet) Read

func (s LWWSet) Read() tla.Value

func (LWWSet) String

func (s LWWSet) String() string

func (LWWSet) Write

func (s LWWSet) Write(id tla.Value, value tla.Value) CRDTValue

type LocalReplicaHandle

type LocalReplicaHandle struct {
	// contains filtered or unexported fields
}

LocalReplicaHandle is a structure for interacting with a replica operating in the same process (although likely running in a seperate goroutine). This is probably only useful for testing.

func (LocalReplicaHandle) Close

func (handle LocalReplicaHandle) Close() error

func (LocalReplicaHandle) Send

func (handle LocalReplicaHandle) Send(request TwoPCRequest, reply *TwoPCResponse) chan error

Send instructs the local replica to process a 2PC message.

type LocalSharedManager

type LocalSharedManager struct {
	// contains filtered or unexported fields
}

LocalSharedManager contains the shared resources and its lock.

func NewLocalSharedManager

func NewLocalSharedManager(value tla.Value, opts ...LocalSharedManagerOption) *LocalSharedManager

func (*LocalSharedManager) MakeLocalShared

func (sv *LocalSharedManager) MakeLocalShared() Persistable

MakeLocalShared is method that creates a localShared resources. To share a resource between different archetypes, you should use this method to build one ArchetypeResource per archetype with which you want to share the underlying resource.

type LocalSharedManagerOption

type LocalSharedManagerOption func(*LocalSharedManager)

func WithLocalSharedResourceTimeout

func WithLocalSharedResourceTimeout(t time.Duration) LocalSharedManagerOption

type MailboxKind

type MailboxKind int
const (
	MailboxesLocal MailboxKind = iota
	MailboxesRemote
)

func (MailboxKind) String

func (mk MailboxKind) String() string

type Mailboxes

type Mailboxes struct {
	*IncMap
}

func NewRelaxedMailboxes

func NewRelaxedMailboxes(addressMappingFn MailboxesAddressMappingFn, opts ...MailboxesOption) *Mailboxes

NewRelaxedMailboxes produces a distsys.ArchetypeResource for a collection of TCP mailboxes. It has the same guarantees as tcp mailboxes, however, relaxed mailboxes don't follow 2PC semantics strictly same as TCP mailboxes. The main difference is that when a critical section successfully sends a message using relaxed remotes mailboxes (res.Write returns with no error), it will be not possible to abort that critical section anymore. Therefore, it's not always safe to use relaxed mailboxes instead of TCP mailboxes. It's only safe to use them in a critical section when there is at most one network send operation in the it and all succeeding operations in the critical section are guaranteed to commit successfully. Also with relaxed mailboxes, it's not safe have an await statement after a network send in a critical section. Note that we only the remove rollback support in the relaxed mailboxes and don't remove the timeout support. Reading from a relaxed local mailbox might timeout and it's OK. Also writing to a relaxed remote mailbox might timeout and it's fine too. With these restrictions, it is still possible to use a limited form of either statement, as long as await comes before the network write, and timing out on a network write is sequentially the last reason the either branch might fail.

func NewTCPMailboxes

func NewTCPMailboxes(addressMappingFn MailboxesAddressMappingFn, opts ...MailboxesOption) *Mailboxes

NewTCPMailboxes produces a distsys.ArchetypeResource for a collection of TCP mailboxes. Each individual mailbox will match the following mapping macro, assuming exactly one process "reads" from it:

\* assuming initially that:
\* $variable := [queue |-> <<>> (* empty buffer *), enabled |-> TRUE (* process running *)]

mapping macro LimitedBufferReliableFIFOLink {
    read {
    assert $variable.enabled;
        await Len($variable.queue) > 0;
        with (msg = Head($variable.queue)) {
            $variable.queue := Tail($variable.queue);
            yield msg;
        };
    }

    write {
        await Len($variable.queue) < BUFFER_SIZE /\ $variable.enabled;
        yield [queue |-> Append($variable.queue, $value), enabled |-> $variable.enabled];
    }
}

As is shown above, each mailbox should be a fully reliable FIFO channel, which these resources approximated via a lightweight TCP-based protocol optimised for optimistic data transmission. While the protocol should be extended to support reliability under crash recovery in the future, this behaviour is currently a stub.

Note that BUFFER_SIZE is currently fixed to internal constant tcpMailboxesReceiveChannelSize, although precise numbers of in-flight messages may slightly exceed this number, as "reception" speculatively accepts one commit of messages before rate-limiting.

Note also that this protocol is not live, with respect to Commit. All other ops will recover from timeouts via aborts, which will not be visible and will not take infinitely long. Commit is the exception, as it _must complete_ for semantics to be preserved, or it would be possible to observe partial effects of critical sections.

type MailboxesAddressMappingFn

type MailboxesAddressMappingFn func(tla.Value) (MailboxKind, string)

MailboxesAddressMappingFn is responsible for translating the index, as in network[index] from distsys.Value to a pair of MailboxKind and address string, where the address string would be appropriate to pass to net.Listen or net.Dial. It should return MailboxesLocal if this node is to be the only listener, and it should return MailboxesRemote if the mailbox is remote and should be dialed. This could potentially allow unusual setups where a single process "owns" more than one mailbox.

type MailboxesOption

type MailboxesOption func(mailboxesConfig)

func WithMailboxesDialTimeout

func WithMailboxesDialTimeout(t time.Duration) MailboxesOption

func WithMailboxesReadTimeout

func WithMailboxesReadTimeout(t time.Duration) MailboxesOption

func WithMailboxesReceiveChanSize

func WithMailboxesReceiveChanSize(s int) MailboxesOption

func WithMailboxesWriteTimeout

func WithMailboxesWriteTimeout(t time.Duration) MailboxesOption

type Monitor

type Monitor struct {
	ListenAddr string
	// contains filtered or unexported fields
}

Monitor monitors the registered archetypes by wrapping them. Monitor provides the IsAlive API, which can be queried to find out whether a specific archetype is alive. At most one monitor should be defined in each OS process, and it catches all archetypes' goroutines errors and panics. In the case of an error or a panic for an archetype's goroutine, the Monitor returns false to IsAlive calls with that particular archetype. Monitor exposes IsAlive API as an RPC. If the whole OS process fails, consequent calls to IsAlive will time out, and this timeout behavior denotes failure of the queried archetype.

func NewMonitor

func NewMonitor(listenAddr string) *Monitor

NewMonitor creates a new Monitor and returns a pointer to it.

func (*Monitor) Close

func (m *Monitor) Close() error

Close stops the monitor's RPC servers. It doesn't do anything with the archetypes that the monitor is running.

func (*Monitor) ListenAndServe

func (m *Monitor) ListenAndServe() error

ListenAndServe starts the monitor's RPC server and serves the incoming connections. It blocks until an error occurs or the monitor closes.

func (*Monitor) RunArchetype

func (m *Monitor) RunArchetype(ctx *distsys.MPCalContext) (err error)

RunArchetype runs the given archetype inside the monitor. Wraps a call to ctx.RunDiscardingExits

type MonitorRPCReceiver

type MonitorRPCReceiver struct {
	// contains filtered or unexported fields
}

func (*MonitorRPCReceiver) IsAlive

func (rcvr *MonitorRPCReceiver) IsAlive(arg tla.Value, reply *ArchetypeState) error

type OutputChan

type OutputChan struct {
	distsys.ArchetypeResourceLeafMixin
	// contains filtered or unexported fields
}

OutputChan wraps a native Go channel, such that an MPCal model may write to that channel.

func NewOutputChan

func NewOutputChan(ch chan<- tla.Value) *OutputChan

func (*OutputChan) Abort

func (res *OutputChan) Abort() chan struct{}

func (*OutputChan) Close

func (res *OutputChan) Close() error

func (*OutputChan) Commit

func (res *OutputChan) Commit() chan struct{}

func (*OutputChan) PreCommit

func (res *OutputChan) PreCommit() chan error

func (*OutputChan) ReadValue

func (res *OutputChan) ReadValue() (tla.Value, error)

func (*OutputChan) WriteValue

func (res *OutputChan) WriteValue(value tla.Value) error

type Persistable

type Persistable interface {
	distsys.ArchetypeResource
	GetState() ([]byte, error)
}

type Persistent

type Persistent struct {
	// contains filtered or unexported fields
}

func (*Persistent) Abort

func (res *Persistent) Abort() chan struct{}

func (*Persistent) Close

func (res *Persistent) Close() error

func (*Persistent) Commit

func (res *Persistent) Commit() chan struct{}

func (*Persistent) Index

func (res *Persistent) Index(index tla.Value) (distsys.ArchetypeResource, error)

func (*Persistent) PreCommit

func (res *Persistent) PreCommit() chan error

func (*Persistent) ReadValue

func (res *Persistent) ReadValue() (tla.Value, error)

func (*Persistent) VClockHint

func (res *Persistent) VClockHint(archClock trace.VClock) trace.VClock

func (*Persistent) WriteValue

func (res *Persistent) WriteValue(value tla.Value) error

type PlaceHolder

type PlaceHolder struct{}

func (*PlaceHolder) Abort

func (res *PlaceHolder) Abort() chan struct{}

func (*PlaceHolder) Close

func (res *PlaceHolder) Close() error

func (*PlaceHolder) Commit

func (res *PlaceHolder) Commit() chan struct{}

func (*PlaceHolder) Index

func (res *PlaceHolder) Index(index tla.Value) (distsys.ArchetypeResource, error)

func (*PlaceHolder) PreCommit

func (res *PlaceHolder) PreCommit() chan error

func (*PlaceHolder) ReadValue

func (res *PlaceHolder) ReadValue() (tla.Value, error)

func (*PlaceHolder) VClockHint

func (res *PlaceHolder) VClockHint(archClock trace.VClock) trace.VClock

func (*PlaceHolder) WriteValue

func (res *PlaceHolder) WriteValue(value tla.Value) error

type RPCReplicaHandle

type RPCReplicaHandle struct {
	// contains filtered or unexported fields
}

RPCReplicaHandle is a structure for a reference to a remote 2PC replica.

func MakeRPCReplicaHandle

func MakeRPCReplicaHandle(address string, archetypeID tla.Value) RPCReplicaHandle

MakeRPCReplicaHandle creates a replica handle for the 2PC node available at the given address.

func (*RPCReplicaHandle) Close

func (handle *RPCReplicaHandle) Close() error

func (*RPCReplicaHandle) Send

func (handle *RPCReplicaHandle) Send(request TwoPCRequest, reply *TwoPCResponse) chan error

Send sends a 2PC request to a remote replica. This function will initiate the RPC client for the handle if it has not been initiated yet.

func (*RPCReplicaHandle) String

func (handle *RPCReplicaHandle) String() string

type ReceiveValueArgs

type ReceiveValueArgs struct {
	Value CRDTValue
}

type ReceiveValueResp

type ReceiveValueResp struct {
	Value CRDTValue
}

type ReplicaHandle

type ReplicaHandle interface {
	Send(request TwoPCRequest, reply *TwoPCResponse) chan error
	Close() error
}

ReplicaHandle defines the interface for connecting with 2PC replicas. It is functionally the same as the RPC interface.

type SingleFailureDetector

type SingleFailureDetector struct {
	distsys.ArchetypeResourceLeafMixin
	// contains filtered or unexported fields
}

func NewSingleFailureDetector

func NewSingleFailureDetector(archetypeID tla.Value, monitorAddr string, opts ...FailureDetectorOption) *SingleFailureDetector

func (*SingleFailureDetector) Abort

func (res *SingleFailureDetector) Abort() chan struct{}

func (*SingleFailureDetector) Close

func (res *SingleFailureDetector) Close() error

func (*SingleFailureDetector) Commit

func (res *SingleFailureDetector) Commit() chan struct{}

func (*SingleFailureDetector) PreCommit

func (res *SingleFailureDetector) PreCommit() chan error

func (*SingleFailureDetector) ReadValue

func (res *SingleFailureDetector) ReadValue() (tla.Value, error)

func (*SingleFailureDetector) WriteValue

func (res *SingleFailureDetector) WriteValue(value tla.Value) error

type SingleOutputChan

type SingleOutputChan struct {
	distsys.ArchetypeResourceLeafMixin
	// contains filtered or unexported fields
}

func NewSingleOutputChan

func NewSingleOutputChan(ch chan<- tla.Value) *SingleOutputChan

func (*SingleOutputChan) Abort

func (res *SingleOutputChan) Abort() chan struct{}

func (*SingleOutputChan) Close

func (res *SingleOutputChan) Close() error

func (*SingleOutputChan) Commit

func (res *SingleOutputChan) Commit() chan struct{}

func (*SingleOutputChan) PreCommit

func (res *SingleOutputChan) PreCommit() chan error

func (*SingleOutputChan) ReadValue

func (res *SingleOutputChan) ReadValue() (tla.Value, error)

func (*SingleOutputChan) WriteValue

func (res *SingleOutputChan) WriteValue(value tla.Value) error

type TwoPCArchetypeResource

type TwoPCArchetypeResource struct {
	distsys.ArchetypeResourceLeafMixin
	// contains filtered or unexported fields
}

TwoPCArchetypeResource is a struct that contains all the necessary data structures for the 2PC resource to operate as a local resource and communicate with remote 2PC nodes.

func (*TwoPCArchetypeResource) Abort

func (res *TwoPCArchetypeResource) Abort() chan struct{}

Abort aborts the current critical section state. If this node has already completed a 2PC precommit, then it should rollback the PreCommit.

func (*TwoPCArchetypeResource) Close

func (res *TwoPCArchetypeResource) Close() error

Close cleanly shuts down this 2PC instance. NOTE: For now, we keep the listener running

func (*TwoPCArchetypeResource) Commit

func (res *TwoPCArchetypeResource) Commit() chan struct{}

Commit unconditionally commits the local critical section, also performing the 2PC commit at this time. This is safe because PreCommit has already succeeded, thus all Replicas have already accepted the PreCommit and are able to accept the Commit() message.

func (*TwoPCArchetypeResource) PreCommit

func (res *TwoPCArchetypeResource) PreCommit() chan error

PreCommit attempts to perform a PreCommit on the local critical sectionLocal. This triggers the 2PC PreCommit on all relicas. If a majority of replicas accept the PreCommit, then this operation succeeds.

This operation also performs exponential backoff: if the previous precommit failed, then this will wait a while before performing the PreCommit operation.

func (*TwoPCArchetypeResource) ReadValue

func (res *TwoPCArchetypeResource) ReadValue() (tla.Value, error)

ReadValue reads the current value, potential aborting the local critical section

func (*TwoPCArchetypeResource) SetReplicas

func (res *TwoPCArchetypeResource) SetReplicas(replicas []ReplicaHandle)

SetReplicas updates the replicas used for 2PC replication. This function is only for testing; things will likely break if this is called during 2PC operation.

func (*TwoPCArchetypeResource) WriteValue

func (res *TwoPCArchetypeResource) WriteValue(value tla.Value) error

WriteValue writes the given value, potential aborting the local critical state

type TwoPCReceiver

type TwoPCReceiver struct {
	ListenAddr string
	// contains filtered or unexported fields
}

TwoPCReceiver defines the RPC receiver for 2PC communication. The associated functions for this struct are exposed via the RPC interface.

func (*TwoPCReceiver) Receive

func (rcvr *TwoPCReceiver) Receive(arg TwoPCRequest, reply *TwoPCResponse) error

Receive is the generic handler for receiving a 2PC request from another node.

type TwoPCRequest

type TwoPCRequest struct {
	RequestType TwoPCRequestType
	Value       tla.Value
	Sender      tla.Value
	Version     int
	SenderTime  int64
}

TwoPCRequest is the message for a 2PC request, typically sent over an RPC interface.

type TwoPCRequestType

type TwoPCRequestType int

TwoPCRequestType is the type of 2PC message to a remote node.

const (
	PreCommit TwoPCRequestType = iota
	Commit
	Abort
	// Fetch the most recent state from the majority of replicas
	GetState
)

func (TwoPCRequestType) String

func (requestType TwoPCRequestType) String() string

type TwoPCResponse

type TwoPCResponse struct {
	Accept  bool
	Version int
	Value   tla.Value
}

TwoPCRequest is the corresponding response to a 2PC request, also sent via the RPC interface.

type TwoPCState

type TwoPCState int

TwoPCState defines the state of this resource with respect to the 2PC synchronization with remote nodes.

func (TwoPCState) String

func (state TwoPCState) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL