recordio

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

README

RecordIO

Build Status GoDoc License

RecordIO is a file format created for PaddlePaddle Elastic Deep Learning. It is generally useful for distributed computing.

Motivations

Static Sharding v.s. Dynamic Sharding

In distributed computing, we often need to partition and dispatch data to worker processes. A commonly-used solution, known as static sharding, is to define each data shard as a file and to map each file to a worker process. However, when we are doing fault-tolerant distributed computing or elastic scheduling of distributed computing jobs, the total number of worker processes might change at runtime, and static sharding doesn't work. In such cases, we want to partition records in a file into data shards -- an approach known as dynamic sharding.

ReocrdIO and Dynamic Sharding

We define RecordIO file format to support dynamic sharding. A RecordIO file consists of a sequence of records grouped by chunks. We could build an index of records by reading through a file quickly while skipping over chunks. We then use this index data structure to seek to the beginning of any record. In this way, we can locate any dynamic shard efficiently.

The Go API

Writing
f, _ := os.Create("a_file.recordio")
w := recordio.NewWriter(f, -1, -1)
w.Write([]byte("Hello"))
w.Write([]byte("World,"))
w.Write([]byte("RecordIO!"))
w.Close()
f.Close()

Reading

  1. Load chunk index:

    f, _ := os.Open("a_file.recordio")
    idx, _ := recordio.LoadIndex(f)
    fmt.Println("Total records: ", idx.NumRecords())
    
  2. Create one or more scanner to read a range of records. The following example reads 2 records starting from record 1.

    s := recordio.NewScanner(f, idx, 1, 2)
    for s.Scan() {
       fmt.Println(string(s.Record()))
    }
    if s.Error() != nil && s.Error() != io.EOF {
       fmt.Println("Something wrong with scanning: %v", s.Error())
    }
    f.Close()
    

The Python Binding

We provide a Python binding of the Go implementation. For more information please refer to python/README.md.

Documentation

Index

Constants

View Source
const (
	// NoCompression means writing raw chunk data into files.
	// With other choices, chunks are compressed before written.
	NoCompression = iota
	// Snappy had been the default compressing algorithm widely
	// used in Google.  It compromises between speech and
	// compression ratio.
	Snappy
	// Gzip is a well-known compression algorithm.  It is
	// recommmended only you are looking for compression ratio.
	Gzip
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index consists offsets and sizes of the consequetive chunks in a RecordIO file.

func LoadIndex

func LoadIndex(r io.ReadSeeker) (*Index, error)

LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.

func (*Index) Locate

func (r *Index) Locate(recordIndex int) (int, int)

Locate returns the index of chunk that contains the given record, and the record index within the chunk. It returns (-1, -1) if the record is out of range.

func (*Index) NumChunks

func (r *Index) NumChunks() int

NumChunks returns the total number of chunks in a RecordIO file.

func (*Index) NumRecords

func (r *Index) NumRecords() int

NumRecords returns the total number of records in a RecordIO file.

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

Scanner scans records in a specified range within [0, numRecords).

func NewScanner

func NewScanner(r io.ReadSeeker, index *Index, start, len int) *Scanner

NewScanner creates a scanner that sequencially reads records in the range [start, start+len). If start < 0, it scans from the beginning. If len < 0, it scans till the end of file.

func (*Scanner) Error

func (s *Scanner) Error() error

Error returns the error that stopped Scan.

func (*Scanner) Record

func (s *Scanner) Record() []byte

Record returns the record under the current cursor.

func (*Scanner) Scan

func (s *Scanner) Scan() bool

Scan moves the cursor forward for one record and loads the chunk containing the record if not yet.

type Writer

type Writer struct {
	io.Writer // Set to nil to mark a closed writer.
	// contains filtered or unexported fields
}

Writer creates a RecordIO file.

func NewWriter

func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer

NewWriter creates a RecordIO file writer. Each chunk is compressed using the deflate algorithm given compression level. Note that level 0 means no compression and -1 means default compression.

func (*Writer) Close

func (w *Writer) Close() error

Close flushes the current chunk and makes the writer invalid.

func (*Writer) Write

func (w *Writer) Write(record []byte) (int, error)

Writes a record. It returns an error if Close has been called.

Directories

Path Synopsis
Note: this file is part of python extension to recordio.
Note: this file is part of python extension to recordio.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL