captain

package
v0.0.0-...-fc15ddb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2017 License: MPL-2.0 Imports: 16 Imported by: 0

README

Captain's Log

Captain's Log is internal to Workq for the time being. It is meant to be an external package at a later time after it has been vetted. This README is incomplete for this reason.

Captain's log is an write ahead data log with a small API for Go.

It is useful for projects that require simple operational persistence to log files for recovery, essentially a replay log.

Features

  • Segment support - Split after a minimum file size.
  • CRC32 support - CRC checksum.
  • User defined cleaning strategy.

Getting Started

Stream

The Stream represents a single data stream contained within a log directory and split into multiple segment files. A stream can be appended to, iterated, and cleaned.

A Stream object can be created using the NewStream function and requires a path to the log directory and the magic header which contains the magic number and version. The header is written to every new segment file and is verified on every segment file when iterating through the data stream.

// 0x6370746e -> "cptn"
stream := NewStream(path, &MagicHeader{Magic: 0x6370746e, Version: 1})
Appender

An Appender writes into the data stream in append-only mode. The data stream is split into segment files within the log directory. The split is specified by the SegmentSize option which represents the minimum file size to be reached before splitting into a new file.

The primary benefit to split is to simultaneously allow the cleaning of log records while appending.

Opening an Appender

An Appender can be opened using the OpenAppender method on the Stream object:

appender, err := stream.OpenAppender(&AppendOptions{
  // SegmentSize: 67108864, // Default 64MiB in bytes
  // SyncPolicy: captain.SyncOS, // Default
  // SyncInterval: 1000, // Sync interval in ms, used when SyncPolicy is set to "captain.SyncInterval"
})
if err != nil {
  // ...
}
Options

A stream can be opened with these additional options passed in during the OpenAppender call:

  • Options.SegmentSize - The minimum size of a segment file before creating a new one. Defaults to 64 MiB.
  • Options.SyncPolicy - Disk sync policy to use:
    • captain.SyncOS - (Default) Sync deferred to the operating system.
    • captain.SyncAlways - Sync after every append. This will be relatively slow.
    • captain.SyncInterval Sync at a specified interval in milliseconds set by the Options.SyncInterval.
  • Options.SyncInterval - Sync interval in milliseconds. Used when Options.SyncPolicy is set to captain.SyncInterval.
Appending Data

A data stream can only have a single appender at a time and must be locked before use:

if err := appender.Lock(); err != nil {
  // ...
}
defer appender.Unlock()

err := appender.Append([]byte{})
if err != nil {
  // ...
}

An Appender lock is across processes (via advisory lock) and must be released when finished. Locking an Appender for a stream that already has a locked Appender will block indefinitely until released by the first Appender. Appenders do not block cursors or cleaners.

Cursor

A data stream can be iterated using a Cursor. A cursor can be acquired by the OpenCursor method on the Stream object:

cursor, err := stream.OpenCursor()
if err != nil {
  // ...
}

The Next() method returns the next log record in the data stream as a Record object along with an error (*Record, error). The Record object contains the log Time and the original data as the Payload. Next() can return an error if the record could not be fully read or the CRC checksum did not match. When there are no more records remaining, nil, nil will be returned signifying the the graceful end of the stream.

For a consistent view of the stream data[1], a read lock through Lock() is required before iteration and must be released through Unlock() when finished:

if err := cursor.Lock(); err != nil {
  // ...
}
defer cursor.Unlock()

for {
  rec, err := cursor.Next()
  if err != nil {
    // ... handle err
  }

  if rec == nil {
    // End of stream
    break
  }

  fmt.Printf("Time=%s, Payload=%s", r.Time, r.Payload)
}

[1] Cleaners can rewrite segment files and acquires an exclusive lock blocking cursors.

Cleaning

Cleaning is the process of removing old log records that are considered stale and requires a user defined "clean" function. A clean function determines if a log record should be permanently removed. The clean function is invoked with the path of the log segment file and a Record object on every log entry. The clean function should return true to signal to the cleaner to delete or false to retain. If an error is returned, processing stops. The sole exception is when ErrSkipSegment is returned to signal to skip the current segment entirely.

A Cleaner will only visit past read-only segmented files and not the most current one being appended to until it becomes read-only. Keeping segment-size reasonably sized will allow the cleaner to work efficiently.

A cleaner can be opened using the OpenCleaner method on the Stream object:

cleaner, err := stream.OpenCleaner()
if err != nil {
  // ...
}

Cleaning is invoked by the Clean method and passing in the cleaning function. The cleaning process must be locked through Cleaner.Lock() and released with Cleaner.Unlock() when finished. Cleaning locks only readers and not appenders as it only acts on read-only segment files.

if err := cleaner.Lock(); err != nil {
  // ...
}
defer cleaner.Unlock()

clean := func(path string, r *Record) (bool, error) {
  // Clean all records over 24 hours old.
  age := time.Now().Sub(r.Time)
  if age >= time.Duration(24) * time.Hour {
    return true, nil
  }

  return false, nil
}
err := cleaner.Clean(clean)
if err != nil {
  // ...
}

Cleaning works by rewriting the segment file with only the relevant log records.

Implementation

File Names

Captain's log files are segmented into sequentially named log files.

Sequential log filenames are 9 digit, left padded with 0s and have a "log" extension:

Format: {SEQUENCE}.log
Example: 000000001.log

Sequence number and follows natural order based on write time. It is rotated after the minimum segment size has been reached.

File Format

Log files start with a magic header and contain a sequence of variable size records.

|MAGIC_HEADER|RECORD|RECORD|RECORD|...
Magic header

Every log file starts with the specified magic header in the Stream object (8 bytes). The first four bytes translates to the magic number and the last 4 bytes is the specified version as a uint32 in big endian.

Record Format
Record
|TIME|SIZE|PAYLOAD|CRC|
  • TIME (15 Bytes) - Time in UTC RFC 3399 with nanoseconds.
    • byte 0: version
    • bytes 1-8: seconds
    • bytes 9-12: nanoseconds
    • bytes 13-14: zone offset in minutes (reserved for future use, always 0 for UTC)
  • SIZE (Varint) - Length of payload
  • PAYLOAD - Byte stream with the length of SIZE.
  • CRC (4 Bytes) - 32 bit hash computed on TIME,SIZE,PAYLOAD

Documentation

Index

Constants

View Source
const (
	// SyncOS policy defers the sync to the OS.
	SyncOS = 1
	// SyncInterval policy syncs at an interval specfied in Options.SyncInterval.
	SyncInterval = 2
	// SyncAlways policy syncs on every Append.
	SyncAlways = 3
)
View Source
const (
	// DefaultSegmentSize 64 MiB
	DefaultSegmentSize = 67108864
	// DefaultSyncPolicy SyncInterval
	DefaultSyncPolicy = SyncInterval
	// DefaultSyncInterval 1000ms
	DefaultSyncInterval = 1000
)

Variables

View Source
var ErrCRCMismatch = errors.New("crc mismatch")

ErrCRCMismatch signifies the CRC included in the Record did not match the actual CRC reconstructed from the message.

View Source
var ErrLockTimeout = errors.New("lock timeout")

ErrLockTimeout signifies the lock failed to be acquire after the specified timeout.

View Source
var ErrSkipSegment = errors.New("skip this segment")

ErrSkipSegment signals the cleaner to stop & skip over the current segment.

Functions

func TimeoutLock

func TimeoutLock(fn func() error, dur time.Duration) error

TimeoutLock runs a file mutex lock/unlock function and times out after specified duration.

Types

type AppendOptions

type AppendOptions struct {
	// Minimum segment size before rolling into a new one.
	SegmentSize uint
	// Speficies which policy to use, one of the Sync* constants.
	SyncPolicy uint
	// Sync interval in ms, valid when sync policy is set to SyncInterval.
	SyncInterval uint
}

AppendOptions represents the options for opening an Appender.

type Appender

type Appender struct {
	// contains filtered or unexported fields
}

Appender represents a stream writer in append-only mode. Appenders must be locked before use and unlocked when done as there can only be a single active appender at a time.

func (*Appender) Append

func (a *Appender) Append(b []byte) error

Append payload to the log. Formats as a log Record.

func (*Appender) Lock

func (a *Appender) Lock() error

Lock appender Safe across multiple processes via an advisory file lock.

func (*Appender) Unlock

func (a *Appender) Unlock() error

Unlock appender

type CleanFn

type CleanFn func(path string, r *Record) (bool, error)

CleanFn represents the function signature that is passed to the Clean function. Returning true signals the cleaner to remove the record. Returning false signals to retain the record.

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner is responsible for cleaning the read-only portion of the stream. Policy for cleaning is based on the CleanFn passed to the Clean method.

func (*Cleaner) Clean

func (c *Cleaner) Clean(fn CleanFn) error

Clean takes a CleanFn, iterates through every read-only segment file and invokes CleanFn against every log record: * Deleting the record when CleanFn returns true. * Retaining the record when CleanFn returns false. * Skipping the segment file if ErrSkipSegment is returned. * Stopping the cleaning if CleanFn returns any other error.

Cleaning works by rewriting the segment file with only the relevant log records. Requires a cleaning lock via Lock() which is an exclusive lock, blocking cursors.

func (*Cleaner) Lock

func (c *Cleaner) Lock() error

Lock Cleaner across processes (advisory).

func (*Cleaner) Unlock

func (c *Cleaner) Unlock() error

Unlock Cleaner across processes (advisory).

type Cursor

type Cursor struct {
	// contains filtered or unexported fields
}

Cursor represents an iterator that traverses Records in log time order. Cursors can only move forward with the ability to Reset back to the front.

func (*Cursor) Lock

func (c *Cursor) Lock() error

Lock (read) across processes. Ensures a consistent view of the data.

func (*Cursor) Next

func (c *Cursor) Next() (*Record, error)

Next returns the next Record in the cursor. Handles rotating to the next segment file. A record of nil, with an error of nil represents the end of the cursor. Requires a read lock through Lock().

func (*Cursor) Reset

func (c *Cursor) Reset() error

Reset the cursor, rewinding back to the start.

func (*Cursor) Segment

func (c *Cursor) Segment() string

Segment returns the full filename/path to the segment file.

func (*Cursor) Unlock

func (c *Cursor) Unlock() error

Unlock (read) across processes.

type MagicHeader

type MagicHeader struct {
	Magic   uint32
	Version uint32
}

MagicHeader represents the magic bytes and version of a segment file.

func (*MagicHeader) MarshalBinary

func (h *MagicHeader) MarshalBinary() ([]byte, error)

MarshalBinary returns the magic header in BigEndian.

func (*MagicHeader) UnmarshalBinary

func (h *MagicHeader) UnmarshalBinary(data []byte) error

MarshalBinary sets the magic header from data in BigEndian.

type Record

type Record struct {
	Time    time.Time
	Payload []byte
}

Record represents a log entry.

func NewRecord

func NewRecord(p []byte) *Record

NewRecord returns a new record with the current time set to UTC.

func (*Record) MarshalBinary

func (r *Record) MarshalBinary() ([]byte, error)

MarshalBinary encodes a Record in its binary form. Appends a CRC32 to the end of the message.

func (*Record) UnmarshalBinaryFromReader

func (r *Record) UnmarshalBinaryFromReader(rdr RecordReader) error

UnmarshalBinaryFromReader decodes a Record from a RecordReader. Returns an error if the data could not be completely decoded. Returns ErrCRCMismatch if the checksum did not match the original appended in the Record.

type RecordReader

type RecordReader interface {
	io.Reader
	io.ByteReader
}

RecordReader is used for UnmarshalBinaryFromReader.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a data stream. Can be read in sequence using a Cursor. Can be appended to using an Appender. Can be cleaned using a Cleaner.

func NewStream

func NewStream(path string, header *MagicHeader) *Stream

NewStream returns a Stream with a specified path and header to use.

func (*Stream) OpenAppender

func (s *Stream) OpenAppender(options *AppendOptions) (*Appender, error)

OpenAppender opens an Appender object on the stream. Nil options will set default options.

func (*Stream) OpenCleaner

func (s *Stream) OpenCleaner() (*Cleaner, error)

OpenCleaner returns a Cleaner on the stream.

func (*Stream) OpenCursor

func (s *Stream) OpenCursor() (*Cursor, error)

OpenCursor returns a cursor for the current stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL