cockroach: github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency Index | Files | Directories

package concurrency

import "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"

Package concurrency provides a concurrency manager structure that encapsulates the details of concurrency control and contention handling for serializable key-value transactions.

Index

Package Files

concurrency_control.go concurrency_manager.go latch_manager.go lock_table.go lock_table_waiter.go lockstate_interval_btree.go

Variables

var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting(
    "kv.lock_table.deadlock_detection_push_delay",
    "the delay before pushing in order to detect dependency cycles between transactions",

    100*time.Millisecond,
)

LockTableDeadlockDetectionPushDelay sets the delay before pushing in order to detect dependency cycles between transactions.

var LockTableLivenessPushDelay = settings.RegisterDurationSetting(
    "kv.lock_table.coordinator_liveness_push_delay",
    "the delay before pushing in order to detect coordinator failures of conflicting transactions",

    50*time.Millisecond,
)

LockTableLivenessPushDelay sets the delay before pushing in order to detect coordinator failures of conflicting transactions.

type Config Uses

type Config struct {
    // Identification.
    NodeDesc  *roachpb.NodeDescriptor
    RangeDesc *roachpb.RangeDescriptor
    // Components.
    Settings       *cluster.Settings
    DB             *kv.DB
    Clock          *hlc.Clock
    Stopper        *stop.Stopper
    IntentResolver IntentResolver
    // Metrics.
    TxnWaitMetrics *txnwait.Metrics
    SlowLatchGauge *metric.Gauge
    // Configs + Knobs.
    MaxLockTableSize  int64
    DisableTxnPushing bool
    TxnWaitKnobs      txnwait.TestingKnobs
}

Config contains the dependencies to construct a Manager.

type ContentionHandler Uses

type ContentionHandler interface {
    // HandleWriterIntentError consumes a WriteIntentError by informing the
    // concurrency manager about the replicated write intent that was missing
    // from its lock table which was found during request evaluation (while
    // holding latches) under the provided lease sequence. After doing so, it
    // enqueues the request that hit the error in the lock's wait-queue (but
    // does not wait) and releases the guard's latches. It returns an updated
    // guard reflecting this change. After the method returns, the original
    // guard should no longer be used. If an error is returned then the provided
    // guard will be released and no guard will be returned.
    //
    // Example usage: Txn A scans the lock table and does not see an intent on
    // key K from txn B because the intent is not being tracked in the lock
    // table. Txn A moves on to evaluation. While scanning, it notices the
    // intent on key K. It throws a WriteIntentError which is consumed by this
    // method before txn A retries its scan. During the retry, txn A scans the
    // lock table and observes the lock on key K, so it enters the lock's
    // wait-queue and waits for it to be resolved.
    HandleWriterIntentError(
        context.Context, *Guard, roachpb.LeaseSequence, *roachpb.WriteIntentError,
    ) (*Guard, *Error)

    // HandleTransactionPushError consumes a TransactionPushError thrown by a
    // PushTxnRequest by informing the concurrency manager about a transaction
    // record that could not be pushed during request evaluation (while holding
    // latches). After doing so, it releases the guard's latches. It returns an
    // updated guard reflecting this change. After the method returns, the
    // original guard should no longer be used.
    //
    // Example usage: Txn A sends a PushTxn request to push abort txn B. When
    // the request is originally sequenced through the concurrency manager, it
    // checks the txn wait-queue and finds that txn B is not being tracked, so
    // it does not queue up behind it. Txn A moves on to evaluation and tries to
    // push txn B's record. This push fails because txn B is not expired, which
    // results in a TransactionPushError. This error is consumed by this method
    // before txn A retries its push. During the retry, txn A finds that txn B
    // is being tracked in the txn wait-queue so it waits there for txn B to
    // finish.
    HandleTransactionPushError(context.Context, *Guard, *roachpb.TransactionPushError) *Guard
}

ContentionHandler is concerned with handling contention-related errors. This typically involves preparing the request to be queued upon a retry. It is one of the roles of Manager.

type Error Uses

type Error = roachpb.Error

Error is an alias for a roachpb.Error.

type Guard Uses

type Guard struct {
    Req Request
    // contains filtered or unexported fields
}

Guard is returned from Manager.SequenceReq. The guard is passed back in to Manager.FinishReq to release the request's resources when it has completed.

func (*Guard) AssertLatches Uses

func (g *Guard) AssertLatches()

AssertLatches asserts that the guard is non-nil and holding latches.

func (*Guard) AssertNoLatches Uses

func (g *Guard) AssertNoLatches()

AssertNoLatches asserts that the guard is non-nil and not holding latches.

func (*Guard) HoldingLatches Uses

func (g *Guard) HoldingLatches() bool

HoldingLatches returned whether the guard is holding latches or not.

func (*Guard) LatchSpans Uses

func (g *Guard) LatchSpans() *spanset.SpanSet

LatchSpans returns the maximal set of spans that the request will access.

type IntentResolver Uses

type IntentResolver interface {
    // PushTransaction pushes the provided transaction. The method will push the
    // provided pushee transaction immediately, if possible. Otherwise, it will
    // block until the pushee transaction is finalized or eventually can be
    // pushed successfully.
    PushTransaction(
        context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
    ) (*roachpb.Transaction, *Error)

    // ResolveIntent synchronously resolves the provided intent.
    ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error

    // ResolveIntents synchronously resolves the provided batch of intents.
    ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

IntentResolver is an interface used by lockTableWaiterImpl to push transactions and to resolve intents. It contains only the subset of the intentresolver.IntentResolver interface that lockTableWaiterImpl needs.

type LockManager Uses

type LockManager interface {
    // OnLockAcquired informs the concurrency manager that a transaction has
    // acquired a new lock or re-acquired an existing lock that it already held.
    OnLockAcquired(context.Context, *roachpb.LockAcquisition)

    // OnLockUpdated informs the concurrency manager that a transaction has
    // updated or released a lock or range of locks that it previously held.
    // The Durability field of the lock update struct is ignored.
    OnLockUpdated(context.Context, *roachpb.LockUpdate)
}

LockManager is concerned with tracking locks that are stored on the manager's range. It is one of the roles of Manager.

type Manager Uses

type Manager interface {
    RequestSequencer
    ContentionHandler
    LockManager
    TransactionManager
    RangeStateListener
    MetricExporter
}

Manager is a structure that sequences incoming requests and provides isolation between requests that intend to perform conflicting operations. During sequencing, conflicts are discovered and any found are resolved through a combination of passive queuing and active pushing. Once a request has been sequenced, it is free to evaluate without concerns of conflicting with other in-flight requests due to the isolation provided by the manager. This isolation is guaranteed for the lifetime of the request but terminates once the request completes.

Transactions require isolation both within requests and across requests. The manager accommodates this by allowing transactional requests to acquire locks, which outlive the requests themselves. Locks extend the duration of the isolation provided over specific keys to the lifetime of the lock-holder transaction itself. They are (typically) only released when the transaction commits or aborts. Other requests that find these locks while being sequenced wait on them to be released in a queue before proceeding. Because locks are checked during sequencing, requests are guaranteed access to all declared keys after they have been sequenced. In other words, locks don't need to be checked again during evaluation.

However, at the time of writing, not all locks are stored directly under the manager's control, so not all locks are discoverable during sequencing. Specifically, write intents (replicated, exclusive locks) are stored inline in the MVCC keyspace, so they are not detectable until request evaluation time. To accommodate this form of lock storage, the manager exposes a HandleWriterIntentError method, which can be used in conjunction with a retry loop around evaluation to integrate external locks with the concurrency manager structure. In the future, we intend to pull all locks, including those associated with write intents, into the concurrency manager directly through a replicated lock table structure.

Fairness is ensured between requests. In general, if any two requests conflict then the request that arrived first will be sequenced first. As such, sequencing guarantees FIFO semantics. The primary exception to this is that a request that is part of a transaction which has already acquired a lock does not need to wait on that lock during sequencing, and can therefore ignore any queue that has formed on the lock. For other exceptions, see the later comment for lockTable.

Internal Components

The concurrency manager is composed of a number of internal synchronization, bookkeeping, and queueing structures. Each of these is discussed in more detail on their interface definition. The following diagram details how the components are tied together:

+---------------------+---------------------------------------------+
| concurrency.Manager |                                             |
+---------------------+                                             |
|                                                                   |
+------------+  acquire  +--------------+        acquire            |
  Sequence() |--->--->---| latchManager |<---<---<---<---<---<---+  |
+------------+           +--------------+                        |  |
|                         / check locks + wait queues            |  |
|                        v  if conflict, enter q & drop latches  ^  |
|         +---------------------------------------------------+  |  |
|         | [ lockTable ]                                     |  |  |
|         | [    key1   ]    -------------+-----------------+ |  ^  |
|         | [    key2   ]  /  lockState:  | lockWaitQueue:  |----<---<---<----+
|         | [    key3   ]-{   - lock type | +-[a]<-[b]<-[c] | |  |  |         |
|         | [    key4   ]  \  - txn  meta | |  (no latches) |-->-^  |         |
|         | [    key5   ]    -------------+-|---------------+ |     |         |
|         | [    ...    ]                   v                 |     |         ^
|         +---------------------------------|-----------------+     |         | if lock found, HandleWriterIntentError()
|                 |                         |                       |         |  - enter lockWaitQueue
|                 |       +- may be remote -+--+                    |         |  - drop latches
|                 |       |                    |                    |         |  - wait for lock update / release
|                 v       v                    ^                    |         |
|                 |    +--------------------------+                 |         ^
|                 |    | txnWaitQueue:            |                 |         |
|                 |    | (located on txn record's |                 |         |
|                 v    |  leaseholder replica)    |                 |         |
|                 |    |--------------------------|                 |         ^
|                 |    | [txn1] [txn2] [txn3] ... |----<---<---<---<----+     |
|                 |    +--------------------------+                 |   | if txn push failed, HandleTransactionPushError()
|                 |                                                 |   |  - enter txnWaitQueue
|                 |                                                 |   ^  - drop latches
|                 |                                                 |   |  - wait for txn record update
|                 |                                                 |   |     |
|                 |                                                 |   |     |
|                 +--> retain latches --> remain at head of queues ---> evaluate ---> Finish()
|                                                                   |
+----------+                                                        |
  Finish() | ---> exit wait queues ---> drop latches -----------------> respond ...
+----------+                                                        |
|                                                                   |
+-------------------------------------------------------------------+

See the comments on individual components for a more detailed look at their interface and inner-workings.

At a high-level, a request enters the concurrency manager and immediately acquires latches from the latchManager to serialize access to the keys that it intends to touch. This latching takes into account the keys being accessed, the MVCC timestamp of accesses, and the access method being used (read vs. write) to allow for concurrency where possible. This has the effect of queuing on conflicting in-flight operations until their completion.

Once latched, the request consults the lockTable to check for any conflicting locks owned by other transactions. If any are found, the request enters the corresponding lockWaitQueue and its latches are dropped. Requests in the queue wait for the corresponding lock to be released by intent resolution. While waiting, the head of the lockWaitQueue pushes the owner of the lock through a remote RPC that ends up in the pushee's txnWaitQueue. This queue exists on the leaseholder replica of the range that contains the pushee's transaction record. Other entries in the queue wait for the head of the queue, eventually pushing it to detect coordinator failures and transaction deadlocks. Once the lock is released, the head of the queue reacquires latches and attempts to proceed while remaining at the head of that lockWaitQueue to ensure fairness.

Once a request is latched and observes no conflicting locks in the lockTable and no conflicting lockWaitQueues that it is not already the head of, the request can proceed to evaluate. During evaluation, the request may insert or remove locks from the lockTable for its own transaction.

When the request completes, it exits any lockWaitQueues that it was a part of and releases its latches. However, if the request was successful, any locks that it inserted into the lockTable remain.

func NewManager Uses

func NewManager(cfg Config) Manager

NewManager creates a new concurrency Manager structure.

type MetricExporter Uses

type MetricExporter interface {
    // LatchMetrics returns information about the state of the latchManager.
    LatchMetrics() (global, local kvserverpb.LatchManagerInfo)

    // LockTableDebug returns a debug string representing the state of the
    // lockTable.
    LockTableDebug() string

    // TxnWaitQueue returns the concurrency manager's txnWaitQueue.
    // TODO(nvanbenschoten): this doesn't really fit into this interface. It
    // would be nice if the txnWaitQueue was hidden behind the concurrency
    // manager abstraction entirely, but tests want to access it directly.
    TxnWaitQueue() *txnwait.Queue
}

MetricExporter is concerned with providing observability into the state of the concurrency manager. It is one of the roles of Manager.

type RangeStateListener Uses

type RangeStateListener interface {
    // OnRangeDescUpdated informs the manager that its range's descriptor has been
    // updated.
    OnRangeDescUpdated(*roachpb.RangeDescriptor)

    // OnRangeLeaseUpdated informs the concurrency manager that its range's
    // lease has been updated. The argument indicates whether this manager's
    // replica is the leaseholder going forward.
    OnRangeLeaseUpdated(_ roachpb.LeaseSequence, isLeaseholder bool)

    // OnRangeSplit informs the concurrency manager that its range has split off
    // a new range to its RHS.
    OnRangeSplit()

    // OnRangeMerge informs the concurrency manager that its range has merged
    // into its LHS neighbor. This is not called on the LHS range being merged
    // into.
    OnRangeMerge()

    // OnReplicaSnapshotApplied informs the concurrency manager that its replica
    // has received a snapshot from another replica in its range.
    OnReplicaSnapshotApplied()
}

RangeStateListener is concerned with observing updates to the concurrency manager's range. It is one of the roles of Manager.

type Request Uses

type Request struct {
    // The (optional) transaction that sent the request.
    // Non-transactional requests do not acquire locks.
    Txn *roachpb.Transaction

    // The timestamp that the request should evaluate at.
    // Should be set to Txn.ReadTimestamp if Txn is non-nil.
    Timestamp hlc.Timestamp

    // The priority of the request. Only set if Txn is nil.
    Priority roachpb.UserPriority

    // The consistency level of the request. Only set if Txn is nil.
    ReadConsistency roachpb.ReadConsistencyType

    // The wait policy of the request. Signifies how the request should
    // behave if it encounters conflicting locks held by other active
    // transactions.
    WaitPolicy lock.WaitPolicy

    // The individual requests in the batch.
    Requests []roachpb.RequestUnion

    // The maximal set of spans that the request will access. Latches
    // will be acquired for these spans.
    // TODO(nvanbenschoten): don't allocate these SpanSet objects.
    LatchSpans *spanset.SpanSet

    // The maximal set of spans within which the request expects to have
    // isolation from conflicting transactions. Conflicting locks within
    // these spans will be queued on and conditionally pushed.
    //
    // Note that unlike LatchSpans, the timestamps that these spans are
    // declared at are NOT consulted. All read spans are considered to take
    // place at the transaction's read timestamp (Txn.ReadTimestamp) and all
    // write spans are considered to take place the transaction's write
    // timestamp (Txn.WriteTimestamp). If the request is non-transactional
    // (Txn == nil), all reads and writes are considered to take place at
    // Timestamp.
    LockSpans *spanset.SpanSet
}

Request is the input to Manager.SequenceReq. The struct contains all of the information necessary to sequence a KV request and determine which locks and other in-flight requests it conflicts with.

type RequestSequencer Uses

type RequestSequencer interface {
    // SequenceReq acquires latches, checks for locks, and queues behind and/or
    // pushes other transactions to resolve any conflicts. Once sequenced, the
    // request is guaranteed sufficient isolation for the duration of its
    // evaluation, until the returned request guard is released.
    // NOTE: this last part will not be true until replicated locks are pulled
    // into the concurrency manager.
    //
    // An optional existing request guard can be provided to SequenceReq. This
    // allows the request's position in lock wait-queues to be retained across
    // sequencing attempts. If provided, the guard should not be holding latches
    // already. The expected usage of this parameter is that it will only be
    // provided after acquiring a Guard from a ContentionHandler method.
    //
    // If the method returns a non-nil request guard then the caller must ensure
    // that the guard is eventually released by passing it to FinishReq.
    //
    // Alternatively, the concurrency manager may be able to serve the request
    // directly, in which case it will return a Response for the request. If it
    // does so, it will not return a request guard.
    SequenceReq(context.Context, *Guard, Request) (*Guard, Response, *Error)

    // FinishReq marks the request as complete, releasing any protection
    // the request had against conflicting requests and allowing conflicting
    // requests that are blocked on this one to proceed. The guard should not
    // be used after being released.
    FinishReq(*Guard)
}

RequestSequencer is concerned with the sequencing of concurrent requests. It is one of the roles of Manager.

type Response Uses

type Response = []roachpb.ResponseUnion

Response is a slice of responses to requests in a batch. This type is used when the concurrency manager is able to respond to a request directly during sequencing.

type TransactionManager Uses

type TransactionManager interface {
    // OnTransactionUpdated informs the concurrency manager that a transaction's
    // status was updated.
    OnTransactionUpdated(context.Context, *roachpb.Transaction)

    // GetDependents returns a set of transactions waiting on the specified
    // transaction either directly or indirectly. The method is used to perform
    // deadlock detection. See txnWaitQueue for more.
    GetDependents(uuid.UUID) []uuid.UUID
}

TransactionManager is concerned with tracking transactions that have their record stored on the manager's range. It is one of the roles of Manager.

Directories

PathSynopsis
lockPackage lock provides type definitions for locking-related concepts used by concurrency control in the key-value layer.

Package concurrency imports 31 packages (graph) and is imported by 4 packages. Updated 2020-08-12. Refresh now. Tools for package owners.