Documentation ΒΆ
Overview ΒΆ
Example ΒΆ
package main import ( "fmt" "os" "github.com/fgrosse/wal" "github.com/fgrosse/wal/waltest" "go.uber.org/zap" ) // walEntries is an unexported package level variable that is used to register // your own wal.Entry implementations. Such an Entry contains the logic of how // to encode and decode a WAL with your custom data. Each wal.Entry is also // associated with a unique wa.EntryType so we are able to map the binary // representation back to your original Go type. // // In the example below we use two example implementations which are only // available in unit tests. You might want to look into their implementation // (see github.com/fgrosse/wal/waltest) to understand how you can efficiently // implement your own encoding and decoding logic. var walEntries = wal.NewEntryRegistry( func() wal.Entry { return new(waltest.ExampleEntry1) }, func() wal.Entry { return new(waltest.ExampleEntry2) }, ) func main() { // The WAL will persist all written entries onto disk in an efficient // append-only log file. Entries are split over multiple WAL segment files. // To create a new WAL, you have to provide a path to the directory where // the segment files will be stored. path, err := os.MkdirTemp("", "WALExample") check(err) // There are a few runtime options for the WAL which have an impact on its // performance and durability guarantees. By default, the WAL prefers strong // durability and will fsync each write to disk immediately. Under high // throughput, such a configuration can make the WAL a bottleneck of your // application. Therefore, it might make sense to configure a SyncDelay to // let the WAL automatically badge up fyncs for multiple writes. conf := wal.DefaultConfiguration() // This library uses go.uber.org/zap for efficient structured logging. logger, err := zap.NewProduction() check(err) // When you create a new WAL instance, it will immediately try and load any // existent WAL segments from the path you provided. The `walEntries` parameter // that is passed to wal.New(β¦) is an EntryRegistry which lets the WAL know // about your own Entry implementation. This way, you can specify your own types // and encoding/decoding logic but the WAL is still able to load entries from // the last segment. w, err := wal.New(path, conf, walEntries, logger) check(err) // Now you can finally write your first WAL entry. When this function // returns without an error you can be sure that it was fully written to disk. offset, err := w.Write(&waltest.ExampleEntry1{ ID: 42, Point: []float32{1, 2, 3}, }) // You might use the offset in your application or ignore it altogether. fmt.Print(offset) // Finally, you need to close the WAL to release any resources and close the // open segment file. err = w.Close() check(err) } // check is a simple helper function to check errors in Example(). // In a real application, you should implement proper error handling. func check(err error) { if err != nil { panic(err) } }
Output:
Index ΒΆ
Examples ΒΆ
Constants ΒΆ
const ( DefaultWriteBufferSize = 16 * 1024 DefaultMaxSegmentSize = 10 * 1024 * 1024 DefaultEntryPayloadSize = 128 // TODO: no clue if this a good default and if a *default* here makes sense generally )
Default configuration options. You can use the DefaultConfiguration() function to create a Configuration instance that uses these constants.
Variables ΒΆ
This section is empty.
Functions ΒΆ
func SegmentFileNames ΒΆ added in v0.2.0
SegmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
Types ΒΆ
type Configuration ΒΆ
type Configuration struct { WriteBufferSize int // the size of the segment write buffer in bytes MaxSegmentSize int // the file size in bytes at which the segment files will be rotated EntryPayloadSize int // the default size for entry payloads. can be tuned to reduce allocations // SyncDelay is the duration to wait for syncing writes to disk. The default // value 0 will cause every write to be synced immediately. SyncDelay time.Duration }
Configuration contains all settings of a write-ahead log.
func DefaultConfiguration ΒΆ
func DefaultConfiguration() Configuration
DefaultConfiguration returns a new Configuration instance that contains all default WAL parameters.
func (Configuration) MarshalLogObject ΒΆ
func (c Configuration) MarshalLogObject(enc zapcore.ObjectEncoder) error
MarshalLogObject implements the zapcore.ObjectMarshaler interface.
type Entry ΒΆ
type Entry interface { Type() EntryType // EncodePayload encodes the payload into the provided buffer. In case the // buffer is too small to fit the entire payload, this function can grow the // old and return a new slice. Otherwise, the old slice must be returned. EncodePayload([]byte) []byte // ReadPayload reads the payload from the reader but does not yet decode it. // Reading and decoding are separate steps for performance reasons. Sometimes // we might want to quickly seek through the WAL without having to decode // every entry. ReadPayload(r io.Reader) ([]byte, error) // DecodePayload decodes an entry from a payload that has previously been read // by ReadPayload(β¦). DecodePayload([]byte) error }
Entry is a single record of the Write Ahead Log. It is up to the application that uses the WAL to provide at least one concrete Entry implementation to the WAL via the EntryRegistry.
type EntryConstructor ΒΆ added in v0.2.0
type EntryConstructor func() Entry
EntryConstructor is the constructor function of a specific Entry implementation.
type EntryRegistry ΒΆ added in v0.2.0
type EntryRegistry struct {
// contains filtered or unexported fields
}
The EntryRegistry keeps track of all known Entry implementations. This is necessary in order to instantiate the correct types when loading WAL segments.
func NewEntryRegistry ΒΆ added in v0.2.0
func NewEntryRegistry(constructors ...EntryConstructor) *EntryRegistry
NewEntryRegistry creates a new EntryRegistry. If you pass any constructor to this function, each must create a unique Entry implementation (i.e. one which returns a unique EntryType). Otherwise, this function panics.
Alternatively, you can register the constructor functions using EntryRegistry.Register(β¦).
func (*EntryRegistry) New ΒΆ added in v0.2.0
func (r *EntryRegistry) New(typ EntryType) (Entry, error)
New instantiates a new Entry implementation that was previously registered for the requested EntryType. An error is returned if no Entry was registered for this type.
func (*EntryRegistry) Register ΒΆ added in v0.2.0
func (r *EntryRegistry) Register(constructor EntryConstructor) error
Register an EntryConstructor function. Each Entry will be registered with the EntryType that is returned by the corresponding Entry.Type().
An error is returned if this constructor was already registered; i.e. a constructor was already registered that creates an Entry with the same EntryType as this constructor's Entry.
type EntryType ΒΆ
type EntryType uint8
EntryType is used to distinguish different types of messages that we write to the WAL.
type SegmentReader ΒΆ
type SegmentReader struct {
// contains filtered or unexported fields
}
The SegmentReader is responsible for reading WAL entries from their binary representation, typically from disk. It is used by the WAL to automatically resume the last open segment upon startup, but it can also be used to manually iterate through WAL segments.
The complete usage pattern looks like this:
r, err := NewSegmentReader(β¦) β¦ for r.ReadNext() { offset := r.Offset() β¦ entry, err := r.Decode() β¦ } if err := r.Err(); err != nil { β¦ }
func NewSegmentReader ΒΆ
func NewSegmentReader(r io.Reader, registry *EntryRegistry) (*SegmentReader, error)
NewSegmentReader creates a new SegmentReader that reads encoded WAL entries from the provided reader. The registry is used to map the entry types that have been read to their Entry implementations which contain the decoding logic.
func (*SegmentReader) Decode ΒΆ added in v0.2.0
func (r *SegmentReader) Decode() (Entry, error)
Decode decodes the last entry that was read using SegmentReader.ReadNext().
func (*SegmentReader) Err ΒΆ
func (r *SegmentReader) Err() error
Err returns any error that happened when calling ReadNext(). This function must always be called even if ReadNext() never returned true.
Please refer to the comment on the SegmentReader type to see the full usage pattern.
func (*SegmentReader) Offset ΒΆ added in v0.2.0
func (r *SegmentReader) Offset() uint32
Offset returns the offset of the last entry that was read by SegmentReader.ReadNext().
func (*SegmentReader) ReadNext ΒΆ added in v0.2.0
func (r *SegmentReader) ReadNext() bool
ReadNext loads the data for the next Entry from the underlying reader. For efficiency reasons, this function neither checks the entry checksum, nor does it decode the entry bytes. This is done, so the caller can quickly seek through a WAL up to a specific offset without having to decode each WAL entry.
You can get the offset of the current entry using SegmentReader.Offset(). In order to actually decode the read WAL entry, you need to use SegmentReader.Decode(β¦).
func (*SegmentReader) SeekEnd ΒΆ added in v0.2.0
func (r *SegmentReader) SeekEnd() (lastOffset uint32, err error)
SeekEnd reads through the entire segment until the end and returns the last offset.
type SegmentWriter ΒΆ
type SegmentWriter struct {
// contains filtered or unexported fields
}
The SegmentWriter is responsible for writing WAL entry records to disk. This type handles the necessary buffered I/O as well as file system syncing.
Every Entry is written, using the following binary layout (big endian format):
βββββββββββββββ¬ββββββββββββ¬βββββββββββ¬ββββββββββ β Offset (4B) β Type (1B) β CRC (4B) β Payload β βββββββββββββββ΄ββββββββββββ΄βββββββββββ΄ββββββββββ - Offset = 32bit WAL entry number for each record in order to implement a low-water mark - Type = Type of WAL entry - CRC = 32bit hash computed over the payload using CRC - Payload = The actual WAL entry payload data
func NewSegmentWriter ΒΆ
func NewSegmentWriter(w io.WriteCloser) *SegmentWriter
NewSegmentWriter returns a new SegmentWriter writing to w, using the default write buffer size.
func NewSegmentWriterSize ΒΆ
func NewSegmentWriterSize(w io.WriteCloser, bufferSize int) *SegmentWriter
NewSegmentWriterSize returns a new SegmentWriter writing to w whose buffer has at least the specified size.
func (*SegmentWriter) Close ΒΆ
func (w *SegmentWriter) Close() error
Close ensures that all buffered data is flushed to disk before and then closes the associated writer or file.
func (*SegmentWriter) Sync ΒΆ
func (w *SegmentWriter) Sync() error
Sync writes any buffered data to the underlying io.Writer and syncs the file systems in-memory copy of recently written data to disk if we are writing to an os.File.
type WAL ΒΆ
type WAL struct {
// contains filtered or unexported fields
}
WAL is a write-ahead log implementation.
func New ΒΆ
func New(path string, conf Configuration, registry *EntryRegistry, logger *zap.Logger) (*WAL, error)
New creates a new WAL instance that writes and reads segment files to a directory at the provided path.
func (*WAL) Close ΒΆ
Close gracefully shuts down the writeAheadLog by making sure that all pending writes are completed and synced to disk before then closing the WAL segment file. Any future writes after the WAL has been closed will lead to an error.