cockroach: github.com/cockroachdb/cockroach/pkg/storage/apply Index | Examples | Files

package apply

import "github.com/cockroachdb/cockroach/pkg/storage/apply"

Package apply provides abstractions and routines associated with the application of committed raft entries to a replicated state machine.

State Machine Replication

Raft entry application is the process of taking entries that have been committed to a raft group's "raft log" through raft consensus and using them to drive the state machines of each member of the raft group (i.e. each replica). Committed entries are decoded into commands in the same order that they are arranged in the raft log (i.e. in order of increasing log index). This ordering of decoded commands is then treated as the input to state transitions on each replica.

The key to this general approach, known as "state machine replication", is that all state transitions are fully deterministic given only the current state of the machine and the command to apply as input. This ensures that if each instance is driven from the same consistent shared log (same entries, same order), they will all stay in sync. In other words, if we ensure that all replicas start as identical copies of each other and we ensure that all replicas perform the same state transitions, in the same order, deterministically, then through induction we know that all replicas will remain identical copies of each other when compared at the same log index.

This poses a problem for replicas that fail for any reason to apply an entry. If the failure wasn't deterministic across all replicas then they can't carry on applying entries, as their state may have diverged from their peers. The only reasonable recourse is to signal that the replica has become corrupted. This demonstrates why it is necessary to separate deterministic command failures from non-deterministic state transition failures. The former, which we call "command rejection" is permissible as long as all replicas come to the same decision to reject the command and handle the rejection in the same way (e.g. decide not to make any state transition). The latter, on the other hand, it not permissible, and is typically handled by crashing the node.

Performance Concerns

The state machine replication approach also poses complications that affect performance.

A first challenge falls out from the requirement that all replicated commands be sequentially applied on each replica to enforce determinism. This requirement must hold even as the concurrency of the systems processing requests and driving replication grows. If this concurrency imbalance becomes so great that the sequential processing of updates to the replicated state machine can no longer keep up with the concurrent processing feeding inputs into the replicated state machine, replication itself becomes a throughput bottleneck for the system, manifesting as replication lag. This problem, sometimes referred to as the "parallelism gap", is fundamentally due to the loss of context on the interaction between commands after replication and a resulting inability to determine whether concurrent application of commands would be possible without compromising determinism. Put another way, above the level of state machine replication, it is easy to determine which commands conflict with one another, and those that do not conflict can be run concurrently. However, below the level of replication, it is unclear which commands conflict, so to ensure determinism during state machine transitions, no concurrency is possible.

Although it makes no attempt to explicitly introduce concurrency into command application, this package does attempt to improve replication throughput and reduce this parallelism gap through the use of batching. A notion of command triviality is exposed to clients of this package, and those commands that are trivial are considered able to have their application batched with other adjacent trivial commands. This batching, while still preserving a strict ordering of commands, allows multiple commands to achieve some concurrency in their interaction with the state machine. For instance, writes to a storage engine from different commands are able to be batched together using this interface. For more, see Batch.

A second challenge arising from the technique of state machine replication is its interaction with client responses and acknowledgment. We saw before that a command is guaranteed to eventually apply if its corresponding raft entry is committed in the raft log - individual replicas have no other choice but to apply it. However, depending on the replicated state, the fact that a command will apply may not be sufficient to return a response to a client. In some cases, the command may still be rejected (deterministically) and the client should be alerted of that. In more extreme cases, the result of the command may not even be known until it is applied to the state machine. In CockroachDB, this was the case until the major rework that took place in 2016 called "proposer evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the completion of that change, client responses are determined before replication begins. The only remaining work to be done after replication of a command succeeds is to determine whether it will be rejected and replaced by an empty command. To facilitate this acknowledgement as early as possible, this package provides the ability to acknowledge a series of commands before applying them to the state machine. Outcomes are determined before performing any durable work by stepping commands through an in-memory "ephemeral" copy of the state machine. For more, see Task.AckCommittedEntriesBeforeApplication.

A final challenge comes from the desire to properly prioritize the application of commands across multiple state machines in systems like CockroachDB where each machine hosts hundreds or thousands of replicas. This is a complicated concern that must take into consideration the need for each replica's state machine to stay up-to-date (is it a leaseholder? is it serving reads?), the need to acknowledge clients in a timely manner (are clients waiting for command application?), the desire to delay application to accumulate larger application batches (will batching improve system throughput?), and a number of other factors. This package has not begun to answer these questions, but it serves to provide the abstractions necessary to perform such prioritization in the future.

Usage

The package exports a set of interfaces that users must provide implementations for. Notably, users of the package must provide a StateMachine that encapsulates the logic behind performing individual state transitions and a Decoder that is capable of decoding raft entries and providing iteration over corresponding Command objects.

These two structures can be used to create an application Task, which is capable of applying raft entries to the StateMachine (see Task.ApplyCommittedEntries). To do so, the Commands that were decoded using the Decoder (see Task.Decode) are passed through a pipeline of stages. First, the Commands are checked for rejection while being staged in an application Batch, which produces a set of CheckedCommands. Next, the application Batch is committed to the StateMachine. Following this, the in-memory side-effects of the CheckedCommands are applied to the StateMachine, producing AppliedCommands. Finally, these AppliedCommands are finalized and their clients are acknowledged.

Index

Examples

Package Files

cmd.go doc.go task.go

type AppliedCommand Uses

type AppliedCommand interface {
    CheckedCommand
    // FinishAndAckOutcome signals that the application of the command has
    // completed. It also acknowledges the outcome of the command to its
    // client if it was proposed locally. An error will immediately stall
    // entry application, so one must only be returned if the state machine
    // is no longer able to make progress. The method will be called exactly
    // once per Command.
    FinishAndAckOutcome() error
}

AppliedCommand is a command that has been applied to the replicated state machine. A command is considered "applied" if it has been staged in a Batch which has been committed and had its side-effects run on the state machine. If the command was rejected (see CheckedCommand), applying the command will likely be a no-op, but that is up to the implementation of the state machine.

type AppliedCommandIterator Uses

type AppliedCommandIterator interface {
    CommandIteratorBase
    // CurApplied returns the applied command that the iterator is
    // currently pointing at. Must not be called if valid is false.
    CurApplied() AppliedCommand
}

AppliedCommandIterator is an iterator over applied replicated commands.

type AppliedCommandList Uses

type AppliedCommandList interface {
    AppliedCommandIterator
    // AppendApplied adds the applied command to the end of the list.
    AppendApplied(AppliedCommand)
}

AppliedCommandList is a list of applied replicated commands.

type Batch Uses

type Batch interface {
    // Stage inserts a Command into the Batch. In doing so, the Command is
    // checked for rejection and a CheckedCommand is returned.
    Stage(Command) (CheckedCommand, error)
    // ApplyToStateMachine applies the persistent state transitions staged
    // in the Batch to the StateMachine, atomically.
    ApplyToStateMachine(context.Context) error
    // Close closes the batch and releases any resources that it holds.
    Close()
}

Batch accumulates a series of updates from Commands and performs them all at once to its StateMachine when applied. Groups of Commands will be staged in the Batch such that one or more trivial Commands are staged or exactly one non-trivial Command is staged.

type CheckedCommand Uses

type CheckedCommand interface {
    Command
    // Rejected returns whether the command was rejected.
    Rejected() bool
    // CanAckBeforeApplication returns whether the success of the command
    // can be acknowledged before the command has been applied to the state
    // machine.
    CanAckBeforeApplication() bool
    // AckSuccess acknowledges the success of the command to its client.
    // Must only be called if !Rejected.
    AckSuccess() error
}

CheckedCommand is a command that has been checked to see whether it can apply successfully or not. Committing an entry in a raft log and having the command in that entry succeed are similar but not equivalent concepts. A successfully committed entry may contain a command that the replicated state machine decides to reject (deterministically).

type CheckedCommandIterator Uses

type CheckedCommandIterator interface {
    CommandIteratorBase
    // CurChecked returns the checked command that the iterator is
    // currently pointing at. Must not be called if valid is false.
    CurChecked() CheckedCommand
    // NewAppliedList returns a new empty applied command list. Usages
    // of the list will always advance the iterator before pushing into
    // to the list, so implementors are free to share backing memory
    // between the two.
    NewAppliedList() AppliedCommandList
}

CheckedCommandIterator is an iterator over checked replicated commands.

type CheckedCommandList Uses

type CheckedCommandList interface {
    CheckedCommandIterator
    // AppendChecked adds the checked command to the end of the list.
    AppendChecked(CheckedCommand)
}

CheckedCommandList is a list of checked replicated commands.

type Command Uses

type Command interface {
    // Index is the log index of the corresponding raft entry.
    Index() uint64
    // IsTrivial returns whether the command can apply in a batch.
    IsTrivial() bool
    // IsLocal returns whether the command was locally proposed. Command
    // that were locally proposed typically have a client waiting on a
    // response, so there is additional urgency to apply them quickly.
    IsLocal() bool
}

Command is a command that has been successfully replicated through raft by being durably committed to the raft log of a quorum of peers in a raft group.

type CommandIterator Uses

type CommandIterator interface {
    CommandIteratorBase
    // Cur returns the command that the iterator is currently pointing at.
    // Must not be called if valid is false.
    Cur() Command
    // NewList returns a new empty command list. Usages of the list will
    // always advance the iterator before pushing in to the list, so
    // implementors are free to share backing memory between the two.
    NewList() CommandList
    // NewCheckedList returns a new empty checked command list. Usages
    // of the list will always advance the iterator before pushing into
    // to the list, so implementors are free to share backing memory
    // between the two.
    NewCheckedList() CheckedCommandList
}

CommandIterator is an iterator over replicated commands.

type CommandIteratorBase Uses

type CommandIteratorBase interface {
    // Valid returns whether the iterator is pointing at a valid element.
    Valid() bool
    // Next advances the iterator. Must not be called if valid is false.
    Next()
    // Close closes the iterator. Once closed, it must not be used.
    Close()
}

CommandIteratorBase is a common interface extended by all iterator and list variants. It is exported so its methods are displayed in godoc when it is embedded in other interfaces.

type CommandList Uses

type CommandList interface {
    CommandIterator
    // Append adds the command to the end of the list.
    Append(Command)
}

CommandList is a list of replicated commands.

type Decoder Uses

type Decoder interface {
    // DecodeAndBind decodes each of the provided raft entries into commands
    // and binds any that were proposed locally to their local proposals.
    // The method must only be called once per Decoder. It returns whether
    // any of the commands were bound to local proposals waiting for
    // acknowledgement.
    DecodeAndBind(context.Context, []raftpb.Entry) (anyLocal bool, _ error)
    // NewCommandIter creates an iterator over the replicated commands that
    // were passed to DecodeAndBind. The method must not be called until
    // after DecodeAndBind is called.
    NewCommandIter() CommandIterator
    // Reset resets the Decoder and releases any resources that it holds.
    Reset()
}

Decoder is capable of decoding a list of committed raft entries and binding any that were locally proposed to their local proposals.

type StateMachine Uses

type StateMachine interface {
    // NewBatch creates a new batch that is suitable for accumulating the
    // effects that a group of Commands will have on the replicated state
    // machine. Commands are staged in the batch one-by-one and then the
    // entire batch is committed at once.
    //
    // Batch comes in two flavors - real batches and ephemeral batches.
    // Real batches are capable of accumulating updates from commands and
    // applying them to the state machine. Ephemeral batches are not able
    // to make changes to the durable state machine, but can still be used
    // for the purpose of checking commands to determine whether they will
    // be rejected or not when staged in a real batch. The principal user
    // of ephemeral batches is AckCommittedEntriesBeforeApplication.
    NewBatch(ephemeral bool) Batch
    // ApplySideEffects applies the in-memory side-effects of a Command to
    // the replicated state machine. The method will be called in the order
    // that the commands are committed to the state machine's log. Once the
    // in-memory side-effects of the Command are applied, an AppliedCommand
    // is returned so that it can be finished and acknowledged.
    //
    // The method will always be called with a Command that has been checked
    // and whose persistent state transition has been applied to the state
    // machine. Because this method is called after applying the persistent
    // state transition for a Command, it may not be called in the case of
    // an untimely crash. This means that applying these side-effects will
    // typically update the in-memory representation of the state machine
    // to the same state that it would be in if the process restarted.
    ApplySideEffects(CheckedCommand) (AppliedCommand, error)
}

StateMachine represents an instance of a replicated state machine being driven by a replication group. The state machine accepts Commands that have been committed to the replication group's log and applies them to advance to a new state.

All state transitions performed by the state machine are expected to be deterministic, which ensures that if each instance is driven from the same consistent shared log, they will all stay in sync.

type Task Uses

type Task struct {
    // contains filtered or unexported fields
}

Task is an object capable of coordinating the application of commands to a replicated state machine after they have been durably committed to a raft log.

Committed raft entries are provided to the task through its Decode method. The task will then apply these entries to the provided state machine when ApplyCommittedEntries is called.

Code:

defer setLogging(true)()
ctx := context.Background()
ents := makeEntries(7)

sm := getTestStateMachine()
dec := newTestDecoder()
dec.nonTrivial[5] = true
dec.nonLocal[2] = true
dec.nonLocal[6] = true
dec.shouldReject[3] = true
dec.shouldReject[6] = true
fmt.Print(`
Setting up a batch of seven log entries:
 - index 2 and 6 are non-local
 - index 3 and 6 will be rejected
 - index 5 is not trivial
`)

t := apply.MakeTask(sm, dec)
defer t.Close()

fmt.Println("\nDecode (note that index 2 and 6 are not local):")
if err := t.Decode(ctx, ents); err != nil {
    panic(err)
}

fmt.Println("\nAckCommittedEntriesBeforeApplication:")
if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil {
    panic(err)
}
fmt.Print(`
Above, only index 1 and 4 get acked early. The command at 5 is
non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
must be in the first batch to qualify for acking early. 2 is not local
(so there's nobody to ack), and 3 is rejected. We can't ack rejected
commands early because the state machine is free to handle them any way
it likes.
`)

fmt.Println("\nApplyCommittedEntries:")
if err := t.ApplyCommittedEntries(ctx); err != nil {
    panic(err)
}

Output:

Setting up a batch of seven log entries:
 - index 2 and 6 are non-local
 - index 3 and 6 will be rejected
 - index 5 is not trivial

Decode (note that index 2 and 6 are not local):
 decoding command 1; local=true
 decoding command 2; local=false
 decoding command 3; local=true
 decoding command 4; local=true
 decoding command 5; local=true
 decoding command 6; local=false
 decoding command 7; local=true

AckCommittedEntriesBeforeApplication:
 acknowledging command 1 before application
 acknowledging command 4 before application

Above, only index 1 and 4 get acked early. The command at 5 is
non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
must be in the first batch to qualify for acking early. 2 is not local
(so there's nobody to ack), and 3 is rejected. We can't ack rejected
commands early because the state machine is free to handle them any way
it likes.

ApplyCommittedEntries:
 applying batch with commands=[1 2 3 4]
 applying side-effects of command 1
 applying side-effects of command 2
 applying side-effects of command 3
 applying side-effects of command 4
 finishing command 1; rejected=false
 finishing and acknowledging command 2; rejected=false
 finishing and acknowledging command 3; rejected=true
 finishing command 4; rejected=false
 applying batch with commands=[5]
 applying side-effects of command 5
 finishing and acknowledging command 5; rejected=false
 applying batch with commands=[6 7]
 applying side-effects of command 6
 applying side-effects of command 7
 finishing and acknowledging command 6; rejected=true
 finishing and acknowledging command 7; rejected=false

func MakeTask Uses

func MakeTask(sm StateMachine, dec Decoder) Task

MakeTask creates a new task with the provided state machine and decoder.

func (*Task) AckCommittedEntriesBeforeApplication Uses

func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error

AckCommittedEntriesBeforeApplication attempts to acknowledge the success of raft entries that have been durably committed to the raft log but have not yet been applied to the proposer replica's replicated state machine.

This is safe because a proposal through raft can be known to have succeeded as soon as it is durably replicated to a quorum of replicas (i.e. has committed in the raft log). The proposal does not need to wait for the effects of the proposal to be applied in order to know whether its changes will succeed or fail. This is because the raft log is the provider of atomicity and durability for replicated writes, not (ignoring log truncation) the replicated state machine itself.

However, there are a few complications to acknowledging the success of a proposal at this stage:

1. Committing an entry in the raft log and having the command in that entry
   succeed are similar but not equivalent concepts. Even if the entry succeeds
   in achieving durability by replicating to a quorum of replicas, its command
   may still be rejected "beneath raft". This means that a (deterministic)
   check after replication decides that the command will not be applied to the
   replicated state machine. In that case, the client waiting on the result of
   the command should not be informed of its success. Luckily, this check is
   cheap to perform so we can do it here and when applying the command.

   Determining whether the command will succeed or be rejected before applying
   it for real is accomplished using an ephemeral batch. Commands are staged in
   the ephemeral batch to acquire CheckedCommands, which can then be acknowledged
   immediately even though the ephemeral batch itself cannot be used to update
   the durable state machine. Once the rejection status of each command is
   determined, any successful commands that permit acknowledgement before
   application (see CanAckBeforeApplication) are acknowledged. The ephemeral
   batch is then thrown away.

2. Some commands perform non-trivial work such as updating Replica configuration
   state or performing Range splits. In those cases, it's likely that the client
   is interested in not only knowing whether it has succeeded in sequencing the
   change in the raft log, but also in knowing when the change has gone into
   effect. There's currently no exposed hook to ask for an acknowledgement only
   after a command has been applied, so for simplicity the current implementation
   only ever acks transactional writes before they have gone into effect. All
   other commands wait until they have been applied to ack their client.

3. Even though we can determine whether a command has succeeded without applying
   it, the effect of the command will not be visible to conflicting commands until
   it is applied. Because of this, the client can be informed of the success of
   a write at this point, but we cannot release that write's latches until the
   write has applied. See ProposalData.signalProposalResult/finishApplication.

4. etcd/raft may provided a series of CommittedEntries in a Ready struct that
   haven't actually been appended to our own log. This is most common in single
   node replication groups, but it is possible when a follower in a multi-node
   replication group is catching up after falling behind. In the first case,
   the entries are not yet committed so acknowledging them would be a lie. In
   the second case, the entries are committed so we could acknowledge them at
   this point, but doing so seems risky. To avoid complications in either case,
   the method takes a maxIndex parameter that limits the indexes that it will
   acknowledge. Typically, callers will supply the highest index that they have
   durably written to their raft log for this upper bound.

func (*Task) ApplyCommittedEntries Uses

func (t *Task) ApplyCommittedEntries(ctx context.Context) error

ApplyCommittedEntries applies raft entries that have been committed to the raft log but have not yet been applied to the replicated state machine.

func (*Task) Close Uses

func (t *Task) Close()

Close ends the task, releasing any resources that it holds and resetting the Decoder. The Task cannot be used again after being closed.

func (*Task) Decode Uses

func (t *Task) Decode(ctx context.Context, committedEntries []raftpb.Entry) error

Decode decodes the committed raft entries into commands and prepared for the commands to be applied to the replicated state machine.

func (*Task) SetMaxBatchSize Uses

func (t *Task) SetMaxBatchSize(size int)

SetMaxBatchSize sets the maximum application batch size. If 0, no limit will be placed on the number of commands that can be applied in a batch.

Package apply imports 2 packages (graph) and is imported by 1 packages. Updated 2019-08-23. Refresh now. Tools for package owners.