txn: github.com/juju/txn Index | Files | Directories

package txn

import "github.com/juju/txn"

Index

Package Files

cleaner.go incrementalprune.go oracle.go prune.go txn.go

Variables

var (
    // ErrExcessiveContention is used to signal that even after retrying, the transaction operations
    // could not be successfully applied due to database contention.
    ErrExcessiveContention = stderrors.New("state changing too quickly; try again soon")

    // ErrNoOperations is returned by TransactionSource implementations to signal that
    // no transaction operations are available to run.
    ErrNoOperations = stderrors.New("no transaction operations are available")

    // ErrNoOperations is returned by TransactionSource implementations to signal that
    // the transaction list could not be built but the caller should retry.
    ErrTransientFailure = stderrors.New("transient failure")
)
var EOF = fmt.Errorf("end of transaction ids")

func NewCollectionCleaner Uses

func NewCollectionCleaner(config CollectionConfig) *collectionCleaner

NewCollectionCleaner creates an object that can remove transaction tokens from document queues when the transactions have been marked as completed.

func NewStashCleaner Uses

func NewStashCleaner(config CollectionConfig) *collectionCleaner

NewStashCleaner returns an object suitable for cleaning up the txns.stash collection. It is different because when we find all references from a document have been removed, we can remove the document.

func SupportsServerSideTransactions Uses

func SupportsServerSideTransactions(db *mgo.Database) bool

SupportsServerSideTransactions lets you know if the given database can support server-side transactions.

func TestHooks Uses

func TestHooks(runner Runner) chan ([]TestHook)

TestHooks returns the test hooks for a transaction runner. Exported only for testing.

type CleanAndPruneArgs Uses

type CleanAndPruneArgs struct {

    // Txns is the collection that holds all of the transactions that we
    // might want to prune. We will also make use of Txns.Database to find
    // all of the collections that might make use of transactions from that
    // collection.
    Txns *mgo.Collection

    // TxnsCount is a hint from Txns.Count() to avoid having to call it again
    // to determine whether it is ok to hold the set of transactions in memory.
    // It is optional, as we will call Txns.Count() if it is not supplied.
    TxnsCount int

    // MaxTime is a timestamp that provides a threshold of transactions
    // that we will actually prune. Only transactions that were created
    // before this threshold will be pruned.
    MaxTime time.Time

    // MaxTransactionsToProcess defines how many completed transactions that we will evaluate in this batch.
    // A value of 0 indicates we should evaluate all completed transactions.
    MaxTransactionsToProcess int

    // Multithreaded will start multiple pruning passes concurrently
    Multithreaded bool

    // TxnBatchSize is how many transaction to process at once.
    TxnBatchSize int

    // TxnBatchSleepTime is how long we should sleep between processing transaction
    // batches, to allow other parts of the system to operate (avoid consuming
    // all resources)
    // The default is to not sleep at all, but this can be configured to reduce
    // load while pruning.
    TxnBatchSleepTime time.Duration
}

CleanAndPruneArgs specifies the parameters required by CleanAndPrune.

type CleanupStats Uses

type CleanupStats struct {

    // CollectionsInspected is the total number of collections we looked at for documents
    CollectionsInspected int

    // DocsInspected is how many documents we loaded to evaluate their txn queues
    DocsInspected int

    // DocsCleaned is how many documents we Updated to remove entries from their txn queue.
    DocsCleaned int

    // StashDocumentsRemoved is how many total documents we remove from txns.stash
    StashDocumentsRemoved int

    // StashDocumentsRemoved is how many documents we remove from txns
    TransactionsRemoved int

    // ShouldRetry indicates that we think this cleanup was not complete due to too many txns to process. We recommend running it again.
    ShouldRetry bool
}

CleanupStats gives some numbers as to what work was done as part of CleanupAndPrune.

func CleanAndPrune Uses

func CleanAndPrune(args CleanAndPruneArgs) (CleanupStats, error)

CleanAndPrune runs the cleanup steps, and then follows up with pruning all of the transactions that are no longer referenced.

type Clock Uses

type Clock interface {
    // Now returns the current clock time.
    Now() time.Time
}

Clock is a simplified form of juju/clock.Clock, since we don't need all the methods and this allows us to be compatible with both juju/clock.Clock and juju/utils/clock.Clock

type CollectionConfig Uses

type CollectionConfig struct {
    // Oracle is an Oracle that we can use to determine if a given
    // transaction token should be considered a 'completed' transaction.
    Oracle Oracle

    // Source is the mongo collection holding documents created and managed
    // by transactions.
    Source *mgo.Collection

    // NumBatchTokens is the number of tokens that we will cache before
    // doing a query to find out whether their referenced transactions are
    // completed. It is useful to have a number in the hundreds so that we
    // efficiently query the mongo transaction database.
    NumBatchTokens int

    // MaxRemoveQueue is the maximum number of document ids that we will
    // hold on to in memory before we go back to the database to purge those
    // documents. This only affects StashCollectionCleaner, as the generic
    // cleaner never removes documents.
    MaxRemoveQueue int

    // LogInterval defines how often we will show progress
    LogInterval time.Duration
}

CollectionConfig is the definition of what we will be cleaning up.

type CollectionStats Uses

type CollectionStats struct {

    // DocCount is the total number of documents evaluated.
    DocCount int

    // TokenCount is the total number of transaction tokens that were
    // referenced by the documents.
    TokenCount int

    // CompletedTokenCount is the number of unique tokens that referenced
    // completed transactions.
    CompletedTokenCount int

    // CompletedTxnCount is the number of completed transactions that we
    // looked up.
    CompletedTxnCount int

    // UpdatedDocCount is the number of documents we modified without
    // removing them
    UpdatedDocCount int

    // PulledCount is the number of tokens that were removed from documents.
    PulledTokenCount int

    // RemovedCount represents the number of txns.stash documents that we
    // decided to remove entirely.
    RemovedCount int
}

CollectionStats tracks various counters that signal how the collector operated.

func (CollectionStats) Details Uses

func (stats CollectionStats) Details() string

func (CollectionStats) HasChanges Uses

func (stats CollectionStats) HasChanges() bool

type DBOracle Uses

type DBOracle struct {
    // contains filtered or unexported fields
}

DBOracle uses a temporary table on disk to track what transactions are considered completed and purgeable.

func NewDBOracle Uses

func NewDBOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*DBOracle, func(), error)

NewDBOracle uses a database collection to manage the queue of remaining transactions. The caller is responsible to call the returned cleanup() function, to ensure that any resources are freed. thresholdTime is used to omit transactions that are newer than this time (eg, don't consider transactions that are less than 1 hr old to be considered completed yet.)

func (*DBOracle) CompletedTokens Uses

func (o *DBOracle) CompletedTokens(tokens []string) (map[string]bool, error)

CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.

func (*DBOracle) Count Uses

func (o *DBOracle) Count() int

func (*DBOracle) IterTxns Uses

func (o *DBOracle) IterTxns() (OracleIterator, error)

IterTxns lets you iterate over all of the transactions that have not been removed.

func (*DBOracle) RemoveTxns Uses

func (o *DBOracle) RemoveTxns(txnIds []bson.ObjectId) (int, error)

RemoveTxns can be used to flag that a given transaction should not be considered part of the valid set.

type IncrementalPruneArgs Uses

type IncrementalPruneArgs struct {
    // MaxTime is a timestamp that provides a threshold of transactions
    // that we will actually prune. Only transactions that were created
    // before this threshold will be pruned.
    // MaxTime can be set to the Zero value to indicate all transactions.
    MaxTime time.Time

    // If ProgressChannel is not nil, this will send updates when documents are
    // processed and transactions are pruned.
    ProgressChannel chan ProgressMessage

    // ReverseOrder indicates we should process transactions from newest to
    // oldest instead of form oldest to newest.
    ReverseOrder bool

    // TxnBatchSize is how many transactions to process at once.
    TxnBatchSize int

    // TxnBatchSleepTime is how long we should sleep between processing transaction
    // batches, to allow other parts of the system to operate (avoid consuming
    // all resources)
    // The default is to not sleep at all, but this can be configured to reduce
    // load while pruning.
    TxnBatchSleepTime time.Duration
}

IncrementalPruneArgs specifies the parameters for running incremental cleanup steps.

type IncrementalPruner Uses

type IncrementalPruner struct {
    ProgressChan chan ProgressMessage
    // contains filtered or unexported fields
}

IncrementalPruner reads the transaction table incrementally, seeing if it can remove the current set of transactions, and then moves on to newer transactions. It only thinks about 1k txns at a time, because that is the batch size that can be deleted. Instead, it caches documents that it has seen.

func NewIncrementalPruner Uses

func NewIncrementalPruner(args IncrementalPruneArgs) *IncrementalPruner

func (*IncrementalPruner) Prune Uses

func (p *IncrementalPruner) Prune(txns *mgo.Collection) (PrunerStats, error)

type MemOracle Uses

type MemOracle struct {
    // contains filtered or unexported fields
}

MemOracle uses an in-memory cache to track what transactions are considered completed and purgeable.

func NewMemOracle Uses

func NewMemOracle(txns *mgo.Collection, thresholdTime time.Time, maxTxns int) (*MemOracle, func(), error)

NewMemOracle uses an in-memory map to manage the queue of remaining transactions.

func (*MemOracle) CompletedTokens Uses

func (o *MemOracle) CompletedTokens(tokens []string) (map[string]bool, error)

CompletedTokens looks at the list of tokens and finds what referenced txns are completed, and then returns the set of tokens that are completed.

func (*MemOracle) Count Uses

func (o *MemOracle) Count() int

func (*MemOracle) IterTxns Uses

func (o *MemOracle) IterTxns() (OracleIterator, error)

IterTxns lets you iterate over all of the transactions that have not been removed.

func (*MemOracle) RemoveTxns Uses

func (o *MemOracle) RemoveTxns(txnIds []bson.ObjectId) (int, error)

RemoveTxns can be used to flag that a given transaction should not be considered part of the valid set.

type Oracle Uses

type Oracle interface {
    // Count returns the number of transactions that we are working with
    Count() int

    // CompletedTokens is called with a list of tokens to be checked. The
    // returned map will have a 'true' for any token that references a
    // completed transaction.
    CompletedTokens(tokens []string) (map[string]bool, error)

    // RemoveTxns can be used to flag that a given transaction should not
    // be considered part of the valid set.
    RemoveTxns(txnIds []bson.ObjectId) (int, error)

    // IterTxns lets you iterate over all of the transactions that have
    // not been removed.
    IterTxns() (OracleIterator, error)
}

Oracle is the general interface that is used to track what transactions are considered completed, and can be pruned.

type OracleIterator Uses

type OracleIterator interface {
    // Grab the next transaction id. Will return nil if there are no
    // more transactions.
    Next() (bson.ObjectId, error)
}

OracleIterator is used to walk over the remaining transactions. See the mgo.Iter as a similar iteration mechanism. Standard use is to do: iter := oracle.IterTxns() return EOF when we get to the end of the iterator, or some other error if there is another failure. for txnId := iter.Next(); err != nil; txnId := iter.Next() { } if err != txn.EOF { }

type ProgressMessage Uses

type ProgressMessage struct {
    TxnsRemoved int
    DocsCleaned int
}

type PruneOptions Uses

type PruneOptions struct {

    // PruneFactor will trigger a prune when the current count of
    // transactions in the database is greater than old*PruneFactor
    PruneFactor float32

    // MinNewTransactions will skip a prune even if pruneFactor is true
    // if there are less than MinNewTransactions that might be cleaned up.
    MinNewTransactions int

    // MaxNewTransactions will force a prune if it sees more than
    // MaxNewTransactions since the last run.
    MaxNewTransactions int

    // MaxTime sets a threshold for 'completed' transactions. Transactions
    // will be considered completed only if they are both older than
    // MaxTime and have a status of Completed or Aborted. Passing the
    // zero Time will cause us to only filter on the Status field.
    MaxTime time.Time

    // MaxBatchTransactions is the most transactions that we will prune in a single pass.
    // It is possible to pass 0 to prune all transactions in a pass. Note
    // that MaybePruneTransactions will always process all transactions, it
    // is just whether we do so in multiple passes, or whether it is done
    // all at once.
    MaxBatchTransactions int

    // MaxBatches is the maximum number of passes we will attempt. 0 or
    // negative values are treated as do a single pass.
    MaxBatches int

    // SmallBatchTransactionCount is the number of transactions to read at a time.
    // A value of 1000 seems to be a good balance between how much time we spend
    // processing, and how many documents we evaluate at one time. (a value of
    // 100 empirically processes slower, and a value of 10,000 wasn't any faster)
    SmallBatchTransactionCount int

    // BatchTransactionSleepTime is an amount of time that we will sleep between
    // processing batches of transactions. This allows us to avoid excess load
    // on the system while pruning.
    BatchTransactionSleepTime time.Duration
}

PruneOptions controls when we will trigger a database prune.

type PrunerStats Uses

type PrunerStats struct {
    CacheLookupTime    time.Duration
    DocReadTime        time.Duration
    DocLookupTime      time.Duration
    DocCleanupTime     time.Duration
    StashLookupTime    time.Duration
    StashRemoveTime    time.Duration
    TxnReadTime        time.Duration
    TxnRemoveTime      time.Duration
    DocCacheHits       int64
    DocCacheMisses     int64
    DocMissingCacheHit int64
    DocsMissing        int64
    CollectionQueries  int64
    DocReads           int64
    DocStillMissing    int64
    StashQueries       int64
    StashDocReads      int64
    StashDocsRemoved   int64
    DocQueuesCleaned   int64
    DocTokensCleaned   int64
    DocsAlreadyClean   int64
    TxnsRemoved        int64
    TxnsNotRemoved     int64
    StrCacheHits       int64
    StrCacheMisses     int64
}

PrunerStats collects statistics about how the prune progressed

func CombineStats Uses

func CombineStats(a, b PrunerStats) PrunerStats

CombineStats aggregates two stats into a single value

func (PrunerStats) String Uses

func (ps PrunerStats) String() string

type Remover Uses

type Remover interface {
    Remove(id interface{}) error
    Flush() error
    Removed() int
}

type Runner Uses

type Runner interface {
    // RunTransaction applies the specified transaction operations to a database.
    RunTransaction(*Transaction) error

    // Run calls the nominated function to get the transaction operations to apply to a database.
    // If there is a failure due to a txn.ErrAborted error, the attempt is retried up to nrRetries times.
    Run(transactions TransactionSource) error

    // ResumeTransactions resumes all pending transactions.
    ResumeTransactions() error

    // MaybePruneTransactions removes data for completed transactions
    // from mgo/txn's transaction collection. It is intended to be
    // called periodically.
    //
    // Pruning is an I/O heavy activity so it will only be undertaken
    // if:
    //
    //   txn_count >= pruneFactor * txn_count_at_last_prune
    //
    MaybePruneTransactions(pruneOpts PruneOptions) error
}

Runner instances applies operations to collections in a database.

func NewRunner Uses

func NewRunner(params RunnerParams) Runner

NewRunner returns a Runner which runs transactions for the database specified in params. Collection names used to manage the transactions and change log may also be specified in params, but if not, default values will be used.

type RunnerParams Uses

type RunnerParams struct {
    // Database is the mgo database for which the transaction runner will be used.
    Database *mgo.Database

    // TransactionCollectionName is the name of the collection
    // used to initialise the underlying mgo transaction runner,
    // defaults to "txns" if unspecified.
    TransactionCollectionName string

    // ChangeLogName is the mgo transaction runner change log,
    // defaults to "txns.log" if unspecified.
    ChangeLogName string

    // RunTransactionObserver, if non-nil, will be called when
    // a Run or RunTransaction call has completed. It will be
    // passed the txn.Ops and the error result.
    RunTransactionObserver func(Transaction)

    // Clock is an optional clock to use. If Clock is nil, clock.WallClock will
    // be used.
    Clock Clock

    // ServerSideTransactions indicates that if SSTXNs are available, use them.
    // Note that we will check if they are supported server side, and fall
    // back to client-side transactions if they are not supported.
    ServerSideTransactions bool
}

RunnerParams are used to construct a new transaction runner. Only the Database value is mandatory, defaults will be used for the other attributes if not specified.

type TestHook Uses

type TestHook struct {
    Before func()
    After  func()
}

TestHook holds a pair of functions to be called before and after a mgo/txn transaction is run. Exported only for testing.

type Transaction Uses

type Transaction struct {
    // Ops is the operations that were performed
    Ops []txn.Op
    // Error is the error returned from running the operation, might be nil
    Error error
    // Duration is length of time it took to run the operation
    Duration time.Duration
    // Attempt is the current attempt to apply the operation.
    Attempt int
}

Transaction is a struct that is passed to RunTransactionObserver whenever a transaction is run.

type TransactionSource Uses

type TransactionSource func(attempt int) ([]txn.Op, error)

TransactionSource defines a function that can return transaction operations to run.

Directories

PathSynopsis
testing

Package txn imports 15 packages (graph) and is imported by 313 packages. Updated 2019-06-26. Refresh now. Tools for package owners.