netchan

package
v0.0.0-...-a2aa5d3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2017 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package netchan enables Go channels to be used over the network. Messages sent over a channel on one machine will be received by a channel of the same type on a different machine. This includes messages that contain channels (i.e. it is possible to "marshal" channels using this package).

There are two fundamental concepts in netchan: "exposing" and "binding". A channel that is exposed, becomes associated with a public name ("ID") and available to receive messages. A channel on a different machine may then be bound to the exposed channel. Messages sent to the bound channel will be transported over a network transport and will become available to be received by the exposed channel. Effectively the two channels become the endpoints of a unidirectional network link.

Exposing a channel

In order to expose a channel under an ID the Expose() function must be used; there is also an Unexpose() function to unexpose a channel. If multiple channels are exposed under the same ID which channel(s) receive a message depends on the ID. ID's that start with a '+' character are considered "broadcast" ID's and messages sent to them are delivered to all channels exposed under that ID. All other ID's are considered "unicast" and messages sent to them are delivered to a single channel exposed under that ID (determined using a pseudo-random algorithm).

To receive exposer errors one can expose error channels (of type chan error) under the special broadcast ID "+err/". All such error channels will receive transport errors, etc.

Binding a channel

In order to bind a channel to an "address" the Bind() function must be used; to unbind the channel simply close it. Addresses in this package depend on the underlying transport and take the form of URI's. For the default TCP transport an address has the syntax: tcp://HOST[:PORT]/ID

When using Bind() an error channel (of type chan error) may also be specified. This error channel will receive transport errors, etc. related to the bound channel.

Marshaling

This package encodes/decodes messages using one of the following builtin encoding formats. These builtin formats can also be used to encode channels into references that can later be decoded and reconstructed on a different machine.

netgob   extension of the standard gob format that also allows for
         channels to be encoded/decoded
         (https://github.com/billziss-gh/netgob)
netjson  extension of the standard json format that also allows for
         channels to be encoded/decoded
         (https://github.com/billziss-gh/netjson)

Channels that are marshaled in this way are also implicitly exposed and bound. When a message that is being sent contains a channel, a reference is computed for that channel and the channel is implicitly exposed under that reference. When the message arrives at the target machine the reference gets decoded and a new channel is constructed and implicitly bound back to the marshaled channel.

It is now possible to use the implicitly bound channel to send messages back to the marshaled and implicitly exposed channel. Implicitly exposed channels that are no longer in use will be eventually garbage collected. Implicitly bound channels must be closed when they will no longer be used for communication.

Transports

This package comes with a number of builtin transports:

tcp      plain TCP transport
tls      secure TLS (SSL) transport
http     sockets over HTTP (similar to net/rpc protocol)
https    sockets over HTTPS (similar to net/rpc protocol)
ws       (optional) WebSocket transport
wss      (optional) secure WebSocket transport

It is possible to add transports by implementing the Transport and Link interfaces.

Example
/*
 * interface_test.go
 *
 * Copyright 2017 Bill Zissimopoulos
 */
/*
 * This file is part of netchan.
 *
 * It is licensed under the MIT license. The full license text can be found
 * in the License.txt file at the root of this project.
 */

package main

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

func ping(wg *sync.WaitGroup, count int) {
	defer wg.Done()

	pingch := make(chan chan struct{})
	errch := make(chan error, 1)
	err := Bind("tcp://127.0.0.1/pingpong", pingch, errch)
	if nil != err {
		panic(err)
	}

	for i := 0; count > i; i++ {
		// send a new pong (response) channel
		pongch := make(chan struct{})
		pingch <- pongch

		fmt.Println("ping")

		// wait for pong response, error or timeout
		select {
		case <-pongch:
		case err = <-errch:
			panic(err)
		case <-time.After(10 * time.Second):
			err = errors.New("timeout")
			panic(err)
		}
	}

	pingch <- nil

	close(pingch)
}

func pong(wg *sync.WaitGroup, exposed chan struct{}) {
	defer wg.Done()

	pingch := make(chan chan struct{})
	err := Expose("pingpong", pingch)
	if nil != err {
		panic(err)
	}

	close(exposed)

	for {
		// receive the pong (response) channel
		pongch := <-pingch
		if nil == pongch {
			fmt.Println("END")
			break
		}

		fmt.Println("pong")

		// send the pong response
		pongch <- struct{}{}
	}

	Unexpose("pingpong", pingch)
}

func main() {
	wg := &sync.WaitGroup{}

	exposed := make(chan struct{})
	wg.Add(1)
	go pong(wg, exposed)
	<-exposed

	wg.Add(1)
	go ping(wg, 10)

	wg.Wait()

}
Output:

ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
END

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrArgumentInvalid             error = MakeErrArgument("netchan: argument: invalid")
	ErrBinderChanBound             error = MakeErrBinder("netchan: binder: chan is bound")
	ErrBinderChanNotBound          error = MakeErrBinder("netchan: binder: chan is not bound")
	ErrTransportInvalid            error = MakeErrTransport("netchan: transport: invalid")
	ErrTransportClosed             error = MakeErrTransport("netchan: transport: closed")
	ErrTransportUnexpectedResponse error = MakeErrTransport("netchan: transport: unexpected response")
	ErrTransportMessageCorrupt     error = MakeErrTransport("netchan: transport: message is corrupt")
	ErrMarshalerNoChanEncoder      error = MakeErrMarshaler("netchan: marshaler: chan encoder not set")
	ErrMarshalerNoChanDecoder      error = MakeErrMarshaler("netchan: marshaler: chan decoder not set")
	ErrMarshalerPanic              error = MakeErrMarshaler("netchan: marshaler: panic")
)

Errors reports by this package. Other errors are also possible. All errors reported implement the Err interface.

View Source
var IdErr = "+err/"

IdErr contains the special error broadcast ID. A channel (of type chan error) exposed under this ID will receive exposer errors. This special broadcast ID is local to the running process and cannot be accessed remotely.

View Source
var IdInv = "+inv/"

IdInv contains the special invalid message broadcast ID. A channel (of type chan Message) exposed under this ID will receive invalid messages. Invalid messages are messages that cannot be delivered to an exposed channel for any of a number of reasons: because they contain the wrong message ID, because their payload is the wrong type, because the destination channels have been closed, etc. This special broadcast ID is local to the running process and cannot be accessed remotely.

Functions

func Bind

func Bind(uri interface{}, ichan interface{}, echan chan error) error

Bind binds a local channel to a URI that is addressing a remote channel. After the binding is established, the bound channel may be used to send messages to the remote channel.

Remotely exposed channels are addressed by URI's. The URI syntax depends on the underlying transport. For the default TCP transport an address has the syntax: tcp://HOST[:PORT]/ID

The uri parameter contains the URI and can be of type string or *url.URL. An error channel (of type chan error) may also be specified. This error channel will receive transport errors, etc. related to the bound channel.

It is also possible to associate a new error channel with an already bound channel. For this purpose use a nil uri and the new error channel to associate with the bound channel.

To unbind a bound channel simply close it.

Bind binds a channel using the DefaultBinder.

func Expose

func Expose(id string, ichan interface{}) error

Expose exposes a channel under an ID. Exposing a channel associates it with the ID and makes it available to receive messages.

If multiple channels are exposed under the same ID which channel(s) receive a message depends on the ID. ID's that start with a '+' character are considered "broadcast" ID's and messages sent to them are delivered to all channels exposed under that ID. All other ID's are considered "unicast" and messages sent to them are delivered to a single channel exposed under that ID (determined using a pseudo-random algorithm).

To receive exposer errors one can expose error channels (of type chan error) under the special broadcast ID "+err/". All such error channels will receive transport errors, etc. This special broadcast ID is local to the running process and cannot be accessed remotely.

It is also possible to receive "invalid" messages on channels (of type chan Message) exposed under the special broadcast ID "+inv/". Invalid messages are messages that cannot be delivered for any of a number of reasons: because they contain the wrong message ID, because their payload is the wrong type, because the destination channels have been closed, etc. As with "+err/" this special broadcast ID is local to the running process and cannot be accessed remotely.

Expose exposes a channel using the DefaultExposer.

func Unexpose

func Unexpose(id string, ichan interface{})

Unexpose unexposes a channel. It disassociates it from the ID and makes it unavailable to receive messages under that ID.

Unexpose unexposes a channel using the DefaultExposer.

Types

type Binder

type Binder interface {
	// Bind binds a local channel to a URI that is addressing a remote
	// channel. After the binding is established, the bound channel may
	// be used to send messages to the remote channel.
	//
	// Remotely exposed channels are addressed by URI's. The URI
	// syntax depends on the underlying transport. For the default TCP
	// transport an address has the syntax: tcp://HOST[:PORT]/ID
	//
	// The uri parameter contains the URI and can be of type string or
	// *url.URL. An error channel (of type chan error) may also be
	// specified. This error channel will receive transport errors, etc.
	// related to the bound channel.
	//
	// It is also possible to associate a new error channel with an
	// already bound channel. For this purpose use a nil uri and
	// the new error channel to associate with the bound channel.
	//
	// To unbind a bound channel simply close it.
	Bind(uri interface{}, ichan interface{}, echan chan error) error
}

Binder is used to bind a local channel to a remote channel.

var DefaultBinder Binder = NewBinder(DefaultTransport)

DefaultBinder is the default binder of the running process. Instead of DefaultBinder you can use the Bind function.

func NewBinder

func NewBinder(transport Transport) Binder

NewBinder creates a new binder that can be used to bind channels. It is usually sufficient to use the DefaultBinder instead.

type ChanDecoder

type ChanDecoder interface {
	ChanDecode(link Link, vchan reflect.Value, buf []byte, accum map[string]reflect.Value) error
	ChanDecodeAccum(link Link, accum map[string]reflect.Value) error
}

ChanDecoder is used to decode a marshaling reference into a channel.

ChanDecoder is useful to users implementing a new Marshaler.

type ChanEncoder

type ChanEncoder interface {
	ChanEncode(link Link, vchan reflect.Value, accum map[string]reflect.Value) ([]byte, error)
	ChanEncodeAccum(link Link, accum map[string]reflect.Value) error
}

ChanEncoder is used to encode a channel as a marshaling reference.

ChanEncoder is useful to users implementing a new Marshaler.

type Config

type Config struct {
	// MaxLinks contains the maximum number of links that may be opened
	// to a particular address/URI.
	MaxLinks int

	// RedialTimeout contains a timeout for "redial" attempts. If it is
	// non-zero a Transport will retry dialing if a dialing error occurs
	// for at least the duration specified in RedialTimeout. If this
	// field is zero no redial attempts will be made.
	RedialTimeout time.Duration

	// IdleTimeout will close connections that have been idle for the
	// specified duration. If this field is zero idle connections will
	// not be closed.
	IdleTimeout time.Duration
}

Config contains configuration parameters for a Transport.

func (*Config) Clone

func (self *Config) Clone() *Config

Clone makes a shallow clone of the receiver Config.

type Err

type Err interface {
	error

	// Nested returns the original error that is the cause of this error.
	// May be nil.
	Nested() error

	// Chan returns a channel that is associated with this error.
	// May be nil.
	Chan() interface{}
}

Err is implemented by errors reported by this package.

type ErrArgument

type ErrArgument struct {
	// contains filtered or unexported fields
}

ErrArgument encapsulates a function/method argument error. ErrArgument implements the Err interface.

func MakeErrArgument

func MakeErrArgument(args ...interface{}) *ErrArgument

MakeErrArgument makes a function/method argument error.

func (*ErrArgument) Chan

func (err *ErrArgument) Chan() interface{}

func (*ErrArgument) Error

func (err *ErrArgument) Error() string

func (*ErrArgument) Nested

func (err *ErrArgument) Nested() error

type ErrBinder

type ErrBinder struct {
	// contains filtered or unexported fields
}

ErrBinder encapsulates a binder error. ErrBinder implements the Err interface.

func MakeErrBinder

func MakeErrBinder(args ...interface{}) *ErrBinder

MakeErrBinder makes a binder error.

func (*ErrBinder) Chan

func (err *ErrBinder) Chan() interface{}

func (*ErrBinder) Error

func (err *ErrBinder) Error() string

func (*ErrBinder) Nested

func (err *ErrBinder) Nested() error

type ErrExposer

type ErrExposer struct {
	// contains filtered or unexported fields
}

ErrExposer encapsulates an exposer error. ErrExposer implements the Err interface.

func MakeErrExposer

func MakeErrExposer(args ...interface{}) *ErrExposer

MakeErrExposer makes an exposer error.

func (*ErrExposer) Chan

func (err *ErrExposer) Chan() interface{}

func (*ErrExposer) Error

func (err *ErrExposer) Error() string

func (*ErrExposer) Nested

func (err *ErrExposer) Nested() error

type ErrMarshaler

type ErrMarshaler struct {
	// contains filtered or unexported fields
}

ErrMarshaler encapsulates a message encoding/decoding error. ErrMarshaler implements the Err interface.

func MakeErrMarshaler

func MakeErrMarshaler(args ...interface{}) *ErrMarshaler

MakeErrMarshaler makes a message encoding/decoding error.

func (*ErrMarshaler) Chan

func (err *ErrMarshaler) Chan() interface{}

func (*ErrMarshaler) Error

func (err *ErrMarshaler) Error() string

func (*ErrMarshaler) Nested

func (err *ErrMarshaler) Nested() error

type ErrTransport

type ErrTransport struct {
	// contains filtered or unexported fields
}

ErrTransport encapsulates a network transport error. ErrTransport implements the Err interface.

func MakeErrTransport

func MakeErrTransport(args ...interface{}) *ErrTransport

MakeErrTransport makes a network transport error.

func (*ErrTransport) Chan

func (err *ErrTransport) Chan() interface{}

func (*ErrTransport) Error

func (err *ErrTransport) Error() string

func (*ErrTransport) Nested

func (err *ErrTransport) Nested() error

type Exposer

type Exposer interface {
	// Expose exposes a channel under an ID. Exposing a channel
	// associates it with the ID and makes it available to receive
	// messages.
	//
	// If multiple channels are exposed under the same ID which
	// channel(s) receive a message depends on the ID. ID's that start
	// with a '+' character are considered "broadcast" ID's and messages
	// sent to them are delivered to all channels exposed under that
	// ID. All other ID's are considered "unicast" and messages sent to
	// them are delivered to a single channel exposed under that ID
	// (determined using a pseudo-random algorithm).
	//
	// To receive exposer errors one can expose error channels (of type
	// chan error) under the special broadcast ID "+err/". All such error
	// channels will receive transport errors, etc. This special broadcast
	// ID is local to the running process and cannot be accessed remotely.
	//
	// It is also possible to receive "invalid" messages on channels (of
	// type chan Message) exposed under the special broadcast ID
	// "+inv/". Invalid messages are messages that cannot be delivered
	// for any of a number of reasons: because they contain the wrong
	// message ID, because their payload is the wrong type, because the
	// destination channels have been closed, etc. As with "+err/" this
	// special broadcast ID is local to the running process and cannot
	// be accessed remotely.
	Expose(id string, ichan interface{}) error

	// Unexpose unexposes a channel. It disassociates it from the ID
	// and makes it unavailable to receive messages under that ID.
	Unexpose(id string, ichan interface{})
}

Exposer is used to expose and unexpose channels.

var DefaultExposer Exposer = NewExposer(DefaultTransport)

DefaultExposer is the default exposer of the running process. Instead of DefaultExposer you can use the Expose and Unexpose functions.

func NewExposer

func NewExposer(transport Transport) Exposer

NewExposer creates a new Exposer that can be used to expose channels. It is usually sufficient to use the DefaultExposer instead.

type Link interface {
	Sigchan() chan struct{}
	Reference()
	Dereference()
	Activate()
	Recv() (id string, vmsg reflect.Value, err error)
	Send(id string, vmsg reflect.Value) (err error)
}

Link encapsulates a network transport link between two machines. It is used to send and receive messages between machines.

Link is useful to users implementing a new Transport.

type Marshaler

type Marshaler interface {
	RegisterType(val interface{})
	SetChanEncoder(chanEnc ChanEncoder)
	SetChanDecoder(chanDec ChanDecoder)
	Marshal(link Link, id string, vmsg reflect.Value, hdrlen int) (buf []byte, err error)
	Unmarshal(link Link, buf []byte, hdrlen int) (id string, vmsg reflect.Value, err error)
}

Marshaler is used to encode/decode messages for transporting over a network.

Marshaler is useful to users implementing a new marshaling layer.

var DefaultMarshaler Marshaler = NewGobMarshaler()

DefaultMarshaler is the default Marshaler of the running process.

func NewGobMarshaler

func NewGobMarshaler() Marshaler

NewGobMarshaler creates a new Marshaler that uses the netgob format for encoding/decoding.

func NewJsonMarshaler

func NewJsonMarshaler() Marshaler

NewJsonMarshaler creates a new Marshaler that uses the json format for encoding/decoding.

type Message

type Message struct {
	// Id contains the ID of the intended recipient.
	Id string

	// Value contains the message payload (as decoded by the marshaling
	// layer).
	Value reflect.Value
}

Message is a struct that contains a message ID and value.

type Recver

type Recver interface {
	Recver(link Link) error
}

Recver is used to receive messages over a network transport.

Recver is useful to users implementing a new Transport.

type Sender

type Sender interface {
	Sender(link Link) error
}

Sender is used to send messages over a network transport.

Sender is useful to users implementing a new Transport.

type Stats

type Stats interface {
	// StatNames returns a list of value names.
	StatNames() []string

	// Stat returns the value associated with a particular name.
	Stat(name string) float64
}

Stats is used to monitor the activity of a netchan component. Stats is a collection of values accessible by name; these values typically represent a count or ratio.

Exposers and binders implement Stats to provide insights into their internal workings.

type Transport

type Transport interface {
	SetRecver(recver Recver)
	SetSender(sender Sender)
	Listen() error
	Dial(uri *url.URL) (id string, link Link, err error)
	Close()
}

Transport is used to transport messages over a network.

Transport is useful to users implementing a new network transport.

var DefaultTransport Transport = newDefaultTransport()

DefaultTransport is the default Transport of the running process.

func NewHttpTransport

func NewHttpTransport(marshaler Marshaler, uri *url.URL, serveMux *http.ServeMux,
	cfg *Config) Transport

NewHttpTransport creates a new HTTP Transport. The URI to listen to should have the syntax http://[HOST]:PORT/PATH. If a ServeMux is provided, it will be used instead of creating a new HTTP server.

func NewHttpTransportTLS

func NewHttpTransportTLS(marshaler Marshaler, uri *url.URL, serveMux *http.ServeMux,
	cfg *Config, tlscfg *tls.Config) Transport

NewHttpTransportTLS creates a new HTTPS Transport. The URI to listen to should have the syntax https://[HOST]:PORT/PATH. If a ServeMux is provided, it will be used instead of creating a new HTTPS server.

func NewNetTransport

func NewNetTransport(marshaler Marshaler, uri *url.URL, cfg *Config) Transport

NewNetTransport creates a new TCP Transport. The URI to listen to should have the syntax tcp://[HOST]:PORT.

func NewNetTransportTLS

func NewNetTransportTLS(marshaler Marshaler, uri *url.URL, cfg *Config,
	tlscfg *tls.Config) Transport

NewNetTransportTLS creates a new TLS Transport. The URI to listen to should have the syntax tls://[HOST]:PORT.

func RegisterTransport

func RegisterTransport(scheme string, transport Transport) Transport

RegisterTransport associates a URI scheme with a network transport.

func UnregisterTransport

func UnregisterTransport(scheme string) Transport

UnregisterTransport disassociates a URI scheme from a network transport.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL