txnif

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FreezePhase         = "Phase_Freeze"
	RollbackPhase       = "Phase_Rollback"
	PrePreparePhase     = "Phase_PrePrepare"
	PrepareCommitPhase  = "Phase_PrepareCommit"
	PreApplyCommitPhase = "Phase_PreApplyCommit"
	ApplyCommitPhase    = "Phase_ApplyCommit"
)
View Source
const (
	TraceStart = iota
	TracePreparing
	TracePrepareWalWait
	TracePrepareWal
	TracePreapredWait
	TracePrepared
)
View Source
const (
	TxnType_Normal = iota
	TxnType_Heartbeat
)

Variables

View Source
var (
	ErrTxnWWConflict = moerr.NewTxnWWConflictNoCtx()
	ErrTxnNeedRetry  = moerr.NewTAENeedRetryNoCtx()
)
View Source
var UncommitTS types.TS

Functions

func RegisterCmdFactory

func RegisterCmdFactory(cmdType int16, factory CmdFactory)

func TxnStrState

func TxnStrState(state TxnState) string

Types

type AppendNode

type AppendNode interface {
	BaseMVCCNode
	TxnEntry
	GetStartRow() uint32
	GetMaxRow() uint32
}

type BaseMVCCNode added in v0.8.0

type BaseMVCCNode interface {
	String() string
	IsNil() bool

	IsVisibleByTS(ts types.TS) (visible bool)
	IsVisible(txn TxnReader) (visible bool)
	CheckConflict(txn TxnReader) error

	PreparedIn(minTS, maxTS types.TS) (in, before bool)
	CommittedIn(minTS, maxTS types.TS) (in, before bool)
	NeedWaitCommitting(ts types.TS) (bool, TxnReader)
	IsSameTxn(txn TxnReader) bool
	IsActive() bool
	IsCommitting() bool
	IsCommitted() bool
	IsAborted() bool
	Set1PC()
	Is1PC() bool

	GetEnd() types.TS
	GetStart() types.TS
	GetPrepare() types.TS
	GetTxn() TxnReader

	ApplyCommit() (err error)
	ApplyRollback() (err error)
	PrepareCommit() (err error)
	PrepareRollback() (err error)

	WriteTo(w io.Writer) (n int64, err error)
}

type BaseNode added in v0.8.0

type BaseNode[T any] interface {
	Update(o T)
	CloneData() T
	CloneAll() T
}

type BaseTxn added in v1.0.0

type BaseTxn interface {
	GetMemo() *TxnMemo
	GetStartTS() types.TS
	GetPrepareTS() types.TS
	GetCommitTS() types.TS
	GetLSN() uint64
	GetTxnState(waitIfcommitting bool) TxnState
}

type CmdFactory

type CmdFactory func(int16) TxnCmd

func GetCmdFactory

func GetCmdFactory(cmdType int16) (factory CmdFactory)

type CmdType added in v0.6.0

type CmdType uint16
const (
	CmdInvalid CmdType = iota
	CmdPrepare
	CmdCommit
	CmdRollback
	Cmd1PC
	CmdSnapshot
)

type DedupType added in v0.8.0

type DedupType uint8
const (
	FullDedup DedupType = iota
	FullSkipWorkSpaceDedup
	IncrementalDedup
)

type DeleteChain

type DeleteChain interface {
	sync.Locker
	RLock()
	RUnlock()
	RemoveNodeLocked(DeleteNode)

	AddNodeLocked(txn AsyncTxn, deleteType handle.DeleteType) DeleteNode
	AddMergeNode() DeleteNode

	PrepareRangeDelete(start, end uint32, ts types.TS) error
	DepthLocked() int
	CollectDeletesLocked(txn TxnReader, rwlocker *sync.RWMutex) (*nulls.Bitmap, error)
}

type DeleteNode

type DeleteNode interface {
	BaseMVCCNode
	TxnEntry
	IsPersistedDeletedNode() bool
	StringLocked() string
	GetChain() DeleteChain
	DeletedRows() []uint32
	DeletedPK() map[uint32]containers.Vector
	RangeDeleteLocked(start, end uint32, pk containers.Vector, mp *mpool.MPool)
	GetCardinalityLocked() uint32
	IsDeletedLocked(row uint32) bool
	GetRowMaskRefLocked() *roaring.Bitmap
	OnApply() error
}

type MVCCNode added in v0.6.0

type MVCCNode[T any] interface {
	BaseMVCCNode
	BaseNode[T]
}

type Tracer added in v1.1.2

type Tracer interface {
	StartTrace()
	TriggerTrace(state uint8)
	EndTrace()
}

type Txn2PC

type Txn2PC interface {
	Freeze() error
	PrepareRollback() error
	ApplyRollback() error
	PrePrepare(ctx context.Context) error
	PrepareCommit() error
	PreApplyCommit() error
	PrepareWAL() error
	ApplyCommit() error
}

type TxnAsyncer

type TxnAsyncer interface {
	WaitDone(error, bool) error
	WaitPrepared(ctx context.Context) error
}

type TxnChanger

type TxnChanger interface {
	sync.Locker
	RLock()
	RUnlock()
	ToCommittedLocked() error
	ToPreparingLocked(ts types.TS) error
	ToPrepared() error
	ToPreparedLocked() error
	ToRollbackedLocked() error

	ToRollbacking(ts types.TS) error
	ToRollbackingLocked(ts types.TS) error
	ToUnknownLocked()
	Prepare(ctx context.Context) (types.TS, error)
	Committing() error
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
	SetCommitTS(cts types.TS) error
	SetDedupType(skip DedupType)
	SetParticipants(ids []uint64) error
	SetError(error)

	CommittingInRecovery() error
	CommitInRecovery(ctx context.Context) error
}

type TxnCmd

type TxnCmd interface {
	MarshalBinary() ([]byte, error)
	UnmarshalBinary([]byte) error
	GetType() uint16
	Desc() string
	String() string
	ApplyRollback()
	ApplyCommit()
	SetReplayTxn(AsyncTxn)
	VerboseString() string
	Close()
}

type TxnEntry

type TxnEntry interface {
	PrepareCommit() error
	PrepareRollback() error
	ApplyCommit() error
	ApplyRollback() error
	MakeCommand(uint32) (TxnCmd, error)
	Is1PC() bool
	Set1PC()
}

type TxnEntryType

type TxnEntryType int16

type TxnHandle

type TxnHandle interface {
	BindAccessInfo(tenantID, userID, roleID uint32)
	GetTenantID() uint32
	GetUserAndRoleID() (uint32, uint32)
	CreateDatabase(name, createSql, datTyp string) (handle.Database, error)
	CreateDatabaseWithCtx(ctx context.Context,
		name, createSql, datTyp string, id uint64) (handle.Database, error)
	DropDatabase(name string) (handle.Database, error)
	DropDatabaseByID(id uint64) (handle.Database, error)
	GetDatabase(name string) (handle.Database, error)
	GetDatabaseWithCtx(ctx context.Context, name string) (handle.Database, error)
	GetDatabaseByID(id uint64) (handle.Database, error)
	DatabaseNames() []string
}

type TxnMemo added in v0.6.0

type TxnMemo struct {
	*model.Tree
	// contains filtered or unexported fields
}

func NewTxnMemo added in v0.6.0

func NewTxnMemo() *TxnMemo

func (*TxnMemo) AddCatalogChange added in v0.6.0

func (memo *TxnMemo) AddCatalogChange()

func (*TxnMemo) GetDirty added in v0.6.0

func (memo *TxnMemo) GetDirty() *model.Tree

func (*TxnMemo) GetDirtyTableByID added in v0.6.0

func (memo *TxnMemo) GetDirtyTableByID(id uint64) *model.TableTree

func (*TxnMemo) HasAnyTableDataChanges added in v0.6.0

func (memo *TxnMemo) HasAnyTableDataChanges() bool

func (*TxnMemo) HasCatalogChanges added in v0.6.0

func (memo *TxnMemo) HasCatalogChanges() bool

func (*TxnMemo) HasTableDataChanges added in v0.6.0

func (memo *TxnMemo) HasTableDataChanges(id uint64) bool

func (*TxnMemo) ReadFromWithVersion added in v1.1.0

func (memo *TxnMemo) ReadFromWithVersion(r io.Reader, ver uint16) (n int64, err error)

func (*TxnMemo) WriteTo added in v0.6.0

func (memo *TxnMemo) WriteTo(w io.Writer) (n int64, err error)

type TxnReader

type TxnReader interface {
	GetBase() BaseTxn
	RLock()
	RUnlock()
	IsReplay() bool
	Is2PC() bool
	GetDedupType() DedupType
	GetID() string
	GetCtx() []byte
	GetStartTS() types.TS
	GetCommitTS() types.TS
	GetContext() context.Context

	GetPrepareTS() types.TS
	GetParticipants() []uint64
	GetSnapshotTS() types.TS
	SetSnapshotTS(types.TS)
	HasSnapshotLag() bool
	IsVisible(o TxnReader) bool
	GetTxnState(waitIfcommitting bool) TxnState
	GetError() error
	GetStore() TxnStore
	String() string
	Repr() string
	GetLSN() uint64
	GetMemo() *TxnMemo

	SameTxn(txn TxnReader) bool
	CommitBefore(startTs types.TS) bool
	CommitAfter(startTs types.TS) bool
}

type TxnState

type TxnState int32
const (
	TxnStateActive TxnState = iota
	TxnStatePreparing
	//TxnStatePrepared only for 2PC
	TxnStatePrepared
	//TxnStateCommittingFinished only for 2PC txn runs on coordinator
	TxnStateCommittingFinished
	TxnStateRollbacking
	//TxnStateCommitted , TxnStateRollbacked, and TxnStateUnknown are final states.
	TxnStateCommitted
	TxnStateRollbacked
	TxnStateUnknown
)

type TxnStatus added in v0.6.0

type TxnStatus int32

type TxnStore

type TxnStore interface {
	io.Closer
	Tracer
	Txn2PC
	TxnUnsafe
	WaitPrepared(ctx context.Context) error
	BindTxn(AsyncTxn)
	GetLSN() uint64
	GetContext() context.Context
	SetContext(context.Context)

	BatchDedup(dbId, id uint64, pk containers.Vector) error

	Append(ctx context.Context, dbId, id uint64, data *containers.Batch) error
	AddBlksWithMetaLoc(ctx context.Context, dbId, id uint64, stats containers.Vector) error

	RangeDelete(
		id *common.ID, start, end uint32, pkVec containers.Vector, dt handle.DeleteType,
	) error
	TryDeleteByDeltaloc(id *common.ID, deltaloc objectio.Location) (ok bool, err error)
	GetByFilter(
		ctx context.Context, dbId uint64, id uint64, filter *handle.Filter,
	) (*common.ID, uint32, error)
	GetValue(id *common.ID, row uint32, col uint16) (any, bool, error)

	CreateRelation(dbId uint64, def any) (handle.Relation, error)
	CreateRelationWithTableId(dbId uint64, tableId uint64, def any) (handle.Relation, error)
	DropRelationByName(dbId uint64, name string) (handle.Relation, error)
	DropRelationByID(dbId uint64, tid uint64) (handle.Relation, error)
	GetRelationByName(dbId uint64, name string) (handle.Relation, error)
	GetRelationByID(dbId uint64, tid uint64) (handle.Relation, error)

	CreateDatabase(name, createSql, datTyp string) (handle.Database, error)
	CreateDatabaseWithID(name, createSql, datTyp string, id uint64) (handle.Database, error)
	GetDatabase(name string) (handle.Database, error)
	GetDatabaseByID(id uint64) (handle.Database, error)
	DropDatabase(name string) (handle.Database, error)
	DropDatabaseByID(id uint64) (handle.Database, error)
	DatabaseNames() []string

	GetObject(id *common.ID) (handle.Object, error)
	CreateObject(dbId, tid uint64, is1PC bool) (handle.Object, error)
	CreateNonAppendableObject(dbId, tid uint64, is1PC bool) (handle.Object, error)
	CreateBlock(id *common.ID, is1PC bool) (handle.Block, error)
	GetBlock(id *common.ID) (handle.Block, error)
	CreateNonAppendableBlock(id *common.ID, opts *objectio.CreateBlockOpt) (handle.Block, error)
	SoftDeleteObject(id *common.ID) error
	SoftDeleteBlock(id *common.ID) error
	UpdateMetaLoc(id *common.ID, metaLoc objectio.Location) (err error)
	UpdateDeltaLoc(id *common.ID, deltaLoc objectio.Location) (err error)

	AddTxnEntry(TxnEntryType, TxnEntry)

	LogTxnEntry(dbId, tableId uint64, entry TxnEntry, readed []*common.ID) error
	LogTxnState(sync bool) (entry.Entry, error)
	DoneWaitEvent(cnt int)
	AddWaitEvent(cnt int)

	IsReadonly() bool
	IncreateWriteCnt() int
	ObserveTxn(
		visitDatabase func(db any),
		visitTable func(tbl any),
		rotateTable func(dbName, tblName string, dbid, tid uint64),
		visitMetadata func(block any),
		visitObject func(obj any),
		visitAppend func(bat any),
		visitDelete func(ctx context.Context, deletes DeleteNode))
	GetTransactionType() TxnType
	UpdateObjectStats(*common.ID, *objectio.ObjectStats) error
}

type TxnTest

type TxnTest interface {
	MockIncWriteCnt() int
	MockStartTS(types.TS)
	SetPrepareCommitFn(func(AsyncTxn) error)
	SetPrepareRollbackFn(func(AsyncTxn) error)
	SetApplyCommitFn(func(AsyncTxn) error)
	SetApplyRollbackFn(func(AsyncTxn) error)
}

type TxnType added in v0.8.0

type TxnType int8

type TxnUnsafe added in v0.6.0

type TxnUnsafe interface {
	UnsafeGetDatabase(id uint64) (h handle.Database, err error)
	UnsafeGetRelation(dbId, tableId uint64) (h handle.Relation, err error)
}

type TxnWriter

type TxnWriter interface {
	LogTxnEntry(dbId, tableId uint64, entry TxnEntry, readed []*common.ID) error
	LogTxnState(sync bool) (entry.Entry, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL