store

package
v0.0.67 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: Apache-2.0, MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotOpen is returned when a Store is not open.
	ErrNotOpen = errors.New("store not open")

	// ErrOpen is returned when a Store is already open.
	ErrOpen = errors.New("store already open")

	// ErrNotReady is returned when a Store is not ready to accept requests.
	ErrNotReady = errors.New("store not ready")

	// ErrNotLeader is returned when a node attempts to execute a leader-only
	// operation.
	ErrNotLeader = errors.New("not leader")

	// ErrNotSingleNode is returned when a node attempts to execute a single-node
	// only operation.
	ErrNotSingleNode = errors.New("not single-node")

	// ErrStaleRead is returned if the executing the query would violate the
	// requested freshness.
	ErrStaleRead = errors.New("stale read")

	// ErrOpenTimeout is returned when the Store does not apply its initial
	// logs within the specified time.
	ErrOpenTimeout = errors.New("timeout waiting for initial logs application")

	// ErrWaitForRemovalTimeout is returned when the Store does not confirm removal
	// of a node within the specified time.
	ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation")

	// ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader
	// within the specified time.
	ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader")

	// ErrInvalidBackupFormat is returned when the requested backup format
	// is not valid.
	ErrInvalidBackupFormat = errors.New("invalid backup format")

	// ErrInvalidVacuumFormat is returned when the requested backup format is not
	// compatible with vacuum.
	ErrInvalidVacuum = errors.New("invalid vacuum")

	// ErrLoadInProgress is returned when a load is already in progress and the
	// requested operation cannot be performed.
	ErrLoadInProgress = errors.New("load in progress")
)
View Source
var (
	// ErrCASConflict is returned when a CAS operation fails.
	ErrCASConflict = errors.New("CAS conflict")
)

Functions

func HasData

func HasData(dir string) (bool, error)

HasData returns true if the given dir indicates that at least one FSM entry has been committed to the log. This is true if there are any snapshots, or if there are any entries in the log of raft.LogCommand type. This function will block if the Bolt database is already open.

func IsNewNode

func IsNewNode(raftDir string) bool

IsNewNode returns whether a node using raftDir would be a brand-new node. It also means that the window for this node joining a different cluster has passed.

func IsStaleRead

func IsStaleRead(
	leaderlastContact time.Time,
	lastFSMUpdateTime time.Time,
	lastAppendedAtTime time.Time,
	fsmIndex uint64,
	commitIndex uint64,
	freshness int64,
	strict bool,
) bool

IsStaleRead returns whether a read is stale.

Types

type Atomic

type Atomic[T any] struct {
	atomic.Value
}

func (*Atomic[T]) Load

func (a *Atomic[T]) Load() T

func (*Atomic[T]) Store

func (a *Atomic[T]) Store(t T)

type AtomicTime

type AtomicTime struct {
	Atomic[time.Time]
}

func (*AtomicTime) Add

func (a *AtomicTime) Add(t time.Duration)

func (*AtomicTime) Sub

func (a *AtomicTime) Sub(t *AtomicTime) time.Duration

type CheckAndSet

type CheckAndSet struct {
	// contains filtered or unexported fields
}

CheckAndSet is a simple concurrency control mechanism that allows only one goroutine to execute a critical section at a time.

func (*CheckAndSet) Begin

func (c *CheckAndSet) Begin() error

Begin attempts to enter the critical section. If another goroutine is already in the critical section, Begin returns an error.

func (*CheckAndSet) End

func (c *CheckAndSet) End()

End exits the critical section.

type ClusterState

type ClusterState int

ClusterState defines the possible Raft states the current node can be in

const (
	Leader ClusterState = iota
	Follower
	Candidate
	Shutdown
	Unknown
)

Represents the Raft cluster states

type Database

type Database interface {
	Data(ctx context.Context, req *v1.Data) error
	Realtime(ctx context.Context, req *v1.Realtime_Request) (*v1.Realtime_Response, error)
	Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Response, error)
	Timeseries(ctx context.Context, req *v1.Timeseries_Request) (*v1.Timeseries_Response, error)
	Breakdown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Response, error)
	Load(ctx context.Context, req *v1.Load_Request) error
}

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

func NewFSM

func NewFSM(s *Store) *FSM

NewFSM returns a new FSM.

func (*FSM) Apply

func (f *FSM) Apply(l *raft.Log) interface{}

func (*FSM) Restore

func (f *FSM) Restore(w io.ReadCloser) error

func (*FSM) Snapshot

func (f *FSM) Snapshot() (raft.FSMSnapshot, error)

type SnapshotStore

type SnapshotStore interface {
	raft.SnapshotStore

	// FullNeeded returns true if a full snapshot is needed.
	FullNeeded() (bool, error)

	// SetFullNeeded explicitly sets that a full snapshot is needed.
	SetFullNeeded() error
}

type Storage

type Storage interface {
	Database

	Committed(ctx context.Context) (uint64, error)

	CommitIndex(ctx context.Context) (uint64, error)

	// Remove removes the node from the cluster.
	Remove(ctx context.Context, rn *v1.RemoveNode_Request) error

	// LeaderAddr returns the Raft address of the leader of the cluster.
	LeaderAddr(ctx context.Context) (string, error)

	// Ready returns whether the Store is ready to service requests.
	Ready(ctx context.Context) bool

	// Nodes returns the slice of store.Servers in the cluster
	Nodes(ctx context.Context) (*v1.Server_List, error)

	// Backup writes backup of the node state to dst
	Backup(ctx context.Context, br *v1.Backup_Request, dst io.Writer) error

	// ReadFrom reads and loads a SQLite database into the node, initially bypassing
	// the Raft system. It then triggers a Raft snapshot, which will then make
	// Raft aware of the new data.
	ReadFrom(ctx context.Context, r io.Reader) (int64, error)

	Join(ctx context.Context, jr *v1.Join_Request) error
	Notify(ctx context.Context, nr *v1.Notify_Request) error

	Status() (*v1.Status_Store, error)
}

Storage is the interface the Raft-based database must implement.

type Store

type Store struct {
	BootstrapExpect int

	ShutdownOnRemove         bool
	SnapshotThreshold        uint64
	SnapshotThresholdWALSize uint64
	SnapshotInterval         time.Duration
	LeaderLeaseTimeout       time.Duration
	HeartbeatTimeout         time.Duration
	ElectionTimeout          time.Duration
	ApplyTimeout             time.Duration
	RaftLogLevel             string
	NoFreeListSync           bool
	AutoVacInterval          time.Duration
	// contains filtered or unexported fields
}

func NewStore

func NewStore(base *v1.Config, transit raft.Transport, mgr *connections.Manager, tenants *tenant.Tenants) (*Store, error)

func (*Store) Addr

func (s *Store) Addr() string

Addr returns the address of the store.

func (*Store) Aggregate

func (s *Store) Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Response, error)

func (*Store) Backup

func (s *Store) Backup(ctx context.Context, br *v1.Backup_Request, dst io.Writer) error

func (*Store) Bootstrap

func (s *Store) Bootstrap(ctx context.Context, ls *v1.Server_List) error

Bootstrap executes a cluster bootstrap on this node, using the given Servers as the configuration.

func (*Store) Breakdown

func (s *Store) Breakdown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Response, error)

func (*Store) Close

func (s *Store) Close() (retErr error)

Close closes the store. If wait is true, waits for a graceful shutdown.

func (*Store) CommitIndex

func (s *Store) CommitIndex(ctx context.Context) (uint64, error)

CommitIndex returns the Raft commit index.

func (*Store) Committed

func (s *Store) Committed(ctx context.Context) (uint64, error)

Committed blocks until the local commit index is greater than or equal to the Leader index, as checked when the function is called. It returns the committed index. If the Leader index is 0, then the system waits until the commit index is at least 1.

func (*Store) Data

func (s *Store) Data(ctx context.Context, req *v1.Data) error

func (*Store) HasLeader

func (s *Store) HasLeader() bool

HasLeader returns true if the cluster has a leader, false otherwise.

func (*Store) ID

func (s *Store) ID() string

ID returns the Raft ID of the store.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

IsLeader is used to determine if the current node is cluster leader

func (*Store) IsVoter

func (s *Store) IsVoter() (bool, error)

IsVoter returns true if the current node is a voter in the cluster. If there is no reference to the current node in the current cluster configuration then false will also be returned.

func (*Store) Join

func (s *Store) Join(ctx context.Context, jr *v1.Join_Request) error

Join joins a node, identified by id and located at addr, to this store. The node must be ready to respond to Raft communications at that address.

func (*Store) LeaderAddr

func (s *Store) LeaderAddr(_ context.Context) (string, error)

LeaderAddr returns the address of the current leader. Returns a blank string if there is no leader or if the Store is not open.

func (*Store) LeaderCommitIndex

func (s *Store) LeaderCommitIndex() (uint64, error)

LeaderCommitIndex returns the Raft leader commit index, as indicated by the latest AppendEntries RPC. If this node is the Leader then the commit index is returned directly from the Raft object.

func (*Store) Load

func (s *Store) Load(ctx context.Context, req *v1.Load_Request) error

func (*Store) Nodes

func (s *Store) Nodes(ctx context.Context) (*v1.Server_List, error)

Nodes returns the slice of nodes in the cluster, sorted by ID ascending.

func (*Store) Notify

func (s *Store) Notify(ctx context.Context, nr *v1.Notify_Request) error

Notify notifies this Store that a node is ready for bootstrapping at the given address. Once the number of known nodes reaches the expected level bootstrapping will be attempted using this Store. "Expected level" includes this node, so this node must self-notify to ensure the cluster bootstraps with the *advertised Raft address* which the Store doesn't know about.

Notifying is idempotent. A node may repeatedly notify the Store without issue.

func (*Store) Open

func (s *Store) Open(ctx context.Context) error

func (*Store) Path

func (s *Store) Path() string

Path returns the path to the store's storage directory.

func (*Store) ReadFrom

func (s *Store) ReadFrom(ctx context.Context, r io.Reader) (int64, error)

ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus. Once the data is loaded, a snapshot is triggered, which then results in a system as if the data had been loaded through Raft consensus.

func (*Store) Ready

func (s *Store) Ready(ctx context.Context) bool

Ready returns true if the store is ready to serve requests. Ready is defined as having no open channels registered via RegisterReadyChannel and having a Leader.

func (*Store) Realtime

func (s *Store) Realtime(ctx context.Context, req *v1.Realtime_Request) (*v1.Realtime_Response, error)

func (*Store) Remove

func (s *Store) Remove(ctx context.Context, rn *v1.RemoveNode_Request) error

Remove removes a node from the store.

func (*Store) Snapshot

func (s *Store) Snapshot(n uint64) (retError error)

Snapshot performs a snapshot, leaving n trailing logs behind. If n is greater than zero, that many logs are left in the log after snapshotting. If n is zero, then the number set at Store creation is used. Finally, once this function returns, the trailing log configuration value is reset to the value set at Store creation.

func (*Store) State

func (s *Store) State() ClusterState

State returns the current node's Raft state

func (*Store) Status

func (s *Store) Status() (*v1.Status_Store, error)

func (*Store) Timeseries

func (s *Store) Timeseries(ctx context.Context, req *v1.Timeseries_Request) (*v1.Timeseries_Response, error)

func (*Store) WaitForAllApplied

func (s *Store) WaitForAllApplied(timeout time.Duration) error

WaitForApplied waits for all Raft log entries to be applied to the underlying database.

func (*Store) WaitForAppliedFSM

func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error)

WaitForAppliedFSM waits until the currently applied logs (at the time this function is called) are actually reflected by the FSM, or the timeout expires.

func (*Store) WaitForAppliedIndex

func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error

WaitForAppliedIndex blocks until a given log index has been applied, or the timeout expires.

func (*Store) WaitForCommitIndex

func (s *Store) WaitForCommitIndex(ctx context.Context, idx uint64) error

WaitForCommitIndex blocks until the local Raft commit index is equal to or greater the given index, or the timeout expires.

func (*Store) WaitForFSMIndex

func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error)

WaitForFSMIndex blocks until a given log index has been applied to our state machine or the timeout expires.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL