blocks

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2016 License: MIT Imports: 16 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrMissingPartitions = errors.New("existing block store missing partitions")
View Source
var ErrNoManifest = errors.New("no manifest file found")
View Source
var ErrPartitionNotFound = errors.New("the block store doesn't have the correct partition for the key")
View Source
var ErrWrongPartition = errors.New("the file is cleanly partitioned, but doesn't contain a partition we want")
View Source
var ErrWrongVersion = errors.New("wrong manifest version")

Functions

func KeyPartition

func KeyPartition(key string, totalPartitions int) (int, int)

KeyPartition grabs the partition for a key. Most of the time, this will match the way hadoop shuffles keys to reducers. It sometimes returns a second partition where the key may be; see the comment for alternatePathologicalKeyPartition below.

Types

type Block

type Block struct {
	ID        string
	Name      string
	Partition int
	Count     int

	sync.RWMutex
	// contains filtered or unexported fields
}

A block represents a chunk of data, all of the keys of which match a particular partition number. The data is partitioned using the same method Hadoop uses by default for shuffling data:

key.hashCode % partitions

So we can easily have blocks line up with files in a dataset.

func (*Block) Close

func (b *Block) Close()

func (*Block) Get

func (b *Block) Get(key []byte) (*Record, error)

type BlockStore

type BlockStore struct {
	Blocks   []*Block
	BlockMap map[int][]*Block
	// contains filtered or unexported fields
}

A BlockStore stores ingested key/value data in discrete blocks, each stored as a separate CDB file. The blocks are arranged and sorted in a way that takes advantage of the way that the output of hadoop jobs are laid out.

func New

func New(path string, numPartitions int, selectedPartitions map[int]bool, compression Compression, blockSize int) *BlockStore

func NewFromManifest

func NewFromManifest(path string, selectedPartitions map[int]bool) (*BlockStore, error)

func (*BlockStore) AddFile

func (store *BlockStore) AddFile(reader *sequencefile.Reader, throttle time.Duration) error

AddFile ingests the key/value pairs from the given sequencefile, writing them out to at least one block. If the data is not partitioned cleanly, it will sort it into blocks as it reads.

func (*BlockStore) Close

func (store *BlockStore) Close()

Close closes the BlockStore, and any files it has open.

func (*BlockStore) Count

func (store *BlockStore) Count() int

Count returns the total number of records stored.

func (*BlockStore) Delete

func (store *BlockStore) Delete() error

Delete removes any local data the BlockStore has stored.

func (*BlockStore) Get

func (store *BlockStore) Get(key string) (*Record, error)

Get returns the value for a given key. It returns ErrPartitionNotFound if the partition requested is not available locally.

func (*BlockStore) Save

func (store *BlockStore) Save() error

type Compression

type Compression string
const NoCompression Compression = "none"
const SnappyCompression Compression = "snappy"

type Record

type Record struct {
	ValueLen uint64
	// contains filtered or unexported fields
}

A Record is one key/value pair loaded from a block.

func (*Record) Close

func (r *Record) Close() error

func (*Record) Read

func (r *Record) Read(b []byte) (int, error)

func (*Record) WriteTo

func (r *Record) WriteTo(w io.Writer) (n int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL