sif

package module
v0.0.0-...-79c606f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2022 License: Apache-2.0 Imports: 3 Imported by: 3

README

Sif

GoDoc Go Report Card Tests codecov.io

Logo

Sif is a framework for fast, predictable, general-purpose distributed computing in the map/reduce paradigm.

Sif is new, and currently under heavy development. It should be considered alpha software prior to a 1.0.0 release, with the API and behaviours subject to change.

Table of Contents

Why Sif?

Sif is offered primarily as a simpler alternative to Apache Spark, with the following goals in mind:

Predictability: An emphasis on fixed-width data and in-place manipulation makes it easier to reason over the compute and memory requirements of a particular job. Sif's API is designed around a "no magic" philosophy, and attempts to make obvious the runtime consequences of any particular line of code.

Scalability: While scaling up to massive datasets is the primary focus of any distributed computing framework, Sif is also designed to scale down to small datasets as well with minimal startup time and other cluster-oriented overhead.

Ease of Integration: Rather than deploying Sif as a complex, persistent cluster, Sif is a library designed to ease the integration of cluster computing functionality within a larger application or architectural context. Write your own REST API which manages the initiation of a Sif pipeline!

Ease of Development: Sif applications are traceable and debuggable, and the core Sif codebase is designed to be as small as possible.

API Minimalism: A single representation of distributed data, with a single set of tools for manipulating it.

Architectural Minimalism: Throw away YARN and Mesos. Compile your Sif pipeline, package it as a Docker image, then deploy it to distributed container infrastructure such as Docker Swarm or Kubernetes.

Installation

Note: Sif is developed and tested against Go 1.18, leveraging generics support:

$ go get github.com/go-sif/sif

Getting Started

Sif facilitates the definition and execution of a distributed compute pipeline through the use of a few basic components. For the sake of this example, we will assume that we have JSON Lines-type data with the following format:

{"id": 1234, "meta": {"uuid": "27366d2d-502c-4c03-84c3-55dc5ecedd3f", "name": "John Smith"}}
{"id": 5678, "meta": {"uuid": "f21dec0a-37f3-4b0d-9d92-d26b11c62ed8", "name": "Jane Doe"}}
...
Schemas

A Schema outlines the structure of the data which will be manipulated. In Sif, data is represented as a sequence of Rows, each of which consist of Columns with particular ColumnTypes.

package main

import (
	"log"
	"github.com/go-sif/sif"
	"github.com/go-sif/sif/coltype"
	"github.com/go-sif/sif/schema"
)

func main() {
	id := coltype.Int32("id")
	// Schemas should employ fixed-width ColumnTypes whenever possible
	metaUUID := coltype.String("meta.uuid", 36)
	// or variable-width ColumnTypes, if the size of a field is not known
	metaName := coltype.VarString("meta.name")
	schema, err := schema.CreateSchema(id, metaUUID, metaName)
	if err != nil {
		log.Fatal(err)
	}
}
DataSources and Parsers

A DataSource represents a source of raw data, which will be partitioned and parsed into Rows via a Parser in parallel across workers in the cluster. Sif contains several example DataSources, primarily for testing purposes, but it is intended that DataSources for common sources such as databases, Kafka, etc. will be provided as separate packages.

DataSources typically use supplied column names to fetch specific data from the underlying source (as shown in the below jsonl example).

Ultimately, a DataSource provides a DataFrame which can be manipulated by Sif operations.

package main

import (
	"log"
	"path"

	"github.com/go-sif/sif"
	"github.com/go-sif/sif/coltype"
	"github.com/go-sif/sif/schema"
	"github.com/go-sif/sif/datasource/file"
	"github.com/go-sif/sif/datasource/parser/jsonl"
)

func main() {
	id := coltype.Int32("id")
	// In this case, since our Schema featured column names with dots,
	// the jsonl parser is smart enough to use these column names to
	// search within each JSON object for a nested field matching
	// these paths.
	metaUUID := coltype.String("meta.uuid", 36)
	metaName := coltype.VarString("meta.name")
	schema, err := schema.CreateSchema(id, metaUUID, metaName)
	if err != nil {
		log.Fatal(err)
	}

	parser := jsonl.CreateParser(&jsonl.ParserConf{
		PartitionSize: 128,
	})
	frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)
}
DataFrames

A DataFrame facilitates the definition of an execution plan. Multiple Operations are chained together, and then passed to Sif for evaluation:

package main

import (
	"path"
	"log"
	"strings"

	"github.com/go-sif/sif"
	"github.com/go-sif/sif/schema"
	"github.com/go-sif/sif/datasource/file"
	"github.com/go-sif/sif/datasource/parser/jsonl"
	ops "github.com/go-sif/sif/operations/transform"
)

func main() {
	id := coltype.Int32("id")
	metaUUID := coltype.String("meta.uuid", 36)
	metaName := coltype.VarString("meta.name")
	schema, err := schema.CreateSchema(id, metaUUID, metaName)
	if err != nil {
		log.Fatal(err)
	}

	parser := jsonl.CreateParser(&jsonl.ParserConf{
		PartitionSize: 128,
	})
	frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)

	lowercaseName := coltype.VarString("lowercase_name") // a column we will add
	frame, err := frame.To(
		ops.AddColumn(lowercaseName),
		ops.Map(func(row sif.Row) error {
			if metaName.IsNil(row) {
				return nil
			}
			name, err := metaName.From(row)
			if err != nil {
				return err
			}
			return lowercaseName.To(row, strings.ToLower(name))
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
}
Execution (Bringing it all Together)

Execution of a DataFrame involves starting and passing it to a Sif cluster. Sif clusters, at the moment, consist of a single Coordinator and multiple Workers. Each is an identical binary, with the difference in role determined by the SIF_NODE_TYPE environment variable (set to "coordinator" or "worker"). This makes it easy to compile a single executable which can then be deployed and scaled up or down as one sees fit.

package main

import (
	"path"
	"log"
	"strings"
	"context"

	"github.com/go-sif/sif"
	"github.com/go-sif/sif/schema"
	"github.com/go-sif/sif/datasource/file"
	"github.com/go-sif/sif/datasource/parser/jsonl"
	ops "github.com/go-sif/sif/operations/transform"
	"github.com/go-sif/sif/cluster"
)

func main() {
	id := coltype.Int32("id")
	metaUUID := coltype.String("meta.uuid", 36)
	metaName := coltype.VarString("meta.name")
	schema, err := schema.CreateSchema(id, metaUUID, metaName)
	if err != nil {
		log.Fatal(err)
	}

	parser := jsonl.CreateParser(&jsonl.ParserConf{
		PartitionSize: 128,
	})
	frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)

	lowercaseName := coltype.VarString("lowercase_name")
	frame, err := frame.To(
		ops.AddColumn(lowercaseName),
		ops.Map(func(row sif.Row) error {
			if metaName.IsNil(row) {
				return nil
			}
			name, err := metaName.From(row)
			if err != nil {
				return err
			}
			return lowercaseName.To(row, strings.ToLower(name))
		}),
	)
	if err != nil {
		log.Fatal(err)
	}

	// Define a node
	// Sif will read the SIF_NODE_TYPE environment variable to
	// determine whether this copy of the binary
	// is a "coordinator" or "worker".
	opts := &cluster.NodeOptions{
		NumWorkers: 2,
		CoordinatorHost: "insert.coordinator.hostname",
	}
	node, err := cluster.CreateNode(opts)
	if err != nil {
		log.Fatal(err)
	}
	// start this node in the background and run the DataFrame
	defer node.GracefulStop()
	go func() {
		err := node.Start(frame)
		if err != nil {
			log.Fatal(err)
		}
	}()
	// result will be nil in this case, as only certain
	// operations produce a result.
	result, err := node.Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

Advanced Usage

Operations

Sif includes multiple operations suitable for manipulating DataFrames, which can be found under the github.com/go-sif/sif/operations/transform package.

Additional utility operations are included in the github.com/go-sif/sif/operations/util package, which at this time only includes Collect(), which allows for the collection of results to the Coordinator for further, local processing.

A couple of complex Operations are covered in additional detail here:

Reduction

Reduction in Sif is a two step process:

  1. A KeyingOperation labels Rows, with the intention that two Rows with the same key are reduced together
  2. A ReductionOperation defines the mechanism by which two Rows are combined (the "right" Row into the "left" Row)

For example, if we wanted to bucket the JSON Lines data from Getting Started by name, and then produce counts for names beginning with the same letter:

// ...
totalCol := coltype.Uint32("total")
frame, err := frame.To(
	// Add a column to store the total for each first-letter bucket
	ops.AddColumn(totalCol),
	ops.Reduce(func(row sif.Row) ([]byte, error) {
		// Our KeyingOperation comes first, using the first letter as the key
		name, err := metaName.From(row)
		if err != nil {
			return nil, err
		}
		if len(name) == 0 {
			return []byte{0}, nil
		}
		return []byte{name[0]}, nil
	}, func(lrow sif.Row, rrow sif.Row) error {
		// Our ReductionOperation comes second
		// Since our keys ensure two Rows are only reduced together if they
		// have a matching key, we can just add the totals together.
		lval, err := totalCol.From(lrow)
		if err != nil {
			return err
		}
		rval, err := totalCol.From(rrow)
		if err != nil {
			return err
		}
		return totalCol.To(lrow, lval+rval)
	}),
)
// ...

Tip: ops.KeyColumns(colNames ...string) can be used with ops.Reduce to quickly produce a key (or compound key) from a set of column values.

Accumulation

Sif Accumulators are an alternative mechanism for reduction, which offers full customization of reduciton technique, in exchange for accumulation ending a sif pipeline. In exchange for losing the ability to further transform and reduce data, Accumulators offer the potential for significant performance benefits.

Sif offers built-in Accumulators in the accumulators package.

For example, we can use accumulators.Counter() to efficiently count records:

// ...
counter := accumulators.Counter()
frame, err := frame.To(
	util.Accumulate(counter)
)
// ...
// In this case, node.Run returns an Accumulator, which can be
// manipulated on the Coordinator node.
result, err := node.Run(context.Background())
if node.IsCoordinator() {
	totalRows, _ = counter.Value(result.Accumulator)
}
Collection

Collection is the process of pulling results from distributed data back to the Coordinator for local processing. This is not generally encouraged - rather, it is best if Workers write their results directly to an output destination. But, it is occasionally useful, such as in the writing of tests:

// ...
lowercaseName := coltype.VarString("lowercase_name")
frame, err := frame.To(
	ops.AddColumn(lowercaseName),
	ops.Map(func(row sif.Row) error {
		if metaName.IsNil(row) {
			return nil
		}
		name, err := metaName.Get(row)
		if err != nil {
			return err
		}
		return lowercaseName.Set(row, strings.ToLower(name))
	}),
	// To discourage use and unpredictability, you must specify exactly
	// how many Partitions of data you wish to Collect:
	util.Collect(1)
)
// ...
// In this case, node.Run returns a CollectedPartition, which can be
// manipulated on the Coordinator node.
result, err := node.Run(context.Background())
if node.IsCoordinator() {
	err = result.Collected.ForEachRow(func(row sif.Row) error {
		// Do something with results
	})
}
// ...

Extending Sif

Custom ColumnTypes

See Implementing Custom ColumnTypes for details.

Custom Accumulators

See Implementing Custom Accumulators for details.

Custom DataSources

See Implementing Custom DataSources for details.

Custom Parsers

See Implementing Custom Parsers for details.

License

Sif is licensed under the Apache 2.0 License, found in the LICENSE file.

Documentation

Overview

Package sif contains the core components of Sif, a framework for distributed data processing. This root package defines types which are employed during the regular use of the framework, as well as in the extension of the framework, and is an excellent overview of Sif's key concepts.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsFixedWidth

func IsFixedWidth(colType ColumnType) (isFixedWidth bool)

IsFixedWidth returns true iff colType is a FixedWidthColumnType

Types

type Accumulator

type Accumulator interface {
	Accumulate(row Row) error                  // Accumulate adds a row to this Accumulator
	Merge(o Accumulator) error                 // Merge merges another Accumulator into this one
	ToBytes() ([]byte, error)                  // ToBytes serializes this Accumulator
	FromBytes(buf []byte) (Accumulator, error) // FromBytes produces a *new* Accumulator from serialized data
}

An Accumulator is an alternative reduction technique, which siphons data from Partitions into a custom data structure. The result is itself an Accumulator, rather than a series of Partitions, thus ending the job (no more operations may be performed against the data). The advantage, however, is full control over the reduction technique, which can yield substantial performance benefits. As reduction is performed locally on all workers, then worker results are all reduced on the Coordinator, Accumulators are best utilized for smaller results. Distributed reductions via Reduce() are more efficient when there is a large reduction result (i.e. a large number of buckets).

type AccumulatorFactory

type AccumulatorFactory interface {
	New() Accumulator
}

AccumulatorFactory produces a Accumulators

type BucketedPartitionIndex

type BucketedPartitionIndex interface {
	PartitionIndex
	GetBucket(bucket uint64) PartitionIndex // return the PartitionIndex associated with the given bucket
}

A BucketedPartitionIndex is a PartitionIndex divided into buckets, which are indexed by uint64s

type BuildablePartition

type BuildablePartition interface {
	Partition
	ForEachRow(fn MapOperation) error            // ForEachRow iterates over Rows in a Partition
	CanInsertRowData(row Row) error              // CanInsertRowData checks if a Row can be inserted into this Partition
	AppendEmptyRowData(tempRow Row) (Row, error) // AppendEmptyRowData is a convenient way to add an empty Row to the end of this Partition, returning the Row so that Row methods can be used to populate it
	AppendRowData(row Row) error                 // AppendRowData adds a Row to the end of this Partition, if it isn't full and if the Row matches this Partition's schema
	InsertRowData(row Row, pos int) error        // InsertRowData inserts a Row at a specific position within this Partition, if it isn't full and if the Row matches this Partition's Schema. Other Rows are shifted as necessary.
	TruncateRowData(numRows int)                 // TruncateRowData zeroes out rows from the current last row towards the beginning of the Partition
	CreateTempRow() Row
}

A BuildablePartition can be built. Used in the implementation of DataSources and Parsers

type CollectedPartition

type CollectedPartition interface {
	Partition
	ForEachRow(fn MapOperation) error // ForEachRow iterates over Rows in a Partition
}

A CollectedPartition has been collected

type Column

type Column interface {
	Clone() Column            // Clone returns a copy of this Column
	Index() int               // Index returns the index of this Column within a Schema
	Type() ColumnType         // Type returns the ColumnType of this Column
	Accessor() ColumnAccessor // Accessor returns a ColumnAccessor for this Column
}

Column is a single column of data in a row

type ColumnAccessor

type ColumnAccessor interface {
	Name() string
	String() string
	Rename(newName string)
	Clone() ColumnAccessor
	Type() ColumnType
	IsNil(row Row) bool   // IsNil returns true iff the given column value is nil in this row. If an error occurs, this function will return false.
	SetNil(row Row) error // SetNil sets the given column value to nil within this row
}

ColumnAccessor defines the non-generic methods for a struct which can access data from a particular Column in a particular Row. Creators of custom column types should not typically need to implement their own ColumnAccessor, and can instead provide a factory function which calls CreateColumnAccessor instead.

type ColumnType

type ColumnType interface {
	ToString(v interface{}) string           // produces a string representation of a value of this Type
	Deserialize([]byte) (interface{}, error) // Defines how this type is deserialized, returning an error if a value of the correct type could not be deserialized from the given bytes
}

ColumnType is the base interface for all ColumnTypes. VariableWidthColumnType should be used for column types with unknown serialized width. FixedWidthColumnType should be used for columns types which have the additional property of known serialized width. Sif provides a variety of built-in ColumnTypes, and additional ColumnTypes may be added by implementing these interfaces

type Compressor

type Compressor interface {
	Compress(source io.Reader, dest io.Writer) error   // Compress compresseses data from a read stream to a write stream
	Decompress(source io.Reader, dest io.Writer) error // Decompress decompresses data from a read stream to a write stream
	Destroy()                                          // Destroy cleans up anything relevant when the Compressor is no longer needed
}

A Compressor compresses data (and the inverse). Concurrent use of a Compressor's Compress and Decompress methods should be threadsafe.

type DataFrame

type DataFrame interface {
	GetSchema() Schema                            // GetSchema returns the schema of the data at this task.
	GetDataSource() DataSource                    // GetDataSource returns the DataSource of a DataFrame
	GetParser() DataSourceParser                  // GetParser returns the DataSourceParser of a DataFrame
	To(...*DataFrameOperation) (DataFrame, error) // To is a "functional operations" factory method for DataFrames, chaining operations onto the current one(s).
}

A DataFrame is a tool for constructing a chain of transformations and actions applied to columnar data

type DataFrameOperation

type DataFrameOperation struct {
	TaskType TaskType // the task type
	Do       func(df DataFrame) (*DataFrameOperationResult, error)
}

DataFrameOperation - A generic DataFrame transform, returning a Task that performs the "work", a string representation of the Task, and a (potentially) altered Schema.

type DataFrameOperationResult

type DataFrameOperationResult struct {
	Task       Task   // the task
	DataSchema Schema // the new data schema
}

DataFrameOperationResult is the result of a DataFrameOperation

type DataSource

type DataSource interface {
	Analyze() (PartitionMap, error)
	DeserializeLoader([]byte) (PartitionLoader, error)
	IsStreaming() bool
}

DataSource is a source of data which will be manipulating according to transformations and actions defined in a DataFrame. It represents information about how to load data from the source as Partitions.

type DataSourceParser

type DataSourceParser interface {
	PartitionSize() int // returns the maximum size of Partitions produced by this DataSourceParser, in rows
	Parse(
		r io.Reader,
		source DataSource,
		schema Schema,
		onIteratorEnd func(),
	) (PartitionIterator, error) // lazily converts bytes from a Reader into Partitions
}

A DataSourceParser is capable of parsing raw data from a DataSource.Load to produce Partitions

type FilterOperation

type FilterOperation func(row Row) (bool, error)

FilterOperation - A generic function for determining whether or not a Row should be retained

type FixedWidthColumnType

type FixedWidthColumnType interface {
	ColumnType
	Size() int                                       // returns size in bytes of a column type
	SerializeFixed(v interface{}, dest []byte) error // Similar to Serialize, but accepts a reusable buffer of guaranteed Size()
}

FixedWidthColumnType is an interface which is implemented to define a column type for fixed-width data (at least the serialized form should be fixed-width). Sif provides a variety of built-in FixedWidthColumnTypes, and additional FixedWidthColumnTypes may be added by implementing this interface (in addition to GenericFixedWidthColumnType).

type FlatMapOperation

type FlatMapOperation func(row Row, newRow RowFactory) error

FlatMapOperation - A generic function for turning a Row into multiple Rows. newRow() is used to produce new rows, each of which must be used before calling newRow() again.

type GenericAccumulator

type GenericAccumulator[T any] interface {
	Accumulator
	Value() T // Value returns the accumulated value of this Accumulator
}

GenericAccumulator holds the generic methods of Accumulator, allowing typed access to the internal (generally final) value of the Accumulator

type GenericAccumulatorFactory

type GenericAccumulatorFactory[T any] interface {
	AccumulatorFactory
	Value(Accumulator) (T, error) // retrieves the value from the given accumulator or an error if the given accumulator does not match the expected type
}

GenericAccumulatorFactory is the generic version of Accumulator, indicating the value type of the Accumulator and providing access to its value

type GenericColumnAccessor

type GenericColumnAccessor[T any] interface {
	ColumnAccessor
	CloneT() GenericColumnAccessor[T]
	TypeT() GenericColumnType[T]
	From(row Row) (T, error)
	To(row Row, val T) error
}

GenericColumnAccessor defines the generic methods for a struct which can access data from a particular Column in a particular Row. Creators of custom column types should not typically need to implement their own GenericColumnAccessor, and can instead provide a factory function which calls CreateColumnAccessor instead.

func CreateColumnAccessor

func CreateColumnAccessor[T any](colType GenericColumnType[T], colName string) GenericColumnAccessor[T]

CreateColumnAccessor produces a GenericColumnAccessor for the given GenericColumnType

type GenericColumnType

type GenericColumnType[T any] interface {
	ColumnType
	ToStringT(v T) string           // produces a string representation of a value of this Type
	DeserializeT([]byte) (T, error) // Defines how this type is deserialized, returning an error if a value of the correct type could not be deserialized from the given bytes
}

GenericColumnType presents identical methods to ColumnType, but with typed variations as well. This interface makes for cleaner internal code for Sif, but is also intended to help future-proof the implementation of ColumnTypes (in the event that Golang ever considers adding covariance support to their generics implementation)

type GenericFixedWidthColumnType

type GenericFixedWidthColumnType[T any] interface {
	GenericColumnType[T]
	FixedWidthColumnType
	SerializeFixedT(v T, dest []byte) error // Similar to SerializeT, but accepts a reusable buffer of guaranteed Size()
}

GenericFixedWidthColumnType presents identical methods to FixedWidthColumnType, but with typed variations as well. This interface makes for cleaner internal code for Sif, but is also intended to help future-proof the implementation of FixedWidthColumnTypes (in the event that Golang ever considers adding covariance support to their generics implementation)

type GenericVariableWidthColumnType

type GenericVariableWidthColumnType[T any] interface {
	GenericColumnType[T]
	VariableWidthColumnType
	SerializeT(v T) ([]byte, error) // Similar to SerializeT, but accepts a reusable buffer of guaranteed Size()
}

GenericVariableWidthColumnType presents identical methods to VariableWidthColumnType, but with typed variations as well. This interface makes for cleaner internal code for Sif, but is also intended to help future-proof the implementation of FixedWidthColumnTypes (in the event that Golang ever considers adding covariance support to their generics implementation)

type KeyablePartition

type KeyablePartition interface {
	KeyRows(kfn KeyingOperation) (OperablePartition, error) // KeyRows generates hash keys for a row from a key column. Attempts to manipulate partition in-place, falling back to creating a fresh partition if there are row errors
	IsKeyed() bool                                          // IsKeyed returns true iff this Partition has been keyed with KeyRows
	GetKey(rowNum int) (uint64, error)                      // GetKey returns the key for a particular row number, or returns an error if the Partition is not keyed.
	GetRowKey(row Row) (uint64, error)                      // GetRowKey returns the key for a particular row, or returns an error if the Partition is not keyed.
	GetKeyRange(start int, end int) ([]uint64, error)       // GetKeyRange returns keys for the given range of rows, or returns an error if the Partition is not keyed.
}

A KeyablePartition can be keyed. Used in the implementation of Partition shuffling and reduction

type KeyingOperation

type KeyingOperation func(row Row) ([]byte, error)

KeyingOperation - A generic function for generating a key from a Row

type MapOperation

type MapOperation func(row Row) error

MapOperation - A generic function for manipulating Rows in-place

type Numeric

type Numeric interface {
	int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64 | float32 | float64
}

Numeric is a type alias for all of Go's numeric types

type OperablePartition

type OperablePartition interface {
	Partition
	KeyablePartition
	UpdateSchema(currentSchema Schema)                                                  // UpdateSchema sets the public schema of a Partition
	AddColumn(accessor ColumnAccessor) (OperablePartition, error)                       // AddColumn adds space for a new column to this Partition
	RemoveColumn(colName string) (OperablePartition, error)                             // RemoveColumn removes a column's data and metadata from this Partition
	RenameColumn(colName string, newAccessor ColumnAccessor) (OperablePartition, error) // RenameColumn modifies the name for a column in Partition data
	MapRows(fn MapOperation) (OperablePartition, error)                                 // MapRows runs a MapOperation on each row in this Partition, manipulating them in-place. Will fall back to creating a fresh partition if PartitionRowErrors occur.
	FlatMapRows(fn FlatMapOperation) ([]OperablePartition, error)                       // FlatMapRows runs a FlatMapOperation on each row in this Partition, creating new Partitions
	FilterRows(fn FilterOperation) (OperablePartition, error)                           // FilterRows filters the Rows in the current Partition, creating a new one. Rows are retained if FilterOperation returns true
}

An OperablePartition can be operated on

type Partition

type Partition interface {
	ID() string            // ID retrieves the ID of this Partition
	GetMaxRows() int       // GetMaxRows retrieves the maximum number of rows in this Partition
	GetNumRows() int       // GetNumRows retrieves the number of rows in this Partition
	GetRow(rowNum int) Row // GetRow retrieves a specific row from this Partition
	GetSchema() Schema     // GetSchema retrieves the schema for this Partition
}

A Partition is a portion of a columnar dataset, consisting of multiple Rows. Partitions are not generally interacted with directly, instead being manipulated in parallel by DataFrame Tasks.

type PartitionAssignmentStrategy

type PartitionAssignmentStrategy interface {
}

A PartitionAssignmentStrategy does something useful TODO

type PartitionCache

type PartitionCache interface {
	Destroy()
	Add(key string, value ReduceablePartition)
	Get(key string) (value ReduceablePartition, err error) // removes the partition from the cache and returns it, if present. Returns an error otherwise.
	GetSerialized(key string) ([]byte, error)              // removes the partition from the cache in a serialized format, if present. Returns an error otherwise.
	CurrentSize() int
	Resize(frac float64) bool // resize by a fraction RELATIVE TO THE CURRENT NUMBER OF ITEMS IN THE CACHE
}

PartitionCache is a cache for Partitions

type PartitionIndex

type PartitionIndex interface {
	SetMaxRows(maxRows int)                                                      // Change the maxRows for future partitions created by this index
	GetNextStageSchema() Schema                                                  // Returns the Schema for the Stage which will *read* from this index
	MergePartition(part ReduceablePartition, reducefn ReductionOperation) error  // Merges all the Rows within a keyed Partition into this PartitionIndex. reducefn may be nil, indicating that reduction is not intended.
	MergeRow(tempRow Row, row Row, reducefn ReductionOperation) error            // Merges a keyed Row of data into the PartitionIndex, possibly appending it to an existing/new Partition, or combining it with an existing Row. reducefn may be nil, indicating that reduction is not intended.
	GetPartitionIterator(destructive bool) PartitionIterator                     // Returns the PartitionIterator for this PartitionIndex. Must always return the same PartitionIterator, even if called multiple times.
	GetSerializedPartitionIterator(destructive bool) SerializedPartitionIterator // Returns a SerializedPartitionIterator for this PartitionIndex. Must always return the same SerializedPartitionIterator, even if called multiple times.
	NumPartitions() uint64                                                       // Returns the number of Partitions in this PartitionIndex
	CacheSize() int                                                              // Returns the in-memory size (in Partitions) of the underlying PartitionCache
	ResizeCache(frac float64) bool                                               // Resizes the underlying PartitionCache
	Destroy()                                                                    // Destroys the index
}

A PartitionIndex is an index for Partitions, useful for shuffling, sorting and/or reducing. An implementation of PartitionIndex permits the indexing of Partitions as well as individual rows, and provides a PartitionIterator/SerializedPartitionIterator to iterate over the indexed partitions in a particular order unique to the implementation (e.g. sorted order for an index which sorts Rows). Leverages an underlying PartitionCache for Partition storage, rather than storing Partition data itself.

type PartitionIterator

type PartitionIterator interface {
	HasNextPartition() bool
	// if unlockPartition is not nil, it must be called when one is finished with the returned Partition
	NextPartition() (part Partition, unlockPartition func(), err error)
	OnEnd(onEnd func())
}

PartitionIterator is a generalized interface for iterating over Partitions, regardless of where they come from

type PartitionLoader

type PartitionLoader interface {
	ToString() string                                        // for logging
	Load(parser DataSourceParser) (PartitionIterator, error) // how to actually load data
	GobEncode() ([]byte, error)                              // how to serialize this PartitionLoader
	GobDecode([]byte) error                                  // how to deserialize this PartitionLoader
}

PartitionLoader is a description of how to load specific Partitions of data from a particular DataSource. DataSources implement this interface to implement data-loading logic. PartitionLoaders are assigned round-robin to workers, so an assumption is made that each PartitionLoader will produce a roughly equal number of Partitions

type PartitionMap

type PartitionMap interface {
	HasNext() bool
	Next() PartitionLoader
}

PartitionMap is an interface describing an iterator for PartitionLoaders. Returned by DataSource.Analyze(), a Coordinator will iterate through PartitionLoaders and assign them to Workers.

type ReduceablePartition

type ReduceablePartition interface {
	BuildablePartition
	KeyablePartition
	SerializablePartition
	PopulateTempRow(tempRow Row, idx int)
	FindFirstKey(key uint64) (int, error)                                          // PRECONDITION: Partition must already be sorted by key
	FindLastKey(key uint64) (int, error)                                           // PRECONDITION: Partition must already be sorted by key
	FindFirstRowKey(keyBuf []byte, key uint64, keyfn KeyingOperation) (int, error) // PRECONDITION: Partition must already be sorted by key
	FindLastRowKey(keyBuf []byte, key uint64, keyfn KeyingOperation) (int, error)  // PRECONDITION: Partition must already be sorted by key
	AverageKeyValue() (uint64, error)                                              // AverageKeyValue is the average value of key within this sorted, keyed Partition
	Split(pos int) (ReduceablePartition, ReduceablePartition, error)               // Split splits a Partition into two Partitions. Split position ends up in right Partition.
	BalancedSplit() (uint64, ReduceablePartition, ReduceablePartition, error)      // Split position ends up in right Partition.
}

A ReduceablePartition can be stored in a PartitionIndex. Used in the implementation of Partition reduction

type ReductionOperation

type ReductionOperation func(lrow Row, rrow Row) error

ReductionOperation - A generic function for reducing Rows across workers. rrow is merged into lrow, and rrow is discarded.

type Row

type Row interface {
	GetPartID() string                               // Returns the ID of the containing partition (if there is one)
	GetKey() (uint64, error)                         // Returns the key for this Row, or an error if the Row's Partition isn't keyed
	Schema() Schema                                  // Schema returns a read-only copy of the schema for a row
	String() string                                  // String returns a string representation of this row
	IsNil(colName string) bool                       // IsNil returns true iff the given column value is nil in this row. If an error occurs, this function will return false.
	SetNil(colName string) error                     // SetNil sets the given column value to nil within this row
	Get(colName string) (val interface{}, err error) // Get returns the value of any column as an interface{}, if it exists
	Set(colName string, val interface{}) error       // Set sets the value of any column to an interface{} (or nil), if it exists
}

Row is a representation of a single row of columnar data, (a slice of a Partition), along with a reference to the Schema for that Row. In practice, users of Row will call its getter and setter methods to retrieve, manipulate and store data

type RowFactory

type RowFactory func() Row

RowFactory is a function that produces a fresh Row. Used specifically within a FlatMapOperation, a RowFactory gives the client a mechanism to return more Rows than were originally within a Partition.

type RuntimeStatistics

type RuntimeStatistics interface {
	// GetStartTime returns the start time of the Sif pipeline
	GetStartTime() time.Time
	// GetRuntime returns the running time of the Sif pipeline
	GetRuntime() time.Duration
	// GetNumRowsProcessed returns the number of Rows which have been processed so far, counted by stage
	GetNumRowsProcessed() []int64
	// GetNumPartitionsProcessed returns the number of Partitions which have been processed so far, counted by stage
	GetNumPartitionsProcessed() []int64
	// GetCurrentPartitionProcessingTime returns a rolling average of partition processing time
	GetCurrentPartitionProcessingTime() time.Duration
	// GetStageRuntimes returns all recorded stage runtimes, from the most recent run of each Stage
	GetStageRuntimes() []time.Duration
	// GetStageTransformRuntimes returns all recorded stage transform-phase runtimes, from the most recent run of each Stage
	GetStageTransformRuntimes() []time.Duration
	// GetStageShuffleRuntimes returns all recorded stage shuffle-phase runtimes, from the most recent run of each Stage
	GetStageShuffleRuntimes() []time.Duration
}

RuntimeStatistics facilitates the retrieval of statistics about a running Sif pipeline

type Schema

type Schema interface {
	Equals(otherSchema Schema) error                            // Equals return true iff this Schema is identical to otherSchema
	Clone() Schema                                              // Clone() returns a deep copy of this Schema
	NumColumns() int                                            // NumColumns returns the number of columns in this Schema.
	NumFixedWidthColumns() int                                  // NumFixedWidthColumns returns the number of fixed-length columns in this Schema.
	NumVariableWidthColumns() int                               // NumVariableWidthColumns returns the number of variable-length columns in this Schema.
	GetColumn(colName string) (col Column, err error)           // GetColumn returns the Column associated with colName, or an error if none exists
	HasColumn(colName string) bool                              // HasColumn returns true iff colName corresponds to a Column in this Schema
	ColumnNames() []string                                      // ColumnNames returns a list of Column names in this Schema, in the order they were created
	ColumnTypes() []ColumnType                                  // ColumnTypes returns a list of Column types in this Schema, in the order they were created
	ColumnAccessors() []ColumnAccessor                          // ColumnAccessors returns a list of ColumnAccessors in this Schema, in the order they were created
	ForEachColumn(fn func(name string, col Column) error) error // ForEachColumn runs a function for each Column in this Schema
}

Schema is a mapping from column names to column definitions within a Row.

type SerializablePartition

type SerializablePartition interface {
	Partition
	SetCompressor(Compressor) // SetCompressor defines the compressor for this Partition. Must be called prior to serializing.
	ToBytes() ([]byte, error) // ToBytes serializes a Partition to a byte array suitable for persistence to disk
}

A SerializablePartition can be compressed and serialized

type SerializedPartitionIterator

type SerializedPartitionIterator interface {
	HasNextSerializedPartition() bool
	NextSerializedPartition() (id string, spart []byte, done func(), err error)
	OnEnd(onEnd func())
}

SerializedPartitionIterator is a generalized interface for iterating over SerializedPartitions, regardless of where they come from

type StageContext

type StageContext interface {
	context.Context
	ShuffleBuckets() []uint64                                // ShuffleBuckets returns the shuffle buckets for this stage, or an empty slice if there are none
	SetShuffleBuckets([]uint64) error                        // SetShuffleBuckets configures the shuffle buckets for this Stage
	OutgoingSchema() Schema                                  // OutgoingSchema returns the initial underlying data schema for the next stage (or the current Stage if this is the last Stage)
	SetOutgoingSchema(schema Schema) error                   // SetOutgoingSchema sets the initial underlying data schema for the next stage within this StageContext (or sets to the current Stage schema if this is the last Stage)
	PartitionCache() PartitionCache                          // PartitionCache returns the configured PartitionCache for this Stage, or nil if none exists
	SetPartitionCache(cache PartitionCache) error            // SetPartitionCache configures the PartitionCache for this Stage, returning an error if one is already set
	PartitionIndex() PartitionIndex                          // PartitionIndex returns the PartitionIndex for this StageContext, or nil if one has not been set
	SetPartitionIndex(idx PartitionIndex) error              // SetPartitionIndex sets the PartitionIndex for this StageContext. An error is returned if one has already been set.
	IncomingPartitionIterator() PartitionIterator            // IncomingPartitionIndex returns the incoming PartitionIterator for this StageContext, or nil if one has not been set
	SetIncomingPartitionIterator(i PartitionIterator) error  // SetIncomingPartitionIndex sets the incoming PartitionIterator for this StageContext. An error is returned if one has already been set.
	KeyingOperation() KeyingOperation                        // KeyingOperation retrieves the KeyingOperation for this Stage (if it exists)
	SetKeyingOperation(keyFn KeyingOperation) error          // Configure the keying operation for the end of this stage
	ReductionOperation() ReductionOperation                  // ReductionOperation retrieves the ReductionOperation for this Stage (if it exists)
	SetReductionOperation(reduceFn ReductionOperation) error // Configure the reduction operation for the end of this stage
	Accumulator() Accumulator                                // Accumulator retrieves the Accumulator for this Stage (if it exists)
	SetAccumulator(acc Accumulator) error                    // Configure the accumulator for the end of this stage
	CollectionLimit() int                                    // CollectionLimit retrieves the CollectionLimit for this Stage (or -1 if unset)
	SetCollectionLimit(limit int) error                      // Configure the CollectionLimit for the end of this stage
	TargetPartitionSize() int                                // TargetPartitionSize returns the intended Partition maxSize for outgoing Partitions
	SetTargetPartitionSize(TargetPartitionSize int) error    // SetTargetPartitionSize configures the intended Partition maxSize for outgoing Partitions
	Destroy() error                                          // Destroys anything using a lot of memory or goroutines within this StageContext
}

A StageContext is a Context enhanced to store Stage state during execution of a Stage

type Task

type Task interface {
	RunInitialize(sctx StageContext) error
	RunWorker(sctx StageContext, previous OperablePartition) ([]OperablePartition, error)
	RunCoordinator(sctx StageContext) error
}

A Task is an action or transformation applied to Partitions of columnar data.

type TaskType

type TaskType string

TaskType describes the type of a Task, used internally to control behaviour

const (
	// WithColumnTaskType indicates that this task adds a column
	WithColumnTaskType TaskType = "add_column"
	// RemoveColumnTaskType indicates that this task removes a column
	RemoveColumnTaskType TaskType = "remove_column"
	// RenameColumnTaskType indicates that this task renames a column
	RenameColumnTaskType TaskType = "rename_column"
	// ExtractTaskType indicates that this task sources data from a DataSource
	ExtractTaskType TaskType = "extract"
	// ShuffleTaskType indicates that this task triggers a Shuffle
	ShuffleTaskType TaskType = "shuffle"
	// AccumulateTaskType indicates that this task triggers an Accumulation
	AccumulateTaskType TaskType = "accumulate"
	// FlatMapTaskType indicates that this task triggers a FlatMap
	FlatMapTaskType TaskType = "flatmap"
	// MapTaskType indicates that this task triggers a Map
	MapTaskType TaskType = "map"
	// FilterTaskType indicates that this task triggers a Filter
	FilterTaskType TaskType = "filter"
	// CollectTaskType indicates that this task triggers a Collect
	CollectTaskType TaskType = "collect"
)

type VariableWidthColumnType

type VariableWidthColumnType interface {
	ColumnType
	Serialize(v interface{}) ([]byte, error) // Defines how this type is serialized, returning an error if the given value is not of the expected type, or if something went wrong during serialization
}

VariableWidthColumnType is an interface which is implemented to define a column type for variable-width data (where the serialized form has a varying size).

Directories

Path Synopsis
Package accumulators provides common accumulators
Package accumulators provides common accumulators
Package cluster represents the cluster components of Sif
Package cluster represents the cluster components of Sif
Package coltype provides the built-in ColumnTypes for Sif Schemas
Package coltype provides the built-in ColumnTypes for Sif Schemas
Package datasource contains various built-in DataSource types for Sif
Package datasource contains various built-in DataSource types for Sif
file
Package file provides a DataSource which reads data from a directory of files on disk.
Package file provides a DataSource which reads data from a directory of files on disk.
memory
Package memory provides a DataSource which reads data from an in-memory buffer.
Package memory provides a DataSource which reads data from an in-memory buffer.
memorystream
Package memorystream provides a DataSource which streams data from an in-memory buffer.
Package memorystream provides a DataSource which streams data from an in-memory buffer.
odbc
Package odbc provides a DataSource which reads data from an ODBC connection.
Package odbc provides a DataSource which reads data from an ODBC connection.
parser
Package parser features built-in parsers which interpret data from DataSources
Package parser features built-in parsers which interpret data from DataSources
parser/dsv
Package dsv parses Delimiter-separated DataSources
Package dsv parses Delimiter-separated DataSources
parser/jsonl
Package jsonl parses JSON Lines DataSources.
Package jsonl parses JSON Lines DataSources.
Package errors contains built-in error types for Sif
Package errors contains built-in error types for Sif
internal
dataframe
Package dataframe contains code related to the definition and execution of DataFrames
Package dataframe contains code related to the definition and execution of DataFrames
partition
Package partition comprises code related to the manipulation of Partitioned data, exposed as Rows
Package partition comprises code related to the manipulation of Partitioned data, exposed as Rows
pcache
Package pcache contains an implementation of an LRU cache for Partitions
Package pcache contains an implementation of an LRU cache for Partitions
pindex
Package pindex contains implementations of PartitionIndex
Package pindex contains implementations of PartitionIndex
pindex/bucketed
Package bucketed provides a tree-based BucketedPartitionIndex
Package bucketed provides a tree-based BucketedPartitionIndex
pindex/hashmap
Package hashmap provides a map-based PartitionIndex which is suitable for collection
Package hashmap provides a map-based PartitionIndex which is suitable for collection
pindex/tree
Package tree provides a tree-based PartitionIndex which is suitable for reduction
Package tree provides a tree-based PartitionIndex which is suitable for reduction
rpc
stats
Package stats collects statistics about running Sif pipelines
Package stats collects statistics about running Sif pipelines
Package logging includes log levels, and logging-related functions
Package logging includes log levels, and logging-related functions
Package operations contains built-in transformations and actions to apply to DataFrames
Package operations contains built-in transformations and actions to apply to DataFrames
transform
Package transform provides Operations which transform DataFrame rows
Package transform provides Operations which transform DataFrame rows
util
Package util provides utility Operations for DataFrames
Package util provides utility Operations for DataFrames
Package schema helps define Schemas for data processed by Sif
Package schema helps define Schemas for data processed by Sif
Package stats exposes the grpc API for accessing runtime statistics from a Sif coordinator
Package stats exposes the grpc API for accessing runtime statistics from a Sif coordinator
Package testing contains utilities for testing Sif code
Package testing contains utilities for testing Sif code

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL