store

package
v8.24.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2024 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package store provides a distributed SQLite instance.

Distributed consensus is provided via the Raft algorithm.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotOpen is returned when a Store is not open.
	ErrNotOpen = errors.New("store not open")

	// ErrOpen is returned when a Store is already open.
	ErrOpen = errors.New("store already open")

	// ErrNotReady is returned when a Store is not ready to accept requests.
	ErrNotReady = errors.New("store not ready")

	// ErrNotLeader is returned when a node attempts to execute a leader-only
	// operation.
	ErrNotLeader = errors.New("not leader")

	// ErrNotSingleNode is returned when a node attempts to execute a single-node
	// only operation.
	ErrNotSingleNode = errors.New("not single-node")

	// ErrStaleRead is returned if the executing the query would violate the
	// requested freshness.
	ErrStaleRead = errors.New("stale read")

	// ErrOpenTimeout is returned when the Store does not apply its initial
	// logs within the specified time.
	ErrOpenTimeout = errors.New("timeout waiting for initial logs application")

	// ErrWaitForRemovalTimeout is returned when the Store does not confirm removal
	// of a node within the specified time.
	ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation")

	// ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader
	// within the specified time.
	ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader")

	// ErrInvalidBackupFormat is returned when the requested backup format
	// is not valid.
	ErrInvalidBackupFormat = errors.New("invalid backup format")

	// ErrInvalidVacuumFormat is returned when the requested backup format is not
	// compatible with vacuum.
	ErrInvalidVacuum = errors.New("invalid vacuum")

	// ErrLoadInProgress is returned when a load is already in progress and the
	// requested operation cannot be performed.
	ErrLoadInProgress = errors.New("load in progress")
)

Functions

func HasData added in v8.15.0

func HasData(dir string) (bool, error)

HasData returns true if the given dir indicates that at least one FSM entry has been committed to the log. This is true if there are any snapshots, or if there are any entries in the log of raft.LogCommand type. This function will block if the Bolt database is already open.

func IsNewNode

func IsNewNode(raftDir string) bool

IsNewNode returns whether a node using raftDir would be a brand-new node. It also means that the window for this node joining a different cluster has passed.

func IsStaleRead added in v8.20.0

func IsStaleRead(
	leaderlastContact time.Time,
	lastFSMUpdateTime time.Time,
	lastAppendedAtTime time.Time,
	fsmIndex uint64,
	commitIndex uint64,
	freshness int64,
	strict bool,
) bool

IsStaleRead returns whether a read is stale.

func RecoverNode

func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log,
	snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error

RecoverNode is used to manually force a new configuration, in the event that quorum cannot be restored. This borrows heavily from RecoverCluster functionality of the Hashicorp Raft library, but has been customized for rqlite use.

func ResetStats

func ResetStats()

ResetStats resets the expvar stats for this module. Mostly for test purposes.

Types

type ClusterState

type ClusterState int

ClusterState defines the possible Raft states the current node can be in

const (
	Leader ClusterState = iota
	Follower
	Candidate
	Shutdown
	Unknown
)

Represents the Raft cluster states

type CommandProcessor added in v8.14.0

type CommandProcessor struct {
	// contains filtered or unexported fields
}

CommandProcessor processes commands by applying them to the underlying database.

func NewCommandProcessor added in v8.14.0

func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *CommandProcessor

NewCommandProcessor returns a new instance of CommandProcessor.

func (*CommandProcessor) Process added in v8.14.0

func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Command, bool, interface{})

Process processes the given command against the given database.

type Config

type Config struct {
	DBConf *DBConfig   // The DBConfig object for this Store.
	Dir    string      // The working directory for raft.
	Tn     Transport   // The underlying Transport for raft.
	ID     string      // Node ID.
	Logger *log.Logger // The logger to use to log stuff.
}

Config represents the configuration of the underlying Store.

type DBConfig

type DBConfig struct {
	// SQLite on-disk path
	OnDiskPath string `json:"on_disk_path,omitempty"`

	// Enforce Foreign Key constraints
	FKConstraints bool `json:"fk_constraints"`
}

DBConfig represents the configuration of the underlying SQLite database.

func NewDBConfig

func NewDBConfig() *DBConfig

NewDBConfig returns a new DB config instance.

type ExecuteQueryResponses added in v8.16.5

type ExecuteQueryResponses []*proto.ExecuteQueryResponse

ExecuteQueryResponses is a slice of ExecuteQueryResponse, which detects mutations.

func (ExecuteQueryResponses) Mutation added in v8.16.5

func (e ExecuteQueryResponses) Mutation() bool

Mutation returns true if any of the responses mutated the database.

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

FSM is a wrapper around the Store which implements raft.FSM.

func NewFSM

func NewFSM(s *Store) *FSM

NewFSM returns a new FSM.

func (*FSM) Apply

func (f *FSM) Apply(l *raft.Log) interface{}

Apply applies a Raft log entry to the Store.

func (*FSM) Restore

func (f *FSM) Restore(rc io.ReadCloser) error

Restore restores the Store from a snapshot.

func (*FSM) Snapshot

func (f *FSM) Snapshot() (raft.FSMSnapshot, error)

Snapshot returns a Snapshot of the Store

type FSMSnapshot

type FSMSnapshot struct {
	raft.FSMSnapshot
	// contains filtered or unexported fields
}

FSMSnapshot is a wrapper around raft.FSMSnapshot which adds instrumentation and logging.

func (*FSMSnapshot) Persist

func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) (retError error)

Persist writes the snapshot to the given sink.

func (*FSMSnapshot) Release

func (f *FSMSnapshot) Release()

Release is a no-op.

type Layer added in v8.13.3

type Layer interface {
	net.Listener
	Dial(address string, timeout time.Duration) (net.Conn, error)
}

Layer is the interface expected by the Store for network communication between nodes, which is used for Raft distributed consensus.

type NodeTransport

type NodeTransport struct {
	*raft.NetworkTransport
	// contains filtered or unexported fields
}

NodeTransport is a wrapper around the Raft NetworkTransport, which allows custom configuration of the InstallSnapshot method.

func NewNodeTransport

func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport

NewNodeTransport returns an initialized NodeTransport.

func (*NodeTransport) Close

func (n *NodeTransport) Close() error

Close closes the transport

func (*NodeTransport) CommandCommitIndex added in v8.20.1

func (n *NodeTransport) CommandCommitIndex() uint64

CommandCommitIndex returns the index of the latest committed log entry which is applied to the FSM.

func (*NodeTransport) Consumer

func (n *NodeTransport) Consumer() <-chan raft.RPC

Consumer returns a channel of RPC requests to be consumed.

func (*NodeTransport) InstallSnapshot

InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.

func (*NodeTransport) LeaderCommitIndex added in v8.20.2

func (n *NodeTransport) LeaderCommitIndex() uint64

LeaderCommitIndex returns the index of the latest committed log entry which is known to be replicated to the majority of the cluster.

func (*NodeTransport) Stats added in v8.20.2

func (n *NodeTransport) Stats() map[string]interface{}

Stats returns the current stats of the transport.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider implements the uploader Provider interface, allowing the Store to be used as a DataProvider for an uploader.

func NewProvider

func NewProvider(s *Store, v, c bool) *Provider

NewProvider returns a new instance of Provider. If v is true, the SQLite database will be VACUUMed before being provided. If c is true, the SQLite database will be compressed before being provided.

func (*Provider) LastIndex added in v8.16.5

func (p *Provider) LastIndex() (uint64, error)

LastIndex returns the cluster-wide index the data managed by the DataProvider was last modified by.

func (*Provider) Provide

func (p *Provider) Provide(w io.Writer) (retErr error)

Provider writes the SQLite database to the given path. If path exists, it will be overwritten.

type Server

type Server struct {
	ID       string `json:"id,omitempty"`
	Addr     string `json:"addr,omitempty"`
	Suffrage string `json:"suffrage,omitempty"`
}

Server represents another node in the cluster.

func NewServer

func NewServer(id, addr string, voter bool) *Server

NewServer returns an initialized Server.

type Servers

type Servers []*Server

Servers is a set of Servers.

func (Servers) Contains

func (s Servers) Contains(id string) bool

Contains returns whether the given node, as specified by its Raft ID, is a member of the set of servers.

func (Servers) IsReadOnly

func (s Servers) IsReadOnly(id string) (readOnly bool, found bool)

IsReadOnly returns whether the given node, as specified by its Raft ID, is a read-only (non-voting) node. If no node is found with the given ID then found will be false.

func (Servers) Len

func (s Servers) Len() int

func (Servers) Less

func (s Servers) Less(i, j int) bool

func (Servers) Swap

func (s Servers) Swap(i, j int)

type SnapshotStore

type SnapshotStore interface {
	raft.SnapshotStore

	// FullNeeded returns true if a full snapshot is needed.
	FullNeeded() (bool, error)

	// SetFullNeeded explicitly sets that a full snapshot is needed.
	SetFullNeeded() error

	// Stats returns stats about the Snapshot Store.
	Stats() (map[string]interface{}, error)
}

SnapshotStore is the interface Snapshot stores must implement.

type Store

type Store struct {
	BootstrapExpect int

	ShutdownOnRemove         bool
	SnapshotThreshold        uint64
	SnapshotThresholdWALSize uint64
	SnapshotInterval         time.Duration
	LeaderLeaseTimeout       time.Duration
	HeartbeatTimeout         time.Duration
	ElectionTimeout          time.Duration
	ApplyTimeout             time.Duration
	RaftLogLevel             string
	NoFreeListSync           bool
	AutoVacInterval          time.Duration

	// Node-reaping configuration
	ReapTimeout         time.Duration
	ReapReadOnlyTimeout time.Duration
	// contains filtered or unexported fields
}

Store is a SQLite database, where all changes are made via Raft consensus.

func New

func New(ly Layer, c *Config) *Store

New returns a new Store.

func (*Store) Addr

func (s *Store) Addr() string

Addr returns the address of the store.

func (*Store) Backup

func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error)

Backup writes a consistent snapshot of the underlying database to dst. This can be called while writes are being made to the system. The backup may fail if the system is actively snapshotting. The client can just retry in this case.

If vacuum is not true the copy is written directly to dst, optionally in compressed form, without any intermediate temporary files.

If vacuum is true, then a VACUUM is performed on the database before the backup is made. If compression false, and dst is an os.File, then the vacuumed copy will be written directly to that file. Otherwise a temporary file will be created, and that temporary file copied to dst.

func (*Store) Bootstrap

func (s *Store) Bootstrap(servers ...*Server) error

Bootstrap executes a cluster bootstrap on this node, using the given Servers as the configuration.

func (*Store) Close

func (s *Store) Close(wait bool) (retErr error)

Close closes the store. If wait is true, waits for a graceful shutdown.

func (*Store) CommitIndex added in v8.20.2

func (s *Store) CommitIndex() (uint64, error)

CommitIndex returns the Raft commit index.

func (*Store) Committed added in v8.21.0

func (s *Store) Committed(timeout time.Duration) (uint64, error)

Committed blocks until the local commit index is greater than or equal to the Leader index, as checked when the function is called. It returns the committed index. If the Leader index is 0, then the system waits until the commit index is at least 1.

func (*Store) DBAppliedIndex added in v8.16.5

func (s *Store) DBAppliedIndex() uint64

DBAppliedIndex returns the index of the last Raft log that changed the underlying database. If the index is unknown then 0 is returned.

func (*Store) Database

func (s *Store) Database(leader bool) ([]byte, error)

Database returns a copy of the underlying database. The caller MUST ensure that no transaction is taking place during this call, or an error may be returned. If leader is true, this operation is performed with a read consistency level equivalent to "weak". Otherwise, no guarantees are made about the read consistency level.

http://sqlite.org/howtocorrupt.html states it is safe to do this as long as the database is not written to during the call.

func (*Store) DeregisterObserver

func (s *Store) DeregisterObserver(o *raft.Observer)

DeregisterObserver deregisters an observer of Raft events

func (*Store) Execute

func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)

Execute executes queries that return no rows, but do modify the database.

func (*Store) HasLeader added in v8.13.5

func (s *Store) HasLeader() bool

HasLeader returns true if the cluster has a leader, false otherwise.

func (*Store) ID

func (s *Store) ID() string

ID returns the Raft ID of the store.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

IsLeader is used to determine if the current node is cluster leader

func (*Store) IsVoter

func (s *Store) IsVoter() (bool, error)

IsVoter returns true if the current node is a voter in the cluster. If there is no reference to the current node in the current cluster configuration then false will also be returned.

func (*Store) Join

func (s *Store) Join(jr *proto.JoinRequest) error

Join joins a node, identified by id and located at addr, to this store. The node must be ready to respond to Raft communications at that address.

func (*Store) LastVacuumTime added in v8.17.0

func (s *Store) LastVacuumTime() (time.Time, error)

LastVacuumTime returns the time of the last automatic VACUUM.

func (*Store) LeaderAddr

func (s *Store) LeaderAddr() (string, error)

LeaderAddr returns the address of the current leader. Returns a blank string if there is no leader or if the Store is not open.

func (*Store) LeaderCommitIndex added in v8.20.2

func (s *Store) LeaderCommitIndex() (uint64, error)

LeaderCommitIndex returns the Raft leader commit index, as indicated by the latest AppendEntries RPC. If this node is the Leader then the commit index is returned directly from the Raft object.

func (*Store) LeaderID

func (s *Store) LeaderID() (string, error)

LeaderID returns the node ID of the Raft leader. Returns a blank string if there is no leader, or an error.

func (*Store) LeaderWithID

func (s *Store) LeaderWithID() (string, string)

LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.

func (*Store) Load

func (s *Store) Load(lr *proto.LoadRequest) error

Loads an entire SQLite file into the database, sending the request through the Raft log.

func (*Store) Nodes

func (s *Store) Nodes() ([]*Server, error)

Nodes returns the slice of nodes in the cluster, sorted by ID ascending.

func (*Store) Noop

func (s *Store) Noop(id string) (raft.ApplyFuture, error)

Noop writes a noop command to the Raft log. A noop command simply consumes a slot in the Raft log, but has no other effect on the system.

func (*Store) Notify

func (s *Store) Notify(nr *proto.NotifyRequest) error

Notify notifies this Store that a node is ready for bootstrapping at the given address. Once the number of known nodes reaches the expected level bootstrapping will be attempted using this Store. "Expected level" includes this node, so this node must self-notify to ensure the cluster bootstraps with the *advertised Raft address* which the Store doesn't know about.

Notifying is idempotent. A node may repeatedly notify the Store without issue.

func (*Store) Open

func (s *Store) Open() (retErr error)

Open opens the Store.

func (*Store) Path

func (s *Store) Path() string

Path returns the path to the store's storage directory.

func (*Store) Query

func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error)

Query executes queries that return rows, and do not modify the database.

func (*Store) RORWCount added in v8.20.0

func (s *Store) RORWCount(eqr *proto.ExecuteQueryRequest) (nRW, nRO int)

RORWCount returns the number of read-only and read-write statements in the given ExecuteQueryRequest.

func (*Store) ReadFrom

func (s *Store) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus. Once the data is loaded, a snapshot is triggered, which then results in a system as if the data had been loaded through Raft consensus.

func (*Store) Ready

func (s *Store) Ready() bool

Ready returns true if the store is ready to serve requests. Ready is defined as having no open channels registered via RegisterReadyChannel and having a Leader.

func (*Store) RegisterLeaderChange

func (s *Store) RegisterLeaderChange(c chan<- struct{})

RegisterLeaderChange registers the given channel which will receive a signal when this node detects that the Leader changes.

func (*Store) RegisterObserver

func (s *Store) RegisterObserver(o *raft.Observer)

RegisterObserver registers an observer of Raft events

func (*Store) RegisterReadyChannel

func (s *Store) RegisterReadyChannel(ch <-chan struct{})

RegisterReadyChannel registers a channel that must be closed before the store is considered "ready" to serve requests.

func (*Store) Remove

func (s *Store) Remove(rn *proto.RemoveNodeRequest) error

Remove removes a node from the store.

func (*Store) Request

Request processes a request that may contain both Executes and Queries.

func (*Store) SetRequestCompression

func (s *Store) SetRequestCompression(batch, size int)

SetRequestCompression allows low-level control over the compression threshold for the request marshaler.

func (*Store) SetRestorePath

func (s *Store) SetRestorePath(path string) error

SetRestorePath sets the path to a file containing a copy of a SQLite database. This database will be loaded if and when the node becomes the Leader for the first time only. The Store will also delete the file when it's finished with it.

This function should only be called before the Store is opened and setting the restore path means the Store will not report itself as ready until a restore has been attempted.

func (*Store) Snapshot

func (s *Store) Snapshot(n uint64) (retError error)

Snapshot performs a snapshot, leaving n trailing logs behind. If n is greater than zero, that many logs are left in the log after snapshotting. If n is zero, then the number set at Store creation is used. Finally, once this function returns, the trailing log configuration value is reset to the value set at Store creation.

func (*Store) State

func (s *Store) State() ClusterState

State returns the current node's Raft state

func (*Store) Stats

func (s *Store) Stats() (map[string]interface{}, error)

Stats returns stats for the store.

func (*Store) Stepdown

func (s *Store) Stepdown(wait bool) error

Stepdown forces this node to relinquish leadership to another node in the cluster. If this node is not the leader, and 'wait' is true, an error will be returned.

func (*Store) Vacuum added in v8.17.0

func (s *Store) Vacuum() error

Vacuum performs a VACUUM operation on the underlying database. It does this by performing a VACUUM INTO a temporary file, and then swapping the temporary file with the existing database file. The database is then re-opened.

func (*Store) WaitForAllApplied added in v8.16.5

func (s *Store) WaitForAllApplied(timeout time.Duration) error

WaitForAllApplied waits for all Raft log entries to be applied to the underlying database.

func (*Store) WaitForAppliedFSM

func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error)

WaitForAppliedFSM waits until the currently applied logs (at the time this function is called) are actually reflected by the FSM, or the timeout expires.

func (*Store) WaitForAppliedIndex

func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error

WaitForAppliedIndex blocks until a given log index has been applied, or the timeout expires.

func (*Store) WaitForCommitIndex added in v8.21.0

func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error

WaitForCommitIndex blocks until the local Raft commit index is equal to or greater the given index, or the timeout expires.

func (*Store) WaitForFSMIndex

func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error)

WaitForFSMIndex blocks until a given log index has been applied to our state machine or the timeout expires.

func (*Store) WaitForLeader

func (s *Store) WaitForLeader(timeout time.Duration) (string, error)

WaitForLeader blocks until a leader is detected, or the timeout expires.

func (*Store) WaitForRemoval

func (s *Store) WaitForRemoval(id string, timeout time.Duration) error

WaitForRemoval blocks until a node with the given ID is removed from the cluster or the timeout expires.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the network service provided to Raft, and wraps a Listener.

func NewTransport

func NewTransport(ly Layer) *Transport

NewTransport returns an initialized Transport.

func (*Transport) Accept

func (t *Transport) Accept() (net.Conn, error)

Accept waits for the next connection.

func (*Transport) Addr

func (t *Transport) Addr() net.Addr

Addr returns the binding address of the transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport

func (*Transport) Dial

func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial creates a new network connection.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL