trans

package
v0.0.0-...-e9451bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyFinalized = errors.New("transaction was already finalized")
View Source
var ErrRetry = errors.New("retry transaction")

Functions

func CtxWithTxID

func CtxWithTxID(ctx context.Context, id data.TxID) context.Context

func TxIDFromCtx

func TxIDFromCtx(ctx context.Context) data.TxID

Types

type Algo

type Algo struct {
	// contains filtered or unexported fields
}

func NewAlgo

func NewAlgo(
	c clockwork.Clock,
	g storage.Global,
	local storage.Local,
	locker *Locker,
	mon *Monitor,
	bg *concurr.Background,
	log *slog.Logger,
) Algo

func (Algo) Begin

func (t Algo) Begin(ctx context.Context, d Data) *Handle

func (Algo) Commit

func (t Algo) Commit(ctx context.Context, tx *Handle) error

func (Algo) End

func (t Algo) End(ctx context.Context, tx *Handle) error

func (Algo) Reset

func (t Algo) Reset(tx *Handle, data Data)

func (Algo) ValidateReads

func (t Algo) ValidateReads(ctx context.Context, tx *Handle) error

type Data

type Data struct {
	Reads  []ReadAccess
	Writes []WriteAccess
}

type Handle

type Handle struct {
	// contains filtered or unexported fields
}

type KeyCommitStatus

type KeyCommitStatus struct {
	Status storage.TxCommitStatus
	Value  storage.TValue
}

type LockStats

type LockStats struct {
	Calls   int
	Hits    int
	Retries int
}

type Locker

type Locker struct {
	// contains filtered or unexported fields
}

Locker implements locking on top of the global storage.

Waits and retries are handled transparently.

func NewLocker

func NewLocker(
	l storage.Local,
	g storage.Global,
	tl storage.TLogger,
	clock clockwork.Clock,
	tmon *Monitor,
) *Locker

func (*Locker) LockCreate

func (v *Locker) LockCreate(ctx context.Context, key string, tid data.TxID) error

func (*Locker) LockRead

func (v *Locker) LockRead(ctx context.Context, key string, tid data.TxID) error

func (*Locker) LockType

func (v *Locker) LockType(key string, tid data.TxID) storage.LockType

func (*Locker) LockWrite

func (v *Locker) LockWrite(ctx context.Context, key string, tid data.TxID) error

func (*Locker) LockedPaths

func (v *Locker) LockedPaths(tid data.TxID) []storage.PathLock

func (*Locker) StatsAndReset

func (v *Locker) StatsAndReset() LockStats

func (*Locker) Unlock

func (v *Locker) Unlock(ctx context.Context, key string, tid data.TxID) error

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

func NewMonitor

func NewMonitor(
	c clockwork.Clock,
	l storage.Local,
	tl storage.TLogger,
	b *concurr.Background,
) *Monitor

func (*Monitor) AbortTx

func (m *Monitor) AbortTx(ctx context.Context, tid data.TxID) error

func (*Monitor) BeginTx

func (m *Monitor) BeginTx(_ context.Context, tid data.TxID)

func (*Monitor) CommitTx

func (m *Monitor) CommitTx(ctx context.Context, tl storage.TxLog) error

func (*Monitor) CommittedValue

func (m *Monitor) CommittedValue(ctx context.Context, key string, tid data.TxID) (KeyCommitStatus, error)

func (*Monitor) StartRefreshTx

func (m *Monitor) StartRefreshTx(ctx context.Context, tid data.TxID)

func (*Monitor) TxStatus

func (m *Monitor) TxStatus(ctx context.Context, tid data.TxID) (storage.TxCommitStatus, error)

func (*Monitor) WaitForTx

func (m *Monitor) WaitForTx(ctx context.Context, tid data.TxID) <-chan WaitTxResult

WaitForTx waits asynchronously for the given transaction to complete.

The given context may not be used if other callers are are waiting for the same transaction.

type ReadAccess

type ReadAccess struct {
	Path    string
	Version ReadVersion
	Found   bool
}

type ReadValue

type ReadValue struct {
	Value   []byte
	Version storage.Version
}

type ReadVersion

type ReadVersion struct {
	Version    int64
	LastWriter data.TxID
}

func (ReadVersion) IsLocal

func (r ReadVersion) IsLocal() bool

func (ReadVersion) ToStorageVersion

func (r ReadVersion) ToStorageVersion() storage.Version

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(l storage.Local, g storage.Global, m *Monitor) Reader

func (Reader) GetMetadata

func (r Reader) GetMetadata(
	ctx context.Context,
	key string,
	maxStale time.Duration,
) (backend.Metadata, error)

func (Reader) Read

func (r Reader) Read(
	ctx context.Context,
	key string,
	maxStale time.Duration,
) (ReadValue, error)

type WaitTxResult

type WaitTxResult struct {
	Status storage.TxCommitStatus
	Err    error
}

type WriteAccess

type WriteAccess struct {
	Path   string
	Val    []byte
	Delete bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL