sstables

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

README

Using SSTables

SSTables allow you to store a large amount of key/value data on disk and query it efficiently by key or by key ranges. Unsurprisingly, this very format is at the heart of many NoSQL databases (i.e. HBase and Cassandra).

The flavor that is implemented in this library favours small keys and large values (eg. images), since it stores the key index in memory and the values remain on disk. A fully out-of-core version or secondary indices are currently not implemented. Features like bloom filter for faster key look-ups are already in place, so it is not too difficult to add later on.

Writing an SSTable

All files (key index, bloom filter, metadata info) that are necessary to store an SSTable are found under a given basePath in your filesystem. Which means that we can just start writing by creating a directory and appending some key/value pairs.

In the previous section we already saw how to transform a memstore into a sstable.
This example shows how to stream already sorted data into a file:


path := "/tmp/sstable_example/"
os.MkdirAll(path, 0777)
defer os.RemoveAll(path)

writer, err := sstables.NewSSTableStreamWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

// error checks omitted
err = writer.WriteNext([]byte{1}, []byte{1})
err = writer.WriteNext([]byte{2}, []byte{2})
err = writer.WriteNext([]byte{3}, []byte{3})

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

Keep in mind that streaming data requires a comparator (for safety), which will error on writes that are out of order.

Since that is somewhat cumbersome, you can also directly write a full skip list using the SimpleWriter:

path := "/tmp/sstable_example/"
os.MkdirAll(path, 0777)
defer os.RemoveAll(path)

writer, err := sstables.NewSSTableSimpleWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator))
if err != nil { log.Fatalf("error: %v", err) }

skipListMap := skiplist.NewSkipListMap(skiplist.BytesComparator)
skipListMap.Insert([]byte{1}, []byte{1})
skipListMap.Insert([]byte{2}, []byte{2})
skipListMap.Insert([]byte{3}, []byte{3})

err = writer.WriteSkipListMap(skipListMap)
if err != nil { log.Fatalf("error: %v", err) }

Reading an SSTable

Reading can be done by using having a path and the respective comparator. Below example will show what metadata is available, how to get values and check if they exist and how to do a range scan.

reader, err := sstables.NewSSTableReader(
    sstables.ReadBasePath("/tmp/sstable_example/"),
    sstables.ReadWithKeyComparator(skiplist.BytesComparator))
if err != nil { log.Fatalf("error: %v", err) }
defer reader.Close()

metadata := reader.MetaData()
log.Printf("reading table with %d records, minKey %d and maxKey %d", metadata.NumRecords, metadata.MinKey, metadata.MaxKey)

contains := reader.Contains([]byte{1})
val, err := reader.Get([]byte{1})
if err != nil { log.Fatalf("error: %v", err) }
log.Printf("table contains value for key? %t = %d", contains, val)

it, err := reader.ScanRange([]byte{1}, []byte{2})
for {
    k, v, err := it.Next()
    // io.EOF signals that no records are left to be read
    if err == sstables.Done {
        break
    }
    if err != nil { log.Fatalf("error: %v", err) }

    log.Printf("%d = %d", k, v)
}

You can get the full example from examples/sstables.go.

Merging two (or more) SSTables

One of the great features of SSTables is that you can merge them in linear time and in a sequential fashion, which needs only constant amount of space.

In this library, this can be easily composed here via full-table scanners and and a writer to output the resulting merged table:

var iterators []SSTableIteratorI
var iteratorContext []inteface{}
for i := 0; i < numFiles; i++ {
    reader, err := NewSSTableReader(
            ReadBasePath(sstablePath),
            ReadWithKeyComparator(skiplist.BytesComparator))
    if err != nil { log.Fatalf("error: %v", err) }
    defer reader.Close()
    
    it, err := reader.Scan()
    if err != nil { log.Fatalf("error: %v", err) }
    
    iterators = append(iterators, it)   
    iteratorContext = append(iteratorContext, i)
}

writer, err := sstables.NewSSTableSimpleWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator))
if err != nil { log.Fatalf("error: %v", err) }

merger := NewSSTableMerger(skiplist.BytesComparator)
// merge takes care of opening/closing itself
err = merger.Merge(MergeContext{
    iterators:       iterators,
    iteratorContext: iteratorContext,
}, outWriter)

if err != nil { log.Fatalf("error: %v", err) }

// do something with the merged sstable

The merge logic itself is based on a heap, so it can scale to thousands of files easily.

There might be some cases where you want to have the ability to compact while you're merging the files. This is where MergeCompact comes in handy, there you can supply a simple reduce function to directly compact the values for a given key. Below example illustrates this functionality:

reduceFunc := func(key []byte, values [][]byte, context []interface{}) ([]byte, []byte) {
    // always pick the first one
    return key, values[0]
}

merger := NewSSTableMerger(skiplist.BytesComparator)
err = merger.MergeCompact(MergeContext{
    iterators:       iterators,
    iteratorContext: iteratorContext,
}, outWriter, reduceFunc)

The context gives you the ability to figure out which value originated from which file/iterator. The context slice is parallel to the values slice, so the value at index 0 originated from the context at index 0.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BloomFileName = "bloom.bf.gz"
View Source
var DataFileName = "data.rio"
View Source
var Done = errors.New("no more items in iterator")

iterator pattern as described in https://github.com/GoogleCloudPlatform/google-cloud-go/wiki/Iterator-Guidelines

View Source
var IndexFileName = "index.rio"
View Source
var MetaFileName = "meta.pb.bin"
View Source
var NotFound = errors.New("key was not found")
View Source
var Version = uint32(1)

Functions

func ScanReduceLatestWins added in v1.3.0

func ScanReduceLatestWins(key []byte, values [][]byte, context []interface{}) ([]byte, []byte)

ScanReduceLatestWins is a simple version of a merge where the latest value always wins. Latest is determined by looping the context and finding the biggest value denoted by integers (assuming context is actually []int).

Types

type Element added in v1.1.0

type Element struct {
	// contains filtered or unexported fields
}

type EmptySSTableIterator added in v1.3.0

type EmptySSTableIterator struct{}

func (EmptySSTableIterator) Next added in v1.3.0

func (EmptySSTableIterator) Next() ([]byte, []byte, error)

type EmptySStableReader added in v1.3.0

type EmptySStableReader struct{}

func (EmptySStableReader) BasePath added in v1.3.0

func (EmptySStableReader) BasePath() string

func (EmptySStableReader) Close added in v1.3.0

func (EmptySStableReader) Close() error

func (EmptySStableReader) Contains added in v1.3.0

func (EmptySStableReader) Contains(_ []byte) bool

func (EmptySStableReader) Get added in v1.3.0

func (EmptySStableReader) Get(_ []byte) ([]byte, error)

func (EmptySStableReader) MetaData added in v1.3.0

func (EmptySStableReader) MetaData() *proto.MetaData

func (EmptySStableReader) Scan added in v1.3.0

func (EmptySStableReader) ScanRange added in v1.3.0

func (EmptySStableReader) ScanRange(_ []byte, _ []byte) (SSTableIteratorI, error)

func (EmptySStableReader) ScanStartingAt added in v1.3.0

func (EmptySStableReader) ScanStartingAt(_ []byte) (SSTableIteratorI, error)

type MergeCompactionIterator added in v1.3.0

type MergeCompactionIterator struct {
	// contains filtered or unexported fields
}

func (*MergeCompactionIterator) Next added in v1.3.0

func (m *MergeCompactionIterator) Next() ([]byte, []byte, error)

type MergeContext added in v1.3.0

type MergeContext struct {
	Iterators       []SSTableIteratorI
	IteratorContext []interface{}
}

type PriorityQueue added in v1.1.0

type PriorityQueue struct {
	// contains filtered or unexported fields
}

func NewPriorityQueue added in v1.1.0

func NewPriorityQueue(comp skiplist.KeyComparator) PriorityQueue

func (*PriorityQueue) Init added in v1.1.0

func (pq *PriorityQueue) Init(ctx MergeContext) error

func (*PriorityQueue) Next added in v1.1.0

func (pq *PriorityQueue) Next() ([]byte, []byte, interface{}, error)

type PriorityQueueI added in v1.1.0

type PriorityQueueI interface {
	Init(iterators []SSTableIteratorI) error // initializes the heap with the initial values from the Iterators
	Next() ([]byte, []byte, error)           // next key/value/error, Done is returned when all elements are exhausted
}

type ReadOption

type ReadOption func(*SSTableReaderOptions)

func ReadBasePath

func ReadBasePath(p string) ReadOption

func ReadWithKeyComparator

func ReadWithKeyComparator(cmp skiplist.KeyComparator) ReadOption

type SSTableFullScanIterator added in v1.3.0

type SSTableFullScanIterator struct {
	// contains filtered or unexported fields
}

this is an optimized iterator that does a sequential read over the index+data files instead of a sequential read on the index with a random access lookup on the data file via mmap

func (*SSTableFullScanIterator) Next added in v1.3.0

func (it *SSTableFullScanIterator) Next() ([]byte, []byte, error)

type SSTableIterator

type SSTableIterator struct {
	// contains filtered or unexported fields
}

func (*SSTableIterator) Next

func (it *SSTableIterator) Next() ([]byte, []byte, error)

type SSTableIteratorI

type SSTableIteratorI interface {
	// returns the next key, value in sequence
	// returns Done as the error when the iterator is exhausted
	Next() ([]byte, []byte, error)
}

type SSTableMerger added in v1.1.0

type SSTableMerger struct {
	// contains filtered or unexported fields
}

func NewSSTableMerger added in v1.1.0

func NewSSTableMerger(comp skiplist.KeyComparator) SSTableMerger

func (SSTableMerger) Merge added in v1.1.0

func (SSTableMerger) MergeCompact added in v1.3.0

func (m SSTableMerger) MergeCompact(ctx MergeContext, writer SSTableStreamWriterI,
	reduce func([]byte, [][]byte, []interface{}) ([]byte, []byte)) error

func (SSTableMerger) MergeCompactIterator added in v1.3.0

func (m SSTableMerger) MergeCompactIterator(ctx MergeContext,
	reduce func([]byte, [][]byte, []interface{}) ([]byte, []byte)) (SSTableIteratorI, error)

type SSTableMergerI added in v1.1.0

type SSTableMergerI interface {
	// Merge merges/writes the given Iterators into a single sorted SSTable
	Merge(iterators []SSTableIteratorI, writer SSTableStreamWriterI) error
	// MergeCompact is like merge, but accumulates values for the same key and presents it as a
	// "reduction" function to compact values for the same key.
	// reduce receives a key and a slice of values - it then needs to return a single key and value.
	MergeCompact(iterators []SSTableIteratorI, writer SSTableStreamWriterI,
		reduce func([]byte, [][]byte) ([]byte, []byte)) error
}

type SSTableReader

type SSTableReader struct {
	// contains filtered or unexported fields
}

func (*SSTableReader) BasePath added in v1.3.0

func (reader *SSTableReader) BasePath() string

func (*SSTableReader) Close

func (reader *SSTableReader) Close() error

func (*SSTableReader) Contains

func (reader *SSTableReader) Contains(key []byte) bool

func (*SSTableReader) Get

func (reader *SSTableReader) Get(key []byte) ([]byte, error)

func (*SSTableReader) MetaData

func (reader *SSTableReader) MetaData() *proto.MetaData

func (*SSTableReader) Scan added in v1.1.0

func (reader *SSTableReader) Scan() (SSTableIteratorI, error)

func (*SSTableReader) ScanRange

func (reader *SSTableReader) ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)

func (*SSTableReader) ScanStartingAt

func (reader *SSTableReader) ScanStartingAt(key []byte) (SSTableIteratorI, error)

type SSTableReaderI

type SSTableReaderI interface {
	// returns true when the given key exists, false otherwise
	Contains(key []byte) bool
	// returns the value associated with the given key, NotFound as the error otherwise
	Get(key []byte) ([]byte, error)
	// Returns an iterator over the whole sorted sequence. Scan uses a more optimized version that iterates the
	// data file sequentially, whereas the other Scan* functions use the index and random access using mmap.
	Scan() (SSTableIteratorI, error)
	// Returns an iterator over the sorted sequence starting at the given key (inclusive if key is in the list).
	// Using a key that is out of the sequence range will result in either an empty iterator or the full sequence.
	ScanStartingAt(key []byte) (SSTableIteratorI, error)
	// Returns an iterator over the sorted sequence starting at the given keyLower (inclusive if key is in the list)
	// and until the given keyHigher was reached (inclusive if key is in the list).
	// Using keys that are out of the sequence range will result in either an empty iterator or the full sequence.
	// If keyHigher is lower than keyLower an error will be returned.
	ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)
	// closes this sstable reader
	Close() error
	// Returns the metadata of this sstable
	MetaData() *proto.MetaData
	// Returns the base path / root path of this sstable that contains all the files.
	BasePath() string
}

func NewSSTableReader

func NewSSTableReader(readerOptions ...ReadOption) (SSTableReaderI, error)

NewSSTableReader creates a new reader. The sstable base path and comparator are mandatory: > sstables.NewSSTableReader(sstables.ReadBasePath("some_path"), sstables.ReadWithKeyComparator(some_comp))

type SSTableReaderOptions

type SSTableReaderOptions struct {
	// contains filtered or unexported fields
}

read/write options

type SSTableSimpleWriter

type SSTableSimpleWriter struct {
	// contains filtered or unexported fields
}

func NewSSTableSimpleWriter

func NewSSTableSimpleWriter(writerOptions ...WriterOption) (*SSTableSimpleWriter, error)

func (*SSTableSimpleWriter) WriteSkipListMap

func (writer *SSTableSimpleWriter) WriteSkipListMap(skipListMap skiplist.SkipListMapI) error

type SSTableSimpleWriterI

type SSTableSimpleWriterI interface {
	// writes all records of that SkipList to an sstable disk structure, expects []byte as key and value
	WriteSkipList(skipListMap *skiplist.SkipListMap) error
}

type SSTableStreamWriter

type SSTableStreamWriter struct {
	// contains filtered or unexported fields
}

func NewSSTableStreamWriter

func NewSSTableStreamWriter(writerOptions ...WriterOption) (*SSTableStreamWriter, error)

NewSSTableStreamWriter creates a new streamed writer, the minimum options required are the base path and the comparator: > sstables.NewSSTableStreamWriter(sstables.WriteBasePath("some_existing_folder"), sstables.WithKeyComparator(some_comparator))

func (*SSTableStreamWriter) Close

func (writer *SSTableStreamWriter) Close() error

func (*SSTableStreamWriter) Open

func (writer *SSTableStreamWriter) Open() error

func (*SSTableStreamWriter) WriteNext

func (writer *SSTableStreamWriter) WriteNext(key []byte, value []byte) error

type SSTableStreamWriterI

type SSTableStreamWriterI interface {
	// opens the sstable files.
	Open() error
	// writes the next record to an sstable disk structure, expects keys to be ordered.
	WriteNext(key []byte, value []byte) error
	// closes the sstable files.
	Close() error
}

type SSTableWriterOptions

type SSTableWriterOptions struct {
	// contains filtered or unexported fields
}

type SuperSSTableReader added in v1.3.0

type SuperSSTableReader struct {
	// contains filtered or unexported fields
}

SuperSSTableReader unifies several sstables under one single reader with the same interface. The ordering of the readers matters, it is assumed the older reader comes before the newer (ascending order).

func NewSuperSSTableReader added in v1.3.0

func NewSuperSSTableReader(readers []SSTableReaderI, comp skiplist.KeyComparator) *SuperSSTableReader

func (SuperSSTableReader) BasePath added in v1.3.0

func (s SuperSSTableReader) BasePath() string

func (SuperSSTableReader) Close added in v1.3.0

func (s SuperSSTableReader) Close() error

func (SuperSSTableReader) Contains added in v1.3.0

func (s SuperSSTableReader) Contains(key []byte) bool

func (SuperSSTableReader) Get added in v1.3.0

func (s SuperSSTableReader) Get(key []byte) ([]byte, error)

func (SuperSSTableReader) MetaData added in v1.3.0

func (s SuperSSTableReader) MetaData() *proto.MetaData

func (SuperSSTableReader) Scan added in v1.3.0

func (SuperSSTableReader) ScanRange added in v1.3.0

func (s SuperSSTableReader) ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)

func (SuperSSTableReader) ScanStartingAt added in v1.3.0

func (s SuperSSTableReader) ScanStartingAt(key []byte) (SSTableIteratorI, error)

type V0SSTableFullScanIterator added in v1.3.0

type V0SSTableFullScanIterator struct {
	// contains filtered or unexported fields
}

deprecated, since this is for the v0 protobuf based sstables. this is an optimized iterator that does a sequential read over the index+data files instead of a sequential read on the index with a random access lookup on the data file via mmap

func (*V0SSTableFullScanIterator) Next added in v1.3.0

func (it *V0SSTableFullScanIterator) Next() ([]byte, []byte, error)

type WriterOption

type WriterOption func(*SSTableWriterOptions)

func BloomExpectedNumberOfElements

func BloomExpectedNumberOfElements(n uint64) WriterOption

func BloomFalsePositiveProbability

func BloomFalsePositiveProbability(fpProbability float64) WriterOption

func DataCompressionType

func DataCompressionType(p int) WriterOption

func EnableBloomFilter

func EnableBloomFilter() WriterOption

func IndexCompressionType

func IndexCompressionType(p int) WriterOption

func WithKeyComparator

func WithKeyComparator(cmp skiplist.KeyComparator) WriterOption

func WriteBasePath

func WriteBasePath(p string) WriterOption

func WriteBufferSizeBytes added in v1.3.0

func WriteBufferSizeBytes(bufSizeBytes int) WriterOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL