wal

package
v0.29.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2023 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const MagicBytesCheckpointHeader uint16 = 0x2137
View Source
const MagicBytesCheckpointSubtrie uint16 = 0x2136
View Source
const MagicBytesCheckpointToptrie uint16 = 0x2135
View Source
const MaxVersion = VersionV6

MaxVersion is the latest checkpoint version we support. Need to update MaxVersion when creating a newer version.

View Source
const SegmentSize = 32 * 1024 * 1024 // 32 MB
View Source
const VersionV1 uint16 = 0x01
View Source
const VersionV3 uint16 = 0x03

Versions was reset while changing trie format, so now bump it to 3 to avoid conflicts Version 3 contains a file checksum for detecting corrupted checkpoint files.

View Source
const VersionV4 uint16 = 0x04

Version 4 contains a footer with node count and trie count (previously in the header). Version 4 also reduces checkpoint data size. See EncodeNode() and EncodeTrie() for more details.

View Source
const VersionV5 uint16 = 0x05

Version 5 includes these changes: - remove regCount and maxDepth from serialized nodes - add allocated register count and size to serialized tries - reduce number of bytes used to encode payload value size from 8 bytes to 4 bytes. See EncodeNode() and EncodeTrie() for more details.

View Source
const VersionV6 uint16 = 0x06

Version 6 includes these changes:

  • trie nodes are stored in additional 17 checkpoint files, with .0, .1, .2, ... .16 as file name extension

Variables

View Source
var ErrEOFNotReached = errors.New("expect to reach EOF, but actually didn't")

ErrEOFNotReached for indicating end of file not reached error

Functions

func Checkpoints

func Checkpoints(dir string) ([]int, error)

Checkpoints returns all the checkpoint numbers in asc order

func CopyCheckpointFile

func CopyCheckpointFile(filename string, from string, to string) (
	[]string,
	error,
)

Copy the checkpoint file including the part files from the given `from` to the `to` directory it returns the path of all the copied files any error returned are exceptions

func CreateCheckpointWriterForFile

func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) (io.WriteCloser, error)

CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it.

func EncodeDelete

func EncodeDelete(rootHash ledger.RootHash) []byte

func EncodeUpdate

func EncodeUpdate(update *ledger.TrieUpdate) []byte

func HasRootCheckpoint

func HasRootCheckpoint(dir string) (bool, error)

func ListCheckpoints

func ListCheckpoints(dir string) ([]int, int, error)

ListCheckpoints returns all the numbers of the checkpoint files, and the number of the last checkpoint. note, it doesn't include the root checkpoint file

func LoadCheckpoint

func LoadCheckpoint(filepath string, logger *zerolog.Logger) (
	tries []*trie.MTrie,
	errToReturn error)

func NumberToFilename

func NumberToFilename(n int) string

func NumberToFilenamePart

func NumberToFilenamePart(n int) string

func OpenAndReadCheckpointV6

func OpenAndReadCheckpointV6(dir string, fileName string, logger *zerolog.Logger) (
	tries []*trie.MTrie,
	errToReturn error,
)

OpenAndReadCheckpointV6 open the checkpoint file and read it with readCheckpointV6

func StoreCheckpointV5

func StoreCheckpointV5(dir string, fileName string, logger *zerolog.Logger, tries ...*trie.MTrie) (

	errToReturn error,
)

StoreCheckpointV5 writes the given tries to checkpoint file, and also appends a CRC32 file checksum for integrity check. Checkpoint file consists of a flattened forest. Specifically, it consists of:

  • a list of encoded nodes, where references to other nodes are by list index.
  • a list of encoded tries, each referencing their respective root node by index.

Referencing to other nodes by index 0 is a special case, meaning nil.

As an important property, the nodes are listed in an order which satisfies Descendents-First-Relationship. The Descendents-First-Relationship has the following important property: When rebuilding the trie from the sequence of nodes, build the trie on the fly, as for each node, the children have been previously encountered. TODO: evaluate alternatives to CRC32 since checkpoint file is many GB in size. TODO: add concurrency if the performance gains are enough to offset complexity.

func StoreCheckpointV6

func StoreCheckpointV6(
	tries []*trie.MTrie, outputDir string, outputFile string, logger *zerolog.Logger, nWorker uint) error

StoreCheckpointV6 stores checkpoint file into a main file and 17 file parts. the main file stores:

  • version
  • checksum of each part file (17 in total)
  • checksum of the main file itself the first 16 files parts contain the trie nodes below the subtrieLevel the last part file contains the top level trie nodes above the subtrieLevel and all the trie root nodes.

nWorker specifies how many workers to encode subtrie concurrently, valid range [1,16]

func StoreCheckpointV6Concurrently

func StoreCheckpointV6Concurrently(tries []*trie.MTrie, outputDir string, outputFile string, logger *zerolog.Logger) error

StoreCheckpointV6Concurrently stores checkpoint file in v6 in max workers, useful during state extraction

func StoreCheckpointV6SingleThread

func StoreCheckpointV6SingleThread(tries []*trie.MTrie, outputDir string, outputFile string, logger *zerolog.Logger) error

StoreCheckpointV6SingleThread stores checkpoint file in v6 in a single threaded manner, useful when EN is executing block.

Types

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer

func NewCheckpointer(wal *DiskWAL, keyByteSize int, forestCapacity int) *Checkpointer

func (*Checkpointer) Checkpoint

func (c *Checkpointer) Checkpoint(to int) (err error)

Checkpoint creates new checkpoint stopping at given segment

func (*Checkpointer) CheckpointWriter

func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error)

func (*Checkpointer) Checkpoints

func (c *Checkpointer) Checkpoints() ([]int, error)

Checkpoints returns all the numbers of the checkpoint files in asc order. note, it doesn't include the root checkpoint file

func (*Checkpointer) Dir

func (c *Checkpointer) Dir() string

func (*Checkpointer) HasRootCheckpoint

func (c *Checkpointer) HasRootCheckpoint() (bool, error)

func (*Checkpointer) LatestCheckpoint

func (c *Checkpointer) LatestCheckpoint() (int, error)

LatestCheckpoint returns number of latest checkpoint or -1 if there are no checkpoints

func (*Checkpointer) LoadCheckpoint

func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error)

func (*Checkpointer) LoadRootCheckpoint

func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error)

func (*Checkpointer) NotCheckpointedSegments

func (c *Checkpointer) NotCheckpointedSegments() (from, to int, err error)

NotCheckpointedSegments - returns numbers of segments which are not checkpointed yet, or -1, -1 if there are no segments

func (*Checkpointer) RemoveCheckpoint

func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error

type Crc32Reader

type Crc32Reader struct {
	// contains filtered or unexported fields
}

func NewCRC32Reader

func NewCRC32Reader(reader io.Reader) *Crc32Reader

func (*Crc32Reader) Crc32

func (c *Crc32Reader) Crc32() uint32

func (*Crc32Reader) Read

func (c *Crc32Reader) Read(p []byte) (int, error)

type Crc32Writer

type Crc32Writer struct {
	Writer io.Writer
	// contains filtered or unexported fields
}

func NewCRC32Writer

func NewCRC32Writer(writer io.Writer) *Crc32Writer

func (*Crc32Writer) Crc32

func (c *Crc32Writer) Crc32() uint32

func (*Crc32Writer) Write

func (c *Crc32Writer) Write(p []byte) (n int, err error)

type DiskWAL

type DiskWAL struct {
	// contains filtered or unexported fields
}

func NewDiskWAL

func NewDiskWAL(logger zerolog.Logger, reg prometheus.Registerer, metrics module.WALMetrics, dir string, forestCapacity int, pathByteSize int, segmentSize int) (*DiskWAL, error)

TODO use real logger and metrics, but that would require passing them to Trie storage

func (*DiskWAL) Done

func (w *DiskWAL) Done() <-chan struct{}

Done implements interface module.ReadyDoneAware it closes all the open write-ahead log files.

func (*DiskWAL) NewCheckpointer

func (w *DiskWAL) NewCheckpointer() (*Checkpointer, error)

NewCheckpointer returns a Checkpointer for this WAL

func (*DiskWAL) PauseRecord

func (w *DiskWAL) PauseRecord()

func (*DiskWAL) Ready

func (w *DiskWAL) Ready() <-chan struct{}

func (*DiskWAL) RecordDelete

func (w *DiskWAL) RecordDelete(rootHash ledger.RootHash) error

func (*DiskWAL) RecordUpdate

func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) (segmentNum int, skipped bool, err error)

RecordUpdate writes the trie update to the write ahead log on disk. if write ahead logging is not paused, it returns the file num (write ahead log) that the trie update was written to. if write ahead logging is enabled, the second returned value is false, otherwise it's true, meaning WAL is disabled.

func (*DiskWAL) Replay

func (w *DiskWAL) Replay(
	checkpointFn func(tries []*trie.MTrie) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(ledger.RootHash) error,
) error

func (*DiskWAL) ReplayLogsOnly

func (w *DiskWAL) ReplayLogsOnly(
	checkpointFn func(tries []*trie.MTrie) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(rootHash ledger.RootHash) error,
) error

func (*DiskWAL) ReplayOnForest

func (w *DiskWAL) ReplayOnForest(forest *mtrie.Forest) error

func (*DiskWAL) Segments

func (w *DiskWAL) Segments() (first, last int, err error)

func (*DiskWAL) UnpauseRecord

func (w *DiskWAL) UnpauseRecord()

type LedgerWAL

type LedgerWAL interface {
	module.ReadyDoneAware

	NewCheckpointer() (*Checkpointer, error)
	PauseRecord()
	UnpauseRecord()
	RecordUpdate(update *ledger.TrieUpdate) (int, bool, error)
	RecordDelete(rootHash ledger.RootHash) error
	ReplayOnForest(forest *mtrie.Forest) error
	Segments() (first, last int, err error)
	Replay(
		checkpointFn func(tries []*trie.MTrie) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(ledger.RootHash) error,
	) error
	ReplayLogsOnly(
		checkpointFn func(tries []*trie.MTrie) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(rootHash ledger.RootHash) error,
	) error
}

type SyncOnCloseRenameFile

type SyncOnCloseRenameFile struct {
	*bufio.Writer
	// contains filtered or unexported fields
}

SyncOnCloseRenameFile is a composite of buffered writer over a given file which flushes/sync on closing and renames to `targetName` as a last step. Typical usecase is to write data to a temporary file and only rename it to target one as the last step. This help avoid situation when writing is interrupted and unusable file but with target name exists.

func (*SyncOnCloseRenameFile) Close

func (s *SyncOnCloseRenameFile) Close() error

func (*SyncOnCloseRenameFile) Sync

func (s *SyncOnCloseRenameFile) Sync() error

func (*SyncOnCloseRenameFile) Write

func (s *SyncOnCloseRenameFile) Write(b []byte) (int, error)

type TrieQueue

type TrieQueue struct {
	// contains filtered or unexported fields
}

TrieQueue is a fix-sized FIFO queue of MTrie. It is only used by Compactor for checkpointing, and it is intentionally not threadsafe given its limited use case. It is not a general purpose queue to avoid incurring overhead for features not needed for its limited use case.

func NewTrieQueue

func NewTrieQueue(capacity uint) *TrieQueue

NewTrieQueue returns a new TrieQueue with given capacity.

func NewTrieQueueWithValues

func NewTrieQueueWithValues(capacity uint, tries []*trie.MTrie) *TrieQueue

NewTrieQueueWithValues returns a new TrieQueue with given capacity and initial values.

func (*TrieQueue) Count

func (q *TrieQueue) Count() int

Count returns element count.

func (*TrieQueue) Push

func (q *TrieQueue) Push(t *trie.MTrie)

Push pushes trie to queue. If queue is full, it overwrites the oldest element.

func (*TrieQueue) Tries

func (q *TrieQueue) Tries() []*trie.MTrie

Tries returns elements in queue, starting from the oldest element to the newest element.

type WALOperation

type WALOperation uint8
const WALDelete WALOperation = 2
const WALUpdate WALOperation = 1

func Decode

func Decode(data []byte) (operation WALOperation, rootHash ledger.RootHash, update *ledger.TrieUpdate, err error)

type WriterSeekerCloser

type WriterSeekerCloser interface {
	io.Writer
	io.Seeker
	io.Closer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL