txn

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connector added in v0.2.0

type Connector interface {
	Connect() error
	GetItem(key string) (DataItem, error)
	PutItem(key string, value DataItem) error
	ConditionalUpdate(key string, value DataItem, doCreate bool) error
	Get(name string) (string, error)
	Put(name string, value any) error
	Delete(name string) error
}

type DataItem added in v0.2.0

type DataItem struct {
	Key       string       `redis:"Key" bson:"_id"`
	Value     string       `redis:"Value" bson:"Value"`
	TxnId     string       `redis:"TxnId" bson:"TxnId"`
	TxnState  config.State `redis:"TxnState" bson:"TxnState"`
	TValid    time.Time    `redis:"TValid" bson:"TValid"`
	TLease    time.Time    `redis:"TLease" bson:"TLease"`
	Prev      string       `redis:"Prev" bson:"Prev"`
	LinkedLen int          `redis:"LinkedLen" bson:"LinkedLen"`
	IsDeleted bool         `redis:"IsDeleted" bson:"IsDeleted"`
	Version   int          `redis:"Version" bson:"Version"`
}

func (*DataItem) Equal added in v0.2.0

func (r *DataItem) Equal(other DataItem) bool

func (DataItem) GetKey added in v0.2.0

func (m DataItem) GetKey() string

func (DataItem) MarshalBSONValue added in v0.2.0

func (mi DataItem) MarshalBSONValue() (bsontype.Type, []byte, error)

func (DataItem) MarshalBinary added in v0.2.0

func (r DataItem) MarshalBinary() (data []byte, err error)

func (DataItem) String added in v0.2.0

func (r DataItem) String() string

func (*DataItem) UnmarshalBSONValue added in v0.2.0

func (mi *DataItem) UnmarshalBSONValue(t bsontype.Type, raw []byte) error

type Datastore

type Datastore struct {

	// Name is the name of the datastore.
	Name string

	// Txn is the current transaction.
	Txn *Transaction
	// contains filtered or unexported fields
}

Datastore represents a datastorer implementation using the underlying connector.

func NewDatastore added in v0.2.0

func NewDatastore(name string, conn Connector) *Datastore

NewDatastore creates a new instance of Datastore with the given name and connection. It initializes the read and write caches, as well as the serializer.

func (*Datastore) Abort

func (r *Datastore) Abort(hasCommitted bool) error

Abort discards the changes made in the current transaction. If hasCommitted is false, it clears the write cache. If hasCommitted is true, it rolls back the changes made by the current transaction. It returns an error if there is any issue during the rollback process.

func (*Datastore) Commit

func (r *Datastore) Commit() error

Commit updates the state of records in the data store to COMMITTED. It iterates over the write cache and updates each record's state to COMMITTED. After updating the records, it clears the write cache. Returns an error if there is any issue updating the records.

func (*Datastore) Copy

func (r *Datastore) Copy() Datastorer

Copy returns a new instance of Datastore with the same name and connection. It is used to create a copy of the Datastore object.

func (*Datastore) Delete

func (r *Datastore) Delete(key string) error

Delete deletes a record from the Datastore. It will return an error if the record is not found.

func (*Datastore) DeleteTSR added in v0.2.0

func (r *Datastore) DeleteTSR(txnId string) error

DeleteTSR deletes a transaction with the given transaction ID from the Redis datastore. It returns an error if the deletion operation fails.

func (*Datastore) GetName

func (r *Datastore) GetName() string

GetName returns the name of the MemoryDatastore.

func (*Datastore) Prepare

func (r *Datastore) Prepare() error

Prepare prepares the Datastore for commit.

func (*Datastore) Read

func (r *Datastore) Read(key string, value any) error

Read reads a record from the Datastore.

func (*Datastore) ReadTSR added in v0.2.0

func (r *Datastore) ReadTSR(txnId string) (config.State, error)

ReadTSR reads the transaction state from the Redis datastore. It takes a transaction ID as input and returns the corresponding state and an error, if any.

func (*Datastore) SetSerializer added in v0.2.0

func (r *Datastore) SetSerializer(se serializer.Serializer)

SetSerializer sets the serializer for the Datastore. The serializer is used to serialize and deserialize data when storing and retrieving it from Redis.

func (*Datastore) SetTxn

func (r *Datastore) SetTxn(txn *Transaction)

SetTxn sets the transaction for the MemoryDatastore. It takes a pointer to a Transaction as input and assigns it to the Txn field of the MemoryDatastore.

func (*Datastore) Start

func (r *Datastore) Start() error

Start starts the Datastore by establishing a connection to the underlying server. It returns an error if the connection fails.

func (*Datastore) Write

func (r *Datastore) Write(key string, value any) error

Write writes a record to the cache. It will serialize the value using the Datastore's serializer.

func (*Datastore) WriteTSR added in v0.2.0

func (r *Datastore) WriteTSR(txnId string, txnState config.State) error

WriteTSR writes the transaction state (txnState) associated with the given transaction ID (txnId) to the Redis datastore. It returns an error if the write operation fails.

type Datastorer added in v0.2.0

type Datastorer interface {
	// Start starts a transaction, including initializing the connection.
	Start() error

	// Read reads a record from the data store. If the record is not in the cache (readCache/writeCache),
	// it reads the record from the connection and puts it into the cache.
	Read(key string, value any) error

	// Write writes records into the writeCache.
	Write(key string, value any) error

	// Delete marks a record as deleted.
	Delete(key string) error

	// Prepare executes the prepare phase of transaction commit.
	// It first marks the records in the writeCache with T_commit, TxnId, and TxnState,
	// then it performs `conditionalUpdate` in a global order.
	Prepare() error

	// Commit executes the commit phase of transaction commit.
	// It updates the records in the writeCache to the COMMITTED state
	// in the data store.
	Commit() error

	// Abort aborts the transaction.
	// It rolls back the records in the writeCache to the state before the transaction.
	Abort(hasCommitted bool) error

	// GetName returns the name of the data store.
	GetName() string

	// SetTxn sets the current transaction for the data store.
	SetTxn(txn *Transaction)

	Copy() Datastorer
}

Datastore is an interface that defines the operations for interacting with a data store.

type SourceType

type SourceType string
const (
	// EMPTY  SourceType = "EMPTY"
	LOCAL  SourceType = "LOCAL"
	GLOBAL SourceType = "GLOBAL"
)

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

func NewStateMachine

func NewStateMachine() *StateMachine

func (*StateMachine) CheckState

func (st *StateMachine) CheckState(state config.State) error

func (*StateMachine) GetState

func (st *StateMachine) GetState() config.State

func (*StateMachine) SetState

func (st *StateMachine) SetState(state config.State) error

type TSRMaintainer

type TSRMaintainer interface {
	// ReadTSR reads the transaction state record (TSR) for a transaction.
	ReadTSR(txnId string) (config.State, error)

	// WriteTSR writes the transaction state record (TSR) for a transaction.
	WriteTSR(txnId string, txnState config.State) error

	// DeleteTSR deletes the transaction state record (TSR) for a transaction.
	DeleteTSR(txnId string) error
}

type Transaction

type Transaction struct {
	// TxnId is the unique identifier for the transaction.
	TxnId string
	// TxnStartTime is the timestamp when the transaction started.
	TxnStartTime time.Time
	// TxnCommitTime is the timestamp when the transaction was committed.
	TxnCommitTime time.Time

	*StateMachine
	// contains filtered or unexported fields
}

Transaction represents a transaction in the system. It contains information such as the transaction ID, state, timestamps, datastores, time source, oracle URL, and locker.

func NewTransaction

func NewTransaction() *Transaction

NewTransaction creates a new Transaction object. It initializes the Transaction with default values and returns a pointer to the newly created object.

func (*Transaction) Abort

func (t *Transaction) Abort() error

Abort aborts the transaction. It checks the current state of the transaction and returns an error if the transaction is already committed, aborted, or not started. If the transaction is in a valid state, it sets the transaction state to ABORTED and calls the Abort method on each data store associated with the transaction. Returns an error if any of the data store's Abort method returns an error, otherwise returns nil.

func (*Transaction) AddDatastore

func (t *Transaction) AddDatastore(ds Datastorer) error

AddDatastore adds a datastore to the transaction. It checks if the datastore name is duplicated and returns an error if it is. Otherwise, it sets the transaction for the datastore and adds it to the transaction's datastore map.

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit commits the transaction. It checks the transaction state and performs the prepare phase. If the prepare phase fails, it aborts the transaction and returns an error. Otherwise, it proceeds to the commit phase and commits the transaction in all data stores. Finally, it deletes the transaction state record. Returns an error if any operation fails.

func (*Transaction) Delete

func (t *Transaction) Delete(dsName string, key string) error

Delete deletes a key from the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) DeleteTSR

func (t *Transaction) DeleteTSR() error

DeleteTSR deletes the Transaction Status Record (TSR) associated with the Transaction. It calls the DeleteTSR method of the globalDataStore to perform the deletion. It returns an error if the deletion operation fails.

func (*Transaction) GetTSRState

func (t *Transaction) GetTSRState(txnId string) (config.State, error)

func (*Transaction) Lock

func (t *Transaction) Lock(key string, id string, duration time.Duration) error

Lock locks the specified key with the given ID for the specified duration. If the locker is not set, it returns an error.

func (*Transaction) Read

func (t *Transaction) Read(dsName string, key string, value any) error

Read reads the value associated with the given key from the specified datastore. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) SetGlobalDatastore

func (t *Transaction) SetGlobalDatastore(ds Datastorer)

SetGlobalDatastore sets the global datastore for the transaction. It takes a Datastore parameter and assigns it to the globalDataStore field of the Transaction struct.

func (*Transaction) SetGlobalTimeSource

func (t *Transaction) SetGlobalTimeSource(url string)

SetGlobalTimeSource sets the global time source for the transaction. It takes a URL as a parameter and assigns it to the transaction's oracleURL field. The timeSource field is set to GLOBAL.

func (*Transaction) SetLocker

func (t *Transaction) SetLocker(locker locker.Locker)

SetLocker sets the locker for the transaction. The locker is responsible for managing the concurrency of the transaction. It ensures that only one goroutine can access the transaction at a time. The locker must implement the locker.Locker interface.

func (*Transaction) Start

func (t *Transaction) Start() error

Start begins the transaction. It checks if the transaction is already started and returns an error if so. It also checks if the necessary datastores are added and returns an error if not. It sets the transaction state to STARTED and generates a unique transaction ID. It starts each datastore associated with the transaction. Returns an error if any of the above steps fail, otherwise returns nil.

func (*Transaction) Unlock

func (t *Transaction) Unlock(key string, id string) error

Unlock unlocks the specified key with the given ID. It returns an error if the locker is not set or if unlocking fails.

func (*Transaction) Write

func (t *Transaction) Write(dsName string, key string, value any) error

Write writes the given key-value pair to the specified datastore in the transaction. It returns an error if the transaction is not in the STARTED state or if the datastore is not found.

func (*Transaction) WriteTSR

func (t *Transaction) WriteTSR(txnId string, txnState config.State) error

WriteTSR writes the Transaction State Record (TSR) for the given transaction ID and state. It uses the global data store to persist the TSR. The txnId parameter specifies the ID of the transaction. The txnState parameter specifies the state of the transaction. Returns an error if there was a problem writing the TSR.

type TxnError

type TxnError string
const (
	KeyNotFound      TxnError = "key not found"
	DirtyRead        TxnError = "dirty read"
	DeserializeError TxnError = "deserialize error"
	VersionMismatch  TxnError = "version mismatch"
)

func (TxnError) Error

func (e TxnError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL