record

package
v0.0.0-...-15ac754 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2022 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Overview

Package record reads and writes sequences of records. Each record is a stream of bytes that completes before the next record starts.

When reading, call Next to obtain an io.Reader for the next record. Next will return io.EOF when there are no more records. It is valid to call Next without reading the current record to exhaustion.

When writing, call Next to obtain an io.Writer for the next record. Calling Next finishes the current record. Call Close to finish the final record.

Optionally, call Flush to finish the current record and flush the underlying writer without starting a new record. To start a new record after flushing, call Next.

Neither Readers or Writers are safe to use concurrently.

Example code:

func read(r io.Reader) ([]string, error) {
	var ss []string
	records := record.NewReader(r)
	for {
		rec, err := records.Next()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Printf("recovering from %v", err)
			r.Recover()
			continue
		}
		s, err := ioutil.ReadAll(rec)
		if err != nil {
			log.Printf("recovering from %v", err)
			r.Recover()
			continue
		}
		ss = append(ss, string(s))
	}
	return ss, nil
}

func write(w io.Writer, ss []string) error {
	records := record.NewWriter(w)
	for _, s := range ss {
		rec, err := records.Next()
		if err != nil {
			return err
		}
		if _, err := rec.Write([]byte(s)), err != nil {
			return err
		}
	}
	return records.Close()
}

The wire format is that the stream is divided into 32KiB blocks, and each block contains a number of tightly packed chunks. Chunks cannot cross block boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a block must be zero.

A record maps to one or more chunks. There are two chunk formats: legacy and recyclable. The legacy chunk format:

+----------+-----------+-----------+--- ... ---+
| CRC (4B) | Size (2B) | Type (1B) | Payload   |
+----------+-----------+-----------+--- ... ---+

CRC is computed over the type and payload Size is the length of the payload in bytes Type is the chunk type

There are four chunk types: whether the chunk is the full record, or the first, middle or last chunk of a multi-chunk record. A multi-chunk record has one first chunk, zero or more middle chunks, and one last chunk.

The recyclyable chunk format is similar to the legacy format, but extends the chunk header with an additional log number field. This allows reuse (recycling) of log files which can provide significantly better performance when syncing frequently as it avoids needing to update the file metadata. Additionally, recycling log files is a prequisite for using direct IO with log writing. The recyclyable format is:

+----------+-----------+-----------+----------------+--- ... ---+
| CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload   |
+----------+-----------+-----------+----------------+--- ... ---+

Recyclable chunks are distinguished from legacy chunks by the addition of 4 extra "recyclable" chunk types that map directly to the legacy chunk types (i.e. full, first, middle, last). The CRC is computed over the type, log number, and payload.

The wire format allows for limited recovery in the face of data corruption: on a format error (such as a checksum mismatch), the reader moves to the next block and looks for the next full or first chunk.

Index

Constants

View Source
const CapAllocatedBlocks = 16

CapAllocatedBlocks is the maximum number of blocks allocated by the LogWriter.

View Source
const (

	// SyncConcurrency is the maximum number of concurrent sync operations that
	// can be performed. Note that a sync operation is initiated either by a call
	// to SyncRecord or by a call to Close. Exported as this value also limits
	// the commit concurrency in commitPipeline.
	SyncConcurrency = 1 << syncConcurrencyBits
)

Variables

View Source
var (
	// ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.
	ErrNotAnIOSeeker = errors.New("pebble/record: reader does not implement io.Seeker")

	// ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.
	ErrNoLastRecord = errors.New("pebble/record: no last record exists")

	// ErrZeroedChunk is returned if a chunk is encountered that is zeroed. This
	// usually occurs due to log file preallocation.
	ErrZeroedChunk = base.CorruptionErrorf("pebble/record: zeroed chunk")

	// ErrInvalidChunk is returned if a chunk is encountered with an invalid
	// header, length, or checksum. This usually occurs when a log is recycled,
	// but can also occur due to corruption.
	ErrInvalidChunk = base.CorruptionErrorf("pebble/record: invalid chunk")
)

Functions

func IsInvalidRecord

func IsInvalidRecord(err error) bool

IsInvalidRecord returns true if the error matches one of the error types returned for invalid records. These are treated in a way similar to io.EOF in recovery code.

Types

type LogWriter

type LogWriter struct {
	// contains filtered or unexported fields
}

LogWriter writes records to an underlying io.Writer. In order to support WAL file reuse, a LogWriter's records are tagged with the WAL's file number. When reading a log file a record from a previous incarnation of the file will return the error ErrInvalidLogNum.

func NewLogWriter

func NewLogWriter(w io.Writer, logNum base.FileNum) *LogWriter

NewLogWriter returns a new LogWriter.

func (*LogWriter) Close

func (w *LogWriter) Close() error

Close flushes and syncs any unwritten data and closes the writer. Where required, external synchronisation is provided by commitPipeline.mu.

func (*LogWriter) Metrics

func (w *LogWriter) Metrics() *LogWriterMetrics

Metrics must be called after Close. The callee will no longer modify the returned LogWriterMetrics.

func (*LogWriter) SetMinSyncInterval

func (w *LogWriter) SetMinSyncInterval(minSyncInterval durationFunc)

SetMinSyncInterval sets the closure to invoke for retrieving the minimum sync duration between syncs.

func (*LogWriter) Size

func (w *LogWriter) Size() int64

Size returns the current size of the file. External synchronisation provided by commitPipeline.mu.

func (*LogWriter) SyncRecord

func (w *LogWriter) SyncRecord(p []byte, wg *sync.WaitGroup, err *error) (int64, error)

SyncRecord writes a complete record. If wg!= nil the record will be asynchronously persisted to the underlying writer and done will be called on the wait group upon completion. Returns the offset just past the end of the record. External synchronisation provided by commitPipeline.mu.

func (*LogWriter) WriteRecord

func (w *LogWriter) WriteRecord(p []byte) (int64, error)

WriteRecord writes a complete record. Returns the offset just past the end of the record. External synchronisation provided by commitPipeline.mu.

type LogWriterMetrics

type LogWriterMetrics struct {
	WriteThroughput   base.ThroughputMetric
	PendingBufferLen  base.GaugeSampleMetric
	SyncQueueLen      base.GaugeSampleMetric
	SyncLatencyMicros *hdrhistogram.Histogram
}

LogWriterMetrics contains misc metrics for the log writer.

func (*LogWriterMetrics) Merge

Merge merges metrics from x. Requires that x is non-nil.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads records from an underlying io.Reader.

func NewReader

func NewReader(r io.Reader, logNum base.FileNum) *Reader

NewReader returns a new reader. If the file contains records encoded using the recyclable record format, then the log number in those records must match the specified logNum.

func (*Reader) Next

func (r *Reader) Next() (io.Reader, error)

Next returns a reader for the next record. It returns io.EOF if there are no more records. The reader returned becomes stale after the next Next call, and should no longer be used.

func (*Reader) Offset

func (r *Reader) Offset() int64

Offset returns the current offset within the file. If called immediately before a call to Next(), Offset() will return the record offset.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer writes records to an underlying io.Writer.

func NewWriter

func NewWriter(w io.Writer) *Writer

NewWriter returns a new Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close finishes the current record and closes the writer.

func (*Writer) Flush

func (w *Writer) Flush() error

Flush finishes the current record, writes to the underlying writer, and flushes it if that writer implements interface{ Flush() error }.

func (*Writer) LastRecordOffset

func (w *Writer) LastRecordOffset() (int64, error)

LastRecordOffset returns the offset in the underlying io.Writer of the last record so far - the one created by the most recent Next call. It is the offset of the first chunk header, suitable to pass to Reader.SeekRecord.

If that io.Writer also implements io.Seeker, the return value is an absolute offset, in the sense of io.SeekStart, regardless of whether the io.Writer was initially at the zero position when passed to NewWriter. Otherwise, the return value is a relative offset, being the number of bytes written between the NewWriter call and any records written prior to the last record.

If there is no last record, i.e. nothing was written, LastRecordOffset will return ErrNoLastRecord.

func (*Writer) Next

func (w *Writer) Next() (io.Writer, error)

Next returns a writer for the next record. The writer returned becomes stale after the next Close, Flush or Next call, and should no longer be used.

func (*Writer) Size

func (w *Writer) Size() int64

Size returns the current size of the file.

func (*Writer) WriteRecord

func (w *Writer) WriteRecord(p []byte) (int64, error)

WriteRecord writes a complete record. Returns the offset just past the end of the record.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL