txnbase

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IOET_WALTxnEntry            uint16 = 3000
	IOET_WALTxnCommand_Composed uint16 = 3001
	IOET_WALTxnCommand_TxnState uint16 = 3002

	IOET_WALTxnEntry_V1            uint16 = 1
	IOET_WALTxnEntry_V2            uint16 = 2
	IOET_WALTxnCommand_Composed_V1 uint16 = 1
	IOET_WALTxnCommand_TxnState_V1 uint16 = 1

	IOET_WALTxnEntry_CurrVer            = IOET_WALTxnEntry_V2
	IOET_WALTxnCommand_Composed_CurrVer = IOET_WALTxnCommand_Composed_V1
	IOET_WALTxnCommand_TxnState_CurrVer = IOET_WALTxnCommand_TxnState_V1

	// CmdBufReserved is reserved size of cmd buffer, mainly the size of TxnCtx.Memo.
	// ComposedCmd.CmdBufLimit is the max buffer size that could be sent out to log-service.
	// This value is normally the max RPC message size which is configured in config of DN.
	// The message contains mainly commands, but also other information whose size is CmdBufReserved.
	// TODO(volgariver6): this buf size is about the max size of TxnCt.Memo, we need to calculate
	// the exact size of it.
	CmdBufReserved = 1024 * 1024 * 10
)
View Source
const (
	TSUncommitted int32 = iota
	TSCommitting
	TSCommitted
	TSRollbacking
	TSRollbacked
)
View Source
const (
	OpCommit = iota
	OpRollback
	OpPrepare
	OpCommitting
	OpInvalid
)
View Source
const (
	EventRollback = iota + 1
	EventCommitting
	EventCommit
)
View Source
const (
	IDSize = 8 + types.UuidSize + types.BlockidSize + 4 + 2 + 1
)

Variables

View Source
var (
	SnapshotAttr_StartTS       = "start_ts"
	SnapshotAttr_PrepareTS     = "prepare_ts"
	SnapshotAttr_CommitTS      = "commit_ts"
	SnapshotAttr_LogIndex_LSN  = "log_index_lsn"
	SnapshotAttr_LogIndex_CSN  = "log_index_csn"
	SnapshotAttr_LogIndex_Size = "log_index_size"
)
View Source
var DefaultTxnFactory = func(
	mgr *TxnManager,
	store txnif.TxnStore,
	id []byte,
	startTS types.TS,
	snapshotTS types.TS) txnif.AsyncTxn {
	return NewTxn(mgr, store, id, startTS, snapshotTS)
}
View Source
var (
	ErrTransferTransactionState = moerr.NewInternalErrorNoCtx("tae: transfer transaction state error")
)
View Source
var NoopStoreFactory = func() txnif.TxnStore { return new(NoopTxnStore) }

Functions

func BuildCommandFrom

func BuildCommandFrom(buf []byte) (cmd any, err error)

func CompareTxnMVCCNode added in v0.6.0

func CompareTxnMVCCNode(e, o *TxnMVCCNode) int

func IDCtxToID

func IDCtxToID(buf []byte) string

func IDToIDCtx

func IDToIDCtx(id uint64) []byte

func MarshalID

func MarshalID(id *common.ID) []byte

func TxnField

func TxnField(txn txnif.AsyncTxn) zap.Field

func TxnMgrField

func TxnMgrField(mgr *TxnManager) zap.Field

func UnmarshalID

func UnmarshalID(buf []byte) *common.ID

Types

type BaseCmd

type BaseCmd struct{}

func (*BaseCmd) Close

func (base *BaseCmd) Close()

type BaseCustomizedCmd

type BaseCustomizedCmd struct {
	BaseCmd
	ID   uint32
	Impl txnif.TxnCmd
}

func NewBaseCustomizedCmd

func NewBaseCustomizedCmd(id uint32, impl txnif.TxnCmd) *BaseCustomizedCmd

func (*BaseCustomizedCmd) GetID

func (c *BaseCustomizedCmd) GetID() uint32

type ComposedCmd

type ComposedCmd struct {
	Cmds []txnif.TxnCmd

	// CmdBufLimit indicates max cmd buffer size. We can only send out
	// the cmd buffer whose size is less than it.
	CmdBufLimit int64

	// lastPos is the position in the Cmds list, before which the cmds have
	// been marshalled into buffer.
	LastPos int
}

func NewComposedCmd

func NewComposedCmd(maxSize uint64) *ComposedCmd

func (*ComposedCmd) AddCmd

func (cc *ComposedCmd) AddCmd(cmd txnif.TxnCmd)

func (*ComposedCmd) ApplyCommit added in v0.6.0

func (cc *ComposedCmd) ApplyCommit()

func (*ComposedCmd) ApplyRollback added in v0.6.0

func (cc *ComposedCmd) ApplyRollback()

func (*ComposedCmd) Close

func (cc *ComposedCmd) Close()

func (*ComposedCmd) Desc

func (cc *ComposedCmd) Desc() string

func (*ComposedCmd) GetType

func (cc *ComposedCmd) GetType() uint16

func (*ComposedCmd) MarshalBinary added in v0.8.0

func (cc *ComposedCmd) MarshalBinary() (buf []byte, err error)

func (*ComposedCmd) MoreCmds added in v0.8.0

func (cc *ComposedCmd) MoreCmds() bool

func (*ComposedCmd) SetReplayTxn added in v0.6.0

func (cc *ComposedCmd) SetReplayTxn(txn txnif.AsyncTxn)

func (*ComposedCmd) String

func (cc *ComposedCmd) String() string

func (*ComposedCmd) ToDesc

func (cc *ComposedCmd) ToDesc(prefix string) string

func (*ComposedCmd) ToString

func (cc *ComposedCmd) ToString(prefix string) string

func (*ComposedCmd) ToVerboseString

func (cc *ComposedCmd) ToVerboseString(prefix string) string

func (*ComposedCmd) UnmarshalBinary added in v0.8.0

func (cc *ComposedCmd) UnmarshalBinary(buf []byte) (err error)

func (*ComposedCmd) VerboseString

func (cc *ComposedCmd) VerboseString() string

func (*ComposedCmd) WriteTo

func (cc *ComposedCmd) WriteTo(w io.Writer) (n int64, err error)

type MVCCChain added in v0.6.0

type MVCCChain[T txnif.MVCCNode[T]] struct {
	*sync.RWMutex
	MVCC *common.GenericSortedDList[T]
	// contains filtered or unexported fields
}

func NewMVCCChain added in v0.6.0

func NewMVCCChain[T txnif.MVCCNode[T]](comparefn func(T, T) int, newnodefn func() T) *MVCCChain[T]

func (*MVCCChain[T]) Apply1PCCommit added in v0.6.0

func (be *MVCCChain[T]) Apply1PCCommit() error

func (*MVCCChain[T]) ApplyCommit added in v0.6.0

func (be *MVCCChain[T]) ApplyCommit() error

func (*MVCCChain[T]) ApplyRollback added in v0.6.0

func (be *MVCCChain[T]) ApplyRollback() error

func (*MVCCChain[T]) CheckConflict added in v0.6.0

func (be *MVCCChain[T]) CheckConflict(txn txnif.TxnReader) (err error)

func (*MVCCChain[T]) CloneCommittedInRange added in v0.6.0

func (be *MVCCChain[T]) CloneCommittedInRange(start, end types.TS) (ret *MVCCChain[T])

func (*MVCCChain[T]) CloneLatestNode added in v0.6.0

func (be *MVCCChain[T]) CloneLatestNode() (*MVCCChain[T], T)

func (*MVCCChain[T]) ClonePreparedInRange added in v0.6.0

func (be *MVCCChain[T]) ClonePreparedInRange(start, end types.TS) (ret []T)

ClonePreparedInRange will collect all txn node prepared in the time window. Wait txn to complete committing if it didn't.

func (*MVCCChain[T]) Depth added in v0.6.0

func (be *MVCCChain[T]) Depth() int

func (*MVCCChain[T]) GetLatestCommittedNode added in v0.6.0

func (be *MVCCChain[T]) GetLatestCommittedNode() (node T)

GetLatestCommittedNode gets the latest committed mvcc node. It's useful when check whether the catalog/metadata entry is deleted.

func (*MVCCChain[T]) GetLatestNodeLocked added in v0.6.0

func (be *MVCCChain[T]) GetLatestNodeLocked() T

GetLatestNodeLocked gets the latest mvcc node. It is useful in making command, apply state(e.g. ApplyCommit), check confilct.

func (*MVCCChain[T]) GetPrepareTs added in v0.6.0

func (be *MVCCChain[T]) GetPrepareTs() types.TS

for replay

func (*MVCCChain[T]) GetTxn added in v0.6.0

func (be *MVCCChain[T]) GetTxn() txnif.TxnReader

func (*MVCCChain[T]) GetVisibleNode added in v0.6.0

func (be *MVCCChain[T]) GetVisibleNode(txn txnif.TxnReader) (node T)

GetVisibleNode gets mvcc node according to the txnReader. It returns the mvcc node in the same txn as the read txn or returns the latest mvcc node with commitTS less than the timestamp.

func (*MVCCChain[T]) GetVisibleNodeByTS added in v0.8.0

func (be *MVCCChain[T]) GetVisibleNodeByTS(ts types.TS) (node T)

GetVisibleNode gets mvcc node according to the timestamp. It returns the latest mvcc node with commitTS less than the timestamp.

func (*MVCCChain[T]) HasCommittedNode added in v0.6.0

func (be *MVCCChain[T]) HasCommittedNode() bool

func (*MVCCChain[T]) HasCommittedNodeInRange added in v0.6.0

func (be *MVCCChain[T]) HasCommittedNodeInRange(start, end types.TS) (ok bool)

[start, end] Check whether there is any committed node in between [start, end] -----+------+-------+--------+----------+--------->

    |      |       |        |          |      Time
    |     start    |       end         |
commitTs <----- commitTs <--------- commitTs|uncommitted  <=  MVCCChain Header
   (1)            (2)                 (3)

func (*MVCCChain[T]) HasUncommittedNode added in v0.6.0

func (be *MVCCChain[T]) HasUncommittedNode() bool

func (*MVCCChain[T]) Insert added in v0.6.0

func (be *MVCCChain[T]) Insert(vun T) (node *common.GenericDLNode[T])

func (*MVCCChain[T]) IsCommitted added in v0.6.0

func (be *MVCCChain[T]) IsCommitted() bool

func (*MVCCChain[T]) IsCommitting added in v0.6.0

func (be *MVCCChain[T]) IsCommitting() bool

In /Catalog, there're three states: Active, Committing and Committed. A txn is Active before its CommitTs is allocated. It's Committed when its state will never change, i.e. TxnStateCommitted and TxnStateRollbacked. It's Committing when it's in any other state, including TxnStateCommitting, TxnStateRollbacking, TxnStatePrepared and so on. When read or write an entry, if the last txn of the entry is Committing, we wait for it. When write on an Entry, if there's an Active txn, we report w-w conflict.

func (*MVCCChain[T]) IsCreatingOrAborted added in v0.6.0

func (be *MVCCChain[T]) IsCreatingOrAborted() bool

func (*MVCCChain[T]) IsEmpty added in v0.6.0

func (be *MVCCChain[T]) IsEmpty() bool

func (*MVCCChain[T]) LoopChain added in v0.6.0

func (be *MVCCChain[T]) LoopChain(fn func(T) bool)

func (*MVCCChain[T]) MustOneNodeLocked added in v0.6.0

func (be *MVCCChain[T]) MustOneNodeLocked() (T, bool)

func (*MVCCChain[T]) NeedWaitCommitting added in v0.6.0

func (be *MVCCChain[T]) NeedWaitCommitting(ts types.TS) (bool, txnif.TxnReader)

func (*MVCCChain[T]) PrepareCommit added in v0.6.0

func (be *MVCCChain[T]) PrepareCommit() error

func (*MVCCChain[T]) PrepareRollback added in v0.6.0

func (be *MVCCChain[T]) PrepareRollback() (bool, error)

func (*MVCCChain[T]) SearchNode added in v0.6.0

func (be *MVCCChain[T]) SearchNode(o T) (node T)

It's only used in replay

func (*MVCCChain[T]) StringLocked added in v0.6.0

func (be *MVCCChain[T]) StringLocked() string

func (*MVCCChain[T]) WriteAllTo added in v0.6.0

func (be *MVCCChain[T]) WriteAllTo(w io.Writer) (n int64, err error)

func (*MVCCChain[T]) WriteOneNodeTo added in v0.6.0

func (be *MVCCChain[T]) WriteOneNodeTo(w io.Writer) (n int64, err error)

type MVCCSlice added in v0.6.0

type MVCCSlice[T txnif.MVCCNode[T]] struct {
	MVCC []T
	// contains filtered or unexported fields
}

func NewMVCCSlice added in v0.6.0

func NewMVCCSlice[T txnif.MVCCNode[T]](newnodefn func() T,
	comparefn func(T, T) int) *MVCCSlice[T]

func (*MVCCSlice[T]) DeleteNode added in v0.6.0

func (be *MVCCSlice[T]) DeleteNode(node T)

func (*MVCCSlice[T]) ForEach added in v0.6.0

func (be *MVCCSlice[T]) ForEach(fn func(un T) bool, reverse bool)

func (*MVCCSlice[T]) GetCommittedNode added in v0.6.0

func (be *MVCCSlice[T]) GetCommittedNode() (node T)

GetCommittedNode gets the latest committed UpdateNode. It's useful when check whether the catalog/metadata entry is deleted.

func (*MVCCSlice[T]) GetLastNonAbortedNode added in v0.6.0

func (be *MVCCSlice[T]) GetLastNonAbortedNode() (node T)

func (*MVCCSlice[T]) GetNodeByOffset added in v0.6.0

func (be *MVCCSlice[T]) GetNodeByOffset(offset int) T

func (*MVCCSlice[T]) GetNodeToReadByPrepareTS added in v0.6.0

func (be *MVCCSlice[T]) GetNodeToReadByPrepareTS(ts types.TS) (offset int, node T)

GetNodeToRead gets UpdateNode according to the timestamp. It returns the UpdateNode in the same txn as the read txn or returns the latest UpdateNode with commitTS less than the timestamp. todo getend or getcommitts

func (*MVCCSlice[T]) GetTs added in v0.6.0

func (be *MVCCSlice[T]) GetTs() types.TS

for replay

func (*MVCCSlice[T]) GetUpdateNodeLocked added in v0.6.0

func (be *MVCCSlice[T]) GetUpdateNodeLocked() T

GetUpdateNode gets the latest UpdateNode. It is useful in making command, apply state(e.g. ApplyCommit), check confilct.

func (*MVCCSlice[T]) InsertNode added in v0.6.0

func (be *MVCCSlice[T]) InsertNode(un T)

func (*MVCCSlice[T]) IsCommitted added in v0.6.0

func (be *MVCCSlice[T]) IsCommitted() bool

func (*MVCCSlice[T]) IsCommitting added in v0.6.0

func (be *MVCCSlice[T]) IsCommitting() bool

func (*MVCCSlice[T]) IsEmpty added in v0.6.0

func (be *MVCCSlice[T]) IsEmpty() bool

func (*MVCCSlice[T]) LoopInRange added in v0.6.0

func (be *MVCCSlice[T]) LoopInRange(start, end types.TS, fn func(T) bool)

func (*MVCCSlice[T]) LoopOffsetRange added in v0.6.0

func (be *MVCCSlice[T]) LoopOffsetRange(start, end int, fn func(T) bool)

func (*MVCCSlice[T]) SearchNode added in v0.6.0

func (be *MVCCSlice[T]) SearchNode(o T) (node T)

func (*MVCCSlice[T]) SearchNodeByCompareFn added in v0.6.0

func (be *MVCCSlice[T]) SearchNodeByCompareFn(fn func(a T) int) (offset int, node T)

func (*MVCCSlice[T]) SearchNodeByTS added in v0.6.0

func (be *MVCCSlice[T]) SearchNodeByTS(ts types.TS) (node T)

func (*MVCCSlice[T]) StringLocked added in v0.6.0

func (be *MVCCSlice[T]) StringLocked() string

type NoopCommitListener added in v0.6.0

type NoopCommitListener struct{}

func (*NoopCommitListener) OnBeginPrePrepare added in v0.6.0

func (bl *NoopCommitListener) OnBeginPrePrepare(txn txnif.AsyncTxn)

func (*NoopCommitListener) OnEndPrePrepare added in v0.6.0

func (bl *NoopCommitListener) OnEndPrePrepare(txn txnif.AsyncTxn)

type NoopTxnStore

type NoopTxnStore struct{}

func (*NoopTxnStore) AddBlksWithMetaLoc added in v0.7.0

func (store *NoopTxnStore) AddBlksWithMetaLoc(
	ctx context.Context,
	dbId, tid uint64,
	stats containers.Vector,
) error

func (*NoopTxnStore) AddTxnEntry

func (store *NoopTxnStore) AddTxnEntry(t txnif.TxnEntryType, entry txnif.TxnEntry)

func (*NoopTxnStore) AddWaitEvent added in v0.8.0

func (store *NoopTxnStore) AddWaitEvent(cnt int)

func (*NoopTxnStore) Append

func (store *NoopTxnStore) Append(ctx context.Context, dbId, id uint64, data *containers.Batch) error

func (*NoopTxnStore) Apply2PCPrepare added in v0.6.0

func (store *NoopTxnStore) Apply2PCPrepare() error

func (*NoopTxnStore) ApplyCommit

func (store *NoopTxnStore) ApplyCommit() error

func (*NoopTxnStore) ApplyRollback

func (store *NoopTxnStore) ApplyRollback() error

func (*NoopTxnStore) BatchDedup

func (store *NoopTxnStore) BatchDedup(
	uint64, uint64, containers.Vector,
) (err error)

func (*NoopTxnStore) BindTxn

func (store *NoopTxnStore) BindTxn(txn txnif.AsyncTxn)

func (*NoopTxnStore) Close

func (store *NoopTxnStore) Close() error

func (*NoopTxnStore) CreateBlock

func (store *NoopTxnStore) CreateBlock(*common.ID, bool) (blk handle.Block, err error)

func (*NoopTxnStore) CreateDatabase

func (store *NoopTxnStore) CreateDatabase(name, creatSql, datTyp string) (db handle.Database, err error)

func (*NoopTxnStore) CreateDatabaseWithID added in v0.6.0

func (store *NoopTxnStore) CreateDatabaseWithID(name, createSql, datTyp string, id uint64) (db handle.Database, err error)

func (*NoopTxnStore) CreateNonAppendableBlock

func (store *NoopTxnStore) CreateNonAppendableBlock(*common.ID, *objectio.CreateBlockOpt) (blk handle.Block, err error)

func (*NoopTxnStore) CreateNonAppendableObject added in v1.1.0

func (store *NoopTxnStore) CreateNonAppendableObject(dbId, tid uint64, _ bool) (obj handle.Object, err error)

func (*NoopTxnStore) CreateObject added in v1.1.0

func (store *NoopTxnStore) CreateObject(dbId, tid uint64, is1PC bool) (obj handle.Object, err error)

func (*NoopTxnStore) CreateRelation

func (store *NoopTxnStore) CreateRelation(dbId uint64, def any) (rel handle.Relation, err error)

func (*NoopTxnStore) CreateRelationWithTableId added in v0.6.0

func (store *NoopTxnStore) CreateRelationWithTableId(dbId uint64, tableId uint64, def any) (rel handle.Relation, err error)

func (*NoopTxnStore) DatabaseNames

func (store *NoopTxnStore) DatabaseNames() (names []string)

func (*NoopTxnStore) DoneWaitEvent added in v0.8.0

func (store *NoopTxnStore) DoneWaitEvent(cnt int)

func (*NoopTxnStore) DropDatabase

func (store *NoopTxnStore) DropDatabase(name string) (db handle.Database, err error)

func (*NoopTxnStore) DropDatabaseByID added in v0.6.0

func (store *NoopTxnStore) DropDatabaseByID(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) DropRelationByID added in v0.6.0

func (store *NoopTxnStore) DropRelationByID(dbId uint64, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) DropRelationByName

func (store *NoopTxnStore) DropRelationByName(dbId uint64, name string) (rel handle.Relation, err error)

func (*NoopTxnStore) EndTrace added in v1.1.2

func (store *NoopTxnStore) EndTrace()

func (*NoopTxnStore) Freeze added in v0.8.0

func (store *NoopTxnStore) Freeze() error

func (*NoopTxnStore) GetBlock

func (store *NoopTxnStore) GetBlock(id *common.ID) (blk handle.Block, err error)

func (*NoopTxnStore) GetByFilter

func (store *NoopTxnStore) GetByFilter(
	context.Context, uint64, uint64, *handle.Filter,
) (id *common.ID, offset uint32, err error)

func (*NoopTxnStore) GetContext added in v0.8.0

func (store *NoopTxnStore) GetContext() context.Context

func (*NoopTxnStore) GetDatabase

func (store *NoopTxnStore) GetDatabase(name string) (db handle.Database, err error)

func (*NoopTxnStore) GetDatabaseByID added in v0.6.0

func (store *NoopTxnStore) GetDatabaseByID(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) GetDirty added in v0.6.0

func (store *NoopTxnStore) GetDirty() *model.Tree

func (*NoopTxnStore) GetDirtyTableByID added in v0.6.0

func (store *NoopTxnStore) GetDirtyTableByID(id uint64) *model.TableTree

func (*NoopTxnStore) GetLSN

func (store *NoopTxnStore) GetLSN() uint64

func (*NoopTxnStore) GetObject added in v1.1.0

func (store *NoopTxnStore) GetObject(id *common.ID) (obj handle.Object, err error)

func (*NoopTxnStore) GetRelationByID added in v0.6.0

func (store *NoopTxnStore) GetRelationByID(dbId uint64, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) GetRelationByName

func (store *NoopTxnStore) GetRelationByName(dbId uint64, name string) (rel handle.Relation, err error)

func (*NoopTxnStore) GetTransactionType added in v0.8.0

func (store *NoopTxnStore) GetTransactionType() txnif.TxnType

func (*NoopTxnStore) GetValue

func (store *NoopTxnStore) GetValue(
	*common.ID, uint32, uint16,
) (v any, isNull bool, err error)

func (*NoopTxnStore) HasAnyTableDataChanges added in v0.6.0

func (store *NoopTxnStore) HasAnyTableDataChanges() bool

func (*NoopTxnStore) HasCatalogChanges added in v0.6.0

func (store *NoopTxnStore) HasCatalogChanges() bool

func (*NoopTxnStore) HasTableDataChanges added in v0.6.0

func (store *NoopTxnStore) HasTableDataChanges(id uint64) bool

func (*NoopTxnStore) IncreateWriteCnt

func (store *NoopTxnStore) IncreateWriteCnt() int

func (*NoopTxnStore) IsReadonly

func (store *NoopTxnStore) IsReadonly() bool

func (*NoopTxnStore) LogTxnEntry

func (store *NoopTxnStore) LogTxnEntry(dbId, tableId uint64, entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*NoopTxnStore) LogTxnState added in v0.6.0

func (store *NoopTxnStore) LogTxnState(sync bool) (logEntry entry.Entry, err error)

func (*NoopTxnStore) ObserveTxn added in v0.8.0

func (store *NoopTxnStore) ObserveTxn(
	visitDatabase func(db any),
	visitTable func(tbl any),
	rotateTable func(dbName, tblName string, dbid, tid uint64),
	visitMetadata func(block any),
	visitObject func(any),
	visitAppend func(bat any),
	visitDelete func(ctx context.Context, deletes txnif.DeleteNode))

func (*NoopTxnStore) PreApplyCommit

func (store *NoopTxnStore) PreApplyCommit() error

func (*NoopTxnStore) PrePrepare added in v0.6.0

func (store *NoopTxnStore) PrePrepare(ctx context.Context) error

func (*NoopTxnStore) PrepareCommit

func (store *NoopTxnStore) PrepareCommit() error

func (*NoopTxnStore) PrepareRollback

func (store *NoopTxnStore) PrepareRollback() error

func (*NoopTxnStore) PrepareWAL added in v0.8.0

func (store *NoopTxnStore) PrepareWAL() error

func (*NoopTxnStore) RangeDelete

func (store *NoopTxnStore) RangeDelete(
	*common.ID, uint32, uint32, containers.Vector, handle.DeleteType,
) (err error)

func (*NoopTxnStore) SetContext added in v1.0.0

func (store *NoopTxnStore) SetContext(context.Context)

func (*NoopTxnStore) SoftDeleteBlock

func (store *NoopTxnStore) SoftDeleteBlock(id *common.ID) (err error)

func (*NoopTxnStore) SoftDeleteObject added in v1.1.0

func (store *NoopTxnStore) SoftDeleteObject(id *common.ID) (err error)

func (*NoopTxnStore) StartTrace added in v1.1.2

func (store *NoopTxnStore) StartTrace()

func (*NoopTxnStore) TriggerTrace added in v1.1.2

func (store *NoopTxnStore) TriggerTrace(uint8)

func (*NoopTxnStore) TryDeleteByDeltaloc added in v1.0.0

func (store *NoopTxnStore) TryDeleteByDeltaloc(id *common.ID, deltaloc objectio.Location) (ok bool, err error)

func (*NoopTxnStore) UnsafeGetDatabase added in v0.6.0

func (store *NoopTxnStore) UnsafeGetDatabase(id uint64) (db handle.Database, err error)

func (*NoopTxnStore) UnsafeGetRelation added in v0.6.0

func (store *NoopTxnStore) UnsafeGetRelation(dbId, id uint64) (rel handle.Relation, err error)

func (*NoopTxnStore) Update

func (store *NoopTxnStore) Update(uint64, *common.ID, uint32, uint16, any) (err error)

func (*NoopTxnStore) UpdateDeltaLoc added in v0.6.0

func (store *NoopTxnStore) UpdateDeltaLoc(id *common.ID, un objectio.Location) (err error)

func (*NoopTxnStore) UpdateMetaLoc added in v0.6.0

func (store *NoopTxnStore) UpdateMetaLoc(id *common.ID, un objectio.Location) (err error)

func (*NoopTxnStore) UpdateObjectStats added in v1.1.0

func (store *NoopTxnStore) UpdateObjectStats(*common.ID, *objectio.ObjectStats) error

func (*NoopTxnStore) WaitPrepared added in v0.6.0

func (store *NoopTxnStore) WaitPrepared(ctx context.Context) (err error)

type OpTxn

type OpTxn struct {
	Txn txnif.AsyncTxn
	Op  OpType
	// contains filtered or unexported fields
}

func (*OpTxn) Is2PC added in v0.6.0

func (txn *OpTxn) Is2PC() bool

func (*OpTxn) IsReplay added in v0.6.0

func (txn *OpTxn) IsReplay() bool

func (*OpTxn) IsTryCommitting added in v0.6.0

func (txn *OpTxn) IsTryCommitting() bool

func (*OpTxn) Repr

func (txn *OpTxn) Repr() string

type OpType

type OpType int8

type Txn

type Txn struct {
	*TxnCtx
	Mgr                      *TxnManager
	Store                    txnif.TxnStore
	Err                      error
	LSN                      uint64
	TenantID, UserID, RoleID atomic.Uint32

	DedupType txnif.DedupType

	PrepareCommitFn   func(txnif.AsyncTxn) error
	PrepareRollbackFn func(txnif.AsyncTxn) error
	ApplyCommitFn     func(txnif.AsyncTxn) error
	ApplyRollbackFn   func(txnif.AsyncTxn) error
	// contains filtered or unexported fields
}

func MockTxnReaderWithStartTS added in v0.8.0

func MockTxnReaderWithStartTS(startTS types.TS) *Txn

func NewPersistedTxn added in v0.6.0

func NewPersistedTxn(
	mgr *TxnManager,
	ctx *TxnCtx,
	store txnif.TxnStore,
	lsn uint64,
	prepareCommitFn func(txnif.AsyncTxn) error,
	prepareRollbackFn func(txnif.AsyncTxn) error,
	applyCommitFn func(txnif.AsyncTxn) error,
	applyRollbackFn func(txnif.AsyncTxn) error) *Txn

func NewTxn

func NewTxn(mgr *TxnManager, store txnif.TxnStore, txnId []byte, start, snapshot types.TS) *Txn

func (*Txn) ApplyCommit

func (txn *Txn) ApplyCommit() (err error)

func (*Txn) ApplyRollback

func (txn *Txn) ApplyRollback() (err error)

func (*Txn) BindAccessInfo added in v0.6.0

func (txn *Txn) BindAccessInfo(tenantID, userID, roleID uint32)

func (*Txn) Commit

func (txn *Txn) Commit(ctx context.Context) (err error)

Commit is used to commit a 1PC or 2PC transaction running on Coordinator or running on Participant. Notice that the Commit of a 2PC transaction must be success once the Commit message arrives, since Preparing had already succeeded.

func (*Txn) CommitInRecovery added in v0.6.0

func (txn *Txn) CommitInRecovery(ctx context.Context) (err error)

CommitInRecovery is called during recovery

func (*Txn) Committing added in v0.6.0

func (txn *Txn) Committing() (err error)

Committing is used to record a "committing" status for coordinator. Notice that txn must commit successfully once committing message arrives, since Preparing had already succeeded.

func (*Txn) CommittingInRecovery added in v0.6.0

func (txn *Txn) CommittingInRecovery() (err error)

func (*Txn) CreateDatabase

func (txn *Txn) CreateDatabase(name, createSql, datTyp string) (db handle.Database, err error)

func (*Txn) CreateDatabaseWithCtx added in v0.8.0

func (txn *Txn) CreateDatabaseWithCtx(ctx context.Context,
	name, createSql, datTyp string, id uint64) (db handle.Database, err error)

func (*Txn) CurrentDatabase

func (txn *Txn) CurrentDatabase() (db handle.Database)

func (*Txn) DatabaseNames

func (txn *Txn) DatabaseNames() (names []string)

func (*Txn) DoneWithErr

func (txn *Txn) DoneWithErr(err error, isAbort bool)

func (*Txn) DropDatabase

func (txn *Txn) DropDatabase(name string) (db handle.Database, err error)

func (*Txn) DropDatabaseByID added in v0.6.0

func (txn *Txn) DropDatabaseByID(id uint64) (db handle.Database, err error)

func (*Txn) Freeze added in v0.8.0

func (txn *Txn) Freeze() error

func (*Txn) GetBase added in v1.0.0

func (txn *Txn) GetBase() txnif.BaseTxn

func (*Txn) GetContext added in v0.8.0

func (txn *Txn) GetContext() context.Context

func (*Txn) GetDatabase

func (txn *Txn) GetDatabase(name string) (db handle.Database, err error)

func (*Txn) GetDatabaseByID added in v0.6.0

func (txn *Txn) GetDatabaseByID(id uint64) (db handle.Database, err error)

func (*Txn) GetDatabaseWithCtx added in v0.8.0

func (txn *Txn) GetDatabaseWithCtx(_ context.Context, _ string) (db handle.Database, err error)

func (*Txn) GetDedupType added in v0.8.0

func (txn *Txn) GetDedupType() txnif.DedupType

func (*Txn) GetError

func (txn *Txn) GetError() error

func (*Txn) GetLSN

func (txn *Txn) GetLSN() uint64

func (*Txn) GetLsn added in v0.6.0

func (txn *Txn) GetLsn() uint64

func (*Txn) GetStore

func (txn *Txn) GetStore() txnif.TxnStore

func (*Txn) GetTenantID added in v0.6.0

func (txn *Txn) GetTenantID() uint32

func (*Txn) GetUserAndRoleID added in v0.6.0

func (txn *Txn) GetUserAndRoleID() (uint32, uint32)

func (*Txn) IsReplay added in v0.6.0

func (txn *Txn) IsReplay() bool

func (*Txn) LogTxnEntry

func (txn *Txn) LogTxnEntry(dbId, tableId uint64, entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*Txn) LogTxnState added in v0.6.0

func (txn *Txn) LogTxnState(sync bool) (logEntry entry.Entry, err error)

func (*Txn) MarshalLogObject

func (txn *Txn) MarshalLogObject(enc zapcore.ObjectEncoder) (err error)

func (*Txn) MockIncWriteCnt

func (txn *Txn) MockIncWriteCnt() int

func (*Txn) PreApplyCommit

func (txn *Txn) PreApplyCommit() (err error)

func (*Txn) PrePrepare added in v0.6.0

func (txn *Txn) PrePrepare(ctx context.Context) error

func (*Txn) Prepare added in v0.6.0

func (txn *Txn) Prepare(ctx context.Context) (pts types.TS, err error)

Prepare is used to pre-commit a 2PC distributed transaction. Notice that once any error happened, we should rollback the txn. TODO:

  1. How to handle the case in which log service timed out?
  2. For a 2pc transaction, Rollback message may arrive before Prepare message, should handle this case by TxnStorage?

func (*Txn) PrepareCommit

func (txn *Txn) PrepareCommit() (err error)

func (*Txn) PrepareRollback

func (txn *Txn) PrepareRollback() (err error)

func (*Txn) PrepareWAL added in v0.8.0

func (txn *Txn) PrepareWAL() (err error)

func (*Txn) Rollback

func (txn *Txn) Rollback(ctx context.Context) (err error)

Rollback is used to roll back a 1PC or 2PC transaction. Notice that there may be a such scenario in which a 2PC distributed transaction in ACTIVE will be rollbacked, since Rollback message may arrive before the Prepare message.

func (*Txn) SetApplyCommitFn

func (txn *Txn) SetApplyCommitFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetApplyRollbackFn

func (txn *Txn) SetApplyRollbackFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetDedupType added in v0.8.0

func (txn *Txn) SetDedupType(dedupType txnif.DedupType)

func (*Txn) SetError

func (txn *Txn) SetError(err error)

func (*Txn) SetPrepareCommitFn

func (txn *Txn) SetPrepareCommitFn(fn func(txnif.AsyncTxn) error)

func (*Txn) SetPrepareRollbackFn

func (txn *Txn) SetPrepareRollbackFn(fn func(txnif.AsyncTxn) error)

func (*Txn) String

func (txn *Txn) String() string

func (*Txn) UnsafeGetDatabase added in v0.6.0

func (txn *Txn) UnsafeGetDatabase(id uint64) (db handle.Database, err error)

func (*Txn) UnsafeGetRelation added in v0.6.0

func (txn *Txn) UnsafeGetRelation(dbId, id uint64) (db handle.Relation, err error)

func (*Txn) UseDatabase

func (txn *Txn) UseDatabase(name string) (err error)

func (*Txn) WaitDone

func (txn *Txn) WaitDone(err error, isAbort bool) error

func (*Txn) WaitPrepared added in v0.6.0

func (txn *Txn) WaitPrepared(ctx context.Context) error

type TxnBlock

type TxnBlock struct {
	Txn txnif.AsyncTxn
	Seg handle.Object
}

func (*TxnBlock) Append

func (blk *TxnBlock) Append(*containers.Batch, uint32) (n uint32, err error)

func (*TxnBlock) Close

func (blk *TxnBlock) Close() error

func (*TxnBlock) Fingerprint

func (blk *TxnBlock) Fingerprint() *common.ID

func (*TxnBlock) GetByFilter

func (blk *TxnBlock) GetByFilter(*handle.Filter) (offset uint32, err error)

func (*TxnBlock) GetMeta

func (blk *TxnBlock) GetMeta() any

func (*TxnBlock) GetObject added in v1.1.0

func (blk *TxnBlock) GetObject() (obj handle.Object)

func (*TxnBlock) GetTotalChanges

func (blk *TxnBlock) GetTotalChanges() int

func (*TxnBlock) ID

func (blk *TxnBlock) ID() uint64

func (*TxnBlock) IsAppendableBlock

func (blk *TxnBlock) IsAppendableBlock() bool

func (*TxnBlock) PushDeleteOp

func (blk *TxnBlock) PushDeleteOp(handle.Filter) (err error)

func (*TxnBlock) PushUpdateOp

func (blk *TxnBlock) PushUpdateOp(handle.Filter, string, any) (err error)

func (*TxnBlock) RangeDelete

func (blk *TxnBlock) RangeDelete(uint32, uint32, handle.DeleteType) (err error)

func (*TxnBlock) Reset added in v0.8.0

func (blk *TxnBlock) Reset()

func (*TxnBlock) Rows

func (blk *TxnBlock) Rows() int

func (*TxnBlock) String

func (blk *TxnBlock) String() string

func (*TxnBlock) Update

func (blk *TxnBlock) Update(uint32, uint16, any) (err error)

type TxnCmd added in v0.6.0

type TxnCmd struct {
	*ComposedCmd
	*TxnCtx
	Txn txnif.AsyncTxn

	Lsn uint64
	// contains filtered or unexported fields
}

func NewEmptyTxnCmd added in v0.6.0

func NewEmptyTxnCmd() *TxnCmd

func NewLastTxnCmd added in v0.8.0

func NewLastTxnCmd() *TxnCmd

func NewTxnCmd added in v0.6.0

func NewTxnCmd(maxMessageSize uint64) *TxnCmd

func (*TxnCmd) ApplyCommit added in v0.6.0

func (c *TxnCmd) ApplyCommit()

func (*TxnCmd) ApplyRollback added in v0.6.0

func (c *TxnCmd) ApplyRollback()

func (*TxnCmd) Close added in v0.6.0

func (c *TxnCmd) Close()

func (*TxnCmd) Desc added in v0.6.0

func (c *TxnCmd) Desc() string

func (*TxnCmd) GetType added in v0.6.0

func (c *TxnCmd) GetType() uint16

func (*TxnCmd) IsLastCmd added in v0.8.0

func (c *TxnCmd) IsLastCmd() bool

func (*TxnCmd) MarshalBinary added in v0.8.0

func (c *TxnCmd) MarshalBinary() (buf []byte, err error)

func (*TxnCmd) ReadFromWithVersion added in v1.1.0

func (c *TxnCmd) ReadFromWithVersion(r io.Reader, ver uint16) (n int64, err error)

func (*TxnCmd) SetReplayTxn added in v0.6.0

func (c *TxnCmd) SetReplayTxn(txn txnif.AsyncTxn)

func (*TxnCmd) SetTxn added in v0.6.0

func (c *TxnCmd) SetTxn(txn txnif.AsyncTxn)

func (*TxnCmd) String added in v0.6.0

func (c *TxnCmd) String() string

func (*TxnCmd) UnmarshalBinaryWithVersion added in v1.1.0

func (c *TxnCmd) UnmarshalBinaryWithVersion(buf []byte, ver uint16) (err error)

func (*TxnCmd) VerboseString added in v0.6.0

func (c *TxnCmd) VerboseString() string

func (*TxnCmd) WriteTo added in v0.6.0

func (c *TxnCmd) WriteTo(w io.Writer) (n int64, err error)

type TxnCommitListener added in v0.6.0

type TxnCommitListener interface {
	OnBeginPrePrepare(txnif.AsyncTxn)
	OnEndPrePrepare(txnif.AsyncTxn)
	OnEndPrepareWAL(txnif.AsyncTxn)
}

type TxnCtx

type TxnCtx struct {
	sync.RWMutex
	sync.WaitGroup
	DoneCond                     sync.Cond
	ID                           string
	IDCtx                        []byte
	StartTS, CommitTS, PrepareTS types.TS

	// SnapshotTS is the specified snapshot timestamp used by this txn
	SnapshotTS types.TS

	State        txnif.TxnState
	Participants []uint64

	// Memo is not thread-safe
	// It will be readonly when txn state is not txnif.TxnStateActive
	Memo *txnif.TxnMemo
}

func NewEmptyTxnCtx added in v0.6.0

func NewEmptyTxnCtx() *TxnCtx

func NewTxnCtx

func NewTxnCtx(id []byte, start, snapshot types.TS) *TxnCtx

func (*TxnCtx) CommitAfter added in v0.5.1

func (ctx *TxnCtx) CommitAfter(startTs types.TS) bool

func (*TxnCtx) CommitBefore added in v0.5.1

func (ctx *TxnCtx) CommitBefore(startTs types.TS) bool

func (*TxnCtx) GetCommitTS

func (ctx *TxnCtx) GetCommitTS() types.TS

func (*TxnCtx) GetCtx

func (ctx *TxnCtx) GetCtx() []byte

func (*TxnCtx) GetID

func (ctx *TxnCtx) GetID() string

func (*TxnCtx) GetMemo added in v0.6.0

func (ctx *TxnCtx) GetMemo() *txnif.TxnMemo

func (*TxnCtx) GetParticipants added in v0.6.0

func (ctx *TxnCtx) GetParticipants() []uint64

func (*TxnCtx) GetPrepareTS added in v0.6.0

func (ctx *TxnCtx) GetPrepareTS() types.TS

func (*TxnCtx) GetSnapshotTS added in v0.8.0

func (ctx *TxnCtx) GetSnapshotTS() types.TS

func (*TxnCtx) GetStartTS

func (ctx *TxnCtx) GetStartTS() types.TS

func (*TxnCtx) GetTxnState added in v0.6.0

func (ctx *TxnCtx) GetTxnState(waitIfCommitting bool) (state txnif.TxnState)

False when atomically get the current txn state

True when the txn state is committing, wait it to be committed or rollbacked. It is used during snapshot reads. If TxnStateActive is currently returned, this value will definitely not be used, because even if it becomes TxnStatePreparing later, the timestamp would be larger than the current read timestamp.

func (*TxnCtx) HasSnapshotLag added in v0.8.0

func (ctx *TxnCtx) HasSnapshotLag() bool

func (*TxnCtx) Is2PC added in v0.6.0

func (ctx *TxnCtx) Is2PC() bool

func (*TxnCtx) IsActiveLocked

func (ctx *TxnCtx) IsActiveLocked() bool

func (*TxnCtx) IsReplay added in v0.6.0

func (ctx *TxnCtx) IsReplay() bool

func (*TxnCtx) IsVisible

func (ctx *TxnCtx) IsVisible(o txnif.TxnReader) bool

func (*TxnCtx) MockStartTS added in v0.8.0

func (ctx *TxnCtx) MockStartTS(ts types.TS)

test only Note: unsafe

func (*TxnCtx) Repr

func (ctx *TxnCtx) Repr() string

func (*TxnCtx) SameTxn added in v0.5.1

func (ctx *TxnCtx) SameTxn(txn txnif.TxnReader) bool

func (*TxnCtx) SetCommitTS added in v0.6.0

func (ctx *TxnCtx) SetCommitTS(cts types.TS) (err error)

func (*TxnCtx) SetParticipants added in v0.6.0

func (ctx *TxnCtx) SetParticipants(ids []uint64) (err error)

func (*TxnCtx) SetSnapshotTS added in v0.8.0

func (ctx *TxnCtx) SetSnapshotTS(ts types.TS)

func (*TxnCtx) String

func (ctx *TxnCtx) String() string

func (*TxnCtx) ToCommittedLocked

func (ctx *TxnCtx) ToCommittedLocked() error

func (*TxnCtx) ToCommittingFinished added in v0.6.0

func (ctx *TxnCtx) ToCommittingFinished() (err error)

func (*TxnCtx) ToCommittingFinishedLocked added in v0.6.0

func (ctx *TxnCtx) ToCommittingFinishedLocked() (err error)

func (*TxnCtx) ToPrepared added in v0.6.0

func (ctx *TxnCtx) ToPrepared() (err error)

func (*TxnCtx) ToPreparedLocked added in v0.6.0

func (ctx *TxnCtx) ToPreparedLocked() (err error)

func (*TxnCtx) ToPreparingLocked added in v0.6.0

func (ctx *TxnCtx) ToPreparingLocked(ts types.TS) error

func (*TxnCtx) ToRollbackedLocked

func (ctx *TxnCtx) ToRollbackedLocked() error

func (*TxnCtx) ToRollbacking added in v0.6.0

func (ctx *TxnCtx) ToRollbacking(ts types.TS) error

func (*TxnCtx) ToRollbackingLocked

func (ctx *TxnCtx) ToRollbackingLocked(ts types.TS) error

func (*TxnCtx) ToUnknownLocked

func (ctx *TxnCtx) ToUnknownLocked()

type TxnDatabase

type TxnDatabase struct {
	Txn txnif.AsyncTxn
}

func (*TxnDatabase) Close

func (db *TxnDatabase) Close() error

func (*TxnDatabase) CreateRelation

func (db *TxnDatabase) CreateRelation(def any) (rel handle.Relation, err error)

func (*TxnDatabase) DropRelationByName

func (db *TxnDatabase) DropRelationByName(name string) (rel handle.Relation, err error)

func (*TxnDatabase) GetID

func (db *TxnDatabase) GetID() uint64

func (*TxnDatabase) GetMeta

func (db *TxnDatabase) GetMeta() any

func (*TxnDatabase) GetName

func (db *TxnDatabase) GetName() string

func (*TxnDatabase) GetRelationByName

func (db *TxnDatabase) GetRelationByName(name string) (rel handle.Relation, err error)

func (*TxnDatabase) MakeRelationIt

func (db *TxnDatabase) MakeRelationIt() (it handle.RelationIt)

func (*TxnDatabase) RelationCnt

func (db *TxnDatabase) RelationCnt() int64

func (*TxnDatabase) Relations

func (db *TxnDatabase) Relations() (rels []handle.Relation)

func (*TxnDatabase) String

func (db *TxnDatabase) String() string

func (*TxnDatabase) UnsafeGetRelation added in v0.6.0

func (db *TxnDatabase) UnsafeGetRelation(id uint64) (rel handle.Relation, err error)

type TxnFactory

type TxnFactory = func(*TxnManager, txnif.TxnStore, []byte, types.TS, types.TS) txnif.AsyncTxn

type TxnMVCCNode added in v0.6.0

type TxnMVCCNode struct {
	Start, Prepare, End types.TS
	Txn                 txnif.TxnReader
	Aborted             bool
	// contains filtered or unexported fields
}

func NewTxnMVCCNodeWithStartEnd added in v1.1.0

func NewTxnMVCCNodeWithStartEnd(start, end types.TS) *TxnMVCCNode

func NewTxnMVCCNodeWithTS added in v0.6.0

func NewTxnMVCCNodeWithTS(ts types.TS) *TxnMVCCNode

func NewTxnMVCCNodeWithTxn added in v0.6.0

func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) *TxnMVCCNode

func ReadTuple added in v0.6.0

func ReadTuple(bat *containers.Batch, row int) (un *TxnMVCCNode)

func (*TxnMVCCNode) AppendTuple added in v0.6.0

func (un *TxnMVCCNode) AppendTuple(bat *containers.Batch)

func (*TxnMVCCNode) AppendTupleWithCommitTS added in v1.1.0

func (un *TxnMVCCNode) AppendTupleWithCommitTS(bat *containers.Batch, commitTS types.TS)

In push model, logtail is prepared before committing txn, un.End is txnif.Uncommit

func (*TxnMVCCNode) ApplyCommit added in v0.6.0

func (un *TxnMVCCNode) ApplyCommit() (ts types.TS, err error)

func (*TxnMVCCNode) ApplyRollback added in v0.6.0

func (un *TxnMVCCNode) ApplyRollback() (ts types.TS, err error)

func (*TxnMVCCNode) CheckConflict added in v0.6.0

func (un *TxnMVCCNode) CheckConflict(txn txnif.TxnReader) error

Check w-w confilct

func (*TxnMVCCNode) CloneAll added in v0.6.0

func (un *TxnMVCCNode) CloneAll() *TxnMVCCNode

func (*TxnMVCCNode) CommittedIn added in v0.6.0

func (un *TxnMVCCNode) CommittedIn(minTS, maxTS types.TS) (in, before bool)

in indicates whether this node is committed in between [minTs, maxTs] before indicates whether this node is committed before minTs NeedWaitCommitting should be called before to make sure all prepared active txns in between [minTs, maxTs] be committed or rollbacked

func (*TxnMVCCNode) Compare added in v0.6.0

func (un *TxnMVCCNode) Compare(o *TxnMVCCNode) int

func (*TxnMVCCNode) Compare2 added in v0.6.0

func (un *TxnMVCCNode) Compare2(o *TxnMVCCNode) int

func (*TxnMVCCNode) GetEnd added in v0.6.0

func (un *TxnMVCCNode) GetEnd() types.TS

func (*TxnMVCCNode) GetPrepare added in v0.6.0

func (un *TxnMVCCNode) GetPrepare() types.TS

func (*TxnMVCCNode) GetStart added in v0.6.0

func (un *TxnMVCCNode) GetStart() types.TS

func (*TxnMVCCNode) GetTxn added in v0.6.0

func (un *TxnMVCCNode) GetTxn() txnif.TxnReader

func (*TxnMVCCNode) Is1PC added in v0.6.0

func (un *TxnMVCCNode) Is1PC() bool

func (*TxnMVCCNode) IsAborted added in v0.6.0

func (un *TxnMVCCNode) IsAborted() bool

func (*TxnMVCCNode) IsActive added in v0.6.0

func (un *TxnMVCCNode) IsActive() bool

func (*TxnMVCCNode) IsCommitted added in v0.6.0

func (un *TxnMVCCNode) IsCommitted() bool

func (*TxnMVCCNode) IsCommitting added in v0.6.0

func (un *TxnMVCCNode) IsCommitting() bool

func (*TxnMVCCNode) IsSameTxn added in v0.6.0

func (un *TxnMVCCNode) IsSameTxn(txn txnif.TxnReader) bool

func (*TxnMVCCNode) IsVisible added in v0.6.0

func (un *TxnMVCCNode) IsVisible(txn txnif.TxnReader) (visible bool)

Check whether is mvcc node is visible to ts Make sure all the relevant prepared txns should be committed|rollbacked

func (*TxnMVCCNode) IsVisibleByTS added in v0.8.0

func (un *TxnMVCCNode) IsVisibleByTS(ts types.TS) (visible bool)

Check whether is mvcc node is visible to ts Make sure all the relevant prepared txns should be committed|rollbacked

func (*TxnMVCCNode) NeedWaitCommitting added in v0.6.0

func (un *TxnMVCCNode) NeedWaitCommitting(ts types.TS) (bool, txnif.TxnReader)

Check whether need to wait this mvcc node

func (*TxnMVCCNode) PrepareCommit added in v0.6.0

func (un *TxnMVCCNode) PrepareCommit() (ts types.TS, err error)

func (*TxnMVCCNode) PrepareRollback added in v0.6.0

func (un *TxnMVCCNode) PrepareRollback() (err error)

func (*TxnMVCCNode) PreparedIn added in v0.6.0

func (un *TxnMVCCNode) PreparedIn(minTS, maxTS types.TS) (in, before bool)

func (*TxnMVCCNode) ReadFrom added in v0.6.0

func (un *TxnMVCCNode) ReadFrom(r io.Reader) (n int64, err error)

func (*TxnMVCCNode) ReadTuple added in v0.6.0

func (un *TxnMVCCNode) ReadTuple(bat *containers.Batch, offset int)

func (*TxnMVCCNode) Set1PC added in v0.6.0

func (un *TxnMVCCNode) Set1PC()

func (*TxnMVCCNode) String added in v0.6.0

func (un *TxnMVCCNode) String() string

func (*TxnMVCCNode) Update added in v0.6.0

func (un *TxnMVCCNode) Update(o *TxnMVCCNode)

func (*TxnMVCCNode) WriteTo added in v0.6.0

func (un *TxnMVCCNode) WriteTo(w io.Writer) (n int64, err error)

type TxnManager

type TxnManager struct {
	sync.RWMutex
	sm.ClosedState
	PreparingSM     sm.StateMachine
	FlushQueue      sm.Queue
	IDMap           map[string]txnif.AsyncTxn
	IdAlloc         *common.TxnIDAllocator
	TsAlloc         *types.TsAlloctor
	MaxCommittedTS  atomic.Pointer[types.TS]
	TxnStoreFactory TxnStoreFactory
	TxnFactory      TxnFactory
	Exception       *atomic.Value
	CommitListener  *batchTxnCommitListener
	// contains filtered or unexported fields
}

func NewTxnManager

func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock clock.Clock) *TxnManager

func (*TxnManager) DeleteTxn

func (mgr *TxnManager) DeleteTxn(id string) (err error)

func (*TxnManager) EnqueueFlushing added in v0.6.0

func (mgr *TxnManager) EnqueueFlushing(op any) (err error)

func (*TxnManager) GetOrCreateTxnWithMeta added in v0.6.0

func (mgr *TxnManager) GetOrCreateTxnWithMeta(
	info []byte,
	id []byte,
	ts types.TS) (txn txnif.AsyncTxn, err error)

GetOrCreateTxnWithMeta Get or create a txn initiated by CN

func (*TxnManager) GetTxn

func (mgr *TxnManager) GetTxn(id string) txnif.AsyncTxn

func (*TxnManager) GetTxnByCtx

func (mgr *TxnManager) GetTxnByCtx(ctx []byte) txnif.AsyncTxn

func (*TxnManager) Init

func (mgr *TxnManager) Init(prevTs types.TS) error

func (*TxnManager) MarshalLogObject

func (mgr *TxnManager) MarshalLogObject(enc zapcore.ObjectEncoder) (err error)

func (*TxnManager) MinTSForTest added in v0.7.0

func (mgr *TxnManager) MinTSForTest() types.TS

MinTSForTest is only be used in ut to ensure that files that have been gc will not be used.

func (*TxnManager) Now added in v0.8.0

func (mgr *TxnManager) Now() types.TS

Now gets a timestamp under the protect from a inner lock. The lock makes all timestamps allocated before have been assigned to txn, which means those txn are visible for the returned timestamp.

func (*TxnManager) OnCommitTxn added in v0.8.0

func (mgr *TxnManager) OnCommitTxn(txn txnif.AsyncTxn)

func (*TxnManager) OnException

func (mgr *TxnManager) OnException(new error)

func (*TxnManager) OnOpTxn

func (mgr *TxnManager) OnOpTxn(op *OpTxn) (err error)

func (*TxnManager) OnReplayTxn added in v0.6.0

func (mgr *TxnManager) OnReplayTxn(txn txnif.AsyncTxn) (err error)

Note: Replay should always runs in a single thread

func (*TxnManager) Start added in v0.6.0

func (mgr *TxnManager) Start(ctx context.Context)

func (*TxnManager) StartTxn

func (mgr *TxnManager) StartTxn(info []byte) (txn txnif.AsyncTxn, err error)

StartTxn starts a local transaction initiated by DN

func (*TxnManager) StartTxnWithLatestTS added in v0.8.0

func (mgr *TxnManager) StartTxnWithLatestTS(info []byte) (txn txnif.AsyncTxn, err error)

StartTxn starts a local transaction initiated by DN

func (*TxnManager) StartTxnWithStartTSAndSnapshotTS added in v1.0.0

func (mgr *TxnManager) StartTxnWithStartTSAndSnapshotTS(
	info []byte,
	startTS, snapshotTS types.TS,
) (txn txnif.AsyncTxn, err error)

func (*TxnManager) StatMaxCommitTS added in v0.6.0

func (mgr *TxnManager) StatMaxCommitTS() (ts types.TS)

func (*TxnManager) Stop

func (mgr *TxnManager) Stop()

type TxnObject added in v1.1.0

type TxnObject struct {
	Txn txnif.AsyncTxn
	Rel handle.Relation
}

func (*TxnObject) BatchDedup added in v1.1.0

func (obj *TxnObject) BatchDedup(containers.Vector) (err error)

func (*TxnObject) Close added in v1.1.0

func (obj *TxnObject) Close() error

func (*TxnObject) CreateBlock added in v1.1.0

func (obj *TxnObject) CreateBlock() (blk handle.Block, err error)

func (*TxnObject) CreateNonAppendableBlock added in v1.1.0

func (obj *TxnObject) CreateNonAppendableBlock() (blk handle.Block, err error)

func (*TxnObject) GetBlock added in v1.1.0

func (obj *TxnObject) GetBlock(id uint64) (blk handle.Block, err error)

func (*TxnObject) GetID added in v1.1.0

func (obj *TxnObject) GetID() uint64

func (*TxnObject) GetMeta added in v1.1.0

func (obj *TxnObject) GetMeta() any

func (*TxnObject) GetRelation added in v1.1.0

func (obj *TxnObject) GetRelation() (rel handle.Relation)

func (*TxnObject) MakeBlockIt added in v1.1.0

func (obj *TxnObject) MakeBlockIt() (it handle.BlockIt)

func (*TxnObject) PushDeleteOp added in v1.1.0

func (obj *TxnObject) PushDeleteOp(handle.Filter) (err error)

func (*TxnObject) PushUpdateOp added in v1.1.0

func (obj *TxnObject) PushUpdateOp(handle.Filter, string, any) (err error)

func (*TxnObject) RangeDelete added in v1.1.0

func (obj *TxnObject) RangeDelete(uint64, uint32, uint32, handle.DeleteType) (err error)

func (*TxnObject) Reset added in v1.1.0

func (obj *TxnObject) Reset()

func (*TxnObject) SoftDeleteBlock added in v1.1.0

func (obj *TxnObject) SoftDeleteBlock(id types.Blockid) (err error)

func (*TxnObject) String added in v1.1.0

func (obj *TxnObject) String() string

func (*TxnObject) Update added in v1.1.0

func (obj *TxnObject) Update(uint64, uint32, uint16, any) (err error)

type TxnRelation

type TxnRelation struct {
	Txn txnif.AsyncTxn
	DB  handle.Database
}

func (*TxnRelation) AddBlksWithMetaLoc added in v0.7.0

func (rel *TxnRelation) AddBlksWithMetaLoc(context.Context, containers.Vector) error

func (*TxnRelation) AlterTable added in v0.8.0

func (rel *TxnRelation) AlterTable(context.Context, *apipb.AlterTableReq) (err error)

func (*TxnRelation) Append

func (rel *TxnRelation) Append(ctx context.Context, data *containers.Batch) error

func (*TxnRelation) BatchDedup

func (rel *TxnRelation) BatchDedup(containers.Vector) error

func (*TxnRelation) Close

func (rel *TxnRelation) Close() error

func (*TxnRelation) CreateNonAppendableObject added in v1.1.0

func (rel *TxnRelation) CreateNonAppendableObject(bool) (obj handle.Object, err error)

func (*TxnRelation) CreateObject added in v1.1.0

func (rel *TxnRelation) CreateObject(bool) (obj handle.Object, err error)

func (*TxnRelation) DeleteByFilter

func (rel *TxnRelation) DeleteByFilter(ctx context.Context, filter *handle.Filter) (err error)

func (*TxnRelation) DeleteByPhyAddrKey added in v0.6.0

func (rel *TxnRelation) DeleteByPhyAddrKey(any) (err error)

func (*TxnRelation) DeleteByPhyAddrKeys added in v0.6.0

func (rel *TxnRelation) DeleteByPhyAddrKeys(containers.Vector, containers.Vector) (err error)

func (*TxnRelation) GetByFilter

func (rel *TxnRelation) GetByFilter(context.Context, *handle.Filter) (id *common.ID, offset uint32, err error)

func (*TxnRelation) GetCardinality

func (rel *TxnRelation) GetCardinality(attr string) int64

func (*TxnRelation) GetDB added in v0.8.0

func (rel *TxnRelation) GetDB() (handle.Database, error)

func (*TxnRelation) GetMeta

func (rel *TxnRelation) GetMeta() any

func (*TxnRelation) GetObject added in v1.1.0

func (rel *TxnRelation) GetObject(id *types.Objectid) (obj handle.Object, err error)

func (*TxnRelation) GetValue

func (rel *TxnRelation) GetValue(*common.ID, uint32, uint16) (v any, isNull bool, err error)

func (*TxnRelation) GetValueByFilter

func (rel *TxnRelation) GetValueByFilter(context.Context, *handle.Filter, int) (v any, isNull bool, err error)

func (*TxnRelation) GetValueByPhyAddrKey added in v0.6.0

func (rel *TxnRelation) GetValueByPhyAddrKey(any, int) (v any, isNull bool, err error)

func (*TxnRelation) ID

func (rel *TxnRelation) ID() uint64

func (*TxnRelation) LogTxnEntry

func (rel *TxnRelation) LogTxnEntry(entry txnif.TxnEntry, readed []*common.ID) (err error)

func (*TxnRelation) MakeBlockIt

func (rel *TxnRelation) MakeBlockIt() handle.BlockIt

func (*TxnRelation) MakeObjectIt added in v1.1.0

func (rel *TxnRelation) MakeObjectIt() handle.ObjectIt

func (*TxnRelation) MakeObjectItOnSnap added in v1.1.0

func (rel *TxnRelation) MakeObjectItOnSnap() handle.ObjectIt

func (*TxnRelation) RangeDelete

func (rel *TxnRelation) RangeDelete(*common.ID, uint32, uint32, handle.DeleteType) (err error)

func (*TxnRelation) Rows

func (rel *TxnRelation) Rows() int64

func (*TxnRelation) Schema

func (rel *TxnRelation) Schema() any

func (*TxnRelation) SimplePPString

func (rel *TxnRelation) SimplePPString(_ common.PPLevel) string

func (*TxnRelation) Size

func (rel *TxnRelation) Size(attr string) int64

func (*TxnRelation) SoftDeleteObject added in v1.1.0

func (rel *TxnRelation) SoftDeleteObject(id *types.Objectid) (err error)

func (*TxnRelation) String

func (rel *TxnRelation) String() string

func (*TxnRelation) TryDeleteByDeltaloc added in v1.0.0

func (rel *TxnRelation) TryDeleteByDeltaloc(id *common.ID, deltaloc objectio.Location) (ok bool, err error)

func (*TxnRelation) Update

func (rel *TxnRelation) Update(*common.ID, uint32, uint16, any, bool) (err error)

func (*TxnRelation) UpdateByFilter

func (rel *TxnRelation) UpdateByFilter(ctx context.Context, filter *handle.Filter, col uint16, v any, isNull bool) (err error)

type TxnState

type TxnState struct {
	// contains filtered or unexported fields
}

func (*TxnState) IsCommitted

func (ts *TxnState) IsCommitted() bool

func (*TxnState) IsRollbacked

func (ts *TxnState) IsRollbacked() bool

func (*TxnState) IsUncommitted

func (ts *TxnState) IsUncommitted() bool

func (*TxnState) ToCommitted

func (ts *TxnState) ToCommitted() error

func (*TxnState) ToCommitting

func (ts *TxnState) ToCommitting() error

func (*TxnState) ToRollbacked

func (ts *TxnState) ToRollbacked() error

func (*TxnState) ToRollbacking

func (ts *TxnState) ToRollbacking() error

type TxnStateCmd added in v0.6.0

type TxnStateCmd struct {
	ID       string
	State    txnif.TxnState
	CommitTs types.TS
}

func NewEmptyTxnStateCmd added in v0.6.0

func NewEmptyTxnStateCmd() *TxnStateCmd

func NewTxnStateCmd added in v0.6.0

func NewTxnStateCmd(id string, state txnif.TxnState, cts types.TS) *TxnStateCmd

func (*TxnStateCmd) ApplyCommit added in v0.6.0

func (c *TxnStateCmd) ApplyCommit()

func (*TxnStateCmd) ApplyRollback added in v0.6.0

func (c *TxnStateCmd) ApplyRollback()

func (*TxnStateCmd) Close added in v0.6.0

func (c *TxnStateCmd) Close()

func (*TxnStateCmd) Desc added in v0.6.0

func (c *TxnStateCmd) Desc() string

func (*TxnStateCmd) GetType added in v0.6.0

func (c *TxnStateCmd) GetType() uint16

func (*TxnStateCmd) MarshalBinary added in v0.8.0

func (c *TxnStateCmd) MarshalBinary() (buf []byte, err error)

func (*TxnStateCmd) ReadFrom added in v0.6.0

func (c *TxnStateCmd) ReadFrom(r io.Reader) (n int64, err error)

func (*TxnStateCmd) SetReplayTxn added in v0.6.0

func (c *TxnStateCmd) SetReplayTxn(_ txnif.AsyncTxn)

func (*TxnStateCmd) String added in v0.6.0

func (c *TxnStateCmd) String() string

func (*TxnStateCmd) UnmarshalBinary added in v0.8.0

func (c *TxnStateCmd) UnmarshalBinary(buf []byte) (err error)

func (*TxnStateCmd) VerboseString added in v0.6.0

func (c *TxnStateCmd) VerboseString() string

func (*TxnStateCmd) WriteTo added in v0.6.0

func (c *TxnStateCmd) WriteTo(w io.Writer) (n int64, err error)

type TxnStoreFactory

type TxnStoreFactory = func() txnif.TxnStore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL