occult

package module
v0.0.0-...-3e71865 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2014 License: BSD-3-Clause Imports: 16 Imported by: 0

README

Project Occult

Occult is an open-source, distributed, cache-oriented, array processing architecture for scientific computing.

Quote from Wikipedia: From the scientific perspective, occultism is regarded as unscientific as it does not make use of the standard scientific method to obtain facts.

That's what it feels like to do scientific computing on distributed systems!

Design Goals

  • Simplicity of implementation, deployment, and use.
  • A programming model that hides the complexity of distributed systems.
  • Minimize network traffic and read operations from slow data sources.
  • Cache data in memory to speed up processing. Dynamically route work to where the data is.
  • Avoid centralized management to achieve unlimited scalability and elasticity.
  • Unified platform for batch-, iterative-, and stream-processing.
  • Fault-tolerant.

This is an experimental package to explore distributed array processing architectures using the Go programming language.

The inspiration came from various sources including ideas behind SciDB, Apache Spark, S4 (PDF), and many other open source projects.

Use Cases

  • Process time series data.
  • Detect anomalies in a data center using streams of measurements.
  • Analyze sensor data in Internet of Things (IoT) applications.
  • Detect intruders in a network.
  • Set alarms in a surveillance system that analyzes video using computer vision algorithms.
  • Train a model to predict clicks on a web page.

How It Works

  • Data is ingested by a distributed store. (Not part of the project.)
  • A data source is organized as a sequence of records with an integer, 64-bit key.
  • An app is an executable program deployed to all the nodes in the cluster.
  • An app is a graph of simple processing functions where a processor depends on the outputs of other processors.
  • Processors return a Value for a given key. (Pull architecture.)
  • The application graph may depend on slow data sources.
  • Processors cache intermediate results transparently by key.

Example

source: occult_test.go

To run occult_test:

# If you need to install Go, see:
# install: http://golang.org/doc/install
# setup: http://golang.org/doc/code.html
go get -u github.com/akualab/occult
cd GOPATH/src/github.com/akualab/occult
go test -v

In this example,

  • randomFunc is the data source which provides an array of ints.
  • windowFunc is applied every N samples, returns a slice of ints of length winSize.
  • sortFunc sorts the slice of ints returned by the window.
  • quantileFunc returns a slice with the values of the quantiles (eg. n=2 returns one value, the median.)

The application is a collection of short processing functions. The processing details are hidden. To get a value, for example quantiles for key 33, simply run:

result, err := quantile(33)

See also the collaborative filtering example: README.md

Under the Hood

The core functionality is in occult.go.

The approach is to provide a basic type called ProcFunc:

type ProcFunc func(key uint64, ctx *Context) (Value, error)

with only two arguments, the key and a context object. The wiring of the processors is handled by Context as follows:

aProcessor := ctx.Inputs()[0] // get the first input which points to aProcesror.
anotherProcessor := ctx.Inputs()[1] // get the second input and as many as required.

to get a value from the input:

var uint64 key = 111 // some key
in0, err := aProcessor(key) // Processors return errors.
if err == occult.ErrEndOfArray {
	break // when we don't know the length of the array, we rely on errors.
}
s := in0.(MyType) // use type assertion to uncover the underlying type

the variable s has the value produced by aProcessor for key 111.

Finally, to build an application and get Processor instances, we add the ProcFunc functions to an app using app.Add() and app.AddSource(). The latter will set a flag to indicate that is a slow source. This information will be used to allocate work to nodes efficiently.

Note that a ProcFunc can be used to create more than one processor. The Processor instances will have the same functionality but may use different inputs and parameters. ProcFunc can be written to be highly reusable or highly customized for the application (one-time use).

As always, with Go, we decide to reuse or rewrite using a pragmatic approach. Writing custom code can be much faster and cleaner than writing reusable code. Fewer levels of indirection makes code simpler and easier to understand.

Using a Cluster

We implemented initial cluster functionality for experimentation. Any node can do any work but the router is responsible to make the distribution of work efficient. For now router is doing a dumb round-robin.To send values across the wire, we use the RPC package. Values are encoding using GOB. Custom types must be registered.

Finding Memory

Performance is achieved by distributing work among the nodes in the cluster. However, any node can do any work. A parallel system will be responsible for maintaining routing tables that instruct the app where to get the work done for a given index. This information is built dynamically. For example, to get someWork(333), the app will look up node for the (processor, key) pair. If the info does not exist, the node is chosen based on load or other criteria. However, the mapping between work and node is broadcasted to all the nodes in the cluster to update all the local routing tables.

Each processor instance has a separate LRU cache. Values are cached by key. The code was adapted from the vitess. For now, all cached have the same capacity (max number of items). However, cache capacity can be managed dynamically, based on performance.

Messaging

Because all nodes can do any work, the system feels like a stateless machine, even though state is encoded in the processor graph as a derivative of the original data sources. In other words, messages can get lost and nodes can be added or removed from the cluster without causing failures, only temporary degradation in performance. The only requirement is to have the original data sources available.

Next Steps

  • Get feedback on overall architecture and API.
  • Decide: does the world need this system? Written in Go?
  • Improve cluster functionality. (routing, cluster coordination, failure mgmt, config changes.)
  • Find a task (sponsor?) to build an app for testing using a very large data set. (suggestions?)

Thanks! Leo Neumeyer, May 12, 2014.

Documentation

Overview

Occult: A cache-oriented array processing platform.

Index

Constants

View Source
const (
	DefaultCacheCap          = 2000
	NumRetries               = 20 // Num attempts to connect to other nodes.
	DefaultBlockSize  uint64 = 10
	DefaultNumWorkers        = 2
	DefaultGoMaxProcs        = 2
)

Variables

View Source
var (
	ErrEndOfArray = errors.New("reached the end of the array")
)

Functions

This section is empty.

Types

type App

type App struct {
	Name       string `yaml:"name"`
	CacheCap   uint64 `yaml:"cache_cap"`
	NumWorkers int    `yaml:"num_workers"`
	BlockSize  uint64 `yaml:"block_size"`
	NumRetries int    `yaml:"num_retries"`
	GoMaxProcs int    `yaml:"go_max_procs"`
	// contains filtered or unexported fields
}

An App coordinates the execution of a set of processors.

func NewApp

func NewApp(config *Config) *App

Creates a new App.

func (*App) Add

func (app *App) Add(fn ProcFunc, opt interface{}, inputs ...Processor) Processor

Adds a ProcFunc to the app. The instance may use opt to retrieve parameters and is wired using the inputs.

func (*App) AddSource

func (app *App) AddSource(fn ProcFunc, opt interface{}, inputs ...Processor) Processor

Same as Add but indicating that this is a presistent source. The system will attempt to use the same cluster node for a given key. This affinity will increase the cache hit rate and minimize reads from the persistent source.

func (*App) Context

func (app *App) Context(id int) *Context

func (*App) Run

func (app *App) Run()

Run app. Must be called after adding processors.

func (*App) SetServer

func (app *App) SetServer(b bool)

func (*App) Shutdown

func (app *App) Shutdown()

Shutdown all the servers in the cluster.

type Cluster

type Cluster struct {
	Name string
	// All nodes in teh cluster.
	Nodes []*Node `yaml:"nodes"`
	// The local node ID.
	NodeID int
}

func (*Cluster) IsLocal

func (c *Cluster) IsLocal(id int) bool

Returns true if node id is the local node.

func (*Cluster) LocalNode

func (c *Cluster) LocalNode() *Node

Returns Node for node id.

func (*Cluster) Node

func (c *Cluster) Node(id int) *Node

Returns Node for node id.

type Config

type Config struct {
	App     *App     `yaml:"app"`
	Cluster *Cluster `yaml:"cluster"`
}

Configuration file. Example:

app:
  name: "myapp"
  cache_cap: 1000
cluster:
  nodes:
    - id: 0
    - addr: ":33330"
    - id: 1
    - addr: ":33331"

func OneNodeConfig

func OneNodeConfig() (config *Config)

func ReadConfig

func ReadConfig(filename string) (config *Config, err error)

Read the occult configuration file.

func (*Config) String

func (c *Config) String() string

type Context

type Context struct {
	// The Options field is made available to apps to pass parameters
	// to proc instances in any way they want.
	Options interface{}
	// contains filtered or unexported fields
}

The context provides internal information for processor instances. Each processor instance has a context.

func (*Context) Inputs

func (ctx *Context) Inputs() []Processor

type Node

type Node struct {
	ID   int    `yaml:"id"`
	Addr string `yaml:"addr"`
	// contains filtered or unexported fields
}

type ProcFunc

type ProcFunc func(key uint64, ctx *Context) (Value, error)

All processors must be implemented using this function type.

type Processor

type Processor func(key uint64) (Value, error)

A Processor instance. Once the processor instance is created, the parameters and inputs cannot be changed.

func (Processor) Map

func (p Processor) Map(start, end uint64) (values []Value, err error)

Map applies the processor to the key range {start..end}. Returns a slice of Values of length (end-start).

func (Processor) MapAll

func (p Processor) MapAll(start uint64, ctx *Context) chan Value

MapAll applies the processor to the processor values with key range {start..}.

type RArgs

type RArgs struct {
	Start, End uint64
	ProcID     int
}

Use RPC to request a value to a remote node. The arg is the key range and processor instance id. If succesful, returns a slice of values whose length is between zero and (End-Start).

type RProc

type RProc struct {
	// contains filtered or unexported fields
}

RPC type to get remote values.

func (*RProc) Get

func (rp *RProc) Get(args *RArgs, rv *Slice) error

RPC method to get remote values.

func (*RProc) Ready

func (rp *RProc) Ready(args int, ready *bool) error

Tells client if server is ready to start takign requests.

func (*RProc) Shutdown

func (rp *RProc) Shutdown(args int, ready *bool) error

type Router

type Router interface {
	// Target node for processor instance and key.
	Route(key uint64, procID int) *Node
	// Target node for processor slice.
	RouteSlice(start, end uint64, procID int) *Node
}

A router identifies which remote node can do the requested work efficiently for a given processor instance and range of keys.

type Slice

type Slice struct {
	Offset uint64
	Data   []Value
}

A slice of values.

func NewSlice

func NewSlice(start uint64, size, cap int) *Slice

Creates a new Slice.

func ToSlice

func ToSlice(key uint64, vals ...Value) *Slice

Converts Values to a Slice.

func (*Slice) End

func (s *Slice) End() uint64

Offset position after the last value of this Slice.

func (*Slice) Length

func (s *Slice) Length() int

The length of the Slice.

func (*Slice) Start

func (s *Slice) Start() uint64

The offset for this Slice.

type Value

type Value interface{}

The value returned by Processors.

Directories

Path Synopsis
examples
reco
Implement various collaborative filtering algorithms [1] and evaluate using the movie lense data set [2] [1] http://www.stanford.edu/~lmackey/papers/cf_slides-pml09.pdf [2] http://grouplens.org/datasets/movielens/ See README.md for details.
Implement various collaborative filtering algorithms [1] and evaluate using the movie lense data set [2] [1] http://www.stanford.edu/~lmackey/papers/cf_slides-pml09.pdf [2] http://grouplens.org/datasets/movielens/ See README.md for details.
A key-value store based on leveldb.
A key-value store based on leveldb.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL