etl

package
v0.0.0-...-1f8a15b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

ETL

ETL framework is most commonly used in staged sync.

It implements a pattern where we extract some data from a database, transform it, then put it into temp files and insert back to the database in sorted order.

Inserting entries into our KV storage sorted by keys helps to minimize write amplification, hence it is much faster, even considering additional I/O that is generated by storing files.

It behaves similarly to enterprise Extract, Tranform, Load frameworks, hence the name. We use temporary files because that helps keep RAM usage predictable and allows using ETL on large amounts of data.

Example
func keyTransformExtractFunc(transformKey func([]byte) ([]byte, error)) etl.ExtractFunc {
	return func(k, v []byte, next etl.ExtractNextFunc) error {
		newK, err := transformKey(k)
		if err != nil {
			return err
		}
		return next(k, newK, v)
	}
}

err := etl.Transform(
		db,                                              // database 
		dbutils.PlainStateBucket,                        // "from" bucket
		dbutils.CurrentStateBucket,                      // "to" bucket
		datadir,                                         // where to store temp files
		keyTransformExtractFunc(transformPlainStateKey), // transformFunc on extraction
		etl.IdentityLoadFunc,                            // transform on load
		etl.TransformArgs{                               // additional arguments
			Quit: quit,
		},
	)
	if err != nil {
		return err
	}

Data Transformation

The whole flow is shown in the image

Data could be transformed in two places along the pipeline:

  • transform on extraction

  • transform on loading

Transform On Extraction

type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error

Transform on extraction function receives the current key and value from the source bucket.

Transform On Loading

type LoadFunc func(k []byte, value []byte, state State, next LoadNextFunc) error

As well as the current key and value, the transform on loading function receives the State object that can receive data from the destination bucket.

That is used in index generation where we want to extend index entries with new data instead of just adding new ones.

<...>NextFunc pattern

Sometimes we need to produce multiple entries from a single entry when transforming.

To do that, each of the transform function receives a next function that should be called to move data further. That means that each transformation can produce any number of outputs for a single input.

It can be one output, like in IdentityLoadFunc:

func IdentityLoadFunc(k []byte, value []byte, _ State, next LoadNextFunc) error {
	return next(k, k, value) // go to the next step
}

It can be multiple outputs like when each entry is a ChangeSet:

func(dbKey, dbValue []byte, next etl.ExtractNextFunc) error {
		blockNum, _ := dbutils.DecodeTimestamp(dbKey)
		return bytes2walker(dbValue).Walk(func(changesetKey, changesetValue []byte) error {
			key := common.CopyBytes(changesetKey)
			v := make([]byte, 9)
			binary.BigEndian.PutUint64(v, blockNum)
			if len(changesetValue) == 0 {
				v[8] = 1
			}
			return next(dbKey, key, v)                      // go to the next step
		})
	}
Buffer Types

Before the data is being flushed into temp files, it is getting collected into a buffer until if overflows (etl.ExtractArgs.BufferSize).

There are different types of buffers available with different behaviour.

  • SortableSliceBuffer -- just append (k, v1), (k, v2) onto a slice. Duplicate keys will lead to duplicate entries: [(k, v1) (k, v2)].

  • SortableAppendBuffer -- on duplicate keys: merge. (k, v1), (k, v2) will lead to k: [v1 v2]

  • SortableOldestAppearedBuffer -- on duplicate keys: keep the oldest. (k, v1), (k v2) will lead to k: v1

Transforming Structs

Both transform functions and next functions allow only byte arrays. If you need to pass a struct, you will need to marshal it.

Loading Into Database

We load data from the temp files into a database in batches, limited by IdealBatchSize() of an ethdb.Mutation.

(for tests we can also override it)

Handling Interruptions

ETL processes are long, so we need to be able to handle interruptions.

Handing Ctrl+C

You can pass your quit channel into Quit parameter into etl.TransformArgs.

When this channel is closed, ETL will be interrupted.

Saving & Restoring State

Interrupting in the middle of loading can lead to inconsistent state in the database.

To avoid that, the ETL framework allows storing progress by setting OnLoadCommit in etl.TransformArgs.

Then we can use this data to know the progress the ETL transformation made.

You can also specify ExtractStartKey and ExtractEndKey to limit the number of items transformed.

Ways to work with ETL framework

There might be 2 scenarios on how you want to work with the ETL framework.

etl.Transform function

The vast majority of use-cases is when we extract data from one bucket and in the end, load it into another bucket. That is the use-case for etl.Transform function.

etl.Collector struct

If you want a more modular behaviour instead of just reading from the DB (like generating intermediate hashes in ../../core/chain_makers.go, you can use etl.Collector struct directly.

It has a .Collect() method that you can provide your data to.

Optimizations

  • if all data fits into a single file, we don't write anything to disk and just use in-memory storage.

Documentation

Index

Constants

View Source
const (
	//SliceBuffer - just simple slice w
	SortableSliceBuffer = iota
	//SortableAppendBuffer - map[k] [v1 v2 v3]
	SortableAppendBuffer
	// SortableOldestAppearedBuffer - buffer that keeps only the oldest entries.
	// if first v1 was added under key K, then v2; only v1 will stay
	SortableOldestAppearedBuffer
	SortableMergeBuffer

	//BufIOSize - 128 pages | default is 1 page | increasing over `64 * 4096` doesn't show speedup on SSD/NVMe, but show speedup in cloud drives
	BufIOSize = 128 * 4096
)

Variables

View Source
var BufferOptimalSize = 256 * datasize.MB /*  var because we want to sometimes change it from tests or command-line flags */

Functions

func FlushToDisk

func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvider, error)

FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose).

func KeepInRAM

func KeepInRAM(buffer Buffer) dataProvider

func NewAppendBuffer

func NewAppendBuffer(bufferOptimalSize datasize.ByteSize) *appendSortableBuffer

func NewLatestMergedEntryMergedBuffer

func NewLatestMergedEntryMergedBuffer(bufferOptimalSize datasize.ByteSize, merger func([]byte, []byte) []byte) *oldestMergedEntrySortableBuffer

func NewOldestEntryBuffer

func NewOldestEntryBuffer(bufferOptimalSize datasize.ByteSize) *oldestEntrySortableBuffer

func NewSortableBuffer

func NewSortableBuffer(bufferOptimalSize datasize.ByteSize) *sortableBuffer

func NextKey

func NextKey(key []byte) ([]byte, error)

NextKey generates the possible next key w/o changing the key length. for [0x01, 0x01, 0x01] it will generate [0x01, 0x01, 0x02], etc

func ProgressFromKey

func ProgressFromKey(k []byte) int

func Transform

func Transform(
	logPrefix string,
	db kv.RwTx,
	fromBucket string,
	toBucket string,
	tmpdir string,
	extractFunc ExtractFunc,
	loadFunc LoadFunc,
	args TransformArgs,
	logger log.Logger,
) error

Types

type AdditionalLogArguments

type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{})

type Buffer

type Buffer interface {
	Put(k, v []byte)
	Get(i int, keyBuf, valBuf []byte) ([]byte, []byte)
	Len() int
	Reset()
	SizeLimit() int
	Prealloc(predictKeysAmount, predictDataAmount int)
	Write(io.Writer) error
	Sort()
	CheckFlushSize() bool
}

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part as a Collect Transform Load

func NewCollector

func NewCollector(logPrefix, tmpdir string, sortableBuffer Buffer, logger log.Logger) *Collector

func NewCollectorFromFiles

func NewCollectorFromFiles(logPrefix, tmpdir string, logger log.Logger) (*Collector, error)

NewCollectorFromFiles creates collector from existing files (left over from previous unsuccessful loading)

func NewCriticalCollector

func NewCriticalCollector(logPrefix, tmpdir string, sortableBuffer Buffer, logger log.Logger) *Collector

NewCriticalCollector does not clean up temporary files if loading has failed

func (*Collector) Close

func (c *Collector) Close()

func (*Collector) Collect

func (c *Collector) Collect(k, v []byte) error

func (*Collector) Flush

func (c *Collector) Flush() error

Flush - an optional method (usually user don't need to call it) - forcing sort+flush current buffer. it does trigger background sort and flush, reducing RAM-holding, etc... it's useful when working with many collectors: to trigger background sort for all of them

func (*Collector) Load

func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args TransformArgs) error

func (*Collector) LogLvl

func (c *Collector) LogLvl(v log.Lvl)

type CurrentTableReader

type CurrentTableReader interface {
	Get([]byte) ([]byte, error)
}

type ExtractFunc

type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error

type ExtractNextFunc

type ExtractNextFunc func(originalK, k []byte, v []byte) error

type Heap

type Heap struct {
	// contains filtered or unexported fields
}

func (*Heap) Len

func (h *Heap) Len() int

func (*Heap) Less

func (h *Heap) Less(i, j int) bool

func (*Heap) Pop

func (h *Heap) Pop() *HeapElem

func (*Heap) Push

func (h *Heap) Push(x *HeapElem)

func (*Heap) Swap

func (h *Heap) Swap(i, j int)

type HeapElem

type HeapElem struct {
	Key     []byte
	Value   []byte
	TimeIdx int
}

type LoadCommitHandler

type LoadCommitHandler func(db kv.Putter, key []byte, isDone bool) error

LoadCommitHandler is a callback called each time a new batch is being loaded from files into a DB * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) * `isDone`: true, if everything is processed

type LoadFunc

type LoadFunc func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error
var IdentityLoadFunc LoadFunc = func(k []byte, value []byte, _ CurrentTableReader, next LoadNextFunc) error {
	return next(k, k, value)
}

IdentityLoadFunc loads entries as they are, without transformation

type LoadNextFunc

type LoadNextFunc func(originalK, k, v []byte) error

type TransformArgs

type TransformArgs struct {
	Quit              <-chan struct{}
	LogDetailsExtract AdditionalLogArguments
	LogDetailsLoad    AdditionalLogArguments
	// [ExtractStartKey, ExtractEndKey)
	ExtractStartKey []byte
	ExtractEndKey   []byte
	BufferType      int
	BufferSize      int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL