orderedio

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package orderedio implements an ordered log with explicit ordinals over top of a record Encoder/Decoder.

This can be used as an implementation for a self-describing write-ahead-log.

The Decoder and Encoder types can wrap stuffed recordio.Encoder and recordio.Decoder values.

They are, like the simpler recordio Decoder/Encoder types, straightforward to use. However, they require an index (ordinal) to be passed for each entry when writing, and these indices are returned when reading.

Additionally, each record has a checksum associated with it, and during reads, those checksums are checked.

Repeated indices are allowed, with only the first valid record for that indices being returned on read.

The first record's index must be 1 or higher. Options allow a specific starting index to be enforced (e.g., when the file name indicates the starting point of the log, one can specify that starting point when reading and receive an error if it is long).

Indices must be in sequence, with the exception of repeats.

An example of how this works is below:

buf := new(bytes.Buffer)
e := NewStreamEncoder(buf)

// Write messages.
msgs := []string{
	"This is a message",
	"This is another message",
	"And here's a third",
}

for i, msg := range msgs {
	if _, err := e.Encode(uint64(i)+1, []byte(msg)); err != nil {
		log.Fatalf("Encode error: %v", err)
	}
}

// Now read them back.
d := NewStreamDecoder(buf)
defer d.Close()
for !d.Done() {
	idx, val, err := d.Next()
	if err != nil {
		log.Fatalf("Read error: %v", err)
	}
	fmt.Printf("%d: %q\n", idx, string(val))
}

// Output:
// 1: "This is a message"
// 2: "This is another message"
// 3: "And here's a third"

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder is a write-ahead log reader implemented over a word-stuffed log. The concept adds a couple of important bits of data on top of the unopinionated word-stuffing reader to ensure that records are checksummed and ordered.

In particular, it knows how to skip truly duplicate entries (with the same index and checksum) where possibly only the last is not corrupt, and it knows that record indices should be both compact and monotonic.

func NewDecoder

func NewDecoder(iter RecordIterator, opts ...DecoderOption) *Decoder

NewDecoder creates a Decoder around the given record iterator, for example, a recordio.Decoder.

func NewStreamDecoder

func NewStreamDecoder(r io.Reader, opts ...DecoderOption) *Decoder

NewStreamDecoder creates a Decoder over an io.Reader using the default recordio.Decoder as the underlying RecordIterator.

func (*Decoder) Close

func (d *Decoder) Close() error

Close closes underlying implementations if they are io.Closers.

func (*Decoder) Done

func (d *Decoder) Done() bool

Done returns true when all entries have been returned by Next.

func (*Decoder) Next

func (d *Decoder) Next() (uint64, []byte, error)

Next returns the next entry from the WAL, verifying its checksum and returning an error if it does not match, or if the ordinal value is not the next one in the series. It skips corrupt entries in the underlying log, under the assumption that they would have been retried.

If the underlying stream is done, this returns io.EOF.

type DecoderOption

type DecoderOption func(d *Decoder)

DecoderOption defines options for write-ahead log readers.

func ExpectDescending

func ExpectDescending(desc bool) DecoderOption

ExpectDescending tells the unstuffer to expect indices to be in strictly descending order, rather than ascending. Default is false.

func ExpectFirstIndex

func ExpectFirstIndex(index uint64) DecoderOption

ExpectFirstIndex sets the expected initial index for this reader. A value of zero (the default) indicates that any initial value is allowed, which reduces its ability to check that you are reading from the expected file.

type EncodeCloser

type EncodeCloser interface {
	io.Closer
	Encode([]byte) (int, error)
}

EncodeCloser is an interface for appending records to a stream.

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder implements a write-ahead log around an EncodeCloser (like an Encoder from the recordio package). It requires every appended entry to have an incremented index specified, and it checksums data before stuffing it into the log, allowing a Decoder to detect accidental corruption.

func NewEncoder

func NewEncoder(a EncodeCloser, opts ...EncoderOption) *Encoder

NewEncoder creates a Encoder around the given record appender (for example, a recordio.Encoder).

func NewStreamEncoder

func NewStreamEncoder(w io.Writer, opts ...EncoderOption) *Encoder

NewStreamEncoder creates an Encoder over an io.Writer, using the default recordio.Encoder as the underlying EncodeCloser.

func (*Encoder) Close

func (e *Encoder) Close() error

Close closes underlying implementations if they are io.Closers.

func (*Encoder) Encode

func (e *Encoder) Encode(index uint64, p []byte) (int, error)

Encode writes a new log entry into the WAL.

func (*Encoder) NextIndex

func (e *Encoder) NextIndex() uint64

NextIndex returns the next expected write index for this writer.

func (*Encoder) RegisterClose

func (e *Encoder) RegisterClose(f func() error)

RegisterClose registers a function to call when the Encoder is closed. Can be used, for example, to do atomic renames when snapshotting to a WAL.

type EncoderOption

type EncoderOption func(*Encoder)

EncoderOption specifies options for the write-ahead log writer.

func WithFirstIndex

func WithFirstIndex(idx uint64) EncoderOption

WithFirstIndex sets the first index for this writer. Use this to start appending to an existing log at the proper index (which must be the next one). Default is 0, indicating that any first index will work.

type RecordIterator

type RecordIterator interface {
	Next() ([]byte, error)
	Done() bool
}

RecordIterator is an interface that produces a bytes iterator using Done and Next methods.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL