cockroach: github.com/cockroachdb/cockroach/pkg/storage/closedts/minprop Index | Examples | Files

package minprop

import "github.com/cockroachdb/cockroach/pkg/storage/closedts/minprop"

Package minprop exports a main data structure, Tracker, which checkpoints closed timestamps and associated Raft Lease Applied indexes positions for which (under additional conditions) it is legal to serve follower reads. It does so by maintaining a 'next' timestamp above which new command evaluations are forced, and by tracking when all in-flight evaluations below this timestamp have completed (at which point a call to the Close method succeeds: 'next' becomes closed, and a new 'next' is initialized with a future timestamp).

In-flight command evaluations are tracked via the Track method which acquires a reference with the tracker, returns a minimum timestamp to be used for the proposal evaluation, and provides a closure that releases the reference with a lease applied index used for the proposal.

Code:

package main

import (
    "context"
    "fmt"
    "sort"
    "strings"

    "github.com/cockroachdb/cockroach/pkg/roachpb"
    "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb"
    "github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func main() {
    ctx := context.TODO()

    tracker := NewTracker()
    const ep1 ctpb.Epoch = 1
    fmt.Println("The newly initialized tracker has a zero closed timestamp:")
    fmt.Println(tracker)

    fmt.Println("A first command arrives on range 12 (though the range isn't known yet to the Tracker).")
    ts, done1 := tracker.Track(ctx)
    fmt.Println("All commands initially start out on the right. The command has its timestamp forwarded to", ts, ".")
    fmt.Println(tracker)

    fmt.Println("Two more commands arrive, on r1 and r12.")
    _, done2 := tracker.Track(ctx)
    _, done3 := tracker.Track(ctx)
    fmt.Println(tracker)

    fmt.Println("The command on r1 finishes evaluating at Lease Applied Index 10 and lets the Tracker know.")
    done2(ctx, ep1, 1, 10)
    fmt.Println(tracker)

    fmt.Println("The command on r12 also finishes quickly, at LAI 77.")
    done3(ctx, ep1, 12, 77)
    fmt.Println(tracker)

    fmt.Println("The system closes out a timestamp (registering 1000 as the next timestamp to close out).")
    closed1, mlai1, _ := tracker.Close(hlc.Timestamp{WallTime: 1E9}, ep1)
    fmt.Println("No problem: nothing is tracked on the left side; returns:", closed1, "and", mlaiString(mlai1))
    fmt.Println("Note how the items on the right have moved to the left, as they are relevant for the")
    fmt.Println("next call to Close.")
    fmt.Println(tracker)

    fmt.Println("Nothing happens for a while until the system tries to close out the next timestamp.")
    fmt.Println("However, the very first proposal is still tracked and blocks progress.")
    closed2, mlai2, _ := tracker.Close(hlc.Timestamp{WallTime: 2E9}, ep1)
    fmt.Println("The call returns a no-op in the form", closed2, mlaiString(mlai2), ".")
    fmt.Println(tracker)

    ts4, done4 := tracker.Track(ctx)
    fmt.Println("A new command gets tracked on r12 (and is forwarded to", ts4, "(if necessary).")
    fmt.Println("It terminates quickly, leaving an MLAI entry of 78 behind.")
    done4(ctx, ep1, 12, 78)
    fmt.Println(tracker)

    fmt.Println("Finally! The slow evaluation finishes and the command gets proposed at index 79.")
    fmt.Println("Note that the right now tracks a smaller value of 78. Consumers have to keep the")
    fmt.Println("maximum they've seen.")
    done1(ctx, ep1, 12, 79)
    fmt.Println(tracker)

    closed3, mlai3, _ := tracker.Close(hlc.Timestamp{WallTime: 3E9}, ep1)
    fmt.Println("The next call to Close() is successful and returns:", closed3, "and", mlaiString(mlai3))
    fmt.Println(tracker)

}

// mlaiString converts an mlai map into a string. Avoids randomized ordering of
// map elements in string output.
func mlaiString(mlai map[roachpb.RangeID]ctpb.LAI) string {
    var rangeIDs []roachpb.RangeID
    for rangeID := range mlai {
        rangeIDs = append(rangeIDs, rangeID)
    }
    sort.Slice(rangeIDs, func(i, j int) bool {
        return rangeIDs[i] < rangeIDs[j]
    })

    var sb strings.Builder
    sb.WriteString("map[")
    for i, rangeID := range rangeIDs {
        if i > 0 {
            sb.WriteString(" ")
        }
        fmt.Fprintf(&sb, "%d:%d", rangeID, mlai[rangeID])
    }
    sb.WriteString("]")
    return sb.String()
}

Index

Examples

Package Files

doc.go tracker.go

type Tracker Uses

type Tracker struct {
    // contains filtered or unexported fields
}

Tracker implements TrackerI.

func NewTracker Uses

func NewTracker() *Tracker

NewTracker returns a Tracker initialized to a closed timestamp of zero and a next closed timestamp of one logical tick past zero.

func (*Tracker) Close Uses

func (t *Tracker) Close(
    next hlc.Timestamp, expCurEpoch ctpb.Epoch,
) (ts hlc.Timestamp, mlai map[roachpb.RangeID]ctpb.LAI, ok bool)

Close attempts to close out the current candidate timestamp (replacing it with the provided one). This is possible only if tracked proposals that were evaluating when Close was previously called have since completed. On success, all subsequent proposals will be forced to evaluate strictly above the provided timestamp, and the timestamp previously passed to Close is returned as a closed timestamp along with a map of minimum Lease Applied Indexes reflecting the updates for the past period. On failure, the previous closed timestamp is returned along with a nil map (which can be treated by callers like a successful call that happens to not return any new information). Similarly, failure to provide a timestamp strictly larger than that to be closed out next results in the same "idempotent" return values.

Callers additionally provide the current expected epoch value, the liveness epoch at which the caller intends to advertise this closed timestamp. The caller must know that it is live at a timestamp greater than or equal to the timestamp which the tracker will close. For correctness purposes this will be the case if the caller knows that it is live at next and calls to Close() pass monontic calues for next. If the current expected epoch is older than the currently tracked data then the timestamp will fail to be closed. If the expected epoch value is older than the epoch tracked on the left but corresponds to the epoch of the previous successful close then the previous closed timestamp is returned along with a nil map. This situation is just like the unsuccessful close scenario due to unreleased proposals. This behavior enables the caller to successfully obtain the tracked data at the newer epoch in a later query after its epoch has updated. If the caller's expected epoch is even older than the previously returned epoch then zero values are returned. If the caller's expected epoch is newer than that of tracked data the state of the tracker is progressed but zero values are returned.

Code:

ctx := context.Background()
tracker := NewTracker()
_, slow := tracker.Track(ctx)
_, _, _ = tracker.Close(hlc.Timestamp{WallTime: 1E9}, ep1)
_, fast := tracker.Track(ctx)

fmt.Println("Slow proposal finishes at LAI 2")
slow(ctx, ep1, 99, 2)
closed, m, ok := tracker.Close(hlc.Timestamp{WallTime: 2E9}, ep1)
fmt.Println("Closed:", closed, m, ok)

fmt.Println("Fast proposal finishes at LAI 1")
fast(ctx, ep1, 99, 1)
fmt.Println(tracker)
closed, m, ok = tracker.Close(hlc.Timestamp{WallTime: 3E9}, ep1)
fmt.Println("Closed:", closed, m, ok)
fmt.Println("Note how the MLAI has 'regressed' from 2 to 1. The consumer")
fmt.Println("needs to track the maximum over all deltas received.")

Output:

Slow proposal finishes at LAI 2
Closed: 1.000000000,0 map[99:2] true
Fast proposal finishes at LAI 1

  closed=1.000000000,0
      |            next=2.000000000,0
      |          left | right
      |             0 # 0
      |             1 e 1
      |             1 @        (r99)
      v               v
---------------------------------------------------------> time

Closed: 2.000000000,0 map[99:1] true
Note how the MLAI has 'regressed' from 2 to 1. The consumer
needs to track the maximum over all deltas received.

ExampleTracker_EpochChanges tests the interactions between epoch values passed to Close and epoch values of proposals being tracked.

Code:

ts1 := hlc.Timestamp{WallTime: 1E9}
ts2 := hlc.Timestamp{WallTime: 2E9}
ts3 := hlc.Timestamp{WallTime: 3E9}

ctx := context.Background()
tracker := NewTracker()
fmt.Println("The newly initialized tracker has a zero closed timestamp:")
fmt.Println(tracker)

fmt.Println("A first command arrives on range 1 (though the range isn't known yet to the Tracker).")
ts, r1e1lai1 := tracker.Track(ctx)
fmt.Println("All commands initially start out on the right. The command has its timestamp forwarded to", ts, ".")
fmt.Println("The command finished quickly and is released in epoch 1.")
r1e1lai1(ctx, ep1, 1, 1)
fmt.Println(tracker)

fmt.Println("Another proposal arrives on range 2 but does not complete before the next call to Close().")
_, r2e2lai1 := tracker.Track(ctx)
fmt.Println(tracker)

fmt.Println("The system closes out a timestamp expecting liveness epoch 2 (registering", ts1, "as the next",
    "timestamp to close out).")
closed, mlai, ok := tracker.Close(ts1, ep2)
fmt.Println("The Close() call fails due to the liveness epoch mismatch between",
    "the expected current epoch and the tracked data, returning", closed, mlai, ok)
fmt.Println("The Close() call evicts the tracked range 1 LAI.")
fmt.Println(tracker)

fmt.Println("The proposal on range 2 is released in epoch 2.")
r2e2lai1(ctx, ep2, 2, 1)
fmt.Println(tracker)

fmt.Println("Another proposal arrives on range 1 and quickly finishes with",
    "LAI 2 but is still in epoch 1 and is not tracked.")
_, r1e1lai2 := tracker.Track(ctx)
r1e1lai2(ctx, ep1, 2, 2)
fmt.Println("Meanwhile a proposal arrives on range 2 and quickly finishes with",
    "LAI 2 in epoch 2.")
_, r2e2lai2 := tracker.Track(ctx)
r2e2lai2(ctx, ep2, 2, 2)
fmt.Println(tracker)

fmt.Println("A new proposal arrives on range 1 and quickly finishes with LAI 2 in epoch 3.")
fmt.Println("This new epoch evicts the data on the right side corresponding to epoch 2.")
_, r1e3lai2 := tracker.Track(ctx)
r1e3lai2(ctx, ep3, 1, 2)
fmt.Println(tracker)

closed, mlai, ok = tracker.Close(ts2, ep2)
fmt.Println("The next call to Close() occurs in epoch 2 and successfully returns:", closed, mlai, ok)
closed, mlai, ok = tracker.Close(ts3, ep2)
fmt.Println("Subsequent calls to Close() at later times but still in epoch 2 do not move the tracker state.")
fmt.Println("They return the previous closed timestamp with an empty mlai map:", closed, mlai, ok, ".")
fmt.Println("Data corresponding to epoch 3 is retained.")
fmt.Println(tracker)
closed, mlai, ok = tracker.Close(ts3, ep3)
fmt.Println("The next call to Close() occurs in epoch 3 and successfully returns:", closed, mlai, ok, ".")

Output:

The newly initialized tracker has a zero closed timestamp:

  closed=0.000000000,0
      |            next=0.000000000,1
      |          left | right
      |             0 # 0
      |             1 e 1
      v               v
---------------------------------------------------------> time

A first command arrives on range 1 (though the range isn't known yet to the Tracker).
All commands initially start out on the right. The command has its timestamp forwarded to 0.000000000,2 .
The command finished quickly and is released in epoch 1.

  closed=0.000000000,0
      |            next=0.000000000,1
      |          left | right
      |             0 # 0
      |             1 e 1
      |               @ 1      (r1)
      v               v
---------------------------------------------------------> time

Another proposal arrives on range 2 but does not complete before the next call to Close().

  closed=0.000000000,0
      |            next=0.000000000,1
      |          left | right
      |             0 # 1
      |             1 e 1
      |               @ 1      (r1)
      v               v
---------------------------------------------------------> time

The system closes out a timestamp expecting liveness epoch 2 (registering 1.000000000,0 as the next timestamp to close out).
The Close() call fails due to the liveness epoch mismatch between the expected current epoch and the tracked data, returning 0.000000000,0 map[] false
The Close() call evicts the tracked range 1 LAI.

  closed=0.000000000,1
      |            next=1.000000000,0
      |          left | right
      |             1 # 0
      |             2 e 2
      v               v
---------------------------------------------------------> time

The proposal on range 2 is released in epoch 2.

  closed=0.000000000,1
      |            next=1.000000000,0
      |          left | right
      |             0 # 0
      |             2 e 2
      |             1 @        (r2)
      v               v
---------------------------------------------------------> time

Another proposal arrives on range 1 and quickly finishes with LAI 2 but is still in epoch 1 and is not tracked.
Meanwhile a proposal arrives on range 2 and quickly finishes with LAI 2 in epoch 2.

  closed=0.000000000,1
      |            next=1.000000000,0
      |          left | right
      |             0 # 0
      |             2 e 2
      |             1 @        (r2)
      |               @ 2      (r2)
      v               v
---------------------------------------------------------> time

A new proposal arrives on range 1 and quickly finishes with LAI 2 in epoch 3.
This new epoch evicts the data on the right side corresponding to epoch 2.

  closed=0.000000000,1
      |            next=1.000000000,0
      |          left | right
      |             0 # 0
      |             2 e 3
      |               @ 2      (r1)
      |             1 @        (r2)
      v               v
---------------------------------------------------------> time

The next call to Close() occurs in epoch 2 and successfully returns: 1.000000000,0 map[2:1] true
Subsequent calls to Close() at later times but still in epoch 2 do not move the tracker state.
They return the previous closed timestamp with an empty mlai map: 1.000000000,0 map[] true .
Data corresponding to epoch 3 is retained.

  closed=1.000000000,0
      |            next=2.000000000,0
      |          left | right
      |             0 # 0
      |             3 e 3
      |             2 @        (r1)
      v               v
---------------------------------------------------------> time

The next call to Close() occurs in epoch 3 and successfully returns: 2.000000000,0 map[1:2] true .

func (*Tracker) String Uses

func (t *Tracker) String() string

String prints a string representation of the Tracker's state.

func (*Tracker) Track Uses

func (t *Tracker) Track(ctx context.Context) (hlc.Timestamp, closedts.ReleaseFunc)

Track is called before evaluating a proposal. It returns the minimum timestamp at which the proposal can be evaluated (i.e. the request timestamp needs to be forwarded if necessary), and acquires a reference with the Tracker. This reference is released by calling the returned closure either a) before proposing the command, supplying the Lease Applied Index at which

the proposal will be carried out, or

b) with zero arguments if the command won't end up being proposed (i.e. hit

an error during evaluation).

The ReleaseFunc is not thread safe. For convenience, it may be called with zero arguments once after a regular call.

Package minprop imports 9 packages (graph) and is imported by 1 packages. Updated 2019-07-09. Refresh now. Tools for package owners.