distchan

package module
v0.0.0-...-d86f4d1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2017 License: MPL-2.0 Imports: 12 Imported by: 0

README

distchan

Package distchan enables Go channels to be used for distributed computation. Improved stable version by covrom.

Why?

While Go's concurrency story around single-process data pipelines is great, its ability to distribute workloads across multiple machines is relatively lacking. There are several options here (notably glow and gleam), but they lack the type-safety and ease-of-use that Go's built-in channels provide.

How?

In a nutshell: standard net primitives, Gob encoding, and reflection.

Architecturally, the package assumes a client-server model of workload distribution. One server can have as many clients connected to it as you want, and work will be distributed across them using a simple round-robin algorithm.

Example

As a simple example, let's say that capitalizing letters in a string is very computationally expensive, and you want to distribute that work across a number of nodes. First, you'll need to create a server in charge of defining the work to be done:

package main

import (
	"log"
	"net"

	"github.com/covrom/distchan"
)

func main() {
	ln, err := net.Listen("tcp", "localhost:5678")
	if err != nil {
		log.Fatal(err)
	}

	var (
		out    = make(chan string)
		in     = make(chan string)
		server = distchan.NewServer(ln, out, in)
	)

	server.Start()
	server.WaitUntilReady() // wait until we have at least one worker available

	go producer(out)

	for s := range in {
		println(s)
	}
}

func producer(out chan<- string) {
	// send strings to be capitalized to out
	out <- "hello world"
	// don't forget to close the channel! this is how all connected
	// clients know that there's no more work coming.
	close(out)
}

Then you'll need to create a client, or worker. It's similarly easy to get wired up, so you can focus on the hard part: capitalizing strings:

package main

import (
	"log"
	"net"
	"strings"

	"github.com/covrom/distchan"
)

func main() {
	conn, err := net.Dial("tcp", "localhost:5678") // must be able to connect to the server
	if err != nil {
		log.Fatal(err)
	}

	var (
		out = make(chan string)
		in  = make(chan string)
	)

	client := distchan.NewClient(conn, out, in).Start()

	// Loop over all input from the server...
	for input := range in {
		capitalized := strings.ToUpper(input)
		// ...and send the results back.
		out <- capitalized
	}

	close(out)
	<-client.Done()
}

Check out the example folder for more examples.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrorInIsNotChannel  = errors.New("Parameter 'in' is not a channel")
	ErrorOutIsNotChannel = errors.New("Parameter 'out' is not a channel")
	ErrorBadRequest      = errors.New("Bad request format")
)

ErrorInIsNotChannel raises when 'in' is not nil and not a channel ErrorOutIsNotChannel raises when 'out' is not nil and not a channel

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a registration between a network connection and a pair of channels. See the documentation for Server for more details.

func NewClient

func NewClient(conn net.Conn, out, in interface{}) (*Client, error)

func (*Client) AddDecoder

func (c *Client) AddDecoder(f Transformer) *Client

AddDecoder adds a new decoder to the client. Any inbound messages will be passed through all registered decoders before being sent to the channel. See the tests for an example of decoding the data using AES encryption.

func (*Client) AddEncoder

func (c *Client) AddEncoder(f Transformer) *Client

AddEncoder adds a new encoder to the client. Any outbound messages will be passed through all registered encoders before being sent over the wire. See the tests for an example of encoding the data using AES encryption.

func (*Client) Done

func (c *Client) Done() <-chan struct{}

Done returns a channel that will be closed once all in-flight data has been handled.

func (*Client) Logger

func (c *Client) Logger() *log.Logger

Logger exposes the client's internal logger so that it can be configured. For example, if you want the logs to go somewhere besides standard output (the default), you can use c.Logger().SetOutput(...).

func (*Client) Start

func (c *Client) Start() *Client

Start instructs the client to begin serving messages.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a registration between a network listener and a pair of channels, one for input and one for output.

func NewServer

func NewServer(ln net.Listener, out, in interface{}) (*Server, error)

NewServer registers a pair of channels with an active listener. Gob-encoded messages received on the listener will be passed to in; any values passed to out will be gob-encoded and written to one open connection. The server uses a simple round-robin strategy when deciding which connection to send the message to; no client is favored over any others.

Note that the returned value's Start() method must be called before any messages will be passed. This gives the user an opportunity to register encoders and decoders before any data passes over the network.

func (*Server) AddDecoder

func (s *Server) AddDecoder(f Transformer) *Server

AddDecoder adds a new decoder to the server. Any inbound messages will be passed through all registered decoders before being sent to the channel. See the tests for an example of decoding the data using AES encryption.

func (*Server) AddEncoder

func (s *Server) AddEncoder(f Transformer) *Server

AddEncoder adds a new encoder to the server. Any outbound messages will be passed through all registered encoders before being sent over the wire. See the tests for an example of encoding the data using AES encryption.

func (*Server) Logger

func (s *Server) Logger() *log.Logger

Logger exposes the server's internal logger so that it can be configured. For example, if you want the logs to go somewhere besides standard output (the default), you can use s.Logger().SetOutput(...).

func (*Server) Ready

func (s *Server) Ready() bool

Ready returns true if there are any clients currently connected.

func (*Server) Start

func (s *Server) Start() *Server

Start instructs the server to begin serving messages.

func (*Server) Stop

func (s *Server) Stop()

Stop instructs the server to stop serving messages.

func (*Server) WaitUntilReady

func (s *Server) WaitUntilReady()

WaitUntilReady blocks until the server has at least one client available.

type Transformer

type Transformer func(src io.Reader) io.Reader

Transformer represents a function that does an arbitrary transformation on a piece of data. It's used for defining custom encoders and decoders for modifying how data is sent across the wire.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL