sequencefile

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2023 License: MIT Imports: 14 Imported by: 4

README

Sequencefile

Go Reference

This is a native Go implementation of Hadoop's SequenceFile format.

Usage

sf, err := sequencefile.Open("foo.sequencefile")
if err != nil {
  log.Fatal(err)
}

// Iterate through the file.
for sf.Scan() {
  // Do something with sf.Key() and sf.Value()
}

if sf.Err() != nil {
  log.Fatal(err)
}

Reading files written by Hadoop

Hadoop adds another layer of serialization for individual keys and values, depending on the class used, like BytesWritable. By default, this library will return the raw key and value bytes, still serialized. You can use the following methods to unwrap them:

func BytesWritable(b []byte) []byte
func Text(b []byte) string
func IntWritable(b []byte) int32
func LongWritable(b []byte) int64

Documentation

Overview

Package sequencefile provides functionality for reading and writing Hadoop's SequenceFile format, documented here: http://goo.gl/sOSJmJ

Index

Examples

Constants

View Source
const (
	SyncSize = 16

	GzipClassName   = "org.apache.hadoop.io.compress.GzipCodec"
	SnappyClassName = "org.apache.hadoop.io.compress.SnappyCodec"
	ZlibClassName   = "org.apache.hadoop.io.compress.DefaultCodec"
	ZstdClassName   = "org.apache.hadoop.io.compress.ZStandardCodec"
	Bzip2ClassName  = "org.apache.hadoop.io.compress.BZip2Codec"
)
View Source
const (
	BytesWritableClassName = "org.apache.hadoop.io.BytesWritable"
	TextClassName          = "org.apache.hadoop.io.Text"
	IntWritableClassName   = "org.apache.hadoop.io.IntWritable"
	LongWritableClassName  = "org.apache.hadoop.io.LongWritable"
)

Variables

This section is empty.

Functions

func BytesWritable

func BytesWritable(b []byte) []byte

BytesWritable unwraps a hadoop BytesWritable and returns the actual bytes.

func IntWritable

func IntWritable(b []byte) int32

IntWritable unwraps an IntWritable and returns the deserialized int32.

func LongWritable

func LongWritable(b []byte) int64

LongWritable unwraps an LongWritable and returns the deserialized int64.

func ReadVInt

func ReadVInt(r io.Reader) (int64, error)

ReadVInt reads an int64 encoded in hadoop's "VInt" format, described and implemented here: https://goo.gl/1h4mrG. It does at most two reads to the underlying io.Reader.

func Text

func Text(b []byte) string

Text unwraps a Text and returns the deserialized string.

func WriteVInt

func WriteVInt(w io.Writer, i int64) (err error)

WriteVInt writes an int64 encoded in Hadoop's "VInt" format.

Types

type Compression

type Compression int
const (
	NoCompression Compression = iota + 1
	RecordCompression
	BlockCompression
)

type CompressionCodec

type CompressionCodec int
const (
	GzipCompression CompressionCodec = iota + 1
	SnappyCompression
	ZlibCompression
	ZstdCompression
	Bzip2Compression
)
type Header struct {
	Version                   int
	Compression               Compression
	CompressionCodec          CompressionCodec
	CompressionCodecClassName string
	KeyClassName              string
	ValueClassName            string
	Metadata                  map[string]string
	SyncMarker                string
}

A Header represents the information contained in the header of the SequenceFile.

type Reader

type Reader struct {
	Header Header
	// contains filtered or unexported fields
}

A Reader reads key/value pairs from a SequenceFile input stream.

A reader is valid at any key or block offset; it's safe to start in the middle of a file or seek the underlying input stream if the location was recorded between calls to Scan, and as long as you call Reset after seeking. Note, however, that with a block-compressed file (Header.Compression set to BlockCompression), the position will be at the beginning of the block that holds the key, not right before the key itself.

Example
package main

import (
	"fmt"
	"log"

	"github.com/colinmarc/sequencefile"
)

func main() {
	sf, err := sequencefile.Open("testdata/block_compressed_snappy.sequencefile")
	if err != nil {
		log.Fatal(err)
	}

	// Iterate through the file.
	for sf.Scan() {
		// Unwrap the BytesWritable values.
		key := sequencefile.BytesWritable(sf.Key())
		value := sequencefile.BytesWritable(sf.Value())
		fmt.Println(string(key), string(value))
	}

	if sf.Err() != nil {
		log.Fatal(err)
	}

}
Output:

Alice Practice
Bob Hope

func NewReader

func NewReader(r io.Reader) *Reader

New returns a new Reader for a SequenceFile, reading data from r. If the io.Reader is positioned at the start of a file, you should immediately call ReadHeader to read through the header.

func NewReaderCompression

func NewReaderCompression(r io.Reader, compression Compression, codec CompressionCodec) *Reader

New returns a new Reader for a SequenceFile, reading data from r. Normally, compression options are inferred from the header of a file, but if the header is unavailable (because you're starting mid-stream) you can call this method with the compression options set explicitly.

func Open

func Open(path string) (*Reader, error)

Open opens a SequenceFile on disk and immediately reads the header.

func (*Reader) Err

func (r *Reader) Err() error

Err returns the first non-EOF error reached while scanning.

func (*Reader) Key

func (r *Reader) Key() []byte

Key returns the key for the current record. The byte slice will be reused after the next call to Scan.

func (*Reader) ReadHeader

func (r *Reader) ReadHeader() error

ReadHeader parses the SequenceFile header from the input stream, and fills in the Header struct with the values. This should be called when the reader is positioned at the start of the file or input stream, before any records are read.

ReadHeader will also validate that the settings of the SequenceFile (version, compression, key/value serialization, etc) are compatible.

func (*Reader) Reset

func (r *Reader) Reset()

Reset resets the internal state of the reader, but maintains compression settings and header information. You should call Reset if you seek the underlying reader, but should create an entirely new Reader if you are starting a different file.

func (*Reader) Scan

func (r *Reader) Scan() bool

Scan advances the reader to the start of the next record, reading the key and value into memory. These can then be obtained by calling Key and Value. If the end of the file is reached, or there is an error, Scan will return false.

func (*Reader) Value

func (r *Reader) Value() []byte

Value returns the value for the current record. The byte slice will be reused after the next call to Scan.

type WritableWriter

type WritableWriter func(io.Writer, interface{}) error

A WritableWriter knows how to write data wrapped in Hadoop Writables.

Each WritableWriter understands just a single type of data.

func NewWritableWriter

func NewWritableWriter(className string) (WritableWriter, error)

NewWritableWriter gets a WritableWriter for a given Hadoop class name.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

A Writer writes key/value pairs to a sequence file output stream.

Example
package main

import (
	"bytes"
	"log"

	"github.com/colinmarc/sequencefile"
)

func main() {
	var buf bytes.Buffer

	cfg := &sequencefile.WriterConfig{
		Writer:     &buf,
		KeyClass:   sequencefile.BytesWritableClassName,
		ValueClass: sequencefile.BytesWritableClassName,
	}
	w, err := sequencefile.NewWriter(cfg)
	if err != nil {
		log.Fatal(err)
	}
	defer w.Close()

	pairs := []struct{ k, v string }{
		{"Alice", "Practice"},
		{"Bob", "Hope"},
	}
	for _, p := range pairs {
		err = w.Append([]byte(p.k), []byte(p.v))
		if err != nil {
			log.Fatal(err)
		}
	}
}
Output:

func NewWriter

func NewWriter(cfg *WriterConfig) (w *Writer, err error)

NewWriter constructs a new Writer.

func (*Writer) Append

func (w *Writer) Append(key interface{}, value interface{}) (err error)

Append adds a key/value pair to this Writer. The types of the key and value must match the KeyClass and ValueClass this Writer was configured with.

func (*Writer) Close

func (w *Writer) Close() error

Close frees resources held by this Writer.

type WriterConfig

type WriterConfig struct {
	// Writer is where data will be written to.
	Writer io.Writer

	// KeyClass is the type of each key to be written.
	KeyClass string

	// ValueClass is the type of each value to be written.
	ValueClass string

	// Compression is the type of compression to be used.
	// Either none, record or block.
	Compression Compression

	// CompressionCoded is the codec to be used for compression.
	// This is only relevant if compression is used.
	CompressionCodec CompressionCodec

	// BlockSize is the size of each block for compression.
	// This is only relevant if block compression is used.
	BlockSize int

	// Metadata contains key/value pairs to be added to the header.
	Metadata map[string]string

	// Rand is a source of random numbers. Should usually be nil, but useful
	// for reproducible output.
	Rand *rand.Rand
}

A WriterConfig specifies the configuration for a Writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL