wal

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2023 License: BSD-3-Clause Imports: 14 Imported by: 0

README ΒΆ

Go Write-Ahead Log πŸƒπŸ§Ύ

A write-ahead logging (WAL) implementation in Go.

THIS SOFTWARE IS STILL IN ALPHA AND THERE ARE NO GUARANTEES REGARDING API STABILITY YET.


Package wal implements an efficient Write-ahead log for Go applications.

The main goal of a Write-ahead Log (WAL) is to make the application more durable, so it does not lose data in case of a crash. WALs are used in applications such as database systems to flush all written data to disk before the changes are written to the database. In case of a crash, the WAL enables the application to recover lost in-memory changes by reconstructing all required operations from the log.

Example usage

The code below is a copy of example_test.go. It shows the general usage of this library together with some explanation.

package wal_test

import (
	"fmt"
	"os"

	"github.com/fgrosse/wal"
	"github.com/fgrosse/wal/waltest"
	"go.uber.org/zap"
)

// walEntries is an unexported package level variable that is used to register
// your own wal.Entry implementations. Such an Entry contains the logic of how
// to encode and decode a WAL with your custom data. Each wal.Entry is also
// associated with a unique wa.EntryType so we are able to map the binary
// representation back to your original Go type.
//
// In the example below we use two example implementations which are only
// available in unit tests. You might want to look into their implementation
// (see github.com/fgrosse/wal/waltest) to understand how you can efficiently
// implement your own encoding and decoding logic.
var walEntries = wal.NewEntryRegistry(
	func() wal.Entry { return new(waltest.ExampleEntry1) },
	func() wal.Entry { return new(waltest.ExampleEntry2) },
)

func Example() {
	// The WAL will persist all written entries onto disk in an efficient
	// append-only log file. Entries are split over multiple WAL segment files.
	// To create a new WAL, you have to provide a path to the directory where
	// the segment files will be stored.
	path, err := os.MkdirTemp("", "WALExample")
	check(err)

	// There are a few runtime options for the WAL which have an impact on its
	// performance and durability guarantees. By default, the WAL prefers strong
	// durability and will fsync each write to disk immediately. Under high
	// throughput, such a configuration can make the WAL a bottleneck of your
	// application. Therefore, it might make sense to configure a SyncDelay to
	// let the WAL automatically badge up fyncs for multiple writes.
	conf := wal.DefaultConfiguration()

	// This library uses go.uber.org/zap for efficient structured logging.
	logger, err := zap.NewProduction()
	check(err)

	// When you create a new WAL instance, it will immediately try and load any
	// existent WAL segments from the path you provided. The `walEntries` parameter
	// that is passed to wal.New(…) is an EntryRegistry which lets the WAL know
	// about your own Entry implementation. This way, you can specify your own types
	// and encoding/decoding logic but the WAL is still able to load entries from
	// the last segment.
	w, err := wal.New(path, conf, walEntries, logger)
	check(err)

	// Now you can finally write your first WAL entry. When this function
	// returns without an error you can be sure that it was fully written to disk.
	offset, err := w.Write(&waltest.ExampleEntry1{
		ID:    42,
		Point: []float32{1, 2, 3},
	})

	// You might use the offset in your application or ignore it altogether.
	fmt.Print(offset)

	// Finally, you need to close the WAL to release any resources and close the
	// open segment file.
	err = w.Close()
	check(err)
}

// check is a simple helper function to check errors in Example().
// In a real application, you should implement proper error handling.
func check(err error) {
	if err != nil {
		panic(err)
	}
}

Encoding your own WAL Entries

Your custom entries must implement the wal.Entry interface:

// Entry is a single record of the Write Ahead Log.
// It is up to the application that uses the WAL to provide at least one concrete
// Entry implementation to the WAL via the EntryRegistry.
type Entry interface {
	Type() EntryType

	// EncodePayload encodes the payload into the provided buffer. In case the
	// buffer is too small to fit the entire payload, this function can grow the
	// old and return a new slice. Otherwise, the old slice must be returned.
	EncodePayload([]byte) []byte

	// ReadPayload reads the payload from the reader but does not yet decode it.
	// Reading and decoding are separate steps for performance reasons. Sometimes
	// we might want to quickly seek through the WAL without having to decode
	// every entry.
	ReadPayload(r io.Reader) ([]byte, error)

	// DecodePayload decodes an entry from a payload that has previously been read
	// by ReadPayload(…).
	DecodePayload([]byte) error
}

// EntryType is used to distinguish different types of messages that we write
// to the WAL.
type EntryType uint8

You can find an example implementation at entry_test.go.

How it works

Each WAL.Write(…) call creates a binary encoding of the passed wal.Entry which we call the entry's payload. This payload is written to disk together with some metadata such as the entry type, a CRC checksum and an offset number.

The full binary layout looks like the following:

// Every Entry is written, using the following binary layout (big endian format):
//
//	  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//	  β”‚ Offset (4B) β”‚ Type (1B) β”‚ CRC (4B) β”‚ Payload β”‚
//	  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//
//		- Offset = 32bit WAL entry number for each record in order to implement a low-water mark
//		- Type = Type of WAL entry
//		- CRC = 32bit hash computed over the payload using CRC
//		- Payload = The actual WAL entry payload data

This data is appended to a file and the WAL makes sure that it is actually written to non-volatile storage rather than just being stored in a memory-based write cache that would be lost if power failed (see fsynced).

When the WAL file reaches a configurable maximum size, it is closed and the WAL starts to append its records to a new and empty file. These files are called WAL segments. Typically, the WAL is split into multiple segments to enable other processes to take care of cleaning old segments, implement WAL segment backups and more. When the WAL is started, it will resume operation at the end of the last open segment file.

Installation

$ go get github.com/fgrosse/wal

Built With

  • zap - Blazing fast, structured, leveled logging in Go
  • testify - A simple unit test library
  • and more

Contributing

Please read CONTRIBUTING.md for details on our code of conduct and on the process for submitting pull requests to this repository.

Versioning

THIS SOFTWARE IS STILL IN ALPHA AND THERE ARE NO GUARANTEES REGARDING API STABILITY YET.

All significant (e.g. breaking) changes are documented in the CHANGELOG.md.

After the v1.0 release we plan to use SemVer for versioning. For the versions available, see the releases page.

Authors

  • Friedrich Große - Initial work - fgrosse

See also the list of contributors who participated in this project.

License

This project is licensed under the BSD-3-Clause License - see the LICENSE file for details.

Documentation ΒΆ

Overview ΒΆ

Example ΒΆ
package main

import (
	"fmt"
	"os"

	"github.com/fgrosse/wal"
	"github.com/fgrosse/wal/waltest"
	"go.uber.org/zap"
)

// walEntries is an unexported package level variable that is used to register
// your own wal.Entry implementations. Such an Entry contains the logic of how
// to encode and decode a WAL with your custom data. Each wal.Entry is also
// associated with a unique wa.EntryType so we are able to map the binary
// representation back to your original Go type.
//
// In the example below we use two example implementations which are only
// available in unit tests. You might want to look into their implementation
// (see github.com/fgrosse/wal/waltest) to understand how you can efficiently
// implement your own encoding and decoding logic.
var walEntries = wal.NewEntryRegistry(
	func() wal.Entry { return new(waltest.ExampleEntry1) },
	func() wal.Entry { return new(waltest.ExampleEntry2) },
)

func main() {
	// The WAL will persist all written entries onto disk in an efficient
	// append-only log file. Entries are split over multiple WAL segment files.
	// To create a new WAL, you have to provide a path to the directory where
	// the segment files will be stored.
	path, err := os.MkdirTemp("", "WALExample")
	check(err)

	// There are a few runtime options for the WAL which have an impact on its
	// performance and durability guarantees. By default, the WAL prefers strong
	// durability and will fsync each write to disk immediately. Under high
	// throughput, such a configuration can make the WAL a bottleneck of your
	// application. Therefore, it might make sense to configure a SyncDelay to
	// let the WAL automatically badge up fyncs for multiple writes.
	conf := wal.DefaultConfiguration()

	// This library uses go.uber.org/zap for efficient structured logging.
	logger, err := zap.NewProduction()
	check(err)

	// When you create a new WAL instance, it will immediately try and load any
	// existent WAL segments from the path you provided. The `walEntries` parameter
	// that is passed to wal.New(…) is an EntryRegistry which lets the WAL know
	// about your own Entry implementation. This way, you can specify your own types
	// and encoding/decoding logic but the WAL is still able to load entries from
	// the last segment.
	w, err := wal.New(path, conf, walEntries, logger)
	check(err)

	// Now you can finally write your first WAL entry. When this function
	// returns without an error you can be sure that it was fully written to disk.
	offset, err := w.Write(&waltest.ExampleEntry1{
		ID:    42,
		Point: []float32{1, 2, 3},
	})

	// You might use the offset in your application or ignore it altogether.
	fmt.Print(offset)

	// Finally, you need to close the WAL to release any resources and close the
	// open segment file.
	err = w.Close()
	check(err)
}

// check is a simple helper function to check errors in Example().
// In a real application, you should implement proper error handling.
func check(err error) {
	if err != nil {
		panic(err)
	}
}
Output:

Index ΒΆ

Examples ΒΆ

Constants ΒΆ

View Source
const (
	DefaultWriteBufferSize  = 16 * 1024
	DefaultMaxSegmentSize   = 10 * 1024 * 1024
	DefaultEntryPayloadSize = 128 // TODO: no clue if this a good default and if a *default* here makes sense generally
)

Default configuration options. You can use the DefaultConfiguration() function to create a Configuration instance that uses these constants.

Variables ΒΆ

This section is empty.

Functions ΒΆ

func SegmentFileNames ΒΆ added in v0.2.0

func SegmentFileNames(dir string) ([]string, error)

SegmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.

Types ΒΆ

type Configuration ΒΆ

type Configuration struct {
	WriteBufferSize  int // the size of the segment write buffer in bytes
	MaxSegmentSize   int // the file size in bytes at which the segment files will be rotated
	EntryPayloadSize int // the default size for entry payloads. can be tuned to reduce allocations

	// SyncDelay is the duration to wait for syncing writes to disk. The default
	// value 0 will cause every write to be synced immediately.
	SyncDelay time.Duration
}

Configuration contains all settings of a write-ahead log.

func DefaultConfiguration ΒΆ

func DefaultConfiguration() Configuration

DefaultConfiguration returns a new Configuration instance that contains all default WAL parameters.

func (Configuration) MarshalLogObject ΒΆ

func (c Configuration) MarshalLogObject(enc zapcore.ObjectEncoder) error

MarshalLogObject implements the zapcore.ObjectMarshaler interface.

type Entry ΒΆ

type Entry interface {
	Type() EntryType

	// EncodePayload encodes the payload into the provided buffer. In case the
	// buffer is too small to fit the entire payload, this function can grow the
	// old and return a new slice. Otherwise, the old slice must be returned.
	EncodePayload([]byte) []byte

	// ReadPayload reads the payload from the reader but does not yet decode it.
	// Reading and decoding are separate steps for performance reasons. Sometimes
	// we might want to quickly seek through the WAL without having to decode
	// every entry.
	ReadPayload(r io.Reader) ([]byte, error)

	// DecodePayload decodes an entry from a payload that has previously been read
	// by ReadPayload(…).
	DecodePayload([]byte) error
}

Entry is a single record of the Write Ahead Log. It is up to the application that uses the WAL to provide at least one concrete Entry implementation to the WAL via the EntryRegistry.

type EntryConstructor ΒΆ added in v0.2.0

type EntryConstructor func() Entry

EntryConstructor is the constructor function of a specific Entry implementation.

type EntryRegistry ΒΆ added in v0.2.0

type EntryRegistry struct {
	// contains filtered or unexported fields
}

The EntryRegistry keeps track of all known Entry implementations. This is necessary in order to instantiate the correct types when loading WAL segments.

func NewEntryRegistry ΒΆ added in v0.2.0

func NewEntryRegistry(constructors ...EntryConstructor) *EntryRegistry

NewEntryRegistry creates a new EntryRegistry. If you pass any constructor to this function, each must create a unique Entry implementation (i.e. one which returns a unique EntryType). Otherwise, this function panics.

Alternatively, you can register the constructor functions using EntryRegistry.Register(…).

func (*EntryRegistry) New ΒΆ added in v0.2.0

func (r *EntryRegistry) New(typ EntryType) (Entry, error)

New instantiates a new Entry implementation that was previously registered for the requested EntryType. An error is returned if no Entry was registered for this type.

func (*EntryRegistry) Register ΒΆ added in v0.2.0

func (r *EntryRegistry) Register(constructor EntryConstructor) error

Register an EntryConstructor function. Each Entry will be registered with the EntryType that is returned by the corresponding Entry.Type().

An error is returned if this constructor was already registered; i.e. a constructor was already registered that creates an Entry with the same EntryType as this constructor's Entry.

type EntryType ΒΆ

type EntryType uint8

EntryType is used to distinguish different types of messages that we write to the WAL.

type SegmentReader ΒΆ

type SegmentReader struct {
	// contains filtered or unexported fields
}

The SegmentReader is responsible for reading WAL entries from their binary representation, typically from disk. It is used by the WAL to automatically resume the last open segment upon startup, but it can also be used to manually iterate through WAL segments.

The complete usage pattern looks like this:

r, err := NewSegmentReader(…)
…

for r.ReadNext() {
  offset := r.Offset()
  …
  entry, err := r.Decode()
  …
}

if err := r.Err(); err != nil {
  …
}

func NewSegmentReader ΒΆ

func NewSegmentReader(r io.Reader, registry *EntryRegistry) (*SegmentReader, error)

NewSegmentReader creates a new SegmentReader that reads encoded WAL entries from the provided reader. The registry is used to map the entry types that have been read to their Entry implementations which contain the decoding logic.

func (*SegmentReader) Decode ΒΆ added in v0.2.0

func (r *SegmentReader) Decode() (Entry, error)

Decode decodes the last entry that was read using SegmentReader.ReadNext().

func (*SegmentReader) Err ΒΆ

func (r *SegmentReader) Err() error

Err returns any error that happened when calling ReadNext(). This function must always be called even if ReadNext() never returned true.

Please refer to the comment on the SegmentReader type to see the full usage pattern.

func (*SegmentReader) Offset ΒΆ added in v0.2.0

func (r *SegmentReader) Offset() uint32

Offset returns the offset of the last entry that was read by SegmentReader.ReadNext().

func (*SegmentReader) ReadNext ΒΆ added in v0.2.0

func (r *SegmentReader) ReadNext() bool

ReadNext loads the data for the next Entry from the underlying reader. For efficiency reasons, this function neither checks the entry checksum, nor does it decode the entry bytes. This is done, so the caller can quickly seek through a WAL up to a specific offset without having to decode each WAL entry.

You can get the offset of the current entry using SegmentReader.Offset(). In order to actually decode the read WAL entry, you need to use SegmentReader.Decode(…).

func (*SegmentReader) SeekEnd ΒΆ added in v0.2.0

func (r *SegmentReader) SeekEnd() (lastOffset uint32, err error)

SeekEnd reads through the entire segment until the end and returns the last offset.

type SegmentWriter ΒΆ

type SegmentWriter struct {
	// contains filtered or unexported fields
}

The SegmentWriter is responsible for writing WAL entry records to disk. This type handles the necessary buffered I/O as well as file system syncing.

Every Entry is written, using the following binary layout (big endian format):

  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Offset (4B) β”‚ Type (1B) β”‚ CRC (4B) β”‚ Payload β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

	- Offset = 32bit WAL entry number for each record in order to implement a low-water mark
	- Type = Type of WAL entry
	- CRC = 32bit hash computed over the payload using CRC
	- Payload = The actual WAL entry payload data

func NewSegmentWriter ΒΆ

func NewSegmentWriter(w io.WriteCloser) *SegmentWriter

NewSegmentWriter returns a new SegmentWriter writing to w, using the default write buffer size.

func NewSegmentWriterSize ΒΆ

func NewSegmentWriterSize(w io.WriteCloser, bufferSize int) *SegmentWriter

NewSegmentWriterSize returns a new SegmentWriter writing to w whose buffer has at least the specified size.

func (*SegmentWriter) Close ΒΆ

func (w *SegmentWriter) Close() error

Close ensures that all buffered data is flushed to disk before and then closes the associated writer or file.

func (*SegmentWriter) Sync ΒΆ

func (w *SegmentWriter) Sync() error

Sync writes any buffered data to the underlying io.Writer and syncs the file systems in-memory copy of recently written data to disk if we are writing to an os.File.

func (*SegmentWriter) Write ΒΆ

func (w *SegmentWriter) Write(offset uint32, typ EntryType, checksum uint32, payload []byte) error

Write a new WAL entry.

Note, that we do not use the Entry interface here because encoding the payload is done at an earlier stage than actually writing data to the WAL segment.

type WAL ΒΆ

type WAL struct {
	// contains filtered or unexported fields
}

WAL is a write-ahead log implementation.

func New ΒΆ

func New(path string, conf Configuration, registry *EntryRegistry, logger *zap.Logger) (*WAL, error)

New creates a new WAL instance that writes and reads segment files to a directory at the provided path.

func (*WAL) Close ΒΆ

func (w *WAL) Close() error

Close gracefully shuts down the writeAheadLog by making sure that all pending writes are completed and synced to disk before then closing the WAL segment file. Any future writes after the WAL has been closed will lead to an error.

func (*WAL) Offset ΒΆ

func (w *WAL) Offset() uint32

Offset returns the last offset that the WAL has written to disk

func (*WAL) Write ΒΆ

func (w *WAL) Write(e Entry) (offset uint32, err error)

Directories ΒΆ

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL