litefs

package module
v0.5.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 29 Imported by: 1

README

LiteFS GitHub release (latest by date) Status GitHub

LiteFS is a FUSE-based file system for replicating SQLite databases across a cluster of machines. It works as a passthrough file system that intercepts writes to SQLite databases in order to detect transaction boundaries and record changes on a per-transaction level in LTX files.

This project is actively maintained but is currently in a beta state. Please report any bugs as an issue on the GitHub repository.

You can find a Getting Started guide on LiteFS' documentation site. Please see the ARCHITECTURE.md design document for details about how LiteFS works.

SQLite TCL Test Suite

It's a goal of LiteFS to pass the SQLite TCL test suite, however, this is currently a work in progress. LiteFS doesn't have database deletion implemented yet so that causes many tests to fail during teardown.

To run a test from the suite against LiteFS, you can use the Dockerfile.test to run it in isolation. First build the Dockerfile:

docker build -t litefs-test -f Dockerfile.test .

Then run it with the filename of the test you want to run. In this case, we are running select1.test:

docker run --device /dev/fuse --cap-add SYS_ADMIN -it litefs-test select1.test

Contributing

LiteFS contributions work a little different than most GitHub projects. If you have a small bug fix or typo fix, please PR directly to this repository.

If you would like to contribute a feature, please follow these steps:

  1. Discuss the feature in an issue on this GitHub repository.
  2. Create a pull request to your fork of the repository.
  3. Post a link to your pull request in the issue for consideration.

This project has a roadmap and features are added and tested in a certain order. Additionally, it's likely that code style, implementation details, and test coverage will need to be tweaked so it's easier to for me to grab your implementation as a starting point when implementing a feature.

Documentation

Index

Constants

View Source
const (
	StreamFrameTypeLTX       = StreamFrameType(1)
	StreamFrameTypeReady     = StreamFrameType(2)
	StreamFrameTypeEnd       = StreamFrameType(3)
	StreamFrameTypeDropDB    = StreamFrameType(4)
	StreamFrameTypeHandoff   = StreamFrameType(5)
	StreamFrameTypeHWM       = StreamFrameType(6)
	StreamFrameTypeHeartbeat = StreamFrameType(7)
)
View Source
const (
	// ClusterIDLen is the length of a cluster ID.
	ClusterIDLen = 20

	// ClusterIDPrefix is the prefix for every cluster ID.
	ClusterIDPrefix = "LFSC"
)
View Source
const (
	WALHeaderSize      = 32
	WALFrameHeaderSize = 24
	WALIndexHeaderSize = 136
	WALIndexBlockSize  = 32768
)

SQLite constants

View Source
const (
	PENDING_BYTE  = 0x40000000
	RESERVED_BYTE = (PENDING_BYTE + 1)
	SHARED_FIRST  = (PENDING_BYTE + 2)
	SHARED_SIZE   = 510
)

SQLite rollback journal lock constants.

View Source
const (
	WAL_WRITE_LOCK   = 120
	WAL_CKPT_LOCK    = 121
	WAL_RECOVER_LOCK = 122
	WAL_READ_LOCK0   = 123
	WAL_READ_LOCK1   = 124
	WAL_READ_LOCK2   = 125
	WAL_READ_LOCK3   = 126
	WAL_READ_LOCK4   = 127
)

SQLite WAL lock constants.

View Source
const (
	JournalModeDelete   = "DELETE"
	JournalModeTruncate = "TRUNCATE"
	JournalModePersist  = "PERSIST"
	JournalModeWAL      = "WAL"
)
View Source
const (
	FileTypeNone = FileType(iota)
	FileTypeDatabase
	FileTypeJournal
	FileTypeWAL
	FileTypeSHM
	FileTypePos
	FileTypeLock
)

Database file types.

View Source
const (
	SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00"

	// Location of the database size, in pages, in the main database file.
	SQLITE_DATABASE_SIZE_OFFSET = 28

	/// Magic header string that identifies a SQLite journal header.
	/// https://www.sqlite.org/fileformat.html#the_rollback_journal
	SQLITE_JOURNAL_HEADER_STRING = "\xd9\xd5\x05\xf9\x20\xa1\x63\xd7"

	// Size of the journal header, in bytes.
	SQLITE_JOURNAL_HEADER_SIZE = 28
)
View Source
const (
	// Database file locks
	LockTypeHalt     = LockType(72)         // LiteFS-specific lock byte
	LockTypePending  = LockType(0x40000000) // 1073741824
	LockTypeReserved = LockType(0x40000001) // 1073741825
	LockTypeShared   = LockType(0x40000002) // 1073741826

	// SHM file locks
	LockTypeWrite   = LockType(120)
	LockTypeCkpt    = LockType(121)
	LockTypeRecover = LockType(122)
	LockTypeRead0   = LockType(123)
	LockTypeRead1   = LockType(124)
	LockTypeRead2   = LockType(125)
	LockTypeRead3   = LockType(126)
	LockTypeRead4   = LockType(127)
	LockTypeDMS     = LockType(128)
)
View Source
const (
	DBModeRollback = DBMode(0)
	DBModeWAL      = DBMode(1)
)

Database journal modes.

View Source
const (
	RWMutexStateUnlocked = RWMutexState(iota)
	RWMutexStateShared
	RWMutexStateExclusive
)
View Source
const (
	DefaultReconnectDelay = 1 * time.Second
	DefaultDemoteDelay    = 10 * time.Second

	DefaultRetention                = 10 * time.Minute
	DefaultRetentionMonitorInterval = 1 * time.Minute

	DefaultHaltAcquireTimeout      = 10 * time.Second
	DefaultHaltLockTTL             = 30 * time.Second
	DefaultHaltLockMonitorInterval = 5 * time.Second

	DefaultBackupDelay            = 1 * time.Second
	DefaultBackupFullSyncInterval = 10 * time.Second
)

Default store settings.

View Source
const (
	// MaxBackupLTXFileN is the number of LTX files that can be compacted
	// together at a time when sending data to the backup service.
	MaxBackupLTXFileN = 256

	MetricsMonitorInterval = 1 * time.Second
)
View Source
const (
	EventTypeInit          = "init"
	EventTypeTx            = "tx"
	EventTypePrimaryChange = "primaryChange"
)
View Source
const ChecksumBlockSize = 256

ChecksumBlockSize is the number of pages that are grouped into a single checksum block.

View Source
const EventChannelBufferSize = 1024
View Source
const RWMutexInterval = 10 * time.Microsecond

RWMutexInterval is the time between reattempting lock acquisition.

View Source
const TraceLogFlags = log.LstdFlags | log.Lmicroseconds | log.LUTC

TraceLogFlags are the flags to be used with TraceLog.

View Source
const WaitInterval = 100 * time.Microsecond

WaitInterval is the time between checking if the DB has reached a position in DB.Wait().

Variables

View Source
var (
	ErrDatabaseNotFound = fmt.Errorf("database not found")
	ErrDatabaseExists   = fmt.Errorf("database already exists")

	ErrNoPrimary     = errors.New("no primary")
	ErrPrimaryExists = errors.New("primary exists")
	ErrNotEligible   = errors.New("not eligible to become primary")
	ErrLeaseExpired  = errors.New("lease expired")
	ErrNoHaltPrimary = errors.New("no remote halt needed on primary node")

	ErrReadOnlyReplica  = fmt.Errorf("read only replica")
	ErrDuplicateLTXFile = fmt.Errorf("duplicate ltx file")
)

LiteFS errors

View Source
var ErrInvalidClusterID = errors.New("invalid cluster id")

ErrInvalidClusterID is returned when a cluster ID is invalid.

View Source
var ErrInvalidNodeID = errors.New("invalid node id")
View Source
var ErrStoreClosed = fmt.Errorf("store closed")
View Source
var GlobalStore atomic.Value

GlobalStore represents a single store used for metrics collection.

View Source
var LogLevel struct {
	sync.Mutex
	slog.LevelVar
}

Global log level. Can be adjusted dynamically.

View Source
var NativeEndian = binary.LittleEndian

NativeEndian is always set to little endian as that is the only endianness used by supported platforms for LiteFS. This may be expanded in the future.

View Source
var TraceLog = log.New(io.Discard, "", TraceLogFlags)

TraceLog is a log for low-level tracing.

Functions

func ContainsLockType added in v0.3.0

func ContainsLockType(a []LockType, typ LockType) bool

ContainsLockType returns true if a contains typ.

func FormatNodeID added in v0.4.0

func FormatNodeID(id uint64) string

FormatNodeID formats a node identifier as a 16-character uppercase hex string.

func GenerateClusterID added in v0.5.0

func GenerateClusterID() string

GenerateClusterID returns a new, randomly-generated cluster ID.

func JournalChecksum added in v0.3.0

func JournalChecksum(data []byte, initial uint32) uint32

JournalChecksum returns the checksum used by the journal format.

func ParseNodeID added in v0.4.0

func ParseNodeID(s string) (uint64, error)

ParseNodeID parses a 16-character hex string into a node identifier.

func TrimName

func TrimName(name string) string

TrimName removes "-journal", "-shm" or "-wal" from the given name.

func ValidateClusterID added in v0.5.0

func ValidateClusterID(id string) error

ValidateClusterID returns nil if id is a valid cluster ID.

func WALChecksum added in v0.3.0

func WALChecksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)

WALChecksum computes a running SQLite WAL checksum over a byte slice.

func WriteStreamFrame

func WriteStreamFrame(w io.Writer, f StreamFrame) error

WriteStreamFrame writes the stream type & frame to the writer.

Types

type BackupClient added in v0.5.0

type BackupClient interface {
	// URL of the backup service.
	URL() string

	// PosMap returns the replication position for all databases on the backup service.
	PosMap(ctx context.Context) (map[string]ltx.Pos, error)

	// WriteTx writes an LTX file to the backup service. The file must be
	// contiguous with the latest LTX file on the backup service or else it
	// will return an ltx.PosMismatchError.
	//
	// Returns the high-water mark that indicates it is safe to remove LTX files
	// before that transaction ID.
	WriteTx(ctx context.Context, name string, r io.Reader) (hwm ltx.TXID, err error)

	// FetchSnapshot requests a full snapshot of the database as it exists on
	// the backup service. This should be used if the LiteFS node has become
	// out of sync with the backup service.
	FetchSnapshot(ctx context.Context, name string) (io.ReadCloser, error)
}

type ChangeSetSubscriber added in v0.5.5

type ChangeSetSubscriber struct {
	// contains filtered or unexported fields
}

ChangeSetSubscriber subscribes to changes to databases in the store.

It implements a set of "dirty" databases instead of a channel of all events as clients can be slow and we don't want to cause channels to back up. It is the responsibility of the caller to determine the state changes which is usually just checking the position of the client versus the store's database.

func (*ChangeSetSubscriber) Close added in v0.5.5

func (s *ChangeSetSubscriber) Close() error

Close removes the subscriber from the store.

func (*ChangeSetSubscriber) DirtySet added in v0.5.5

func (s *ChangeSetSubscriber) DirtySet() map[string]struct{}

DirtySet returns a set of database IDs that have changed since the last call to DirtySet(). This call clears the set.

func (*ChangeSetSubscriber) HandoffCh added in v0.5.5

func (s *ChangeSetSubscriber) HandoffCh() chan string

HandoffCh returns a channel that returns a lease ID on handoff.

func (*ChangeSetSubscriber) MarkDirty added in v0.5.5

func (s *ChangeSetSubscriber) MarkDirty(name string)

MarkDirty marks a database ID as dirty.

func (*ChangeSetSubscriber) NodeID added in v0.5.5

func (s *ChangeSetSubscriber) NodeID() uint64

NodeID returns the ID of the subscribed node.

func (*ChangeSetSubscriber) NotifyCh added in v0.5.5

func (s *ChangeSetSubscriber) NotifyCh() <-chan struct{}

NotifyCh returns a channel that receives a value when the dirty set has changed.

type Client

type Client interface {
	// AcquireHaltLock attempts to acquire a remote halt lock on the primary node.
	AcquireHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*HaltLock, error)

	// ReleaseHaltLock releases a previous held remote halt lock on the primary node.
	ReleaseHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) error

	// Commit sends an LTX file to the primary to be committed.
	// Must be holding the halt lock to be successful.
	Commit(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error

	// Stream starts a long-running connection to stream changes from another node.
	// If filter is specified, only those databases will be replicated.
	Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (Stream, error)
}

Client represents a client for connecting to other LiteFS nodes.

type DB

type DB struct {

	// Returns the current time. Used for mocking time in tests.
	Now func() time.Time
	// contains filtered or unexported fields
}

DB represents a SQLite database.

func NewDB

func NewDB(store *Store, name string, path string) *DB

NewDB returns a new instance of DB.

func (*DB) AcquireHaltLock added in v0.4.0

func (db *DB) AcquireHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error)

AcquireHaltLock acquires the halt lock locally. This implicitly acquires locks required for locking & performs a checkpoint.

func (*DB) AcquireRemoteHaltLock added in v0.4.0

func (db *DB) AcquireRemoteHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error)

AcquireRemoteHaltLock acquires the remote lock and syncs the database to its position before returning to the caller. Caller should provide a random lock identifier so that the primary can deduplicate retry requests.

func (*DB) AcquireWriteLock added in v0.3.0

func (db *DB) AcquireWriteLock(ctx context.Context, fn func() error) (_ *GuardSet, err error)

AcquireWriteLock acquires the appropriate locks for a write depending on if the database uses a rollback journal or WAL.

func (*DB) ApplyLTXNoLock added in v0.4.0

func (db *DB) ApplyLTXNoLock(path string, fatalOnError bool) (retErr error)

ApplyLTXNoLock applies an LTX file to the database.

func (*DB) CanLock added in v0.3.0

func (db *DB) CanLock(ctx context.Context, owner uint64, lockTypes []LockType) (bool, RWMutexState)

CanLock returns true if all locks can acquire a write lock. If false, also returns the mutex state of the blocking lock.

func (*DB) CanRLock added in v0.3.0

func (db *DB) CanRLock(ctx context.Context, owner uint64, lockTypes []LockType) bool

CanRLock returns true if all locks can acquire a read lock.

func (*DB) Checkpoint added in v0.4.0

func (db *DB) Checkpoint(ctx context.Context) (err error)

Checkpoint acquires locks and copies pages from the WAL into the database and truncates the WAL.

func (*DB) CheckpointNoLock added in v0.4.0

func (db *DB) CheckpointNoLock(ctx context.Context) (err error)

CheckpointNoLock copies pages from the WAL into the database and truncates the WAL. Appropriate locks must be held by the caller.

func (*DB) CloseDatabase added in v0.3.0

func (db *DB) CloseDatabase(ctx context.Context, f *os.File, owner uint64) error

CloseDatabase closes a handle associated with the database file.

func (*DB) CloseJournal added in v0.3.0

func (db *DB) CloseJournal(ctx context.Context, f *os.File, owner uint64) error

CloseJournal closes a handle associated with the journal file.

func (*DB) CloseSHM added in v0.3.0

func (db *DB) CloseSHM(ctx context.Context, f *os.File, owner uint64) error

CloseSHM closes a handle associated with the SHM file.

func (*DB) CloseWAL added in v0.3.0

func (db *DB) CloseWAL(ctx context.Context, f *os.File, owner uint64) error

CloseWAL closes a handle associated with the WAL file.

func (*DB) CommitJournal

func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error)

CommitJournal deletes the journal file which commits or rolls back the transaction.

func (*DB) CommitWAL added in v0.3.0

func (db *DB) CommitWAL(ctx context.Context) (err error)

CommitWAL is called when the client releases the WAL_WRITE_LOCK(120). The transaction data is copied from the WAL into an LTX file and committed.

func (*DB) CreateGuardSetIfNotExists added in v0.3.0

func (db *DB) CreateGuardSetIfNotExists(owner uint64) *GuardSet

CreateGuardSetIfNotExists returns a guard set for the given owner. Creates a new guard set if one is not associated with the owner.

func (*DB) CreateJournal

func (db *DB) CreateJournal() (*os.File, error)

CreateJournal creates a new journal file on disk.

func (*DB) CreateSHM added in v0.3.0

func (db *DB) CreateSHM() (*os.File, error)

CreateSHM creates a new shared memory file on disk.

func (*DB) CreateWAL added in v0.3.0

func (db *DB) CreateWAL() (*os.File, error)

CreateWAL creates a new WAL file on disk.

func (*DB) DatabasePath added in v0.1.1

func (db *DB) DatabasePath() string

DatabasePath returns the path to the underlying database file.

func (*DB) Drop added in v0.5.5

func (db *DB) Drop(ctx context.Context) (err error)

Drop writes a zero "commit" value to indicate that the database has been deleted.

func (*DB) EnforceHaltLockExpiration added in v0.4.0

func (db *DB) EnforceHaltLockExpiration(ctx context.Context)

EnforceHaltLockExpiration unsets the HALT lock if it has expired.

func (*DB) EnforceRetention added in v0.2.0

func (db *DB) EnforceRetention(ctx context.Context, minTime time.Time) error

EnforceRetention removes all LTX files created before minTime.

func (*DB) Export added in v0.4.0

func (db *DB) Export(ctx context.Context, dst io.Writer) (ltx.Pos, error)

Export writes the contents of the database to dst. Returns the current replication position.

func (*DB) GuardSet added in v0.2.0

func (db *DB) GuardSet(owner uint64) *GuardSet

GuardSet returns a guard set for the given owner, if it exists.

func (*DB) HWM added in v0.5.0

func (db *DB) HWM() ltx.TXID

HWM returns the current high-water mark from the backup service.

func (*DB) HasRemoteHaltLock added in v0.4.0

func (db *DB) HasRemoteHaltLock() bool

HasRemoteHaltLock returns true if the node currently has the remote lock acquired.

func (*DB) Import added in v0.3.0

func (db *DB) Import(ctx context.Context, r io.Reader) error

Import replaces the contents of the database with the contents from the r. NOTE: LiteFS does not validate the integrity of the imported database!

func (*DB) InWriteTx

func (db *DB) InWriteTx() bool

InWriteTx returns true if the RESERVED lock has an exclusive lock.

func (*DB) JournalPath added in v0.1.1

func (db *DB) JournalPath() string

JournalPath returns the path to the underlying journal file.

func (*DB) LTXDir

func (db *DB) LTXDir() string

LTXDir returns the path to the directory of LTX transaction files.

func (*DB) LTXPath

func (db *DB) LTXPath(minTXID, maxTXID ltx.TXID) string

LTXPath returns the path of an LTX file.

func (*DB) Mode added in v0.4.0

func (db *DB) Mode() DBMode

Mode returns the journaling mode for the database (DBModeWAL or DBModeRollback).

func (*DB) Name

func (db *DB) Name() string

Name of the database name.

func (*DB) Open

func (db *DB) Open() error

Open initializes the database from files in its data directory.

func (*DB) OpenDatabase added in v0.3.0

func (db *DB) OpenDatabase(ctx context.Context) (*os.File, error)

OpenDatabase returns a handle for the database file.

func (*DB) OpenJournal added in v0.3.0

func (db *DB) OpenJournal(ctx context.Context) (*os.File, error)

OpenJournal returns a handle for the journal file.

func (*DB) OpenLTXFile

func (db *DB) OpenLTXFile(txID ltx.TXID) (*os.File, error)

OpenLTXFile returns a file handle to an LTX file that contains the given TXID.

func (*DB) OpenSHM added in v0.3.0

func (db *DB) OpenSHM(ctx context.Context) (*os.File, error)

OpenSHM returns a handle for the shared memory file.

func (*DB) OpenWAL added in v0.3.0

func (db *DB) OpenWAL(ctx context.Context) (*os.File, error)

OpenWAL returns a handle for the write-ahead log file.

func (*DB) PageN added in v0.5.5

func (db *DB) PageN() uint32

PageN returns the number of pages in the database.

func (*DB) Path

func (db *DB) Path() string

Path of the database's data directory.

func (*DB) Pos

func (db *DB) Pos() ltx.Pos

Pos returns the current transaction position of the database.

func (*DB) ReadDatabaseAt added in v0.3.0

func (db *DB) ReadDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)

ReadDatabaseAt reads from the database at the specified index.

func (*DB) ReadJournalAt added in v0.3.0

func (db *DB) ReadJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)

ReadJournalAt reads from the journal at the specified offset.

func (*DB) ReadLTXDir added in v0.2.0

func (db *DB) ReadLTXDir() ([]fs.DirEntry, error)

ReadLTXDir returns DirEntry for every LTX file.

func (*DB) ReadSHMAt added in v0.3.0

func (db *DB) ReadSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)

ReadSHMAt reads from the shared memory at the specified offset.

func (*DB) ReadWALAt added in v0.3.0

func (db *DB) ReadWALAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)

ReadWALAt reads from the WAL at the specified index.

func (*DB) Recover added in v0.3.0

func (db *DB) Recover(ctx context.Context) error

Recover forces a rollback (journal) or checkpoint (wal).

func (*DB) ReleaseHaltLock added in v0.4.0

func (db *DB) ReleaseHaltLock(ctx context.Context, id int64)

ReleaseHaltLock releases a halt lock by identifier. If the current halt lock does not match the identifier then it has already been released.

func (*DB) ReleaseRemoteHaltLock added in v0.4.0

func (db *DB) ReleaseRemoteHaltLock(ctx context.Context, lockID int64) (retErr error)

ReleaseRemoteHaltLock releases the current remote lock from the primary.

func (*DB) RemoteHaltLock added in v0.4.0

func (db *DB) RemoteHaltLock() *HaltLock

RemoteHaltLock returns a copy of the current remote lock, if any.

func (*DB) RemoveJournal added in v0.3.0

func (db *DB) RemoveJournal(ctx context.Context) error

RemoveJournal deletes the journal file from disk.

func (*DB) RemoveSHM added in v0.3.0

func (db *DB) RemoveSHM(ctx context.Context) error

RemoveSHM removes the SHM file from disk.

func (*DB) RemoveWAL added in v0.3.0

func (db *DB) RemoveWAL(ctx context.Context) (err error)

RemoveWAL deletes the WAL file from disk.

func (*DB) SHMPath added in v0.3.0

func (db *DB) SHMPath() string

SHMPath returns the path to the underlying shared memory file.

func (*DB) SetHWM added in v0.5.0

func (db *DB) SetHWM(txID ltx.TXID)

SetHWM sets the current high-water mark.

func (*DB) Store added in v0.2.0

func (db *DB) Store() *Store

Store returns the store that the database is a member of.

func (*DB) SyncDatabase added in v0.3.0

func (db *DB) SyncDatabase(ctx context.Context) (err error)

SyncDatabase fsync's the database file.

func (*DB) SyncJournal added in v0.3.0

func (db *DB) SyncJournal(ctx context.Context) (err error)

SyncJournal fsync's the journal file.

func (*DB) SyncSHM added in v0.3.0

func (db *DB) SyncSHM(ctx context.Context) (err error)

SyncSHM fsync's the shared memory file.

func (*DB) SyncWAL added in v0.3.0

func (db *DB) SyncWAL(ctx context.Context) (err error)

SyncWAL fsync's the WAL file.

func (*DB) TXID

func (db *DB) TXID() ltx.TXID

TXID returns the current transaction ID.

func (*DB) Timestamp added in v0.5.0

func (db *DB) Timestamp() time.Time

Timestamp is the timestamp from the last applied ltx.

func (*DB) TruncateDatabase added in v0.3.0

func (db *DB) TruncateDatabase(ctx context.Context, size int64) (err error)

TruncateDatabase sets the size of the database file.

func (*DB) TruncateJournal added in v0.3.0

func (db *DB) TruncateJournal(ctx context.Context) error

TruncateJournal sets the size of the journal file.

func (*DB) TruncateSHM added in v0.3.0

func (db *DB) TruncateSHM(ctx context.Context, size int64) error

TruncateSHM sets the size of the the SHM file.

func (*DB) TruncateWAL added in v0.3.0

func (db *DB) TruncateWAL(ctx context.Context, size int64) (err error)

TruncateWAL sets the size of the WAL file.

func (*DB) TryAcquireWriteLock added in v0.4.0

func (db *DB) TryAcquireWriteLock() (ret *GuardSet)

TryAcquireWriteLock acquires the appropriate locks for a write. If any locks fail then the action is aborted.

func (*DB) TryLocks added in v0.3.0

func (db *DB) TryLocks(ctx context.Context, owner uint64, lockTypes []LockType) (bool, error)

TryLocks attempts to lock one or more locks on the database for a given owner. Returns an error if no locks are supplied.

func (*DB) TryRLocks added in v0.3.0

func (db *DB) TryRLocks(ctx context.Context, owner uint64, lockTypes []LockType) bool

TryRLocks attempts to read lock one or more locks on the database for a given owner. Returns an error if no locks are supplied.

func (*DB) Unlock added in v0.3.0

func (db *DB) Unlock(ctx context.Context, owner uint64, lockTypes []LockType) error

Unlock unlocks one or more locks on the database for a given owner.

func (*DB) UnlockDatabase added in v0.3.0

func (db *DB) UnlockDatabase(ctx context.Context, owner uint64)

UnlockDatabase unlocks all locks from the database file.

func (*DB) UnlockSHM added in v0.3.0

func (db *DB) UnlockSHM(ctx context.Context, owner uint64)

UnlockSHM unlocks all locks from the SHM file.

func (*DB) UnsetRemoteHaltLock added in v0.4.0

func (db *DB) UnsetRemoteHaltLock(ctx context.Context, lockID int64) (retErr error)

UnsetRemoteHaltLock releases the current remote lock because of expiration. This only removes the reference locally as it's assumed it has already been removed on the primary.

func (*DB) WALPath added in v0.3.0

func (db *DB) WALPath() string

WALPath returns the path to the underlying WAL file.

func (*DB) WaitPosExact added in v0.4.0

func (db *DB) WaitPosExact(ctx context.Context, target ltx.Pos) error

WaitPosExact returns once db has reached the target position. Returns an error if ctx is done, TXID is exceeded, or on checksum mismatch.

func (*DB) WriteDatabaseAt added in v0.3.0

func (db *DB) WriteDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) error

WriteDatabaseAt writes data to the main database file at the given index.

func (*DB) WriteJournalAt added in v0.3.0

func (db *DB) WriteJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (err error)

WriteJournal writes data to the rollback journal file.

func (*DB) WriteLTXFileAt added in v0.4.0

func (db *DB) WriteLTXFileAt(ctx context.Context, r io.Reader) (string, error)

WriteLTXFileAt atomically writes r to the database's LTX directory but does not apply the file. That should be done after the file is written.

If file is a snapshot, then all other LTX files are removed.

Returns the path of the new LTX file on success.

func (*DB) WriteSHMAt added in v0.3.0

func (db *DB) WriteSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)

WriteSHMAt writes data to the SHM file.

func (*DB) WriteSnapshotTo added in v0.2.0

func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.Header, trailer ltx.Trailer, err error)

WriteSnapshotTo writes an LTX snapshot to dst.

func (*DB) WriteWALAt added in v0.3.0

func (db *DB) WriteWALAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (err error)

WriteWALAt writes data to the WAL file. On final commit write, an LTX file is generated for the transaction.

func (*DB) Writeable added in v0.4.0

func (db *DB) Writeable() bool

Writeable returns true if the node is the primary or if we've acquire the HALT lock from the primary.

type DBMode added in v0.3.0

type DBMode int

DBMode represents either a rollback journal or WAL mode.

func (DBMode) String added in v0.3.0

func (m DBMode) String() string

String returns the string representation of m.

type DropDBStreamFrame added in v0.4.0

type DropDBStreamFrame struct {
	Name string // database name
}

DropDBStreamFrame notifies replicas that a database has been deleted. DEPRECATED: LTX files with a zero "commit" field now represent deletions.

func (*DropDBStreamFrame) ReadFrom added in v0.4.0

func (f *DropDBStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*DropDBStreamFrame) Type added in v0.4.0

Type returns the type of stream frame.

func (*DropDBStreamFrame) WriteTo added in v0.4.0

func (f *DropDBStreamFrame) WriteTo(w io.Writer) (int64, error)

type EndStreamFrame added in v0.3.0

type EndStreamFrame struct{}

func (*EndStreamFrame) ReadFrom added in v0.3.0

func (f *EndStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*EndStreamFrame) Type added in v0.3.0

func (f *EndStreamFrame) Type() StreamFrameType

func (*EndStreamFrame) WriteTo added in v0.3.0

func (f *EndStreamFrame) WriteTo(w io.Writer) (int64, error)

type Environment added in v0.5.0

type Environment interface {
	// Type returns the name of the environment type.
	Type() string

	// SetPrimaryStatus sets marks the current node as the primary or not.
	SetPrimaryStatus(ctx context.Context, isPrimary bool)
}

Environment represents an interface for interacting with the host environment.

type Event added in v0.5.5

type Event struct {
	Type string `json:"type"`
	DB   string `json:"db,omitempty"`
	Data any    `json:"data,omitempty"`
}

Event represents a generic event.

func (*Event) UnmarshalJSON added in v0.5.5

func (e *Event) UnmarshalJSON(data []byte) error

type EventSubscriber added in v0.5.5

type EventSubscriber struct {
	// contains filtered or unexported fields
}

EventSubscriber subscribes to generic store events.

func (*EventSubscriber) C added in v0.5.5

func (s *EventSubscriber) C() <-chan Event

C returns a channel that receives event notifications. If caller cannot read events fast enough then channel will be closed.

func (*EventSubscriber) Stop added in v0.5.5

func (s *EventSubscriber) Stop()

Stop closes the subscriber and removes it from the store.

type FileBackupClient added in v0.5.0

type FileBackupClient struct {
	// contains filtered or unexported fields
}

FileBackupClient is a reference implemenation for BackupClient. This implementation is typically only used for testing.

func NewFileBackupClient added in v0.5.0

func NewFileBackupClient(path string) *FileBackupClient

NewFileBackupClient returns a new instance of FileBackupClient.

func (*FileBackupClient) FetchSnapshot added in v0.5.0

func (c *FileBackupClient) FetchSnapshot(ctx context.Context, name string) (_ io.ReadCloser, retErr error)

FetchSnapshot requests a full snapshot of the database as it exists on the backup service. This should be used if the LiteFS node has become out of sync with the backup service.

func (*FileBackupClient) Open added in v0.5.0

func (c *FileBackupClient) Open() (err error)

Open validates & creates the path the client was initialized with.

func (*FileBackupClient) PosMap added in v0.5.0

func (c *FileBackupClient) PosMap(ctx context.Context) (map[string]ltx.Pos, error)

PosMap returns the replication position for all databases on the backup service.

func (*FileBackupClient) URL added in v0.5.0

func (c *FileBackupClient) URL() string

URL of the backup service.

func (*FileBackupClient) WriteTx added in v0.5.0

func (c *FileBackupClient) WriteTx(ctx context.Context, name string, r io.Reader) (hwm ltx.TXID, err error)

WriteTx writes an LTX file to the backup service. The file must be contiguous with the latest LTX file on the backup service or else it will return an ltx.PosMismatchError.

type FileType

type FileType int

FileType represents a type of SQLite file.

func (FileType) IsValid

func (t FileType) IsValid() bool

IsValid returns true if t is a valid file type.

type GuardSet added in v0.2.0

type GuardSet struct {
	// contains filtered or unexported fields
}

GuardSet represents a set of mutex guards by a single owner.

func (*GuardSet) Ckpt added in v0.3.0

func (s *GuardSet) Ckpt() *RWMutexGuard

Ckpt returns a reference to the CKPT mutex guard.

func (*GuardSet) DMS added in v0.3.0

func (s *GuardSet) DMS() *RWMutexGuard

DMS returns a reference to the DMS mutex guard.

func (*GuardSet) Guard added in v0.2.0

func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard

Guard returns a guard by lock type. Panic on invalid lock type.

func (*GuardSet) Pending added in v0.3.0

func (s *GuardSet) Pending() *RWMutexGuard

Pending returns a reference to the PENDING mutex guard.

func (*GuardSet) Read0 added in v0.3.0

func (s *GuardSet) Read0() *RWMutexGuard

Read0 returns a reference to the READ0 mutex guard.

func (*GuardSet) Read1 added in v0.3.0

func (s *GuardSet) Read1() *RWMutexGuard

Read1 returns a reference to the READ1 mutex guard.

func (*GuardSet) Read2 added in v0.3.0

func (s *GuardSet) Read2() *RWMutexGuard

Read2 returns a reference to the READ2 mutex guard.

func (*GuardSet) Read3 added in v0.3.0

func (s *GuardSet) Read3() *RWMutexGuard

Read3 returns a reference to the READ3 mutex guard.

func (*GuardSet) Read4 added in v0.3.0

func (s *GuardSet) Read4() *RWMutexGuard

Read4 returns a reference to the READ4 mutex guard.

func (*GuardSet) Recover added in v0.3.0

func (s *GuardSet) Recover() *RWMutexGuard

Recover returns a reference to the RECOVER mutex guard.

func (*GuardSet) Reserved added in v0.3.0

func (s *GuardSet) Reserved() *RWMutexGuard

Reserved returns a reference to the RESERVED mutex guard.

func (*GuardSet) Shared added in v0.3.0

func (s *GuardSet) Shared() *RWMutexGuard

Shared returns a reference to the SHARED mutex guard.

func (*GuardSet) Unlock added in v0.2.0

func (s *GuardSet) Unlock()

Unlock unlocks all the guards in reversed order that they are acquired by SQLite.

func (*GuardSet) UnlockDatabase added in v0.3.0

func (s *GuardSet) UnlockDatabase()

UnlockDatabase unlocks all the database file guards.

func (*GuardSet) UnlockSHM added in v0.3.0

func (s *GuardSet) UnlockSHM()

UnlockSHM unlocks all the SHM file guards.

func (*GuardSet) Write added in v0.3.0

func (s *GuardSet) Write() *RWMutexGuard

Write returns a reference to the WRITE mutex guard.

type HWMStreamFrame added in v0.5.0

type HWMStreamFrame struct {
	TXID ltx.TXID // high-water mark TXID
	Name string   // database name
}

HWMStreamFrame propagates the high-water mark to replica nodes.

func (*HWMStreamFrame) ReadFrom added in v0.5.0

func (f *HWMStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*HWMStreamFrame) Type added in v0.5.0

Type returns the type of stream frame.

func (*HWMStreamFrame) WriteTo added in v0.5.0

func (f *HWMStreamFrame) WriteTo(w io.Writer) (int64, error)

type HaltLock added in v0.4.0

type HaltLock struct {
	// Unique identifier for the lock.
	ID int64 `json:"id"`

	// Position of the primary when this lock was acquired.
	Pos ltx.Pos `json:"pos"`

	// Time that the halt lock expires at.
	Expires *time.Time `json:"expires"`
}

HaltLock represents a lock remotely held on the primary. This allows the local node to perform writes and send them to the primary while the lock is held.

type HandoffStreamFrame added in v0.4.0

type HandoffStreamFrame struct {
	LeaseID string
}

func (*HandoffStreamFrame) ReadFrom added in v0.4.0

func (f *HandoffStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*HandoffStreamFrame) Type added in v0.4.0

Type returns the type of stream frame.

func (*HandoffStreamFrame) WriteTo added in v0.4.0

func (f *HandoffStreamFrame) WriteTo(w io.Writer) (int64, error)

type HeartbeatStreamFrame added in v0.5.0

type HeartbeatStreamFrame struct {
	Timestamp int64 // ms since unix epoch
}

HeartbeatStreamFrame informs replicas that there have been no recent transactions

func (*HeartbeatStreamFrame) ReadFrom added in v0.5.0

func (f *HeartbeatStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*HeartbeatStreamFrame) Type added in v0.5.0

Type returns the type of stream frame.

func (*HeartbeatStreamFrame) WriteTo added in v0.5.0

func (f *HeartbeatStreamFrame) WriteTo(w io.Writer) (int64, error)

type InitEventData added in v0.5.5

type InitEventData struct {
	IsPrimary bool   `json:"isPrimary"`
	Hostname  string `json:"hostname,omitempty"`
}

type Invalidator added in v0.1.1

type Invalidator interface {
	InvalidateDB(db *DB) error
	InvalidateDBRange(db *DB, offset, size int64) error
	InvalidateSHM(db *DB) error
	InvalidatePos(db *DB) error
	InvalidateEntry(name string) error
	InvalidateLag() error
}

Invalidator is a callback for the store to use to invalidate the kernel page cache.

type JournalMode

type JournalMode string

JournalMode represents a SQLite journal mode.

type JournalReader added in v0.3.0

type JournalReader struct {
	// contains filtered or unexported fields
}

JouralReader represents a reader of the SQLite journal file format.

func NewJournalReader added in v0.3.0

func NewJournalReader(f *os.File, pageSize uint32) *JournalReader

JournalReader returns a new instance of JournalReader.

func (*JournalReader) DatabaseSize added in v0.3.0

func (r *JournalReader) DatabaseSize() int64

DatabaseSize returns the size of the database before the journal transaction, in bytes.

func (*JournalReader) IsValid added in v0.3.0

func (r *JournalReader) IsValid() bool

IsValid returns true if at least one journal header was read.

func (*JournalReader) Next added in v0.3.0

func (r *JournalReader) Next() (err error)

Next reads the next segment of the journal. Returns io.EOF if no more segments exist.

func (*JournalReader) ReadFrame added in v0.3.0

func (r *JournalReader) ReadFrame() (pgno uint32, data []byte, err error)

ReadFrame returns the page number and page data for the next frame. Returns io.EOF after the last frame. Page data should not be retained.

type LTXStreamFrame

type LTXStreamFrame struct {
	Size int64  // payload size
	Name string // database name
}

func (*LTXStreamFrame) ReadFrom

func (f *LTXStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*LTXStreamFrame) Type

Type returns the type of stream frame.

func (*LTXStreamFrame) WriteTo

func (f *LTXStreamFrame) WriteTo(w io.Writer) (int64, error)

type Lease

type Lease interface {
	ID() string
	RenewedAt() time.Time
	TTL() time.Duration

	// Renew attempts to reset the TTL on the lease.
	// Returns ErrLeaseExpired if the lease has expired or was deleted.
	Renew(ctx context.Context) error

	// Marks the lease as handed-off to another node.
	// This should send the nodeID to the channel returned by HandoffCh().
	Handoff(ctx context.Context, nodeID uint64) error
	HandoffCh() <-chan uint64

	// Close attempts to remove the lease from the server.
	Close() error
}

Lease represents an acquired lease from a Leaser.

type Leaser

type Leaser interface {
	io.Closer

	// Type returns the name of the leaser.
	Type() string

	Hostname() string
	AdvertiseURL() string

	// Acquire attempts to acquire the lease to become the primary.
	Acquire(ctx context.Context) (Lease, error)

	// AcquireExisting returns a lease from an existing lease ID.
	// This occurs when the primary is handed off to a replica node.
	AcquireExisting(ctx context.Context, leaseID string) (Lease, error)

	// PrimaryInfo attempts to read the current primary data.
	// Returns ErrNoPrimary if no primary currently has the lease.
	PrimaryInfo(ctx context.Context) (PrimaryInfo, error)

	// ClusterID returns the cluster ID set on the leaser.
	// This is used to ensure two clusters do not accidentally overlap.
	ClusterID(ctx context.Context) (string, error)

	// SetClusterID sets the cluster ID on the leaser.
	SetClusterID(ctx context.Context, clusterID string) error
}

Leaser represents an API for obtaining a lease for leader election.

type LockType

type LockType int

LockType represents a SQLite lock type.

func ParseDatabaseLockRange added in v0.3.0

func ParseDatabaseLockRange(start, end uint64) []LockType

ParseDatabaseLockRange returns a list of SQLite database locks that are within a range.

This does not include the HALT lock as that is specific to LiteFS and we don't want to accidentally include it when locking/unlocking the whole file.

func ParseSHMLockRange added in v0.3.0

func ParseSHMLockRange(start, end uint64) []LockType

ParseSHMLockRange returns a list of SQLite WAL locks that are within a range.

func (LockType) String added in v0.3.0

func (t LockType) String() string

String returns the name of the lock type.

type NodeInfo added in v0.4.0

type NodeInfo struct {
	ClusterID string `json:"clusterID,omitempty"` // cluster ID
	IsPrimary bool   `json:"isPrimary"`           // if true, node is currently primary
	Candidate bool   `json:"candidate"`           // if true, node is eligible to be primary
	Path      string `json:"path"`                // data directory

	Primary struct {
		Hostname string `json:"hostname"`
	} `json:"primary"`
}

NodeInfo represents basic info about a node.

type OS added in v0.5.6

type OS interface {
	Create(op, name string) (*os.File, error)
	Mkdir(op, path string, perm os.FileMode) error
	MkdirAll(op, path string, perm os.FileMode) error
	Open(op, name string) (*os.File, error)
	OpenFile(op, opname string, flag int, perm os.FileMode) (*os.File, error)
	ReadDir(op, opname string) ([]os.DirEntry, error)
	ReadFile(op, name string) ([]byte, error)
	Remove(op, name string) error
	RemoveAll(op, name string) error
	Rename(op, oldpath, newpath string) error
	Stat(op, name string) (os.FileInfo, error)
	Truncate(op, name string, size int64) error
	WriteFile(op, name string, data []byte, perm os.FileMode) error
}

OS represents an interface for os package calls so they can be mocked for testing.

type PrimaryChangeEventData added in v0.5.5

type PrimaryChangeEventData struct {
	IsPrimary bool   `json:"isPrimary"`
	Hostname  string `json:"hostname,omitempty"`
}

type PrimaryInfo added in v0.2.0

type PrimaryInfo struct {
	Hostname     string `json:"hostname"`
	AdvertiseURL string `json:"advertise-url"`
}

PrimaryInfo is the JSON object stored in the Consul lease value.

func (*PrimaryInfo) Clone added in v0.2.0

func (info *PrimaryInfo) Clone() *PrimaryInfo

Clone returns a copy of info.

type RWMutex

type RWMutex struct {

	// If set, this function is called when the state transitions.
	// Must be set before use of the mutex or its guards.
	OnLockStateChange func(prevState, newState RWMutexState)
	// contains filtered or unexported fields
}

RWMutex is a reader/writer mutual exclusion lock. It wraps the sync package to provide additional capabilities such as lock upgrades & downgrades. It only supports TryLock() & TryRLock() as that is what's supported by our FUSE file system.

func (*RWMutex) Guard added in v0.2.0

func (rw *RWMutex) Guard() RWMutexGuard

Guard returns an unlocked guard for the mutex.

func (*RWMutex) State

func (rw *RWMutex) State() RWMutexState

State returns whether the mutex has a exclusive lock, one or more shared locks, or if the mutex is unlocked.

type RWMutexGuard

type RWMutexGuard struct {
	// contains filtered or unexported fields
}

RWMutexGuard is a reference to a mutex. Locking, unlocking, upgrading, & downgrading operations are all performed via the guard instead of directly on the RWMutex itself as this works similarly to how POSIX locks work.

func (*RWMutexGuard) CanLock

func (g *RWMutexGuard) CanLock() (canLock bool, mutexState RWMutexState)

CanLock returns true if the guard can become an exclusive lock. Also returns the current state of the underlying mutex to determine if the lock is blocked by a shared or exclusive lock.

func (*RWMutexGuard) CanRLock added in v0.2.0

func (g *RWMutexGuard) CanRLock() bool

CanRLock returns true if the guard can become a shared lock.

func (*RWMutexGuard) Lock added in v0.2.0

func (g *RWMutexGuard) Lock(ctx context.Context) error

Lock attempts to obtain a exclusive lock for the guard. Returns an error if ctx is done.

func (*RWMutexGuard) RLock

func (g *RWMutexGuard) RLock(ctx context.Context) error

RLock attempts to obtain a shared lock for the guard. Returns an error if ctx is done.

func (*RWMutexGuard) State added in v0.3.0

func (g *RWMutexGuard) State() RWMutexState

State returns the current state of the guard.

func (*RWMutexGuard) TryLock

func (g *RWMutexGuard) TryLock() bool

TryLock upgrades the lock from a shared lock to an exclusive lock. This is a no-op if the lock is already an exclusive lock. This function will trigger OnLockStateChange on the mutex, if set, and if state changes.

func (*RWMutexGuard) TryRLock added in v0.2.0

func (g *RWMutexGuard) TryRLock() bool

TryRLock attempts to obtain a shared lock on the mutex for the guard. This will upgrade an unlocked guard and downgrade an exclusive guard. Shared guards are a no-op.

func (*RWMutexGuard) Unlock

func (g *RWMutexGuard) Unlock()

Unlock unlocks the underlying mutex.

type RWMutexState

type RWMutexState int

RWMutexState represents the lock state of an RWMutex or RWMutexGuard.

func (RWMutexState) String added in v0.2.0

func (s RWMutexState) String() string

String returns the string representation of the state.

type ReadyStreamFrame added in v0.2.0

type ReadyStreamFrame struct{}

func (*ReadyStreamFrame) ReadFrom added in v0.2.0

func (f *ReadyStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*ReadyStreamFrame) Type added in v0.2.0

func (*ReadyStreamFrame) WriteTo added in v0.2.0

func (f *ReadyStreamFrame) WriteTo(w io.Writer) (int64, error)

type StaticLease added in v0.2.0

type StaticLease struct {
	// contains filtered or unexported fields
}

StaticLease represents a lease for a fixed primary.

func (*StaticLease) Close added in v0.2.0

func (l *StaticLease) Close() error

func (*StaticLease) Handoff added in v0.4.0

func (l *StaticLease) Handoff(ctx context.Context, nodeID uint64) error

Handoff always returns an error.

func (*StaticLease) HandoffCh added in v0.4.0

func (l *StaticLease) HandoffCh() <-chan uint64

HandoffCh always returns a nil channel.

func (*StaticLease) ID added in v0.4.0

func (l *StaticLease) ID() string

ID always returns a blank string.

func (*StaticLease) Renew added in v0.2.0

func (l *StaticLease) Renew(ctx context.Context) error

Renew is a no-op.

func (*StaticLease) RenewedAt added in v0.2.0

func (l *StaticLease) RenewedAt() time.Time

RenewedAt returns the Unix epoch in UTC.

func (*StaticLease) TTL added in v0.2.0

func (l *StaticLease) TTL() time.Duration

TTL returns the duration until the lease expires which is a time well into the future.

type StaticLeaser added in v0.2.0

type StaticLeaser struct {
	// contains filtered or unexported fields
}

StaticLeaser always returns a lease to a static primary.

func NewStaticLeaser added in v0.2.0

func NewStaticLeaser(isPrimary bool, hostname, advertiseURL string) *StaticLeaser

NewStaticLeaser returns a new instance of StaticLeaser.

func (*StaticLeaser) Acquire added in v0.2.0

func (l *StaticLeaser) Acquire(ctx context.Context) (Lease, error)

Acquire returns a lease if this node is the static primary. Otherwise returns ErrPrimaryExists.

func (*StaticLeaser) AcquireExisting added in v0.4.0

func (l *StaticLeaser) AcquireExisting(ctx context.Context, leaseID string) (Lease, error)

AcquireExisting always returns an error. Static leasing does not support handoff.

func (*StaticLeaser) AdvertiseURL added in v0.2.0

func (l *StaticLeaser) AdvertiseURL() string

AdvertiseURL returns the primary URL if this is the primary. Otherwise returns blank.

func (*StaticLeaser) Close added in v0.2.0

func (l *StaticLeaser) Close() (err error)

Close is a no-op.

func (*StaticLeaser) ClusterID added in v0.5.0

func (l *StaticLeaser) ClusterID(ctx context.Context) (string, error)

ClusterID always returns a blank string for the static leaser.

func (*StaticLeaser) Hostname added in v0.5.7

func (l *StaticLeaser) Hostname() string

func (*StaticLeaser) IsPrimary added in v0.2.0

func (l *StaticLeaser) IsPrimary() bool

IsPrimary returns true if the current node is the primary.

func (*StaticLeaser) PrimaryInfo added in v0.2.0

func (l *StaticLeaser) PrimaryInfo(ctx context.Context) (PrimaryInfo, error)

PrimaryInfo returns the primary's info. Returns ErrNoPrimary if the node is the primary.

func (*StaticLeaser) SetClusterID added in v0.5.0

func (l *StaticLeaser) SetClusterID(ctx context.Context, clusterID string) error

SetClusterID is always a no-op for the static leaser.

func (*StaticLeaser) Type added in v0.5.0

func (l *StaticLeaser) Type() string

Type returns "static".

type Store

type Store struct {

	// The operating system interface to use for system calls. Defaults to SystemOS.
	OS   OS
	Exit func(int)

	// Client used to connect to other LiteFS instances.
	Client Client

	// Leaser manages the lease that controls leader election.
	Leaser Leaser

	// BackupClient is the client to connect to an external backup service.
	BackupClient BackupClient

	// If true, LTX files are compressed using LZ4.
	Compress bool

	// Time to wait after disconnecting from the primary to reconnect.
	ReconnectDelay time.Duration

	// Time to wait after manually demoting trying to become primary again.
	DemoteDelay time.Duration

	// Length of time to retain LTX files.
	Retention                time.Duration
	RetentionMonitorInterval time.Duration

	// Max time to hold HALT lock and interval between expiration checks.
	HaltLockTTL             time.Duration
	HaltLockMonitorInterval time.Duration

	// Time to wait to acquire the HALT lock.
	HaltAcquireTimeout time.Duration

	// Time after a change is made before it is sent to the backup service.
	// This allows multiple changes in quick succession to be batched together.
	BackupDelay time.Duration

	// Interval between checks to re-fetch the position map. This ensures that
	// restores on the backup server are detected by the LiteFS primary.
	BackupFullSyncInterval time.Duration

	// Callback to notify kernel of file changes.
	Invalidator Invalidator

	// Interface to interact with the host environment.
	Environment Environment

	// Specifies a subset of databases to replicate from the primary.
	DatabaseFilter []string

	// If true, computes and verifies the checksum of the entire database
	// after every transaction. Should only be used during testing.
	StrictVerify bool
	// contains filtered or unexported fields
}

Store represents a collection of databases.

func NewStore

func NewStore(path string, candidate bool) *Store

NewStore returns a new instance of Store.

func (*Store) Candidate added in v0.2.0

func (s *Store) Candidate() bool

Candidate returns true if store is eligible to be the primary.

func (*Store) Close

func (s *Store) Close() (retErr error)

Close signals for the store to shut down.

func (*Store) ClusterID added in v0.5.0

func (s *Store) ClusterID() string

ClusterID returns the cluster ID.

func (*Store) ClusterIDPath added in v0.5.0

func (s *Store) ClusterIDPath() string

ClusterIDPath returns the filename where the cluster ID is stored.

func (*Store) CreateDB

func (s *Store) CreateDB(name string) (db *DB, f *os.File, err error)

CreateDB creates a new database with the given name. The returned file handle must be closed by the caller. Returns an error if a database with the same name already exists.

func (*Store) CreateDBIfNotExists added in v0.2.0

func (s *Store) CreateDBIfNotExists(name string) (*DB, error)

CreateDBIfNotExists creates an empty database with the given name.

func (*Store) DB

func (s *Store) DB(name string) *DB

DBByName returns a database by name. Returns nil if the database does not exist.

func (*Store) DBDir

func (s *Store) DBDir() string

DBDir returns the folder that stores all databases.

func (*Store) DBPath added in v0.2.0

func (s *Store) DBPath(name string) string

DBPath returns the folder that stores a single database.

func (*Store) DBs

func (s *Store) DBs() []*DB

DBs returns a list of databases.

func (*Store) Demote added in v0.3.0

func (s *Store) Demote()

Demote instructs store to destroy its primary lease, if any. Store will wait momentarily before attempting to become primary again.

func (*Store) EnforceHaltLockExpiration added in v0.4.0

func (s *Store) EnforceHaltLockExpiration(ctx context.Context)

EnforceHaltLockExpiration expires any overdue HALT locks.

func (*Store) EnforceRetention added in v0.2.0

func (s *Store) EnforceRetention(ctx context.Context) (err error)

EnforceRetention enforces retention of LTX files on all databases.

func (*Store) Expvar added in v0.4.0

func (s *Store) Expvar() expvar.Var

Expvar returns a variable for debugging output.

func (*Store) Handoff added in v0.4.0

func (s *Store) Handoff(ctx context.Context, nodeID uint64) error

Handoff instructs store to send its lease to a connected replica.

func (*Store) ID added in v0.2.0

func (s *Store) ID() uint64

ID returns the unique identifier for this instance. Available after Open(). Persistent across restarts if underlying storage is persistent.

func (*Store) IsPrimary

func (s *Store) IsPrimary() bool

IsPrimary returns true if store has a lease to be the primary.

func (*Store) Lag added in v0.5.3

func (s *Store) Lag() time.Duration

Lag returns the number of seconds that the local instance is lagging behind the primary node. Returns 0 if the node is the primary or if the node is not marked as ready yet.

func (*Store) MarkDirty

func (s *Store) MarkDirty(name string)

MarkDirty marks a database dirty on all subscribers.

func (*Store) NotifyEvent added in v0.5.5

func (s *Store) NotifyEvent(event Event)

NotifyEvent sends event to all event subscribers. If a subscriber has no additional buffer space available then it is closed.

func (*Store) Open

func (s *Store) Open() error

Open initializes the store based on files in the data directory.

func (*Store) Path

func (s *Store) Path() string

Path returns underlying data directory.

func (*Store) PosMap

func (s *Store) PosMap() map[string]ltx.Pos

PosMap returns a map of databases and their transactional position.

func (*Store) PrimaryCtx added in v0.2.0

func (s *Store) PrimaryCtx(ctx context.Context) context.Context

PrimaryCtx wraps ctx with another context that will cancel when no longer primary.

func (*Store) PrimaryInfo added in v0.2.0

func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo)

PrimaryInfo returns info about the current primary.

func (*Store) PrimaryInfoWithContext added in v0.5.7

func (s *Store) PrimaryInfoWithContext(ctx context.Context) (isPrimary bool, info *PrimaryInfo)

PrimaryInfoWithContext continually attempts to fetch the primary info until available. Returns when isPrimary is true, info is non-nil, or when ctx is done.

func (*Store) PrimaryTimestamp added in v0.5.0

func (s *Store) PrimaryTimestamp() int64

PrimaryTimestamp returns the last timestamp (ms since epoch) received from the primary. Returns -1 if we are the primary or if we haven't finished initial replication yet.

func (*Store) ReadyCh added in v0.2.0

func (s *Store) ReadyCh() chan struct{}

ReadyCh returns a channel that is closed once the store has become primary or once it has connected to the primary.

func (*Store) Recover added in v0.3.0

func (s *Store) Recover(ctx context.Context) (err error)

Recover forces a rollback (journal) or checkpoint (wal) on all open databases. This is done when switching the primary/replica state.

func (*Store) SubscribeChangeSet added in v0.5.5

func (s *Store) SubscribeChangeSet(nodeID uint64) *ChangeSetSubscriber

SubscribeChangeSet creates a new subscriber for store changes.

func (*Store) SubscribeEvents added in v0.5.5

func (s *Store) SubscribeEvents() *EventSubscriber

SubscribeEvents creates a new subscriber for store events.

func (*Store) SubscriberByNodeID added in v0.4.0

func (s *Store) SubscriberByNodeID(nodeID uint64) *ChangeSetSubscriber

SubscriberByNodeID returns a subscriber by node ID. Returns nil if the node is not currently subscribed to the store.

func (*Store) SyncBackup added in v0.5.0

func (s *Store) SyncBackup(ctx context.Context) error

SyncBackup connects to a backup server performs a one-time sync.

func (*Store) UnsubscribeChangeSet added in v0.5.5

func (s *Store) UnsubscribeChangeSet(sub *ChangeSetSubscriber)

UnsubscribeChangeSet removes a subscriber from the store.

func (*Store) UnsubscribeEvents added in v0.5.5

func (s *Store) UnsubscribeEvents(sub *EventSubscriber)

UnsubscribeEvents removes an event subscriber from the store.

type StoreVar added in v0.2.0

type StoreVar Store

func (*StoreVar) String added in v0.2.0

func (v *StoreVar) String() string

type Stream added in v0.5.0

type Stream interface {
	io.ReadCloser

	// ClusterID of the primary node.
	ClusterID() string
}

Stream represents a stream of frames.

type StreamFrame

type StreamFrame interface {
	io.ReaderFrom
	io.WriterTo
	Type() StreamFrameType
}

func ReadStreamFrame

func ReadStreamFrame(r io.Reader) (StreamFrame, error)

ReadStreamFrame reads a the stream type & frame from the reader.

type StreamFrameType

type StreamFrameType uint32

type TxEventData added in v0.5.5

type TxEventData struct {
	TXID              ltx.TXID     `json:"txID"`
	PostApplyChecksum ltx.Checksum `json:"postApplyChecksum"`
	PageSize          uint32       `json:"pageSize"`
	Commit            uint32       `json:"commit"`
	Timestamp         time.Time    `json:"timestamp"`
}

type WALReader added in v0.3.0

type WALReader struct {
	// contains filtered or unexported fields
}

WALReader wraps an io.Reader and parses SQLite WAL frames.

This reader verifies the salt & checksum integrity while it reads. It does not enforce transaction boundaries (i.e. it may return uncommitted frames). It is the responsibility of the caller to handle this.

func NewWALReader added in v0.3.0

func NewWALReader(r io.Reader) *WALReader

NewWALReader returns a new instance of WALReader.

func (*WALReader) Offset added in v0.3.0

func (r *WALReader) Offset() int64

Offset returns the file offset of the last read frame. Returns zero if no frames have been read.

func (*WALReader) PageSize added in v0.3.0

func (r *WALReader) PageSize() uint32

PageSize returns the page size from the header. Must call ReadHeader() first.

func (*WALReader) ReadFrame added in v0.3.0

func (r *WALReader) ReadFrame(data []byte) (pgno, commit uint32, err error)

ReadFrame reads the next frame from the WAL and returns the page number. Returns io.EOF at the end of the valid WAL.

func (*WALReader) ReadHeader added in v0.3.0

func (r *WALReader) ReadHeader() error

ReadHeader reads the WAL header into the reader. Returns io.EOF if WAL is invalid.

Directories

Path Synopsis
cmd
litefs
go:build linux
go:build linux

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL