txn

package module
v0.0.0-...-251cea9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: LGPL-3.0 Imports: 15 Imported by: 77

README

juju/txn

This package wraps github.com/juju/mgo/v2/txn, providing convenience functions and interfaces for common transaction execution patterns, and also providing test hook points.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrExcessiveContention is used to signal that even after retrying, the transaction operations
	// could not be successfully applied due to database contention.
	ErrExcessiveContention = stderrors.New("state changing too quickly; try again soon")

	// ErrNoOperations is returned by TransactionSource implementations to signal that
	// no transaction operations are available to run.
	ErrNoOperations = stderrors.New("no transaction operations are available")

	// ErrNoOperations is returned by TransactionSource implementations to signal that
	// the transaction list could not be built but the caller should retry.
	ErrTransientFailure = stderrors.New("transient failure")
)
View Source
var EOF = fmt.Errorf("end of transaction ids")

Functions

func NewCollectionCleaner

func NewCollectionCleaner(config CollectionConfig) *collectionCleaner

NewCollectionCleaner creates an object that can remove transaction tokens from document queues when the transactions have been marked as completed.

func NewStashCleaner

func NewStashCleaner(config CollectionConfig) *collectionCleaner

NewStashCleaner returns an object suitable for cleaning up the txns.stash collection. It is different because when we find all references from a document have been removed, we can remove the document.

func SupportsServerSideTransactions

func SupportsServerSideTransactions(db *mgo.Database) bool

SupportsServerSideTransactions lets you know if the given database can support server-side transactions.

func TestHooks

func TestHooks(runner Runner) chan ([]TestHook)

TestHooks returns the test hooks for a transaction runner. Exported only for testing.

Types

type CleanAndPruneArgs

type CleanAndPruneArgs struct {

	// Txns is the collection that holds all of the transactions that we
	// might want to prune. We will also make use of Txns.Database to find
	// all of the collections that might make use of transactions from that
	// collection.
	Txns *mgo.Collection

	// TxnsCount is a hint from Txns.Count() to avoid having to call it again
	// to determine whether it is ok to hold the set of transactions in memory.
	// It is optional, as we will call Txns.Count() if it is not supplied.
	TxnsCount int

	// MaxTime is a timestamp that provides a threshold of transactions
	// that we will actually prune. Only transactions that were created
	// before this threshold will be pruned.
	MaxTime time.Time

	// MaxTransactionsToProcess defines how many completed transactions that we will evaluate in this batch.
	// A value of 0 indicates we should evaluate all completed transactions.
	MaxTransactionsToProcess int

	// Multithreaded will start multiple pruning passes concurrently
	Multithreaded bool

	// TxnBatchSize is how many transaction to process at once.
	TxnBatchSize int

	// TxnBatchSleepTime is how long we should sleep between processing transaction
	// batches, to allow other parts of the system to operate (avoid consuming
	// all resources)
	// The default is to not sleep at all, but this can be configured to reduce
	// load while pruning.
	TxnBatchSleepTime time.Duration
}

CleanAndPruneArgs specifies the parameters required by CleanAndPrune.

type CleanupStats

type CleanupStats struct {

	// CollectionsInspected is the total number of collections we looked at for documents
	CollectionsInspected int

	// DocsInspected is how many documents we loaded to evaluate their txn queues
	DocsInspected int

	// DocsCleaned is how many documents we Updated to remove entries from their txn queue.
	DocsCleaned int

	// StashDocumentsRemoved is how many total documents we remove from txns.stash
	StashDocumentsRemoved int

	// StashDocumentsRemoved is how many documents we remove from txns
	TransactionsRemoved int

	// ShouldRetry indicates that we think this cleanup was not complete due to too many txns to process. We recommend running it again.
	ShouldRetry bool
}

CleanupStats gives some numbers as to what work was done as part of CleanupAndPrune.

func CleanAndPrune

func CleanAndPrune(args CleanAndPruneArgs) (CleanupStats, error)

CleanAndPrune runs the cleanup steps, and then follows up with pruning all of the transactions that are no longer referenced.

type Clock

type Clock interface {
	// Now returns the current clock time.
	Now() time.Time
}

Clock is a simplified form of juju/clock.Clock, since we don't need all the methods and this allows us to be compatible with both juju/clock.Clock and juju/utils/clock.Clock

type CollectionConfig

type CollectionConfig struct {
	// Oracle is an Oracle that we can use to determine if a given
	// transaction token should be considered a 'completed' transaction.
	Oracle Oracle

	// Source is the mongo collection holding documents created and managed
	// by transactions.
	Source *mgo.Collection

	// NumBatchTokens is the number of tokens that we will cache before
	// doing a query to find out whether their referenced transactions are
	// completed. It is useful to have a number in the hundreds so that we
	// efficiently query the mongo transaction database.
	NumBatchTokens int

	// MaxRemoveQueue is the maximum number of document ids that we will
	// hold on to in memory before we go back to the database to purge those
	// documents. This only affects StashCollectionCleaner, as the generic
	// cleaner never removes documents.
	MaxRemoveQueue int

	// LogInterval defines how often we will show progress
	LogInterval time.Duration
}

CollectionConfig is the definition of what we will be cleaning up.

type CollectionStats

type CollectionStats struct {

	// DocCount is the total number of documents evaluated.
	DocCount int

	// TokenCount is the total number of transaction tokens that were
	// referenced by the documents.
	TokenCount int

	// CompletedTokenCount is the number of unique tokens that referenced
	// completed transactions.
	CompletedTokenCount int

	// CompletedTxnCount is the number of completed transactions that we
	// looked up.
	CompletedTxnCount int

	// UpdatedDocCount is the number of documents we modified without
	// removing them
	UpdatedDocCount int

	// PulledCount is the number of tokens that were removed from documents.
	PulledTokenCount int

	// RemovedCount represents the number of txns.stash documents that we
	// decided to remove entirely.
	RemovedCount int
}

CollectionStats tracks various counters that signal how the collector operated.

func (CollectionStats) Details

func (stats CollectionStats) Details() string

func (CollectionStats) HasChanges

func (stats CollectionStats) HasChanges() bool

type DBOracle

type DBOracle struct {
	// contains filtered or unexported fields
}

DBOracle uses a temporary table on disk to track what transactions are considered completed and purgeable.

func NewDBOracle

func NewDBOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*DBOracle, func(), error)

NewDBOracle uses a database collection to manage the queue of remaining transactions. The caller is responsible to call the returned cleanup() function, to ensure that any resources are freed. thresholdTime is used to omit transactions that are newer than this time (eg, don't consider transactions that are less than 1 hr old to be considered completed yet.)

func (*DBOracle) CompletedTokens

func (o *DBOracle) CompletedTokens(tokens []string) (map[string]bool, error)

CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.

func (*DBOracle) Count

func (o *DBOracle) Count() int

func (*DBOracle) IterTxns

func (o *DBOracle) IterTxns() (OracleIterator, error)

IterTxns lets you iterate over all of the transactions that have not been removed.

func (*DBOracle) RemoveTxns

func (o *DBOracle) RemoveTxns(txnIds []bson.ObjectId) (int, error)

RemoveTxns can be used to flag that a given transaction should not be considered part of the valid set.

type IncrementalPruneArgs

type IncrementalPruneArgs struct {
	// MaxTime is a timestamp that provides a threshold of transactions
	// that we will actually prune. Only transactions that were created
	// before this threshold will be pruned.
	// MaxTime can be set to the Zero value to indicate all transactions.
	MaxTime time.Time

	// If ProgressChannel is not nil, this will send updates when documents are
	// processed and transactions are pruned.
	ProgressChannel chan ProgressMessage

	// ReverseOrder indicates we should process transactions from newest to
	// oldest instead of form oldest to newest.
	ReverseOrder bool

	// TxnBatchSize is how many transactions to process at once.
	TxnBatchSize int

	// TxnBatchSleepTime is how long we should sleep between processing transaction
	// batches, to allow other parts of the system to operate (avoid consuming
	// all resources)
	// The default is to not sleep at all, but this can be configured to reduce
	// load while pruning.
	TxnBatchSleepTime time.Duration
}

IncrementalPruneArgs specifies the parameters for running incremental cleanup steps.

type IncrementalPruner

type IncrementalPruner struct {
	ProgressChan chan ProgressMessage
	// contains filtered or unexported fields
}

IncrementalPruner reads the transaction table incrementally, seeing if it can remove the current set of transactions, and then moves on to newer transactions. It only thinks about 1k txns at a time, because that is the batch size that can be deleted. Instead, it caches documents that it has seen.

func NewIncrementalPruner

func NewIncrementalPruner(args IncrementalPruneArgs) *IncrementalPruner

func (*IncrementalPruner) Prune

func (p *IncrementalPruner) Prune(txns *mgo.Collection) (PrunerStats, error)

type MemOracle

type MemOracle struct {
	// contains filtered or unexported fields
}

MemOracle uses an in-memory cache to track what transactions are considered completed and purgeable.

func NewMemOracle

func NewMemOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*MemOracle, func(), error)

NewMemOracle uses an in-memory map to manage the queue of remaining transactions.

func (*MemOracle) CompletedTokens

func (o *MemOracle) CompletedTokens(tokens []string) (map[string]bool, error)

CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.

func (*MemOracle) Count

func (o *MemOracle) Count() int

func (*MemOracle) IterTxns

func (o *MemOracle) IterTxns() (OracleIterator, error)

IterTxns lets you iterate over all of the transactions that have not been removed.

func (*MemOracle) RemoveTxns

func (o *MemOracle) RemoveTxns(txnIds []bson.ObjectId) (int, error)

RemoveTxns can be used to flag that a given transaction should not be considered part of the valid set.

type Oracle

type Oracle interface {
	// Count returns the number of transactions that we are working with
	Count() int

	// CompletedTokens is called with a list of tokens to be checked. The
	// returned map will have a 'true' for any token that references a
	// completed transaction.
	CompletedTokens(tokens []string) (map[string]bool, error)

	// RemoveTxns can be used to flag that a given transaction should not
	// be considered part of the valid set.
	RemoveTxns(txnIds []bson.ObjectId) (int, error)

	// IterTxns lets you iterate over all of the transactions that have
	// not been removed.
	IterTxns() (OracleIterator, error)
}

Oracle is the general interface that is used to track what transactions are considered completed, and can be pruned.

type OracleIterator

type OracleIterator interface {
	// Grab the next transaction id. Will return nil if there are no
	// more transactions.
	Next() (bson.ObjectId, error)
}

OracleIterator is used to walk over the remaining transactions. See the mgo.Iter as a similar iteration mechanism. Standard use is to do: iter := oracle.IterTxns() return EOF when we get to the end of the iterator, or some other error if there is another failure. for txnId := iter.Next(); err != nil; txnId := iter.Next() { } if err != txn.EOF { }

type ProgressMessage

type ProgressMessage struct {
	TxnsRemoved int
	DocsCleaned int
}

type PruneOptions

type PruneOptions struct {

	// PruneFactor will trigger a prune when the current count of
	// transactions in the database is greater than old*PruneFactor
	PruneFactor float32

	// MinNewTransactions will skip a prune even if pruneFactor is true
	// if there are less than MinNewTransactions that might be cleaned up.
	MinNewTransactions int

	// MaxNewTransactions will force a prune if it sees more than
	// MaxNewTransactions since the last run.
	MaxNewTransactions int

	// MaxTime sets a threshold for 'completed' transactions. Transactions
	// will be considered completed only if they are both older than
	// MaxTime and have a status of Completed or Aborted. Passing the
	// zero Time will cause us to only filter on the Status field.
	MaxTime time.Time

	// MaxBatchTransactions is the most transactions that we will prune in a single pass.
	// It is possible to pass 0 to prune all transactions in a pass. Note
	// that MaybePruneTransactions will always process all transactions, it
	// is just whether we do so in multiple passes, or whether it is done
	// all at once.
	MaxBatchTransactions int

	// MaxBatches is the maximum number of passes we will attempt. 0 or
	// negative values are treated as do a single pass.
	MaxBatches int

	// SmallBatchTransactionCount is the number of transactions to read at a time.
	// A value of 1000 seems to be a good balance between how much time we spend
	// processing, and how many documents we evaluate at one time. (a value of
	// 100 empirically processes slower, and a value of 10,000 wasn't any faster)
	SmallBatchTransactionCount int

	// BatchTransactionSleepTime is an amount of time that we will sleep between
	// processing batches of transactions. This allows us to avoid excess load
	// on the system while pruning.
	BatchTransactionSleepTime time.Duration
}

PruneOptions controls when we will trigger a database prune.

type PrunerStats

type PrunerStats struct {
	CacheLookupTime    time.Duration
	DocReadTime        time.Duration
	DocLookupTime      time.Duration
	DocCleanupTime     time.Duration
	StashLookupTime    time.Duration
	StashRemoveTime    time.Duration
	TxnReadTime        time.Duration
	TxnRemoveTime      time.Duration
	DocCacheHits       int64
	DocCacheMisses     int64
	DocMissingCacheHit int64
	DocsMissing        int64
	CollectionQueries  int64
	DocReads           int64
	DocStillMissing    int64
	StashQueries       int64
	StashDocReads      int64
	StashDocsRemoved   int64
	DocQueuesCleaned   int64
	DocTokensCleaned   int64
	DocsAlreadyClean   int64
	TxnsRemoved        int64
	TxnsNotRemoved     int64
	StrCacheHits       int64
	StrCacheMisses     int64
}

PrunerStats collects statistics about how the prune progressed

func CombineStats

func CombineStats(a, b PrunerStats) PrunerStats

CombineStats aggregates two stats into a single value

func (PrunerStats) String

func (ps PrunerStats) String() string

type Remover

type Remover interface {
	Remove(id interface{}) error
	Flush() error
	Removed() int
}

type Runner

type Runner interface {
	// RunTransaction applies the specified transaction operations to a database.
	RunTransaction(*Transaction) error

	// Run calls the nominated function to get the transaction operations to apply to a database.
	// If there is a failure due to a txn.ErrAborted error, the attempt is retried up to nrRetries times.
	Run(transactions TransactionSource) error

	// ResumeTransactions resumes all pending transactions.
	ResumeTransactions() error

	// MaybePruneTransactions removes data for completed transactions
	// from mgo/txn's transaction collection. It is intended to be
	// called periodically.
	//
	// Pruning is an I/O heavy activity so it will only be undertaken
	// if:
	//
	//   txn_count >= pruneFactor * txn_count_at_last_prune
	//
	MaybePruneTransactions(pruneOpts PruneOptions) error
}

Runner instances applies operations to collections in a database.

func NewRunner

func NewRunner(params RunnerParams) Runner

NewRunner returns a Runner which runs transactions for the database specified in params. Collection names used to manage the transactions and change log may also be specified in params, but if not, default values will be used.

type RunnerParams

type RunnerParams struct {
	// Database is the mgo database for which the transaction runner will be used.
	Database *mgo.Database

	// TransactionCollectionName is the name of the collection
	// used to initialise the underlying mgo transaction runner,
	// defaults to "txns" if unspecified.
	TransactionCollectionName string

	// ChangeLogName is the mgo transaction runner change log,
	// defaults to "txns.log" if unspecified.
	ChangeLogName string

	// RunTransactionObserver, if non-nil, will be called when
	// a Run or RunTransaction call has completed. It will be
	// passed the txn.Ops and the error result.
	RunTransactionObserver func(Transaction)

	// Clock is an optional clock to use. If Clock is nil, clock.WallClock will
	// be used.
	Clock Clock

	// ServerSideTransactions indicates that if SSTXNs are available, use them.
	// Note that we will check if they are supported server side, and fall
	// back to client-side transactions if they are not supported.
	ServerSideTransactions bool
}

RunnerParams are used to construct a new transaction runner. Only the Database value is mandatory, defaults will be used for the other attributes if not specified.

type TestHook

type TestHook struct {
	Before func()
	After  func()
}

TestHook holds a pair of functions to be called before and after a mgo/txn transaction is run. Exported only for testing.

type Transaction

type Transaction struct {
	// Ops is the operations that were performed
	Ops []txn.Op
	// Error is the error returned from running the operation, might be nil
	Error error
	// Duration is length of time it took to run the operation
	Duration time.Duration
	// Attempt is the current attempt to apply the operation.
	Attempt int
}

Transaction is a struct that is passed to RunTransactionObserver whenever a transaction is run.

type TransactionSource

type TransactionSource func(attempt int) ([]txn.Op, error)

TransactionSource defines a function that can return transaction operations to run.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL