Documentation ¶
Overview ¶
Package store provides a distributed SQLite instance.
Distributed consensus is provided via the Raft algorithm.
Index ¶
- Variables
- func HasData(dir string) (bool, error)
- func IsNewNode(raftDir string) bool
- func IsStaleRead(leaderlastContact time.Time, lastFSMUpdateTime time.Time, ...) bool
- func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log, ...) error
- func ResetStats()
- type ClusterState
- type CommandProcessor
- type Config
- type DBConfig
- type ExecuteQueryResponses
- type FSM
- type FSMSnapshot
- type Layer
- type NodeTransport
- func (n *NodeTransport) Close() error
- func (n *NodeTransport) CommandCommitIndex() uint64
- func (n *NodeTransport) Consumer() <-chan raft.RPC
- func (n *NodeTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, ...) error
- func (n *NodeTransport) LeaderCommitIndex() uint64
- func (n *NodeTransport) Stats() map[string]interface{}
- type Provider
- type Server
- type Servers
- type SnapshotStore
- type Store
- func (s *Store) Addr() string
- func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error)
- func (s *Store) Bootstrap(servers ...*Server) error
- func (s *Store) Close(wait bool) (retErr error)
- func (s *Store) CommitIndex() (uint64, error)
- func (s *Store) Committed(timeout time.Duration) (uint64, error)
- func (s *Store) DBAppliedIndex() uint64
- func (s *Store) Database(leader bool) ([]byte, error)
- func (s *Store) DeregisterObserver(o *raft.Observer)
- func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)
- func (s *Store) HasLeader() bool
- func (s *Store) ID() string
- func (s *Store) IsLeader() bool
- func (s *Store) IsVoter() (bool, error)
- func (s *Store) Join(jr *proto.JoinRequest) error
- func (s *Store) LastVacuumTime() (time.Time, error)
- func (s *Store) LeaderAddr() (string, error)
- func (s *Store) LeaderCommitIndex() (uint64, error)
- func (s *Store) LeaderID() (string, error)
- func (s *Store) LeaderWithID() (string, string)
- func (s *Store) Load(lr *proto.LoadRequest) error
- func (s *Store) Nodes() ([]*Server, error)
- func (s *Store) Noop(id string) (raft.ApplyFuture, error)
- func (s *Store) Notify(nr *proto.NotifyRequest) error
- func (s *Store) Open() (retErr error)
- func (s *Store) Path() string
- func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error)
- func (s *Store) RORWCount(eqr *proto.ExecuteQueryRequest) (nRW, nRO int)
- func (s *Store) ReadFrom(r io.Reader) (int64, error)
- func (s *Store) Ready() bool
- func (s *Store) RegisterLeaderChange(c chan<- struct{})
- func (s *Store) RegisterObserver(o *raft.Observer)
- func (s *Store) RegisterReadyChannel(ch <-chan struct{})
- func (s *Store) Remove(rn *proto.RemoveNodeRequest) error
- func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error)
- func (s *Store) SetRequestCompression(batch, size int)
- func (s *Store) SetRestorePath(path string) error
- func (s *Store) Snapshot(n uint64) (retError error)
- func (s *Store) State() ClusterState
- func (s *Store) Stats() (map[string]interface{}, error)
- func (s *Store) Stepdown(wait bool) error
- func (s *Store) Vacuum() error
- func (s *Store) WaitForAllApplied(timeout time.Duration) error
- func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error)
- func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error
- func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error
- func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error)
- func (s *Store) WaitForLeader(timeout time.Duration) (string, error)
- func (s *Store) WaitForRemoval(id string, timeout time.Duration) error
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotOpen is returned when a Store is not open. ErrNotOpen = errors.New("store not open") // ErrOpen is returned when a Store is already open. ErrOpen = errors.New("store already open") // ErrNotReady is returned when a Store is not ready to accept requests. ErrNotReady = errors.New("store not ready") // ErrNotLeader is returned when a node attempts to execute a leader-only // operation. ErrNotLeader = errors.New("not leader") // ErrNotSingleNode is returned when a node attempts to execute a single-node // only operation. ErrNotSingleNode = errors.New("not single-node") // ErrStaleRead is returned if the executing the query would violate the // requested freshness. ErrStaleRead = errors.New("stale read") // ErrOpenTimeout is returned when the Store does not apply its initial // logs within the specified time. ErrOpenTimeout = errors.New("timeout waiting for initial logs application") // ErrWaitForRemovalTimeout is returned when the Store does not confirm removal // of a node within the specified time. ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation") // ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader // within the specified time. ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader") // ErrInvalidBackupFormat is returned when the requested backup format // is not valid. ErrInvalidBackupFormat = errors.New("invalid backup format") // ErrInvalidVacuumFormat is returned when the requested backup format is not // compatible with vacuum. ErrInvalidVacuum = errors.New("invalid vacuum") // ErrLoadInProgress is returned when a load is already in progress and the // requested operation cannot be performed. ErrLoadInProgress = errors.New("load in progress") )
Functions ¶
func HasData ¶ added in v8.15.0
HasData returns true if the given dir indicates that at least one FSM entry has been committed to the log. This is true if there are any snapshots, or if there are any entries in the log of raft.LogCommand type. This function will block if the Bolt database is already open.
func IsNewNode ¶
IsNewNode returns whether a node using raftDir would be a brand-new node. It also means that the window for this node joining a different cluster has passed.
func IsStaleRead ¶ added in v8.20.0
func IsStaleRead( leaderlastContact time.Time, lastFSMUpdateTime time.Time, lastAppendedAtTime time.Time, fsmIndex uint64, commitIndex uint64, freshness int64, strict bool, ) bool
IsStaleRead returns whether a read is stale.
func RecoverNode ¶
func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log, snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error
RecoverNode is used to manually force a new configuration, in the event that quorum cannot be restored. This borrows heavily from RecoverCluster functionality of the Hashicorp Raft library, but has been customized for rqlite use.
func ResetStats ¶
func ResetStats()
ResetStats resets the expvar stats for this module. Mostly for test purposes.
Types ¶
type ClusterState ¶
type ClusterState int
ClusterState defines the possible Raft states the current node can be in
const ( Leader ClusterState = iota Follower Candidate Shutdown Unknown )
Represents the Raft cluster states
type CommandProcessor ¶ added in v8.14.0
type CommandProcessor struct {
// contains filtered or unexported fields
}
CommandProcessor processes commands by applying them to the underlying database.
func NewCommandProcessor ¶ added in v8.14.0
func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *CommandProcessor
NewCommandProcessor returns a new instance of CommandProcessor.
func (*CommandProcessor) Process ¶ added in v8.14.0
func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Command, bool, interface{})
Process processes the given command against the given database.
type Config ¶
type Config struct { DBConf *DBConfig // The DBConfig object for this Store. Dir string // The working directory for raft. Tn Transport // The underlying Transport for raft. ID string // Node ID. Logger *log.Logger // The logger to use to log stuff. }
Config represents the configuration of the underlying Store.
type DBConfig ¶
type DBConfig struct { // SQLite on-disk path OnDiskPath string `json:"on_disk_path,omitempty"` // Enforce Foreign Key constraints FKConstraints bool `json:"fk_constraints"` }
DBConfig represents the configuration of the underlying SQLite database.
type ExecuteQueryResponses ¶ added in v8.16.5
type ExecuteQueryResponses []*proto.ExecuteQueryResponse
ExecuteQueryResponses is a slice of ExecuteQueryResponse, which detects mutations.
func (ExecuteQueryResponses) Mutation ¶ added in v8.16.5
func (e ExecuteQueryResponses) Mutation() bool
Mutation returns true if any of the responses mutated the database.
type FSM ¶
type FSM struct {
// contains filtered or unexported fields
}
FSM is a wrapper around the Store which implements raft.FSM.
type FSMSnapshot ¶
type FSMSnapshot struct { raft.FSMSnapshot // contains filtered or unexported fields }
FSMSnapshot is a wrapper around raft.FSMSnapshot which adds instrumentation and logging.
func (*FSMSnapshot) Persist ¶
func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) (retError error)
Persist writes the snapshot to the given sink.
type Layer ¶ added in v8.13.3
Layer is the interface expected by the Store for network communication between nodes, which is used for Raft distributed consensus.
type NodeTransport ¶
type NodeTransport struct { *raft.NetworkTransport // contains filtered or unexported fields }
NodeTransport is a wrapper around the Raft NetworkTransport, which allows custom configuration of the InstallSnapshot method.
func NewNodeTransport ¶
func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport
NewNodeTransport returns an initialized NodeTransport.
func (*NodeTransport) CommandCommitIndex ¶ added in v8.20.1
func (n *NodeTransport) CommandCommitIndex() uint64
CommandCommitIndex returns the index of the latest committed log entry which is applied to the FSM.
func (*NodeTransport) Consumer ¶
func (n *NodeTransport) Consumer() <-chan raft.RPC
Consumer returns a channel of RPC requests to be consumed.
func (*NodeTransport) InstallSnapshot ¶
func (n *NodeTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error
InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.
func (*NodeTransport) LeaderCommitIndex ¶ added in v8.20.2
func (n *NodeTransport) LeaderCommitIndex() uint64
LeaderCommitIndex returns the index of the latest committed log entry which is known to be replicated to the majority of the cluster.
func (*NodeTransport) Stats ¶ added in v8.20.2
func (n *NodeTransport) Stats() map[string]interface{}
Stats returns the current stats of the transport.
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
Provider implements the uploader Provider interface, allowing the Store to be used as a DataProvider for an uploader.
func NewProvider ¶
NewProvider returns a new instance of Provider. If v is true, the SQLite database will be VACUUMed before being provided. If c is true, the SQLite database will be compressed before being provided.
type Server ¶
type Server struct { ID string `json:"id,omitempty"` Addr string `json:"addr,omitempty"` Suffrage string `json:"suffrage,omitempty"` }
Server represents another node in the cluster.
type Servers ¶
type Servers []*Server
Servers is a set of Servers.
func (Servers) Contains ¶
Contains returns whether the given node, as specified by its Raft ID, is a member of the set of servers.
func (Servers) IsReadOnly ¶
IsReadOnly returns whether the given node, as specified by its Raft ID, is a read-only (non-voting) node. If no node is found with the given ID then found will be false.
type SnapshotStore ¶
type SnapshotStore interface { raft.SnapshotStore // FullNeeded returns true if a full snapshot is needed. FullNeeded() (bool, error) // SetFullNeeded explicitly sets that a full snapshot is needed. SetFullNeeded() error // Stats returns stats about the Snapshot Store. Stats() (map[string]interface{}, error) }
SnapshotStore is the interface Snapshot stores must implement.
type Store ¶
type Store struct { BootstrapExpect int ShutdownOnRemove bool SnapshotThreshold uint64 SnapshotThresholdWALSize uint64 SnapshotInterval time.Duration LeaderLeaseTimeout time.Duration HeartbeatTimeout time.Duration ElectionTimeout time.Duration ApplyTimeout time.Duration RaftLogLevel string NoFreeListSync bool AutoVacInterval time.Duration // Node-reaping configuration ReapTimeout time.Duration ReapReadOnlyTimeout time.Duration // contains filtered or unexported fields }
Store is a SQLite database, where all changes are made via Raft consensus.
func (*Store) Backup ¶
Backup writes a consistent snapshot of the underlying database to dst. This can be called while writes are being made to the system. The backup may fail if the system is actively snapshotting. The client can just retry in this case.
If vacuum is not true the copy is written directly to dst, optionally in compressed form, without any intermediate temporary files.
If vacuum is true, then a VACUUM is performed on the database before the backup is made. If compression false, and dst is an os.File, then the vacuumed copy will be written directly to that file. Otherwise a temporary file will be created, and that temporary file copied to dst.
func (*Store) Bootstrap ¶
Bootstrap executes a cluster bootstrap on this node, using the given Servers as the configuration.
func (*Store) CommitIndex ¶ added in v8.20.2
CommitIndex returns the Raft commit index.
func (*Store) Committed ¶ added in v8.21.0
Committed blocks until the local commit index is greater than or equal to the Leader index, as checked when the function is called. It returns the committed index. If the Leader index is 0, then the system waits until the commit index is at least 1.
func (*Store) DBAppliedIndex ¶ added in v8.16.5
DBAppliedIndex returns the index of the last Raft log that changed the underlying database. If the index is unknown then 0 is returned.
func (*Store) Database ¶
Database returns a copy of the underlying database. The caller MUST ensure that no transaction is taking place during this call, or an error may be returned. If leader is true, this operation is performed with a read consistency level equivalent to "weak". Otherwise, no guarantees are made about the read consistency level.
http://sqlite.org/howtocorrupt.html states it is safe to do this as long as the database is not written to during the call.
func (*Store) DeregisterObserver ¶
DeregisterObserver deregisters an observer of Raft events
func (*Store) Execute ¶
func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)
Execute executes queries that return no rows, but do modify the database.
func (*Store) HasLeader ¶ added in v8.13.5
HasLeader returns true if the cluster has a leader, false otherwise.
func (*Store) IsVoter ¶
IsVoter returns true if the current node is a voter in the cluster. If there is no reference to the current node in the current cluster configuration then false will also be returned.
func (*Store) Join ¶
func (s *Store) Join(jr *proto.JoinRequest) error
Join joins a node, identified by id and located at addr, to this store. The node must be ready to respond to Raft communications at that address.
func (*Store) LastVacuumTime ¶ added in v8.17.0
LastVacuumTime returns the time of the last automatic VACUUM.
func (*Store) LeaderAddr ¶
LeaderAddr returns the address of the current leader. Returns a blank string if there is no leader or if the Store is not open.
func (*Store) LeaderCommitIndex ¶ added in v8.20.2
LeaderCommitIndex returns the Raft leader commit index, as indicated by the latest AppendEntries RPC. If this node is the Leader then the commit index is returned directly from the Raft object.
func (*Store) LeaderID ¶
LeaderID returns the node ID of the Raft leader. Returns a blank string if there is no leader, or an error.
func (*Store) LeaderWithID ¶
LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.
func (*Store) Load ¶
func (s *Store) Load(lr *proto.LoadRequest) error
Loads an entire SQLite file into the database, sending the request through the Raft log.
func (*Store) Noop ¶
func (s *Store) Noop(id string) (raft.ApplyFuture, error)
Noop writes a noop command to the Raft log. A noop command simply consumes a slot in the Raft log, but has no other effect on the system.
func (*Store) Notify ¶
func (s *Store) Notify(nr *proto.NotifyRequest) error
Notify notifies this Store that a node is ready for bootstrapping at the given address. Once the number of known nodes reaches the expected level bootstrapping will be attempted using this Store. "Expected level" includes this node, so this node must self-notify to ensure the cluster bootstraps with the *advertised Raft address* which the Store doesn't know about.
Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (*Store) RORWCount ¶ added in v8.20.0
func (s *Store) RORWCount(eqr *proto.ExecuteQueryRequest) (nRW, nRO int)
RORWCount returns the number of read-only and read-write statements in the given ExecuteQueryRequest.
func (*Store) ReadFrom ¶
ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus. Once the data is loaded, a snapshot is triggered, which then results in a system as if the data had been loaded through Raft consensus.
func (*Store) Ready ¶
Ready returns true if the store is ready to serve requests. Ready is defined as having no open channels registered via RegisterReadyChannel and having a Leader.
func (*Store) RegisterLeaderChange ¶
func (s *Store) RegisterLeaderChange(c chan<- struct{})
RegisterLeaderChange registers the given channel which will receive a signal when this node detects that the Leader changes.
func (*Store) RegisterObserver ¶
RegisterObserver registers an observer of Raft events
func (*Store) RegisterReadyChannel ¶
func (s *Store) RegisterReadyChannel(ch <-chan struct{})
RegisterReadyChannel registers a channel that must be closed before the store is considered "ready" to serve requests.
func (*Store) Remove ¶
func (s *Store) Remove(rn *proto.RemoveNodeRequest) error
Remove removes a node from the store.
func (*Store) Request ¶
func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error)
Request processes a request that may contain both Executes and Queries.
func (*Store) SetRequestCompression ¶
SetRequestCompression allows low-level control over the compression threshold for the request marshaler.
func (*Store) SetRestorePath ¶
SetRestorePath sets the path to a file containing a copy of a SQLite database. This database will be loaded if and when the node becomes the Leader for the first time only. The Store will also delete the file when it's finished with it.
This function should only be called before the Store is opened and setting the restore path means the Store will not report itself as ready until a restore has been attempted.
func (*Store) Snapshot ¶
Snapshot performs a snapshot, leaving n trailing logs behind. If n is greater than zero, that many logs are left in the log after snapshotting. If n is zero, then the number set at Store creation is used. Finally, once this function returns, the trailing log configuration value is reset to the value set at Store creation.
func (*Store) State ¶
func (s *Store) State() ClusterState
State returns the current node's Raft state
func (*Store) Stepdown ¶
Stepdown forces this node to relinquish leadership to another node in the cluster. If this node is not the leader, and 'wait' is true, an error will be returned.
func (*Store) Vacuum ¶ added in v8.17.0
Vacuum performs a VACUUM operation on the underlying database. It does this by performing a VACUUM INTO a temporary file, and then swapping the temporary file with the existing database file. The database is then re-opened.
func (*Store) WaitForAllApplied ¶ added in v8.16.5
WaitForAllApplied waits for all Raft log entries to be applied to the underlying database.
func (*Store) WaitForAppliedFSM ¶
WaitForAppliedFSM waits until the currently applied logs (at the time this function is called) are actually reflected by the FSM, or the timeout expires.
func (*Store) WaitForAppliedIndex ¶
WaitForAppliedIndex blocks until a given log index has been applied, or the timeout expires.
func (*Store) WaitForCommitIndex ¶ added in v8.21.0
WaitForCommitIndex blocks until the local Raft commit index is equal to or greater the given index, or the timeout expires.
func (*Store) WaitForFSMIndex ¶
WaitForFSMIndex blocks until a given log index has been applied to our state machine or the timeout expires.
func (*Store) WaitForLeader ¶
WaitForLeader blocks until a leader is detected, or the timeout expires.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is the network service provided to Raft, and wraps a Listener.
func NewTransport ¶
NewTransport returns an initialized Transport.