cockroach: github.com/cockroachdb/cockroach/pkg/storage/txnwait Index | Files

package txnwait

import "github.com/cockroachdb/cockroach/pkg/storage/txnwait"

Index

Package Files

metrics.go queue.go

Variables

var ErrDeadlock = roachpb.NewErrorf("deadlock detected")

ErrDeadlock is a sentinel error returned when a cyclic dependency between waiting transactions is detected.

var TxnLivenessHeartbeatMultiplier = envutil.EnvOrDefaultInt(
    "COCKROACH_TXN_LIVENESS_HEARTBEAT_MULTIPLIER", 5)

TxnLivenessHeartbeatMultiplier specifies what multiple the transaction liveness threshold should be of the transaction heartbeat internval.

var TxnLivenessThreshold = time.Duration(TxnLivenessHeartbeatMultiplier) * base.DefaultTxnHeartbeatInterval

TxnLivenessThreshold is the maximum duration between transaction heartbeats before the transaction is considered expired by Queue. It is exposed and mutable to allow tests to override it.

Use TestingOverrideTxnLivenessThreshold to override the value in tests.

func IsExpired Uses

func IsExpired(now hlc.Timestamp, txn *roachpb.Transaction) bool

IsExpired is true if the given transaction is expired.

func ShouldPushImmediately Uses

func ShouldPushImmediately(req *roachpb.PushTxnRequest) bool

ShouldPushImmediately returns whether the PushTxn request should proceed without queueing. This is true for pushes which are neither ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where the pushee has min priority or pusher has max priority.

func TestingOverrideTxnLivenessThreshold Uses

func TestingOverrideTxnLivenessThreshold(t time.Duration) func()

TestingOverrideTxnLivenessThreshold allows tests to override the transaction liveness threshold. The function returns a closure that should be called to reset the value.

func TxnExpiration Uses

func TxnExpiration(txn *roachpb.Transaction) hlc.Timestamp

TxnExpiration computes the timestamp after which the transaction will be considered expired.

type Metrics Uses

type Metrics struct {
    PusheeWaiting  *metric.Gauge
    PusherWaiting  *metric.Gauge
    QueryWaiting   *metric.Gauge
    PusherSlow     *metric.Gauge
    PusherWaitTime *metric.Histogram
    QueryWaitTime  *metric.Histogram
    DeadlocksTotal *metric.Counter
}

Metrics contains all the txnqueue related metrics.

func NewMetrics Uses

func NewMetrics(histogramWindowInterval time.Duration) *Metrics

NewMetrics creates a new Metrics instance with all related metric fields.

type Queue Uses

type Queue struct {
    // contains filtered or unexported fields
}

Queue enqueues PushTxn requests which are waiting on extant txns with conflicting intents to abort or commit.

Internally, it maintains a map from extant txn IDs to queues of pending PushTxn requests.

When a write intent is encountered, the command which encountered it (called the "pusher" here) initiates a PushTxn request to determine the disposition of the intent's transaction (called the "pushee" here). This queue is where a PushTxn request will wait if it discovers that the pushee's transaction is still pending, and cannot be otherwise aborted or pushed forward.

Queue is thread safe.

func NewQueue Uses

func NewQueue(store StoreInterface) *Queue

NewQueue instantiates a new Queue.

func (*Queue) Clear Uses

func (q *Queue) Clear(disable bool)

Clear empties the queue and returns all waiters. This method should be invoked when the replica loses or transfers its lease. If `disable` is true, future transactions may not be enqueued or waiting pushers added. Call Enable() once the lease is again acquired by the replica.

func (*Queue) Enable Uses

func (q *Queue) Enable()

Enable allows transactions to be enqueued and waiting pushers added. This method must be idempotent as it can be invoked multiple times as range leases are updated for the same replica.

func (*Queue) Enqueue Uses

func (q *Queue) Enqueue(txn *roachpb.Transaction)

Enqueue creates a new pendingTxn for the target txn of a failed PushTxn command. Subsequent PushTxn requests for the same txn will be enqueued behind the pendingTxn via MaybeWait().

func (*Queue) GetDependents Uses

func (q *Queue) GetDependents(txnID uuid.UUID) []uuid.UUID

GetDependents returns a slice of transactions waiting on the specified txn either directly or indirectly.

func (*Queue) IsEnabled Uses

func (q *Queue) IsEnabled() bool

IsEnabled is true if the queue is enabled.

func (*Queue) MaybeWaitForPush Uses

func (q *Queue) MaybeWaitForPush(
    ctx context.Context, repl ReplicaInterface, req *roachpb.PushTxnRequest,
) (*roachpb.PushTxnResponse, *roachpb.Error)

MaybeWaitForPush checks whether there is a queue already established for pushing the transaction. If not, or if the PushTxn request isn't queueable, return immediately. If there is a queue, enqueue this request as a waiter and enter a select loop waiting for resolution.

If the transaction is successfully pushed while this method is waiting, the first return value is a non-nil PushTxnResponse object.

In the event of a dependency cycle of pushers leading to deadlock, this method will return an ErrDeadlock error.

func (*Queue) MaybeWaitForQuery Uses

func (q *Queue) MaybeWaitForQuery(
    ctx context.Context, repl ReplicaInterface, req *roachpb.QueryTxnRequest,
) *roachpb.Error

MaybeWaitForQuery checks whether there is a queue already established for pushing the transaction. If not, or if the QueryTxn request hasn't specified WaitForUpdate, return immediately. If there is a queue, enqueue this request as a waiter and enter a select loop waiting for any updates to the target transaction.

func (*Queue) TrackedTxns Uses

func (q *Queue) TrackedTxns() map[uuid.UUID]struct{}

TrackedTxns returns a (newly minted) set containing the transaction IDs which are being tracked (i.e. waited on).

For testing purposes only.

func (*Queue) UpdateTxn Uses

func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction)

UpdateTxn is invoked to update a transaction's status after a successful PushTxn or EndTransaction command. It unblocks all pending waiters.

type ReplicaInterface Uses

type ReplicaInterface interface {
    ContainsKey(roachpb.Key) bool
}

ReplicaInterface provides some parts of a Replica without incurring a dependency.

type StoreInterface Uses

type StoreInterface interface {
    Clock() *hlc.Clock
    Stopper() *stop.Stopper
    DB() *client.DB
    GetTxnWaitKnobs() TestingKnobs
    GetTxnWaitMetrics() *Metrics
}

StoreInterface provides some parts of a Store without incurring a dependency.

type TestingKnobs Uses

type TestingKnobs struct {
    // OnTxnWaitEnqueue is called when a would-be pusher joins a wait queue.
    OnPusherBlocked func(ctx context.Context, push *roachpb.PushTxnRequest)
    // OnTxnUpdate is called by Queue.UpdateTxn.
    OnTxnUpdate func(ctx context.Context, txn *roachpb.Transaction)
}

TestingKnobs represents testing knobs for a Queue.

Package txnwait imports 17 packages (graph) and is imported by 10 packages. Updated 2019-07-27. Refresh now. Tools for package owners.