cockroach: github.com/cockroachdb/cockroach/pkg/storage/closedts/storage Index | Examples | Files

package storage

import "github.com/cockroachdb/cockroach/pkg/storage/closedts/storage"

Index

Examples

Package Files

storage.go storage_mem.go

type MultiStorage Uses

type MultiStorage struct {
    // contains filtered or unexported fields
}

MultiStorage implements the closedts.Storage interface.

Code:

ms := NewMultiStorage(func() SingleStorage {
    return NewMemStorage(time.Millisecond, 2)
})

e1 := ctpb.Entry{
    Epoch:           10,
    ClosedTimestamp: hlc.Timestamp{WallTime: 1E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        9: 17,
    },
}
fmt.Println("First, the following entry is added:")
fmt.Println(e1)
ms.Add(1, e1)
fmt.Println(ms)

fmt.Println("The epoch changes. It can only increase, for we receive Entries in a fixed order.")
e2 := ctpb.Entry{
    Epoch:           11,
    ClosedTimestamp: hlc.Timestamp{WallTime: 2E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        9:  18,
        10: 99,
    },
}
ms.Add(1, e2)
fmt.Println(e2)
fmt.Println(ms)

fmt.Println("If it *did* decrease, a higher level component should trigger an assertion.")
fmt.Println("The storage itself will simply ignore such updates:")
e3 := ctpb.Entry{
    Epoch:           8,
    ClosedTimestamp: hlc.Timestamp{WallTime: 3E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        9:  19,
        10: 199,
    },
}
fmt.Println(e3)
ms.Add(1, e3)
fmt.Println(ms)

Output:

First, the following entry is added:
CT: 1.000000000,0 @ Epoch 10
Full: false
MLAI: r9: 17

***** n1 *****
+----+---------------------+----------------------+
          1.000000000,0         0.000000000,0
       age=0s (target ≤0s)   age=1s (target ≤1ms)
            epoch=10               epoch=0
+----+---------------------+----------------------+
  r9                    17
+----+---------------------+----------------------+

The epoch changes. It can only increase, for we receive Entries in a fixed order.
CT: 2.000000000,0 @ Epoch 11
Full: false
MLAI: r9: 18, r10: 99

***** n1 *****
+-----+---------------------+----------------------+
           2.000000000,0         1.000000000,0
        age=0s (target ≤0s)   age=1s (target ≤1ms)
             epoch=11               epoch=10
+-----+---------------------+----------------------+
  r9                     18                     17
  r10                    99
+-----+---------------------+----------------------+

If it *did* decrease, a higher level component should trigger an assertion.
The storage itself will simply ignore such updates:
CT: 3.000000000,0 @ Epoch 8
Full: false
MLAI: r9: 19, r10: 199

***** n1 *****
+-----+---------------------+----------------------+
           2.000000000,0         2.000000000,0
        age=0s (target ≤0s)   age=0s (target ≤1ms)
             epoch=11               epoch=11
+-----+---------------------+----------------------+
  r9                     18                     18
  r10                    99                     99
+-----+---------------------+----------------------+

func NewMultiStorage Uses

func NewMultiStorage(constructor func() SingleStorage) *MultiStorage

NewMultiStorage sets up a MultiStorage which uses the given factory method for setting up the SingleStorage used for each individual NodeID for which operations are received.

func (*MultiStorage) Add Uses

func (ms *MultiStorage) Add(nodeID roachpb.NodeID, entry ctpb.Entry)

Add implements closedts.Storage.

func (*MultiStorage) Clear Uses

func (ms *MultiStorage) Clear()

Clear implements closedts.Storage.

func (*MultiStorage) String Uses

func (ms *MultiStorage) String() string

String prints a tabular rundown of the contents of the MultiStorage.

func (*MultiStorage) StringForNodes Uses

func (ms *MultiStorage) StringForNodes(nodes ...roachpb.NodeID) string

StringForNodes is like String, but restricted to the supplied NodeIDs. If none are specified, is equivalent to String().

func (*MultiStorage) VisitAscending Uses

func (ms *MultiStorage) VisitAscending(nodeID roachpb.NodeID, f func(ctpb.Entry) (done bool))

VisitAscending implements closedts.Storage.

func (*MultiStorage) VisitDescending Uses

func (ms *MultiStorage) VisitDescending(nodeID roachpb.NodeID, f func(ctpb.Entry) (done bool))

VisitDescending implements closedts.Storage.

type SingleStorage Uses

type SingleStorage interface {
    fmt.Stringer
    // VisitAscending walks through the buckets of the storage in ascending
    // closed timestamp order, until the closure returns true (or all buckets
    // have been visited).
    VisitAscending(func(ctpb.Entry) (done bool))
    // VisitDescending walks through the buckets of the storage in descending
    // closed timestamp order, until the closure returns true (or all buckets
    // have been visited).
    VisitDescending(func(ctpb.Entry) (done bool))
    // Add adds a new Entry to this storage. The entry is added to the most
    // recent bucket and remaining buckets are rotated as indicated by their age
    // relative to the newly added Entry.
    Add(ctpb.Entry)
    // Clear removes all Entries from this storage.
    Clear()
}

SingleStorage stores and manages closed timestamp updates originating from a single source (i.e. node). A SingleStorage internally maintains multiple buckets for historical closed timestamp information. The reason for this is twofold:

1. The most recent closed timestamp update is also the hardest to prove a read for, since it comes with larger minimum lease applied indexes. In situations in which followers are lagging behind with their command application, this could lead to a runaway scenario, in which a closed timestamp update can never be used until it is replaced by a new one, which in turn also will never be used, etc. Instead, a SingleStorage keeps some amount of history and upstream systems can try to prove a follower read using an older closed timestamp instead.

2. Follower reads can be used to implement recovery of a consistent cluster-wide snapshot after catastrophic loss of quorum. To do this, the mechanism must locate at least one replica of every range in the cluster, and for each range find the largest possible timestamp at which follower reads are possible among the surviving replicas. Of all these per-range timestamps, the smallest can be used to read from all ranges, resulting in a consistent snapshot. This makes it crucial that every replica can serve at least some follower reads, even when regularly outpaced by the closed timestamp frontier. Emitted MLAIs may never even be proposed to Raft in the event of an ill-timed crash, and so historic information is invaluable.

TODO(tschottdorf): revisit whether this shouldn't be a concrete impl instead, with only the buckets abstracted out.

Code:

s := NewMemStorage(10*time.Second, 4)
fmt.Println("The empty storage renders as below:")
fmt.Println(s)

fmt.Println("After adding the following entry:")
e1 := ctpb.Entry{
    Full:            true,
    ClosedTimestamp: hlc.Timestamp{WallTime: 123E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        1:  1000,
        9:  2000,
    },
}
fmt.Println(e1)
s.Add(e1)
fmt.Println("the result is:")
fmt.Println(s)
fmt.Println("Note how the most recent bucket picked up the update.")

fmt.Println("A new update comes in only two seconds later:")
e2 := ctpb.Entry{
    ClosedTimestamp: hlc.Timestamp{WallTime: 125E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        1:  1001,
        7:  12,
    },
}
fmt.Println(e2)
s.Add(e2)
fmt.Println("The first bucket now contains the union of both updates.")
fmt.Println("The second bucket holds on to the previous value of the first.")
fmt.Println("The remaining buckets are unchanged. The best we could do is")
fmt.Println("give them identical copies of the second, but that's nonsense.")
fmt.Println(s)

fmt.Println("Another update, another eight seconds later:")
e3 := ctpb.Entry{
    ClosedTimestamp: hlc.Timestamp{WallTime: 133E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        9:  2020,
        1:  999,
    },
}
fmt.Println(e3)
s.Add(e3)
fmt.Println("Note how the second bucket didn't rotate, for it is not yet")
fmt.Println("older than 10s. Note also how the first bucket ignores the")
fmt.Println("downgrade for r1; these can occur in practice.")
fmt.Println(s)

fmt.Println("Half a second later, with the next update, it will rotate:")
e4 := ctpb.Entry{
    ClosedTimestamp: hlc.Timestamp{WallTime: 133E9 + 1E9/2},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        7:  17,
        8:  711,
    },
}
fmt.Println(e4)
s.Add(e4)
fmt.Println("Consequently we now see the third bucket fill up.")
fmt.Println(s)

fmt.Println("Next update arrives a whopping 46.5s later (why not).")
e5 := ctpb.Entry{
    ClosedTimestamp: hlc.Timestamp{WallTime: 180E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        1:  1004,
        7:  19,
        2:  929922,
    },
}
fmt.Println(e5)
s.Add(e5)
fmt.Println("The second bucket rotated, but due to the sparseness of updates,")
fmt.Println("it's still above its target age and will rotate again next time.")
fmt.Println("The same is true for the remaining buckets.")
fmt.Println(s)

fmt.Println("Another five seconds later, another update:")
e6 := ctpb.Entry{
    ClosedTimestamp: hlc.Timestamp{WallTime: 185E9},
    MLAI: map[roachpb.RangeID]ctpb.LAI{
        3: 1771,
    },
}
fmt.Println(e6)
s.Add(e6)
fmt.Println("All buckets rotate, but the third and fourth remain over target age.")
fmt.Println("This would resolve itself if reasonably spaced updates kept coming in.")
fmt.Println(s)

fmt.Println("Finally, when the storage is cleared, all buckets are reset.")
s.Clear()
fmt.Println(s)

Output:

The empty storage renders as below:
+--+---------------------+----------------------+----------------------+----------------------+
        0.000000000,0         0.000000000,0          0.000000000,0          0.000000000,0
     age=0s (target ≤0s)   age=0s (target ≤10s)   age=0s (target ≤20s)   age=0s (target ≤40s)
           epoch=0               epoch=0                epoch=0                epoch=0
+--+---------------------+----------------------+----------------------+----------------------+
+--+---------------------+----------------------+----------------------+----------------------+

After adding the following entry:
CT: 123.000000000,0 @ Epoch 0
Full: true
MLAI: r1: 1000, r9: 2000

the result is:
+----+---------------------+------------------------+------------------------+------------------------+
         123.000000000,0     0.000000000,0 age=2m3s   0.000000000,0 age=2m3s   0.000000000,0 age=2m3s
       age=0s (target ≤0s)   (target ≤10s) epoch=0    (target ≤20s) epoch=0    (target ≤40s) epoch=0
             epoch=0
+----+---------------------+------------------------+------------------------+------------------------+
  r1                  1000
  r9                  2000
+----+---------------------+------------------------+------------------------+------------------------+

Note how the most recent bucket picked up the update.
A new update comes in only two seconds later:
CT: 125.000000000,0 @ Epoch 0
Full: false
MLAI: r1: 1001, r7: 12

The first bucket now contains the union of both updates.
The second bucket holds on to the previous value of the first.
The remaining buckets are unchanged. The best we could do is
give them identical copies of the second, but that's nonsense.
+----+---------------------+----------------------+------------------------+------------------------+
         125.000000000,0       123.000000000,0      0.000000000,0 age=2m5s   0.000000000,0 age=2m5s
       age=0s (target ≤0s)   age=2s (target ≤10s)   (target ≤20s) epoch=0    (target ≤40s) epoch=0
             epoch=0               epoch=0
+----+---------------------+----------------------+------------------------+------------------------+
  r1                  1001                   1000
  r7                    12
  r9                  2000                   2000
+----+---------------------+----------------------+------------------------+------------------------+

Another update, another eight seconds later:
CT: 133.000000000,0 @ Epoch 0
Full: false
MLAI: r1: 999, r9: 2020

Note how the second bucket didn't rotate, for it is not yet
older than 10s. Note also how the first bucket ignores the
downgrade for r1; these can occur in practice.
+----+---------------------+-----------------------+-------------------------+-------------------------+
         133.000000000,0        123.000000000,0      0.000000000,0 age=2m13s   0.000000000,0 age=2m13s
       age=0s (target ≤0s)   age=10s (target ≤10s)    (target ≤20s) epoch=0     (target ≤40s) epoch=0
             epoch=0                epoch=0
+----+---------------------+-----------------------+-------------------------+-------------------------+
  r1                  1001                    1000
  r7                    12
  r9                  2020                    2000
+----+---------------------+-----------------------+-------------------------+-------------------------+

Half a second later, with the next update, it will rotate:
CT: 133.500000000,0 @ Epoch 0
Full: false
MLAI: r7: 17, r8: 711

Consequently we now see the third bucket fill up.
+----+---------------------+-------------------------+-------------------------+---------------------------+
         133.500000000,0         133.000000000,0           123.000000000,0       0.000000000,0 age=2m13.5s
       age=0s (target ≤0s)   age=500ms (target ≤10s)   age=10.5s (target ≤20s)     (target ≤40s) epoch=0
             epoch=0                 epoch=0                   epoch=0
+----+---------------------+-------------------------+-------------------------+---------------------------+
  r1                  1001                      1001                      1000
  r7                    17                        12
  r8                   711
  r9                  2020                      2020                      2000
+----+---------------------+-------------------------+-------------------------+---------------------------+

Next update arrives a whopping 46.5s later (why not).
CT: 180.000000000,0 @ Epoch 0
Full: false
MLAI: r1: 1004, r2: 929922, r7: 19

The second bucket rotated, but due to the sparseness of updates,
it's still above its target age and will rotate again next time.
The same is true for the remaining buckets.
+----+---------------------+-------------------------+-----------------------+-----------------------+
         180.000000000,0         133.500000000,0          133.000000000,0         123.000000000,0
       age=0s (target ≤0s)   age=46.5s (target ≤10s)   age=47s (target ≤20s)   age=57s (target ≤40s)
             epoch=0                 epoch=0                  epoch=0                 epoch=0
+----+---------------------+-------------------------+-----------------------+-----------------------+
  r1                  1004                      1001                    1001                    1000
  r2                929922
  r7                    19                        17                      12
  r8                   711                       711
  r9                  2020                      2020                    2020                    2000
+----+---------------------+-------------------------+-----------------------+-----------------------+

Another five seconds later, another update:
CT: 185.000000000,0 @ Epoch 0
Full: false
MLAI: r3: 1771

All buckets rotate, but the third and fourth remain over target age.
This would resolve itself if reasonably spaced updates kept coming in.
+----+---------------------+----------------------+-------------------------+-----------------------+
         185.000000000,0       180.000000000,0          133.500000000,0          133.000000000,0
       age=0s (target ≤0s)   age=5s (target ≤10s)   age=51.5s (target ≤20s)   age=52s (target ≤40s)
             epoch=0               epoch=0                  epoch=0                  epoch=0
+----+---------------------+----------------------+-------------------------+-----------------------+
  r1                  1004                   1004                      1001                    1001
  r2                929922                 929922
  r3                  1771
  r7                    19                     19                        17                      12
  r8                   711                    711                       711
  r9                  2020                   2020                      2020                    2020
+----+---------------------+----------------------+-------------------------+-----------------------+

Finally, when the storage is cleared, all buckets are reset.
+--+---------------------+----------------------+----------------------+----------------------+
        0.000000000,0         0.000000000,0          0.000000000,0          0.000000000,0
     age=0s (target ≤0s)   age=0s (target ≤10s)   age=0s (target ≤20s)   age=0s (target ≤40s)
           epoch=0               epoch=0                epoch=0                epoch=0
+--+---------------------+----------------------+----------------------+----------------------+
+--+---------------------+----------------------+----------------------+----------------------+

func NewMemStorage Uses

func NewMemStorage(scale time.Duration, buckets int) SingleStorage

NewMemStorage initializes a SingleStorage backed by an in-memory slice that represents the given number of buckets, where the i-th bucket holds a closed timestamp approximately 2^i*scale in the past.

Package storage imports 12 packages (graph) and is imported by 2 packages. Updated 2019-07-27. Refresh now. Tools for package owners.