oplog

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 11 Imported by: 1

Documentation

Overview

Package txn implements functions for examining and processing transaction oplog entries.

Index

Constants

View Source
const (
	// field in oplog
	OplogTsName          = "ts"
	OplogOperationName   = "op"
	OplogGidName         = "g" // useless in change stream
	OplogNamespaceName   = "ns"
	OplogObjectName      = "o"
	OplogQueryName       = "o2"
	OplogUniqueKeyName   = "uk" // useless in change stream
	OplogLsidName        = "lsid"
	OplogFromMigrateName = "fromMigrate"
)
View Source
const (
	ShardByID        = "id"
	ShardByNamespace = "collection"
	ShardAutomatic   = "auto"
)
View Source
const (
	DefaultHashValue = 0
)
View Source
const (
	PrimaryKey = "_id"
)

Variables

View Source
var ErrBufferClosed = errors.New("transaction buffer already closed")
View Source
var ErrNotTransaction = errors.New("oplog entry is not a transaction")
View Source
var ErrTxnAborted = errors.New("transaction aborted")

Functions

func BuildUpdateDelteOplog

func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error)

func ConvertBsonD2M

func ConvertBsonD2M(input bson.D) (bson.M, map[string]struct{})

convert bson.D to bson.M

func ConvertBsonD2MExcept

func ConvertBsonD2MExcept(input bson.D, except map[string]struct{}) (bson.M, map[string]struct{})

func ConvertBsonM2D

func ConvertBsonM2D(input bson.M) bson.D

func DiffUpdateOplogToNormal

func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error)

"o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }

func ExtraCommandName

func ExtraCommandName(o bson.D) (string, bool)

func FindFiledPrefix

func FindFiledPrefix(input bson.D, prefix string) bool

func GetIdOrNSFromOplog

func GetIdOrNSFromOplog(log *PartialLog) interface{}

func GetKey

func GetKey(log bson.D, wanted string) interface{}

func GetKeyWithIndex

func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)

func Hash

func Hash(hashObject interface{}) uint32

func IsNeedFilterCommand

func IsNeedFilterCommand(operation string) bool

func IsRunOnAdminCommand

func IsRunOnAdminCommand(operation string) bool

func IsSyncDataCommand

func IsSyncDataCommand(operation string) bool

func LogEntryEncode

func LogEntryEncode(logs []*GenericOplog) [][]byte

func RemoveFiled

func RemoveFiled(input bson.D, key string) bson.D

pay attention: the input bson.D will be modified.

func SetFiled

func SetFiled(input bson.D, key string, value interface{})

func TimestampGreaterThan

func TimestampGreaterThan(lhs, rhs primitive.Timestamp) bool

TimestampGreaterThan returns true if lhs comes after rhs, false otherwise.

func TimestampLessThan

func TimestampLessThan(lhs, rhs primitive.Timestamp) bool

TimestampLessThan returns true if lhs comes before rhs, false otherwise.

func TxnOpTimeEquals

func TxnOpTimeEquals(lhs TxnOpTime, rhs TxnOpTime) bool

TxnOpTimeEquals returns true if lhs equals rhs, false otherwise. We first check for nil / not nil mismatches between the terms and between the hashes. Then we check for equality between the terms and between the hashes (if they exist) before checking the timestamps.

func TxnOpTimeGreaterThan

func TxnOpTimeGreaterThan(lhs TxnOpTime, rhs TxnOpTime) bool

TxnOpTimeGreaterThan returns true if lhs comes after rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.

func TxnOpTimeIsEmpty

func TxnOpTimeIsEmpty(opTime TxnOpTime) bool

TxnOpTimeIsEmpty returns true if opTime is uninitialized, false otherwise.

func TxnOpTimeLessThan

func TxnOpTimeLessThan(lhs TxnOpTime, rhs TxnOpTime) bool

TxnOpTimeLessThan returns true if lhs comes before rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.

Types

type CommandOperation

type CommandOperation struct {
	// contains filtered or unexported fields
}

type Event

type Event struct {
	Id                bson.M              `bson:"_id" json:"_id"`
	OperationType     string              `bson:"operationType" json:"operationType"`
	FullDocument      bson.D              `bson:"fullDocument,omitempty" json:"fullDocument,omitempty"` // exists on "insert", "replace", "delete", "update"
	Ns                bson.M              `bson:"ns" json:"ns"`
	To                bson.M              `bson:"to,omitempty" json:"to,omitempty"`
	DocumentKey       bson.D              `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists on "insert", "replace", "delete", "update"
	UpdateDescription bson.M              `bson:"updateDescription,omitempty" json:"updateDescription,omitempty"`
	ClusterTime       primitive.Timestamp `bson:"clusterTime,omitempty" json:"clusterTime,omitempty"`
	TxnNumber         *int64              `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`
	LSID              bson.Raw            `bson:"lsid,omitempty" json:"lsid,omitempty"`
}
  • example: { _id : { // 存储元信息 "_data" : <BinData|hex string> // resumeToken }, "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate "fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于原来的o字段 "ns" : { // 就是ns "db" : "<database>", "coll" : "<collection" }, "to" : { // 只在operationType==rename的时候有效,表示改名以后的ns "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。 "updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,而replace是替换。 "updatedFields" : { <document> }, // 更新的field的值 "removedFields" : [ "<field>", ... ] // 删除的field列表 }, "FullDocument" : { //永不为 nil "fullDocument" : { <document> }, // 开启full_document之后,为updateLookup,不开启则为default } "clusterTime" : <Timestamp>, // 相当于ts字段 "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增 "lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id。 "id" : <UUID>, "uid" : <BinData> } }

func (*Event) String

func (e *Event) String() string

type GenericOplog

type GenericOplog struct {
	Raw    []byte
	Parsed *PartialLog
}

func GatherApplyOps

func GatherApplyOps(input []*PartialLog) (*GenericOplog, error)

type Hasher

type Hasher interface {
	DistributeOplogByMod(log *PartialLog, mod int) uint32
}

type ParsedLog

type ParsedLog struct {
	Timestamp     primitive.Timestamp `bson:"ts" json:"ts"`
	Term          *int64              `bson:"t" json:"t"`
	Hash          *int64              `bson:"h" json:"h"`
	Version       int                 `bson:"v" json:"v"`
	Operation     string              `bson:"op" json:"op"`
	Gid           string              `bson:"g,omitempty" json:"g,omitempty"`
	Namespace     string              `bson:"ns" json:"ns"`
	Object        bson.D              `bson:"o" json:"o"`
	Query         bson.D              `bson:"o2" json:"o2"`                                       // update condition
	UniqueIndexes bson.M              `bson:"uk,omitempty" json:"uk,omitempty"`                   //
	LSID          bson.Raw            `bson:"lsid,omitempty" json:"lsid,omitempty"`               // mark the session id, used in transaction
	FromMigrate   bool                `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk
	TxnNumber     *int64              `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`     // transaction number in session
	DocumentKey   bson.D              `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id
	PrevOpTime    bson.Raw            `bson:"prevOpTime,omitempty"`
	UI            *primitive.Binary   `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently
}

func ExtractInnerOps

func ExtractInnerOps(tranOp *ParsedLog) ([]ParsedLog, error)

ExtractInnerOps doc.applyOps[i].ts(Let ckpt use the last ts to judge complete)

applyOps[0 - n-1].ts = doc.ts - 1
applyOps[n-1].ts = doc.ts

type PartialLog

type PartialLog struct {
	ParsedLog

	/*
	 * Every field subsequent declared is NEVER persistent or
	 * transfer on network connection. They only be parsed from
	 * respective logic
	 */
	UniqueIndexesUpdates bson.M // generate by CollisionMatrix
	RawSize              int    // generate by Decorator
	SourceId             int    // generate by Validator
}

func ConvertEvent2Oplog

func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error)

func LogParsed

func LogParsed(logs []*GenericOplog) []*PartialLog

func NewPartialLog

func NewPartialLog(data bson.M) *PartialLog

func (*PartialLog) Dump

func (partialLog *PartialLog) Dump(keys map[string]struct{}, all bool) bson.D

dump according to the given keys, "all" == true means ignore keys

func (*PartialLog) String

func (partialLog *PartialLog) String() string

type PrimaryKeyHasher

type PrimaryKeyHasher struct {
	Hasher
}

******************************************* PrimaryKeyHasher

func (*PrimaryKeyHasher) DistributeOplogByMod

func (objectIdHasher *PrimaryKeyHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

we need to ensure that oplog entry will be sent to the same job[$hash] if they have the same ObjectID. thus we can consume the oplog entry sequentially

type TableHasher

type TableHasher struct {
	Hasher
}

******************************************* PrimaryKeyHasher

func (*TableHasher) DistributeOplogByMod

func (collectionHasher *TableHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

type TxnBuffer

type TxnBuffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TxnBuffer stores transaction oplog entries until they are needed to commit them to a desination. It includes a WaitGroup for tracking all goroutines across all transactions for use in global shutdown.

func NewBuffer

func NewBuffer() *TxnBuffer

NewBuffer initializes a transaction oplog buffer.

func (*TxnBuffer) AddOp

func (b *TxnBuffer) AddOp(m TxnMeta, op ParsedLog) error

AddOp sends a transaction oplog entry to a background goroutine (starting one for a new transaction TxnID) for asynchronous pre-processing and storage. If the oplog entry is not a transaction, an error will be returned. Any errors during processing can be discovered later via the error channel from `GetTxnStream`.

Must not be called concurrently with other transaction-related operations. Must not be called for a given transaction after starting to stream that transaction.

func (*TxnBuffer) GetTxnStream

func (b *TxnBuffer) GetTxnStream(m TxnMeta) (<-chan ParsedLog, <-chan error)

GetTxnStream returns a channel of Oplog entries in a transaction and a channel for errors. If the buffer has been stopped, the returned op channel will be closed and the error channel will have an error on it.

Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.

func (*TxnBuffer) OldestOpTime

func (b *TxnBuffer) OldestOpTime() TxnOpTime

OldestOpTime returns the optime of the oldest buffered transaction, or an empty optime if no transactions are buffered. This will include committed transactions until they are purged.

func (*TxnBuffer) PurgeTxn

func (b *TxnBuffer) PurgeTxn(m TxnMeta) error

PurgeTxn closes any transaction streams in progress and deletes all oplog entries associated with a transaction.

Must not be called concurrently with other transaction-related operations. For a given transaction, it must not be called until after a final oplog entry has been passed to AddOp and it must not be called more than once.

func (*TxnBuffer) Size

func (b *TxnBuffer) Size() int

func (*TxnBuffer) Stop

func (b *TxnBuffer) Stop() error

Stop shuts down processing and cleans up. Subsequent calls to Stop() will return nil. All other methods error after this is called.

type TxnID

type TxnID struct {
	// contains filtered or unexported fields
}

TxnID wraps fields needed to uniquely identify a transaction for use as a map key. The 'lsid' is a string rather than bson.Raw or []byte so that this type is a valid map key.

func (TxnID) String

func (id TxnID) String() string

type TxnMeta

type TxnMeta struct {
	// contains filtered or unexported fields
}

TxnMeta holds information extracted from an oplog entry for later routing logic. Zero value means 'not a transaction'. We store 'prevOpTime' as string so the struct is comparable.

func NewTxnMeta

func NewTxnMeta(op ParsedLog) (TxnMeta, error)

NewTxnMeta extracts transaction metadata from an oplog entry. A non-transaction will return a zero-value TxnMeta struct, not an error.

Currently there is no way for this to error, but that may change in the future if we change the db.Oplog.Object to bson.Raw, so the API is designed with failure as a possibility.

func (TxnMeta) IsAbort

func (m TxnMeta) IsAbort() bool

IsAbort is true if the oplog entry had the abort command.

func (TxnMeta) IsCommit

func (m TxnMeta) IsCommit() bool

IsCommit is true if the oplog entry was an abort command or was the final entry of an unprepared transaction.

func (TxnMeta) IsCommitOp

func (m TxnMeta) IsCommitOp() bool

IsCommitOp is commitTransaction oplog

func (TxnMeta) IsData

func (m TxnMeta) IsData() bool

IsData is true if the oplog entry contains transaction data

func (TxnMeta) IsFinal

func (m TxnMeta) IsFinal() bool

IsFinal is true if the oplog entry is the closing entry of a transaction, i.e. if IsAbort or IsCommit is true.

func (TxnMeta) IsMultiOp

func (m TxnMeta) IsMultiOp() bool

IsMultiOp is true if the oplog entry is part of a prepared and/or large transaction.

func (TxnMeta) IsTxn

func (m TxnMeta) IsTxn() bool

IsTxn is true if the oplog entry is part of any transaction, i.e. the lsid field exists.

func (TxnMeta) String

func (m TxnMeta) String() string

type TxnOpTime

type TxnOpTime struct {
	Timestamp primitive.Timestamp `json:"timestamp"`
	Term      *int64              `json:"term"`
	Hash      *int64              `json:"hash"`
}

TxnOpTime represents the values to uniquely identify an oplog entry. An TxnOpTime must always have a timestamp, but may or may not have a term. The hash is set uniquely up until (and including) version 4.0, but is set to zero in version 4.2+ with plans to remove it soon (see SERVER-36334).

func GetTxnOpTimeFromOplogEntry

func GetTxnOpTimeFromOplogEntry(oplogEntry *ParsedLog) TxnOpTime

GetTxnOpTimeFromOplogEntry returns an TxnOpTime struct from the relevant fields in an ParsedLog struct.

func (TxnOpTime) String

func (ot TxnOpTime) String() string

type WhiteListObjectIdHasher

type WhiteListObjectIdHasher struct {
	Hasher

	TableHasher
	PrimaryKeyHasher
	// contains filtered or unexported fields
}

******************************************* WhiteListObjectIdHasher: hash by collection in general, when hit white list, hash by _id

func NewWhiteListObjectIdHasher

func NewWhiteListObjectIdHasher(whiteList []string) *WhiteListObjectIdHasher

func (*WhiteListObjectIdHasher) DistributeOplogByMod

func (wloi *WhiteListObjectIdHasher) DistributeOplogByMod(log *PartialLog, mod int) uint32

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL