recordio

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0 Imports: 10 Imported by: 6

README

RecordIO

A recordio file stores a sequence of items, with optional compression and/or encryption. Recordio also allows an application to generate indices.

An API documentation is available at https://godoc.org/github.com/grailbio/base/recordio

RecordIO file structure

The following picture shows the structure of a recordio file.

recordio format

A recordio file logically stores a list of items. Items are grouped into blocks. Each block may be compressed or encrypted, then split into sequence of chunks and stored in the file.

There are three types of blocks: header, body, and trailer. These block types have a common structure:

block :=
  number of items (varint)
  item 0 size (varint)
  …
  item K-1 size (varint)
  item 0 body (bytes)
  …
  item K-1 body (bytes)
Header block

Header block is the first block in the file. Header block contains one item. The sole item stores a flat key-value mappings of the following form:

header item := List of (metakey, metavalue)
metakey := value
metavalue := value
value := valuetype valuebytes
valuetype := one byte, where
    1 if the valuebytes is a utf-8 string
    2 if the valuebytes is a signed varint
    3 if the valuebytes is a unsigned varint
    4 if the valuebytes is a IEEE float64 LE
valuebytes :=
    For utf-8, length as uvarint, followed by contents.
    For other data types, just encode the data raw.

Note: we could have defined the header as a protomessage, but we also wanted to avoid depending on the proto library. It would complicate cross-language integration.

The user can add arbitrary (metakey, metavalue) pairs in the header, but a few metakey values are reserved.

Key Value
trailer Bool. Whether the file contains a trailer block
transformer "flate", "zstd", etc.

TODO: Reserve keys for encryption.

Body block

Body block contains actual user data.

Trailer block

Trailer block is optional. It contains a single arbitrary item. Typically, it stores an index in an application-specific format so that the application can seek into arbitrary item if needed.

Recordio library provides a way to read the trailer block in a constant time.

Structure of a block

At rest, a block is optionally compressed and encrypted. The resulting data is then split into multiple chunks. Size of a chunk is fixed at 32KiB. The chunk structure allows an application to detect a corrupt chunk and skip to the next chunk or block.

Each chunk contains a 28 byte header.

chunk :=
    magic (8 bytes)
    CRC32 (4 bytes LE)
    flag (4 bytes LE)
    chunk payload size (4 bytes LE)
    totalChunks (4 bytes LE)
    chunk index (4 bytes LE)
    payload (bytes)
  • The 8-byte magic header tells whether the chunk is part of header, body, or a trailer.

    The current recordio format defines three magic numbers: MagicHeader, MagicPacked, and MagicTrailer.

  • The chunk payload size is (32768 - 28), unless it is for the final chunk of a block. For the final chunk, the "chunk payload size" stores the size of the block contents, and the chunk is filled with garbage to make it 32KiB at rest.

  • totalChunks is the number of chunks in the block. All the chunks in the same block stores the same totalChunks value.

  • Chunk index is 0 for the first chunk of the block, 1 for the second chunk of the block, and so on. The index resets to zero at the start of the next block.

  • Flag is a 32-bit bitmap. It is not used currently.

  • CRC is the IEEE CRC32 checksum of the rest of the chunk (payload size, index, flag, plus the payload).

Compression and encryption

A block can be optionally compressed and/or encrypted using transformers. The following example demonstrates the use of flate compression.

https://github.com/grailbio/base/tree/master/recordio/example_basic_test.go

Recordio library provides a few standard transformers:

To register zstd, for example, call

recordiozstd.Init()

somewhere before writing or reading the recordio file. Then when writing, set transformer "zstd" in WriterOpts.Transformers. The transformer name is recorded in the recordio header block. The recordio reader reads the header, discovers the transformer name, and automatically creates a matching reverse transformer function.

You can also register your own transformers. To do that, add transformer factories when the application starts, using RegisterTransformer. See recordioflate and recordiozstd source code for examples.

Indexing

An application can arrange a callback function to be run when items are written to storage. Such a callback can be used to build an index in a format of application's choice. The following example demonstrates indexing.

https://github.com/grailbio/base/tree/master/recordio/example_indexing_test.go

The index is typically written in the trailer block of the recordio file. The recordio scanner provides a feature to read the trailer block.

Legacy file format

The recordio package supports a legacy file format that was in use before 2018-03. recordio.Scanner supports both the current and the legacy file formats transparently. The legacy file can still be produced using the deprecated/LegacyWriter class, but we discourage its use; its support may be completely removed in a future.

The legacy file format has the following structure:

<header 0><record 0>
<header 1><record 1>
...

Each header is:

8 bytes: magic number
8 bytes: 64 bit length of payload, little endian
4 bytes: IEEE CRC32 of the length, little endian
<record>: length bytes

The magic number is included to allow for the possibility of scanning to the next record in the case of a corrupted file.

For the packed format each record (i.e. payload above) is as follows:

uint32 little endian: IEEE CRC32 of all the varints that follow.
uint32 varint: number of items in the record (n)
uint32 varint: size of <item 0>
uint32 varint: size of <item 1>
...
uint32 varint: size of <item n>

<item 0>
<item 1>
..
<item n>

For the simple recordio format (not packed), indexing is supported via the Index callback which is called whenever a new record is written:

Index func(offset, length uint64, v interface{}, p []byte) error

offset: the absolute offset in the stream that the record is
        written at, including its header
length: the size of the record being written, including the header.
v:      the object marshaled if Marshal was used to write an object,
        nil otherwise
p:      the byte slice being written

The intended use is to instantiate a new Scanner at the specified offset in underlying file/stream.

For the packed format indexing is a more involved due to the need to identify the start of each item as well as the record. To this end, the Index callback is called in two ways, and a second Flush callback is also provided.

At the start of a record:

offset: the absolute offset, including the recordio header
length: is the size of the entire record being written (the sum of the
        of the sizes of the items and associated metadata), including
        the recordio header.
v:      nil
p:      nil

For each item written to a single record:

offset: the offset from the start of the data portion of the record
        that contains this item
length: the size of the item
v:      the object marshaled if Marshal was used to write an object,
        nil otherwise
p:      the byte slice being written

Documentation

Overview

Package recordio implements the recordio file format. A recordio file stores a sequence of items, with optional compressiond, encryption, and indexing.

See the README.md file (https://github.com/grailbio/base/blob/master/recordio/README.md) for more detailed documentation.

Example (Basic)

Example_basic demonstrates basic reads, writes, and flate complession.

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/grailbio/base/recordio"
	"github.com/grailbio/base/recordio/recordioflate"
)

func init() {
	recordioflate.Init()
}

func doWrite(out io.Writer) {
	wr := recordio.NewWriter(out, recordio.WriterOpts{
		Transformers: []string{"flate"},
		Marshal:      func(scratch []byte, v interface{}) ([]byte, error) { return []byte(v.(string)), nil },
	})
	wr.Append("Item0")
	wr.Append("Item1")
	if err := wr.Finish(); err != nil {
		panic(err)
	}
}

func doRead(in io.ReadSeeker) {
	r := recordio.NewScanner(in, recordio.ScannerOpts{
		Unmarshal: func(data []byte) (interface{}, error) { return string(data), nil },
	})
	for r.Scan() {
		fmt.Printf("Item: %s\n", r.Get().(string))
	}
	if err := r.Err(); err != nil {
		panic(err)
	}
}

// Example_basic demonstrates basic reads, writes, and flate complession.
func main() {
	buf := &bytes.Buffer{}
	doWrite(buf)
	doRead(bytes.NewReader(buf.Bytes()))
}
Output:

Item: Item0
Item: Item1
Example (Indexing)
package main

import (
	"bytes"
	"encoding/gob"
	"fmt"
	"io"

	"github.com/grailbio/base/recordio"
)

type recordioIndex map[string]recordio.ItemLocation

func doWriteWithIndex(out io.Writer) {
	index := make(recordioIndex)
	wr := recordio.NewWriter(out, recordio.WriterOpts{
		Marshal: func(scratch []byte, v interface{}) ([]byte, error) { return []byte(v.(string)), nil },
		Index: func(loc recordio.ItemLocation, val interface{}) error {
			index[val.(string)] = loc
			return nil
		},
	})

	// To store a trailer block, AddHeader(recordio.KeyTrailer, true) must be
	// called beforehand.
	wr.AddHeader(recordio.KeyTrailer, true)
	wr.Append("Item0")
	wr.Append("Item1")
	wr.Append("Item2")
	wr.Flush()
	// Wait for the index callbacks to run.
	wr.Wait()

	// Write the index in the trailer.
	indexBuf := &bytes.Buffer{}
	encoder := gob.NewEncoder(indexBuf)
	if err := encoder.Encode(index); err != nil {
		panic(err)
	}
	wr.SetTrailer(indexBuf.Bytes())
	if err := wr.Finish(); err != nil {
		panic(err)
	}
}

func doReadWithIndex(in io.ReadSeeker) {
	r := recordio.NewScanner(in, recordio.ScannerOpts{
		Unmarshal: func(data []byte) (interface{}, error) { return string(data), nil },
	})
	// Read the trailer, parse it into the recordioIndex.
	decoder := gob.NewDecoder(bytes.NewReader(r.Trailer()))
	index := make(recordioIndex)
	if err := decoder.Decode(&index); err != nil {
		panic(err)
	}
	// Try reading individual items.
	r.Seek(index["Item1"])
	for r.Scan() {
		fmt.Printf("Item: %s\n", r.Get().(string))
	}
	r.Seek(index["Item0"])
	for r.Scan() {
		fmt.Printf("Item: %s\n", r.Get().(string))
	}
	if err := r.Err(); err != nil {
		panic(err)
	}
}

func main() {
	buf := &bytes.Buffer{}
	doWriteWithIndex(buf)
	doReadWithIndex(bytes.NewReader(buf.Bytes()))
}
Output:

Item: Item1
Item: Item2
Item: Item0
Item: Item1
Item: Item2

Index

Examples

Constants

View Source
const (

	// KeyTrailer must be set to true when the recordio file contains a trailer.
	// value type: bool
	KeyTrailer = "trailer"

	// KeyTransformer defines transformer functions used to encode blocks.
	KeyTransformer = "transformer"
)
View Source
const (
	// DefaultFlushParallelism is the default value for WriterOpts.MaxFlushParallelism.
	DefaultFlushParallelism = uint32(8)

	// MaxFlushParallelism is the max allowed value for WriterOpts.MaxFlushParallelism.
	MaxFlushParallelism = uint32(128)

	// MaxPackedItems defines the max items that can be
	// packed into a single record by a PackedWriter.
	MaxPackedItems = uint32(10 * 1024 * 1024)
	// DefaultPackedItems defines the default number of items that can
	// be packed into a single record by a PackedWriter.
	DefaultPackedItems = uint32(16 * 1024)
)

Variables

View Source
var MagicPacked = internal.MagicPacked

MagicPacked is the chunk header for legacy and v2 data chunks. Not for general use.

View Source
var MaxReadRecordSize = internal.MaxReadRecordSize

MaxReadRecordSize defines a max size for a record when reading to avoid crashes for unreasonable requests.

Functions

func RegisterTransformer

func RegisterTransformer(name string, transformer TransformerFactory, untransformer TransformerFactory)

RegisterTransformer registers a block transformer. Factory transformer should produce a transformer function. The factory is run by NewWriterV2. The transformer function is called by the writer to transform a block just before storing it in storage.

The untransformer factory is the reverse of the transformer factory. It is run by NewScannerV2. The untransformer function is called by the scanner to transform data read from storage into a block.

This function is usually called when the process starts.

The transformer and untransformer factories, as well as the functions generated by these factories must be all thread safe.

REQUIRES: A (un)transformer with the same "name" has not been registered already.

func RequiredSpaceUpperBound added in v0.0.11

func RequiredSpaceUpperBound(itemSizes []int64, recordSize int64) int64

RequiredSpaceUpperBound returns an upper bound on the space required to store n items of itemSizes for a specified record size.

Types

type FormatVersion

type FormatVersion int

FormatVersion defines the file-format version. Not for general use. It may be removed without notice.

const (
	// V1 is pre 2018-02 format
	V1 FormatVersion = 1
	// V2 is post 2018-02 format
	V2 FormatVersion = 2
)

type IndexFunc

type IndexFunc func(loc ItemLocation, item interface{}) error

IndexFunc runs after an item is flushed to storage. Parameter "loc" is the location of the item in the file. It can be later passed to Reader.Seek method to seek to the item.

type ItemLocation

type ItemLocation struct {
	// Location of the first byte of the block within the file. Unit is bytes.
	Block uint64
	// Index of the item within the block. The Nth item in the block (N=1,2,...)
	// has value N-1.
	Item int
}

ItemLocation identifies the location of an item in a recordio file.

type KeyValue

type KeyValue struct {
	// Key is the header key
	Key string
	// Value is the value corresponding to Key. The value must be one of int*,
	// uint*, float*, bool, or string type.
	Value interface{}
}

KeyValue defines one entry stored in a recordio header block

type MarshalFunc

type MarshalFunc func(scratch []byte, v interface{}) ([]byte, error)

MarshalFunc is called to serialize data. Parameter scratch is passed as an performance hint. If the result of the transformation fits in scratch, the function should store the result in scratch and return it as the first return value. Else, it should allocate a new []byte and return it.

type ParsedHeader

type ParsedHeader []KeyValue

ParsedHeader is the result of parsing the recordio header block contents.

func (*ParsedHeader) HasTrailer

func (h *ParsedHeader) HasTrailer() bool

HasTrailer checks if the header has a "trailer" entry.

type Scanner

type Scanner interface {
	// Header returns the contents of the header block.
	Header() ParsedHeader

	// Scan returns true if a new record was read, false otherwise. It will return
	// false on encountering an error; the error may be retrieved using the Err
	// method. Note, that Scan will reuse storage from one invocation to the next.
	Scan() bool

	// Get returns the current item as read by a prior call to Scan.
	//
	// REQUIRES: Preceding Scan calls have returned true. There is no Seek
	// call between the last Scan call and the Get call.
	Get() interface{}

	// Err returns any error encountered by the writer. Once Err() becomes
	// non-nil, it stays so.
	Err() error

	// Set up so that the next Scan() call causes the pointer to move to the given
	// location.  On any error, Err() will be set.
	//
	// REQUIRES: loc must be one of the values passed to the Index callback
	// during writes.
	Seek(loc ItemLocation)

	// Trailer returns the trailer block contents.  If the trailer does not exist,
	// or is corrupt, it returns nil.  The caller should examine Err() if Trailer
	// returns nil.
	Trailer() []byte

	// Return the file format version. Not for general use.
	Version() FormatVersion

	// Finish should be called exactly once, after the application has finished
	// using the scanner. It returns the value of Err().
	//
	// The Finish method recycles the internal scanner resources for use by other
	// scanners, thereby reducing GC overhead. THe application must not touch the
	// scanner object after Finish.
	Finish() error
}

Scanner defines an interface for recordio scanner.

A Scanner implementation must be thread safe. Legal path expression is defined below. Err, Header, and Trailer can be called at any time.

((Scan Get*) | Seek)* Finish

func NewScanner

func NewScanner(in io.ReadSeeker, opts ScannerOpts) Scanner

NewScanner creates a new recordio scanner. The reader can read both legacy recordio files (packed or unpacked) or the new-format files. Any error is reported through the Scanner.Err method.

func NewShardScanner

func NewShardScanner(in io.ReadSeeker, opts ScannerOpts, start, limit, nshard int) Scanner

NewShardScanner creates a new sharded recordio scanner. The returned scanner reads shard [start,limit) (of [0,nshard)) of the recordio file at the ReadSeeker in. Sharding is only supported for v2 recordio files; an error scanner is returned if NewShardScanner is called for a legacy recordio file.

NewShardScanner with shard and nshard set to 0 and 1 respectively (i.e., a single shard) behaves as NewScanner.

type ScannerOpts

type ScannerOpts struct {
	// LegacyTransform is used only to read the legacy recordio files. For the V2
	// recordio files, this field is ignored, and transformers are constructed
	// from the header metadata.
	LegacyTransform TransformFunc

	// Unmarshal transforms a byte slice into an application object. It is called
	// for every item read from storage. If nil, a function that returns []byte
	// unchanged is used. The return value from Unmarshal can be retrieved using
	// the Scanner.Get method.
	Unmarshal func(in []byte) (out interface{}, err error)
}

ScannerOpts defines options used when creating a new scanner.

type TransformFunc

type TransformFunc func(scratch []byte, in [][]byte) (out []byte, err error)

TransformFunc is called to (un)compress or (un)encrypt data. Parameter scratch is passed as an performance hint. If the result of the transformation fits in scratch, the function should store the result in scratch and return it as the first return value. Else, it should allocate a new []byte and return it.

type TransformerFactory

type TransformerFactory func(config string) (TransformFunc, error)

TransformerFactory is a function that creates a new TransformerFunc given an optional config string.

type Writer

type Writer interface {
	// Add an arbitrary metadata to the file. This method must be called
	// before any other Append* or Set* functions. If the key had been already added
	// to the header, this method will overwrite it with the value.
	//
	// REQUIRES: Append, SetTrailer, Finish have not been called.
	AddHeader(key string, value interface{})

	// Write one item. The marshaler will be eventually called to
	// serialize the item.  The type of v must match the input type for
	// the Marshal function passed when the writer is created. Note that
	// since marhsalling is performed asynchronously, the object passed
	// to append should be considered owned by the writer, and must not
	// be reused by the caller.
	//
	// The writer flushes items to the storage in the order of addition.
	//
	// REQUIRES: Finish and SetTrailer have not been called.
	Append(v interface{})

	// Schedule to flush the current block. The next item will be written in a new
	// block. This method just schedules for flush, and returns before the block
	// is actually written to storage. Call Wait to wait for Flush to finish.
	Flush()

	// Block the caller until all the prior Flush calls finish.
	Wait()

	// Add an arbitrary data at the end of the file. After this function, no
	// {Add*,Append*,Set*} functions may be called.
	//
	// REQUIRES: AddHeader(KeyTrailer, true) has been called.
	SetTrailer([]byte)

	// Err returns any error encountered by the writer. Once Err() becomes
	// non-nil, it stays so.
	Err() error

	// Finish must be called at the end of writing. Finish will internally call
	// Flush, then returns the value of Err. No method, other than Err, shall be
	// called in a future.
	Finish() error
}

Writer defines an interface for recordio writer. An implementation must be thread safe.

Legal path expression is defined below. Err can be called at any time, so it is not included in the expression. ? means 0 or 1 call, * means 0 or more calls.

AddHeader*
(Append|Flush)*
SetTrailer?
Finish

func NewWriter

func NewWriter(wr io.Writer, opts WriterOpts) Writer

NewWriter creates a new writer. New users should use this class instead of Writer, PackedWriter, or ConcurrentPackedWriter.

Caution: files created by this writer cannot be read by a legacy recordio.Scanner.

type WriterOpts

type WriterOpts struct {
	// Marshal is called for every item added by Append. It serializes the the
	// record. If Marshal is nil, it defaults to a function that casts the value
	// to []byte and returns it. Marshal may be called concurrently.
	Marshal MarshalFunc

	// Index is called for every item added, just before it is written to
	// storage. Index callback may be called concurrently and out of order of
	// locations.
	//
	// After Index is called, the Writer guarantees that it never touches
	// the value again. The application may recycle the value in a freepool, if it
	// desires. Index may be nil.
	Index IndexFunc

	// Transformer specifies a list of functions to compress, encrypt, or modify
	// data in any other way, just before a block is written to storage.
	//
	// Each entry in Transformer must be of form "name" or "name config.."  The
	// "name" is matched against the registry (see RegisterTransformer).  The
	// "config" part is passed to the transformer factory function.  If "name" is
	// not registered, the writer will fail immediately.
	//
	// If Transformers contains multiple strings, Transformers[0] is invoked
	// first, then its results are passed to Transformers[1], so on.
	//
	// If len(Transformers)==0, then an identity transformer is used. It will
	// return the block as is.
	//
	// Recordio package includes the following standard transformers:
	//
	//  "zstd N" (N is -1 or an integer from 0 to 22): zstd compression level N.
	//  If " N" part is omitted or N=-1, the default compression level is used.
	//  To use zstd, import the 'recordiozstd' package and call
	//  'recordiozstd.Init()' in an init() function.
	//
	//  "flate N" (N is -1 or an integer from 0 to 9): flate compression level N.
	//  If " N" part is omitted or N=-1, the default compression level is used.
	//  To use flate, import the 'recordioflate' package and call
	//  'recordioflate.Init()' in an init() function.
	Transformers []string

	// MaxItems is the maximum number of items to pack into a single record.
	// It defaults to DefaultPackedItems if set to 0.
	// If MaxItems exceeds MaxPackedItems it will silently set to MaxPackedItems.
	MaxItems uint32

	// MaxFlushParallelism limits the maximum number of block flush operations in
	// flight before blocking the application. It defaults to
	// DefaultMaxFlushParallelism.
	MaxFlushParallelism uint32

	// REQUIRES: AddHeader(KeyTrailer, true) has been called or the KeyTrailer
	// option set to true.
	KeyTrailer bool

	// SkipHeader skips writing out the header and starts in the
	// `wStateWritingBody` state.
	SkipHeader bool
}

WriterOpts defines options used when creating a new writer.

Directories

Path Synopsis
Package recordioflate provides the "flate" transformer.
Package recordioflate provides the "flate" transformer.
Package recordioiov provides utility functions for dealing with [][]bytes, used by recordio transformers.
Package recordioiov provides utility functions for dealing with [][]bytes, used by recordio transformers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL