beam

package module
v0.0.0-...-24fa07a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2013 License: Apache-2.0 Imports: 13 Imported by: 0

README

Beam

Beam is a protocol and library for service-oriented communication, with an emphasis on real-world patterns, simplicity and not reinventing the wheel.

It was created by Solomon Hykes and is maintained by Guillaume Charmes and the Docker team at dotCloud, inc.

Design principles

Real-world patterns

Modern applications are made of many loosely coupled components which communicate with each other over the network. Typically there are 4 patterns of communication: crud, sync, jobs and streams.

  1. CRUD: read and write structured data representing the state of another service.

  2. Sync: same as crud, except the state is pushed continuously by the service as it changes.

  3. Jobs: remotely execute "tasks" exposed by another service and interact with their inputs and outputs.

  4. Streams. Send and receive continuous streams of data produced or consumed by another service.

Most applications combine more than one pattern, and typically have to use a different tool for each pattern, as communication tend to be biased towards one of them.

Simplicity

Beam doesn't try to magically solve all the communication problems in every application. Rather, it offers a robust and low-level primitive which can be used as a foundation to solve higher-level communication problems, like service discovery, job scheduling, queueing, latency management, caching, etc.

Don't reinvent the wheel

Beam relies heavily on existing standards and conventions.

Specifically it uses the Redis protocol for both CRUD and sync. In other words, Beam is a subset of the Redis protocol, and yes, every redis client is also a valid Beam client.

Instead of the traditional (and obsolete) RPC metaphor, Beam exposes jobs using the familiar unix process API. And just like processes, jobs can produce and consume streams.

Underlying transport

Beam is a point-to-point protocol designed for structure communication over a reliable, bi-directional octet stream. TCP, Unix domain sockets and TLS sessions are all good transports.

Wire protocol

The beam protocol is a strict subset of the Redis wire protocol, with additional semantics. You could say it's the "ReST of Redis". In other words, all Beam commands map to a sequence of 1 or more valid Redis commands, but not all Redis commands are valid Beam commands.

For a reference of the Redis wire protocol, see http://redis.io/topics/protocol.

Reading and writing data

Once a session is established by the underlying transport, a context is exposed to the client as a redis database. The full set of redis commands is available to read and write values on that context. Beam does not specify which keys must be accessible for read or write - that is up to the context to implement.

Multiple clients may access the context data concurrently, just like they can access a redis database concurrently. All commands follow the redis specification.

Keys beginning with "/beam" are reserved for beam usage, and should not be used.

Data synchronization

A beam client can subscribe to a continuous feed of updates to the context data, in effect maintaining a local copy of the entire context database.

This can be used to watch for changes on particular fields, for example.

Synchronization is implemented using the SYNC redis command. This means that synchronization is available "for free" without extra work in the application, and without re-inventing the wheel in the Beam protocol.

Jobs

A Beam context is basically a redis database which can run jobs. A job is a mechanism for executing code remotely within the context of the context. The implementation of the job is entirely up to the context.

Jobs can be used as an alternative to RPC. The main difference is that jobs are modeled after unix processes, not function calls. This means the API for manipulating jobs is both unfamiliar (if you're used to rpc) and very familiar (if you've ever used a unix shell).

A job has the following inputs:

  • Command-line arguments. This is an array of strings describing the job's arguments. For example: ["ping", "www.google.com"]

  • Environment. This is a set of key-value pairs organized in a hash. They are a form of configuration. For example: {"DEBUG": "1", "retries": "42}.

  • Input streams. Once the job is started it may read data from any number of optional binary streams (similar to stdin in unix processes). The streams are framed, so they can be used for discrete messages as well as continuous octet streams.

A job can produce the following outputs:

  • Output streams. Symetrically to input streams, a job may write data to any number of optional binary streams (similar to stdout and stderr). The streams are framed, so they can be used for discrete messages as well as continuous octet streams.

  • Exit status. When a job exits, it yields an integer as its exit status. Just like unix processes, 0 indicates successful completion. Any other number indicates failure. Similar failure codes should indicate similar kinds of errors.

  • Changes to context. Unlike rpc calls, beam jobs can't return arbitrary data. Instead, they have shared access to the context.

Creating a job

Pseudo-code for creating a job from a client, using redis commands:

# Run the job ["exec", "ls", "-l", "/foobar"] with DEBUG=1 in the environment,
# then stream the result.

# Create a new job and get its id
nJobs = RPUSH /jobs exec
id = nJobs - 1

# Set arguments
RPUSH /jobs/$id/args ls -l /foobar

# Set environment
HSET /jobs/$id/env DEBUG 1

# Start the job
RPUSH /jobs/start $id

# Send stdin on an input stream, then close it
for line in readlines(stdin) {
	RPUSH /jobs/$id/in "0:$line"
}
RPUSH /jobs/$id/in "0:"

# Read output streams
while true {
	frame = BLPOP /jobs/$id/out
	id, data = split(frame, ":")
	if data == "" {
		print "Stream $id is closed"
	} else if id == "1" {
		print(stdout, $data)
	} else if id == "2" {
		print(stderr, $data)
	} else {
		print "Received data on extra channel $id"
	}
}

# Wait for the job to complete
while true {
	BLPOP /jobs/$id/wait
	status = GET /jobs/$id/status
	if status != "" {
		print "Job $id exited with status $status"
		break
	}
}

Serving jobs
# Wait for a job start instruction and start a new job
# Main loop to watch for jobs and start a thread for each
while true {
	id = BLPOP /jobs/start
	lock_acquired = SETNX /jobs/$id "$worker_id"
	if lock_acquired {
		serve_job($id)
	}
}

# Handle a job start request
name = LINDEX /jobs $id
args = LRAGE /jobs/$id/args 0 -1
env = HGETALL /jobs/$id/env
handler = lookupHandler($name)
err = handler($name, $args, $env, streamer(/jobs/$id/in, /jobs/$id/out), $db)
if err == nil {
	SET /jobs/$id/status 0
} else {
	SET /job/$id/status string(err)
}
RPUSH /job/$id/wait $id

Service addressing

Beam does not define how to map a particular context to a particular client connection. That is the responsibility of the server. For example, all clients connecting to a certain tcp port might share the same context (see notes on multi-tenancy), or each new connection might get its own context generated on the fly, or maybe an HTTP gateway selects the right context from the request url, then upgrades the connection to websocket.

Documentation

Index

Constants

View Source
const (
	DEFAULTTIMEOUT = 0 // Wait forever
)

Variables

View Source
var (
	ErrInvalidResposeType = errors.New("Invalid response type")
)

Functions

func Debugf

func Debugf(format string, a ...interface{})

Debug function, if the debug flag is set, then display. Do nothing otherwise If Docker is in damon mode, also send the debug info on the socket

Types

type Channel

type Channel struct {
	SendEmptyResponseOnClose bool
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(stream chan []byte, sendEmptyResponseOnClose bool) (*Channel, chan bool)

func (*Channel) Close

func (c *Channel) Close() error

func (*Channel) Read

func (c *Channel) Read(p []byte) (n int, err error)

func (*Channel) ReadFrom

func (c *Channel) ReadFrom(r io.Reader) (int64, error)

func (*Channel) Write

func (c *Channel) Write(p []byte) (int, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(connector Connector) (*Client, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) NewJob

func (c *Client) NewJob(name string, args ...string) (*Job, error)

type Connector

type Connector interface {
	Connect() (net.Conn, error)
}

type DB

type DB interface {
}

type Job

type Job struct {
	Id         int
	Name       string
	Args       []string
	Env        []string
	Streams    *Streamer
	ExitStatus int
	// contains filtered or unexported fields
}

func (*Job) Start

func (j *Job) Start() error

func (*Job) Wait

func (j *Job) Wait() error

Wait for the job to succeed or fail

type JobHandler

type JobHandler func(name string, args []string, env map[string]string, streams *Streamer, db DB) error

A JobHandler is a function which can be invoked as a job by beam clients. The API for invoking jobs resembles that of unix processes:

  • A job is invoked under a certain <name>.
  • It may receive arguments as a string array (<args>).
  • It may receive an environment as a map of key-value pairs (<env>).
  • It may read from, and write to, streams of binary data. (<streams>).
  • It returns value which can either indicate "success" or a variety of error conditions.

Additionally, a job may modify the server's database, which is shared with all other jobs. This is similar to how multiple unix processes share access to the same filesystem.

type NetTransport

type NetTransport struct {
	Network string
	Address string
}

func (*NetTransport) Connect

func (t *NetTransport) Connect() (net.Conn, error)

type Streamer

type Streamer struct {
	InKey  string
	OutKey string
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(pool *redis.Pool, in, out string) *Streamer

func (*Streamer) Close

func (s *Streamer) Close(name string)

Close closes the stream <name>. All future reads will return io.EOF, and writes will return io.ErrClosedPipe

func (*Streamer) OpenRead

func (s *Streamer) OpenRead(name string) io.ReadCloser

OpenRead returns a read-only interface to receive data on the stream <name>. If the stream hasn't been open for read access before, it is advertised as such to the peer.

func (*Streamer) OpenReadWrite

func (s *Streamer) OpenReadWrite(name string) io.ReadWriter

OpenReadWrite returns a read-write interface to send and receive on the stream <name>. If the stream hasn't been open for read or write access before, it is advertised as such to the peer.

func (*Streamer) OpenWrite

func (s *Streamer) OpenWrite(name string) io.WriteCloser

OpenWrite returns a write-only interface to send data on the stream <name>. If the stream hasn't been open for write access before, it is advertised as such to the peer.

func (*Streamer) ReadFrom

func (s *Streamer) ReadFrom(src io.Reader, name string) (int64, error)

ReadFrom opens a read-only interface on the stream <name>, and copies data to that interface from <src> until EOF or error. The return value n is the number of bytes read. Any error encountered during the write is also returned.

func (*Streamer) Shutdown

func (s *Streamer) Shutdown() error

Shutdown waits until all streams with read access are closed and all WriteTo and ReadFrom operations are completed, then it stops accepting remote messages for its streams, then it returns.

func (*Streamer) WriteTo

func (s *Streamer) WriteTo(dst io.Writer, name string) (int64, error)

WriteTo opens a write-only interface on the stream <name>, and copies data from that interface to <dst> until there's no more data to write or when an error occurs. The return value n is the number of bytes written. Any error encountered during the write is also returned.

type Worker

type Worker struct {
	Prefix string // The prefix for all redis keys
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(transport Connector, prefix string) *Worker

NewWorker initializes a new beam worker.

func (*Worker) KeyPath

func (w *Worker) KeyPath(parts ...string) string

func (*Worker) RegisterJob

func (w *Worker) RegisterJob(name string, h JobHandler)

RegisterJob exposes the function <h> as a remote job to be invoked by clients under the name <name>.

func (*Worker) ServeJob

func (w *Worker) ServeJob(name string, args []string, env map[string]string, streams *Streamer, db DB) error

ServeJob is the server's default job handler. It is called every time a new job is created. It looks up a handler registered at <name>, and calls it with the same arguments. If no handler is registered, it returns an error.

func (*Worker) Work

func (w *Worker) Work() error

Work runs an infinite loop, watching its database for new requests, starting job as requested, moving stream data back and forth, and updating job status as it changes.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL