wal

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

README

yawal

(Yet another) write-ahead logging package, for Go.

CI Godoc

Installing

go get -u go.nesv.ca/yawal

Why should I use this package?

I'm not saying you should. However, if you are looking for a fast, flexible WAL package, then this should hopefully provide the initial groundwork for what you need!

Features
  • Pluggable storage back-ends ("sinks");
  • "Extra" functionality that isn't immediately crucial to the operation of the WAL is split out into a utilities pacakge (e.g. persisting WAL segments after a given time interval).

Overall concepts

The main types for this package are:

  • The Logger, which you write your data to.
  • A Sink, for deciding where to persist your data.
  • And a Reader for, well, reading ("replaying") your data.

For a step-by-step understanding of how this package works:

  • You write your data to a logger. Each []byte of data is herein referred to as a chunk.
  • Chunks have offsets; an offset is nothing more than the timestamp at which the chunk was written, precise to a nanosecond. An offset is automatically added to a chunk when it is written to a logger.
  • Chunks are stored in segments; a segment is a size-bounded collection of chunks.
  • When there isn't enough room in a segment for another chunk, the logger passes the segment along to a sink.
  • Sinks do most of the heavy lifting in this package; they handle the writing, and reading, of segments to/from a persistent storage medium.

Examples

Create a new disk-backed log
package main

import (
	"log"

	wal "gopkg.in/nesv/yawal.v1"
)

func main() {
	// Create a new DirectorySink.
	sink, err := wal.NewDirectorySink("wal")
	if err != nil {
		log.Fatalln(err)
	}
	
	// Create a new logger that will store data in 1MB segment files.
	logger, err := wal.New(sink, wal.SegmentSize(1024 * 1024))
	if err != nil {
		log.Fatalln(err)
	}

	// Note, calling a *wal.Logger's Close() method will also close the
	// underlying Sink.
	defer logger.Close()

	// Write data to your logger.
	for i := 0; i < 100; i++ {
		if err := logger.Write([]byte("Wooo, data!")); err != nil {
			log.Println("error:", err)
			return
		}
	}

	return
}
Create an in-memory log
sink, err := wal.NewMemorySink()
if err != nil {
	log.Fatalln(err)
}

logger, err := wal.New(sink, wal.SegmentSize(1024*1024))
if err != nil {
	log.Fatalln(err)
}
defer logger.Close()

// ...write data...
Read data from an existing log
sink, err := wal.NewDirectorySink("wal")
if err != nil {
	log.Fatalln(err)
}
defer sink.Close()

r := wal.NewReader(sink)
for r.Next() {
	data := r.Data()
	offset := r.Offset()

	fmt.Printf("Data at offset %s: %x\n", offset, data)
}
if err := r.Error(); err != nil {
	log.Println("error reading from wal:", err)
}

Goals

  • If it moves, document it.
  • Do not rely on other, third-party packages.
  • Use as many of the types, and interfaces, from the standard library, as possible (within reason).
  • Be as fast as possible, without sacrificing safety.
  • Do not clutter the core implementation; favour composition of components and functionality.
  • Abstract layers as necessary.

Documentation

Overview

Package wal provides a WAL (write-ahead logging) facility.

This package aims to provide the most-basic implementation of a write-ahead logger as possible. For additional functionality, please see the "wal/walutil" package.

When writing to a Logger, the []byte (herein referred to as a "chunk", or "data chunk") is written to a "segment". When a segment is full, or there is not enough room left for a write to fully succeed, the segment is passed to a Sink's WriteSegment method, and the []byte is written to a new, empty segment.

A Sink is a type that is capable of storing, and retrieving segments. This package provides a Sink implementation that persists segments to a local directory: DirectorySink. Sinks do most of the "heavy lifting" for this package.

Unlike most WAL implementations, the Logger type does not directly expose a means of persisting segments at a regular time interval. This is intentional, and was separated out to keep the implementation of Logger as simple as possible. If you wish to have segments written at a specific time interval, see the documentation for the wal/walutil.FlushInterval function.

This package also provides the means of replaying a log, without requiring the creation of a Logger. For more deatils, see the NewReader and NewReaderOffset functions.

Index

Constants

View Source
const (
	// DefaultSegmentSize is the default size of a data segment (16MB).
	DefaultSegmentSize uint64 = 16777216
)

Variables

View Source
var (
	ErrTooBig       = errors.New("wal: data too large for segment")
	ErrLoggerClosed = errors.New("wal: logger closed")
)
View Source
var (
	ErrNotEnoughSpace = errors.New("not enough space in segment")
	ErrSegmentFull    = errors.New("segment full")
)
View Source
var ZeroOffset = Offset(0)

ZeroOffset holds the value of the oldest-possible offset within a write-ahead logger.

Functions

This section is empty.

Types

type Analyzer

type Analyzer interface {
	Analyze() error
}

Analyzer defines the interface of a type that can perform analysis on a persistent storage medium for write-ahead logs.

type DirectorySink

type DirectorySink struct {
	// contains filtered or unexported fields
}

DirectorySink implements a Sink that can persist WAL segments to, and load them from, a directory.

The nomenclature of the on-disk WAL segment files is:

<chunkOffset0>-<chunkOffsetN>

where chunkOffsetN is the offset of the last data chunk in the segment. As an example, for a segment holding data chunks written between January 1 2017 00:00 and January 1 2017 01:00, the resulting segment's file name would be:

1483228800000000000-1483232400000000000

Each WAL segment file is accompanied by another file containing a checksum used for verifying the contents of the segment. The checksum file name, for the above segment, would be:

1483228800000000000-1483232400000000000.CHECKSUM

func NewDirectorySink

func NewDirectorySink(dir string) (*DirectorySink, error)

NewDirectorySink returns a *DirectorySink that can read and write WAL segments to directory dir.

The permissions of dir will be checked to ensure the *DirectorySink can read and write to dir. If the directory does not exist, it will be created with mode 0777 (before umask).

func (*DirectorySink) Analyze

func (ds *DirectorySink) Analyze() error

Analyze scans the directory the *DirectorySink was initialized with, and gathers all of the currently-available offsets.

This method also attempts to verify each found segment, by calculating a checksum of the segment file, and comparing it to the checksum in the segment's checksum file.

func (*DirectorySink) Close

func (ds *DirectorySink) Close() error

Close implements the io.Closer interface.

In this particular Sink implementation, Close does nothing, as a DirectorySink does not hold any open file descriptors beyond those when calling WriteSegment, or LoadSegment.

func (*DirectorySink) LoadSegment

func (ds *DirectorySink) LoadSegment(offset Offset) (*Segment, error)

LoadSegment implements the SegmentLoader interface.

func (*DirectorySink) NumSegments

func (ds *DirectorySink) NumSegments() int

NumSegments implements the Sink interface by returning the number of data segments currently known to the sink.

func (*DirectorySink) Offsets

func (ds *DirectorySink) Offsets() (oldest, newest Offset)

Offsets returns the oldest, and newest offsets known to the DirectorySink. Initially, the offsets would be gathered by calling the Sink's Analyze() method. After initialization, and analysis, the offset range is extended by each call to WriteSequence.

Offsets implements the Sink interface.

func (*DirectorySink) Truncate

func (ds *DirectorySink) Truncate(offset Offset) error

Truncate implements the Sink interface.

Truncate will delete any on-disk segment files, along with their checksum files, if the last offset in the segment file is older than the given offset.

Should the offset fall within the offsets of a segment file, the segment file will be truncated, re-written to disk, and its checksum re-calculated.

func (*DirectorySink) WriteSegment

func (ds *DirectorySink) WriteSegment(seg *Segment) error

WriteSegment implements the SegmentWriter interface.

It will write each data segment out to a file, along with a second file with a .CHECKSUM extension.

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger is a type that can be used for maintaining write-ahead logs.

A Logger always maintains an "active" segment that data will be written to. For more details, see the Write method's documentation.

func New

func New(sink Sink, options ...Option) (*Logger, error)

New creates a new write-ahead logger that will persist records to sink.

func (*Logger) Close

func (l *Logger) Close() error

Close persists the current segment, by writing it to the *Logger's Sink, then subsequently closes the Sink.

Close implements the io.Closer interface.

func (*Logger) Flush

func (l *Logger) Flush() error

Flush locks the *Logger for writing, and writes the currently-active data segment to the *Logger's internal Sink. If the segment was successfully written, a new, empty segment is started, and the *Logger will be unlocked.

Attempting to call Flush after Close will return ErrLoggerClosed.

func (*Logger) NewReader

func (l *Logger) NewReader() *Reader

NewReader returns a new *Reader that can sequentially read chunks of data from the earliest-known offset.

func (*Logger) NewReaderOffset

func (l *Logger) NewReaderOffset(offset Offset) *Reader

NewReaderOffset returns a new *Reader that can be used to sequentially read chunks of data, starting at offset.

func (*Logger) Offsets

func (l *Logger) Offsets() (first, last Offset)

Latest returns the offsets of the first (oldest), and last (newest) data chunks.

func (*Logger) Truncate

func (l *Logger) Truncate(offset Offset) error

Truncate removes all data chunks whose offsets are <= offset.

This method attempts to call the underlying Sink's Truncate method, before truncating the current segment.

func (*Logger) Write

func (l *Logger) Write(p []byte) (int, error)

Write implements the io.Writer interface for a *Logger.

When len(p) > the amount of space left in a segment, the current segment will be written to the *Logger's internal Sink, and a new segment will be started. Should len(p) be larger than the size of a new, empty segment, this method will return ErrTooBig.

Any attempt to write to a *Logger, after its Close method has been called, will yield ErrLoggerClosed.

type MemorySink

type MemorySink struct {
	// contains filtered or unexported fields
}

MemorySink is a Sink implementation that only stores data in memory.

func NewMemorySink

func NewMemorySink() (*MemorySink, error)

NewMemorySink returns a Sink implementation that stores segments in memory.

func (*MemorySink) Analyze

func (s *MemorySink) Analyze() error

func (*MemorySink) Close

func (s *MemorySink) Close() error

func (*MemorySink) LoadSegment

func (s *MemorySink) LoadSegment(offset Offset) (*Segment, error)

func (*MemorySink) NumSegments

func (s *MemorySink) NumSegments() int

func (*MemorySink) Offsets

func (s *MemorySink) Offsets() (first, last Offset)

func (*MemorySink) Truncate

func (s *MemorySink) Truncate(offset Offset) error

func (*MemorySink) WriteSegment

func (s *MemorySink) WriteSegment(seg *Segment) error

type Offset

type Offset int64

Offset represents the offset of a data chunk within a write-ahead logger.

func NewOffset

func NewOffset() Offset

NewOffset returns a new Offset for the current time. This is a shorthand for:

NewOffsetTime(time.Now())

func NewOffsetTime

func NewOffsetTime(t time.Time) Offset

NewOffsetTime returns a new Offset for the given time.Time.

func ParseOffset

func ParseOffset(s string) (Offset, error)

ParseOffset returns an offset parsed from s.

func (Offset) After

func (o Offset) After(b Offset) bool

After reports whether the offset o is newer than b.

func (Offset) Before

func (o Offset) Before(b Offset) bool

Before reports whether the offset o is older than b.

func (Offset) Equal

func (o Offset) Equal(b Offset) bool

Equal reports whether the offset o is the same as b.

func (Offset) String

func (o Offset) String() string

String implements the fmt.Stringer interface, and provides a means for representing an offset that can be later parsed with ParseOffset.

func (Offset) Within

func (o Offset) Within(a, b Offset) bool

Within reports whether a <= o <= b.

type Option

type Option func(*Logger) error

Option is a functional configuration type that can be used to configure the behaviour of a *Logger.

func SegmentSize

func SegmentSize(n uint64) Option

SegmentSize sets the size of a data segment.

Depending on the Sink provided to the *Logger, setting n too low may cause excessive amounts of I/O, thus slowing everything down. Another potential problem is attempting to write data, where len(data) > n.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader loads data segments from a Sink, and progresses through a data segment until there are no more chunks to be read. When the end of a current segment is reached, a Reader will attempt to increment the last-known chunk offset by one, and load the next-available data segment.

It is not safe to call a Reader from multiple goroutines.

Example:

r := NewReader(sink)

for r.Next() {
	fmt.Printf("% x\n", r.Data())
}

if err := r.Error(); err != nil {
	log.Println("error:", err)
}

func NewReader

func NewReader(sink Sink) *Reader

NewReader returns a *Reader that reads data chunks from sink, starting at the earliest-possible offset.

func NewReaderOffset

func NewReaderOffset(sink Sink, offset Offset) *Reader

NewReaderOffset returns a *Reader that starts reading data chunks from sink, at the specified offset.

func (*Reader) Data

func (r *Reader) Data() []byte

Data returns the []byte of the current data chunk. Successive calls to Data, without calling Next, will return the same []byte.

func (*Reader) Error

func (r *Reader) Error() error

Error returns the most-recent error encountered by the *Reader.

func (*Reader) Next

func (r *Reader) Next() bool

Next reports whether or not there is another data chunk that can be read using the Data method.

A false return value means there are no more data chunks that can be read from the current segment, and no more segments can be loaded.

func (*Reader) Offset

func (r *Reader) Offset() Offset

Offset returns the offset of the current data chunk. Multiple calls to Offset, without calling Next, will return the same offset.

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment is a size-bounded type that chunks are written to.

While a segment is safe for concurrent reading and writing, it is not recommended to do so.

func NewSegment

func NewSegment() *Segment

func NewSegmentSize

func NewSegmentSize(size uint64) *Segment

func (*Segment) Chunk

func (s *Segment) Chunk() chunk

Data returns the current chunk. Successive calls to Data will yield the same chunk. To advance to the next chunk in the segment, call the Next() method.

func (*Segment) Chunks

func (s *Segment) Chunks() int

Chunks returns the current number of chunks in this segment.

func (*Segment) CurrentReadOffset

func (s *Segment) CurrentReadOffset() Offset

CurrentReadOffset returns the offset of the []byte that will be returned by Data.

CurrentReadOffset will panic if it is called before Next.

func (*Segment) EncodedSize

func (s *Segment) EncodedSize() (int64, error)

EncodedSize returns the encoded size of the segment, in bytes. This is the number of bytes that should be returned by WriteTo, assuming no more chunks are added to the segment.

func (*Segment) Limits

func (s *Segment) Limits() (oldest, newest Offset)

Limits returns the oldest and newest offsets of the data chunks in the segment.

If there are no data chunks in the segment, this method will return ZeroOffset for both offsets.

If there is only one data chunk in the segment, that chunk's offset is returned for both oldest, and newest.

func (*Segment) Next

func (s *Segment) Next() bool

Next reports whether or not there is another chunk that can be read with the Chunk() method.

For example:

for s.Next() {
	c := s.Chunk()
	...
}

func (*Segment) ReadFrom

func (s *Segment) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements the io.ReaderFrom interface, and is primarily used to load a segment from disk.

Calling ReadFrom on a non-empty segment will return a non-nil error.

func (*Segment) Remaining

func (s *Segment) Remaining() int64

Remaining returns the number of bytes left before the segment is at capacity.

func (*Segment) Size

func (s *Segment) Size() int64

Size returns the size of the current segment, in bytes.

func (*Segment) Truncate

func (s *Segment) Truncate(offset Offset)

Truncate removes all chunks from the segment, whose offsets are <= offset.

If the current segment is being read, the internal pointer of the chunk to read will be adjusted.

func (*Segment) Write

func (s *Segment) Write(p []byte) (int, error)

Write writes a copy of p to the segment, as a new data chunk.

If the length of p is greater than the remaining capacity of the segment, this method will return ErrNotEnoughSpace.

func (*Segment) WriteTo

func (s *Segment) WriteTo(w io.Writer) (int64, error)

WriteTo implements the io.WriterTo interface, and is primarily used to persist a segment to disk.

The returned int64 is the number of bytes that have been written to w, and not the current size of the segment.

type SegmentLoader

type SegmentLoader interface {
	// LoadSegment returns the WAL segment containing the given Offset.
	//
	// If ZeroOffset is specified, then the segment with the lowest
	// offset will be returned.
	//
	// Should the given offset be greater than one contained in any
	// available segments, no segment will be returned, and err will be
	// io.EOF.
	LoadSegment(Offset) (*Segment, error)
}

SegmentLoader defines the interface of a type that can retrieve segments from their persistent storage medium.

type SegmentWriter

type SegmentWriter interface {
	WriteSegment(*Segment) error
}

SegmentWriter defines the interface of a type that is able to store WAL segments.

type Sink

type Sink interface {
	Analyzer
	SegmentLoader
	SegmentWriter
	io.Closer

	// Offsets returns the first, and last (most-recent) offsets known
	// to a Sink.
	Offsets() (first Offset, last Offset)

	// NumSegments returns the number of segments currently known to
	// the sink.
	NumSegments() int

	// Truncate permanently deletes all data chunks prior to the given
	// offset.
	Truncate(Offset) error
}

Sink defines the interface of a type that can persist, and subsequently load, write-ahead logging segments.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL