cockroach: Index | Files

package txnwait

import ""


Package Files

metrics.go queue.go


var TxnLivenessHeartbeatMultiplier = envutil.EnvOrDefaultInt(

TxnLivenessHeartbeatMultiplier specifies what multiple the transaction liveness threshold should be of the transaction heartbeat internval.

var TxnLivenessThreshold = time.Duration(TxnLivenessHeartbeatMultiplier) * base.DefaultTxnHeartbeatInterval

TxnLivenessThreshold is the maximum duration between transaction heartbeats before the transaction is considered expired by Queue. It is exposed and mutable to allow tests to override it.

Use TestingOverrideTxnLivenessThreshold to override the value in tests.

func IsExpired Uses

func IsExpired(now hlc.Timestamp, txn *roachpb.Transaction) bool

IsExpired is true if the given transaction is expired.

func ShouldPushImmediately Uses

func ShouldPushImmediately(req *roachpb.PushTxnRequest) bool

ShouldPushImmediately returns whether the PushTxn request should proceed without queueing. This is true for pushes which are neither ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where the pushee has min priority or pusher has max priority.

func TestingOverrideTxnLivenessThreshold Uses

func TestingOverrideTxnLivenessThreshold(t time.Duration) func()

TestingOverrideTxnLivenessThreshold allows tests to override the transaction liveness threshold. The function returns a closure that should be called to reset the value.

func TxnExpiration Uses

func TxnExpiration(txn *roachpb.Transaction) hlc.Timestamp

TxnExpiration computes the timestamp after which the transaction will be considered expired.

type Config Uses

type Config struct {
    RangeDesc *roachpb.RangeDescriptor
    DB        *kv.DB
    Clock     *hlc.Clock
    Stopper   *stop.Stopper
    Metrics   *Metrics
    Knobs     TestingKnobs

Config contains the dependencies to construct a Queue.

type Metrics Uses

type Metrics struct {
    PusheeWaiting  *metric.Gauge
    PusherWaiting  *metric.Gauge
    QueryWaiting   *metric.Gauge
    PusherSlow     *metric.Gauge
    PusherWaitTime *metric.Histogram
    QueryWaitTime  *metric.Histogram
    DeadlocksTotal *metric.Counter

Metrics contains all the txnqueue related metrics.

func NewMetrics Uses

func NewMetrics(histogramWindowInterval time.Duration) *Metrics

NewMetrics creates a new Metrics instance with all related metric fields.

type Queue Uses

type Queue struct {
    // contains filtered or unexported fields

Queue enqueues PushTxn requests which are waiting on extant txns with conflicting intents to abort or commit.

Internally, it maintains a map from extant txn IDs to queues of pending PushTxn requests.

When a write intent is encountered, the command which encountered it (called the "pusher" here) initiates a PushTxn request to determine the disposition of the intent's transaction (called the "pushee" here). This queue is where a PushTxn request will wait if it discovers that the pushee's transaction is still pending, and cannot be otherwise aborted or pushed forward.

Queue is thread safe.

func NewQueue Uses

func NewQueue(cfg Config) *Queue

NewQueue instantiates a new Queue.

func (*Queue) Clear Uses

func (q *Queue) Clear(disable bool)

Clear empties the queue and returns all waiters. This method should be invoked when the replica loses or transfers its lease. If `disable` is true, future transactions may not be enqueued or waiting pushers added. Call Enable() once the lease is again acquired by the replica.

func (*Queue) Enable Uses

func (q *Queue) Enable(_ roachpb.LeaseSequence)

Enable allows transactions to be enqueued and waiting pushers added. This method must be idempotent as it can be invoked multiple times as range leases are updated for the same replica.

func (*Queue) EnqueueTxn Uses

func (q *Queue) EnqueueTxn(txn *roachpb.Transaction)

EnqueueTxn creates a new pendingTxn for the target txn of a failed PushTxn command. Subsequent PushTxn requests for the same txn will be enqueued behind the pendingTxn via MaybeWait().

func (*Queue) GetDependents Uses

func (q *Queue) GetDependents(txnID uuid.UUID) []uuid.UUID

GetDependents returns a slice of transactions waiting on the specified txn either directly or indirectly.

func (*Queue) IsEnabled Uses

func (q *Queue) IsEnabled() bool

IsEnabled is true if the queue is enabled.

func (*Queue) MaybeWaitForPush Uses

func (q *Queue) MaybeWaitForPush(
    ctx context.Context, req *roachpb.PushTxnRequest,
) (*roachpb.PushTxnResponse, *roachpb.Error)

MaybeWaitForPush checks whether there is a queue already established for pushing the transaction. If not, or if the PushTxn request isn't queueable, return immediately. If there is a queue, enqueue this request as a waiter and enter a select loop waiting for resolution.

If the transaction is successfully pushed while this method is waiting, the first return value is a non-nil PushTxnResponse object.

func (*Queue) MaybeWaitForQuery Uses

func (q *Queue) MaybeWaitForQuery(
    ctx context.Context, req *roachpb.QueryTxnRequest,
) *roachpb.Error

MaybeWaitForQuery checks whether there is a queue already established for pushing the transaction. If not, or if the QueryTxn request hasn't specified WaitForUpdate, return immediately. If there is a queue, enqueue this request as a waiter and enter a select loop waiting for any updates to the target transaction.

func (*Queue) OnRangeDescUpdated Uses

func (q *Queue) OnRangeDescUpdated(desc *roachpb.RangeDescriptor)

OnRangeDescUpdated informs the Queue that its Range has been updated.

func (*Queue) RangeContainsKeyLocked Uses

func (q *Queue) RangeContainsKeyLocked(key roachpb.Key) bool

RangeContainsKeyLocked returns whether the Queue's Range contains the specified key.

func (*Queue) TrackedTxns Uses

func (q *Queue) TrackedTxns() map[uuid.UUID]struct{}

TrackedTxns returns a (newly minted) set containing the transaction IDs which are being tracked (i.e. waited on).

For testing purposes only.

func (*Queue) UpdateTxn Uses

func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction)

UpdateTxn is invoked to update a transaction's status after a successful PushTxn or EndTxn command. It unblocks all pending waiters.

type TestingKnobs Uses

type TestingKnobs struct {
    // OnTxnWaitEnqueue is called when a would-be pusher joins a wait queue.
    OnPusherBlocked func(ctx context.Context, push *roachpb.PushTxnRequest)
    // OnTxnUpdate is called by Queue.UpdateTxn.
    OnTxnUpdate func(ctx context.Context, txn *roachpb.Transaction)

TestingKnobs represents testing knobs for a Queue.

Package txnwait imports 18 packages (graph) and is imported by 8 packages. Updated 2020-08-12. Refresh now. Tools for package owners.