muxado

package module
v0.0.0-...-fc182d9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2016 License: Apache-2.0 Imports: 12 Imported by: 244

README

muxado - Stream multiplexing for Go godoc reference

muxado implements a general purpose stream-multiplexing protocol. muxado allows clients applications to multiplex any io.ReadWriteCloser (like a net.Conn) into multiple, independent full-duplex byte streams.

muxado is a useful protocol for any two communicating processes. It is an excellent base protocol for implementing lightweight RPC. It eliminates the need for custom async/pipeling code from your peers in order to support multiple simultaneous inflight requests between peers. For the same reason, it also eliminates the need to build connection pools for your clients. It enables servers to initiate streams to clients without building any NAT traversal. muxado can also yield performance improvements (especially latency) for protocols that require rapidly opening many concurrent connections.

muxado's API is designed to make it seamless to integrate into existing Go programs. muxado.Session implements the net.Listener interface and muxado.Stream implements net.Conn.

Example

Here's an example client which responds to simple JSON requests from a server.

    conn, _ := net.Dial("tcp", "example.net:1234")
    sess := muxado.Client(conn)
    for {
        stream, _ := sess.Accept()
        go func(str net.Conn) {
            defer str.Close()
            var req Request
            json.NewDecoder(str).Decode(&req)
            response := handleRequest(&req)
            json.NewEncoder(str).Encode(response)
        }(stream)
    }

Maybe the client wants to make a request to the server instead of just responding. This is easy as well:

    stream, _ := sess.Open()
    req := Request{
        Query: "What is the meaning of life, the universe and everything?",
    }
    json.NewEncoder(stream).Encode(&req)
    var resp Response
    json.dec.Decode(&resp)
    if resp.Answer != "42" {
        panic("wrong answer to the ultimate question!")
    }

Terminology

muxado defines the following terms for clarity of the documentation:

A "Transport" is an underlying stream (typically TCP) that is multiplexed by sending frames between muxado peers over this transport.

A "Stream" is any of the full-duplex byte-streams multiplexed over the transport

A "Session" is two peers running the muxado protocol over a single transport

Implementation Design

muxado's design is influenced heavily by the framing layer of HTTP2 and SPDY. However, instead of being specialized for a higher-level protocol, muxado is designed in a protocol agnostic way with simplicity and speed in mind. More advanced features are left to higher-level libraries and protocols.

Extended functionality

muxado ships with two wrappers that add commonly used functionality. The first is a TypedStreamSession which allows a client application to open streams with a type identifier so that the remote peer can identify the protocol that will be communicated on that stream.

The second wrapper is a simple Heartbeat which issues a callback to the application informing it of round-trip latency and heartbeat failure.

Performance

XXX: add perf numbers and comparisons

Any stream-multiplexing library over TCP will suffer from head-of-line blocking if the next packet to service gets dropped. muxado is also a poor choice when sending many large payloads concurrently. It shines best when the application workload needs to quickly open a large number of small-payload streams.

Status

Most of muxado's features are implemented (and tested!), but there are many that are still rough or could be improved. See the TODO file for suggestions on what needs to improve.

License

Apache

Documentation

Overview

muxado implements a general purpose stream-multiplexing protocol. muxado allows clients applications to multiplex any io.ReadWriteCloser (like a net.Conn) into multiple, independent full-duplex streams.

muxado is a useful protocol for any two communicating processes. It is an excellent base protocol for implementing lightweight RPC. It eliminates the need for custom async/pipeling code from your peers in order to support multiple simultaneous inflight requests between peers. For the same reason, it also eliminates the need to build connection pools for your clients. It enables servers to initiate streams to clients without building any NAT traversal. muxado can also yield performance improvements (especially latency) for protocols that require rapidly opening many concurrent connections.

Here's an example client which responds to simple JSON requests from a server.

conn, _ := net.Dial("tcp", "example.net:1234")
sess := muxado.Client(conn)
for {
    stream, _ := sess.Accept()
    go func(str net.Conn) {
        defer str.Close()
        var req Request
        json.NewDecoder(str).Decode(&req)
        response := handleRequest(&req)
        json.NewEncoder(str).Encode(response)
    }(stream)
}

Maybe the client wants to make a request to the server instead of just responding. This is easy as well:

stream, _ := sess.Open()
req := Request{
    Query: "What is the meaning of life, the universe and everything?",
}
json.NewEncoder(stream).Encode(&req)
var resp Response
json.dec.Decode(&resp)
if resp.Answer != "42" {
    panic("wrong answer to the ultimate question!")
}

muxado defines the following terms for clarity of the documentation:

A "Transport" is an underlying stream (typically TCP) that is multiplexed by sending frames between muxado peers over this transport.

A "Stream" is any of the full-duplex byte-streams multiplexed over the transport

A "Session" is two peers running the muxado protocol over a single transport

muxado's design is influenced heavily by the framing layer of HTTP2 and SPDY. However, instead of being specialized for a higher-level protocol, muxado is designed in a protocol agnostic way with simplicity and speed in mind. More advanced features are left to higher-level libraries and protocols.

muxado's API is designed to make it seamless to integrate into existing Go programs. muxado.Session implements the net.Listener interface and muxado.Stream implements net.Conn.

muxado ships with two wrappers that add commonly used functionality. The first is a TypedStreamSession which allows a client application to open streams with a type identifier so that the remote peer can identify the protocol that will be communicated on that stream.

The second wrapper is a simple Heartbeat which issues a callback to the application informing it of round-trip latency and heartbeat failure.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Maximum size of unread data to receive and buffer (per-stream). Default 256KB.
	MaxWindowSize uint32
	// Maximum number of inbound streams to queue for Accept(). Default 128.
	AcceptBacklog uint32
	// Function creating the Session's framer. Deafult frame.NewFramer()
	NewFramer func(io.Reader, io.Writer) frame.Framer
	// contains filtered or unexported fields
}

type ErrorCode

type ErrorCode uint32

ErrorCode is a 32-bit integer indicating the type of an error condition

const (
	NoError ErrorCode = iota
	ProtocolError
	InternalError
	FlowControlError
	StreamClosed
	StreamRefused
	StreamCancelled
	StreamReset
	FrameSizeError
	AcceptQueueFull
	EnhanceYourCalm
	RemoteGoneAway
	StreamsExhausted
	WriteTimeout
	SessionClosed
	PeerEOF

	ErrorUnknown ErrorCode = 0xFF
)

func GetError

func GetError(err error) (ErrorCode, error)

type Heartbeat

type Heartbeat struct {
	TypedStreamSession
	// contains filtered or unexported fields
}

func NewHeartbeat

func NewHeartbeat(sess TypedStreamSession, cb func(time.Duration), config *HeartbeatConfig) *Heartbeat

func (*Heartbeat) Accept

func (h *Heartbeat) Accept() (net.Conn, error)

func (*Heartbeat) AcceptStream

func (h *Heartbeat) AcceptStream() (Stream, error)

func (*Heartbeat) AcceptTypedStream

func (h *Heartbeat) AcceptTypedStream() (TypedStream, error)

func (*Heartbeat) Close

func (h *Heartbeat) Close() error

func (*Heartbeat) Start

func (h *Heartbeat) Start()

type HeartbeatConfig

type HeartbeatConfig struct {
	Interval  time.Duration
	Tolerance time.Duration
	Type      StreamType
}

func NewHeartbeatConfig

func NewHeartbeatConfig() *HeartbeatConfig

type Session

type Session interface {

	// Open initiates a new stream on the session. It is equivalent to
	// OpenStream(0, false)
	Open() (net.Conn, error)

	// OpenStream initiates a new stream on the session. A caller can specify an
	// opaque stream type.  Setting fin to true will cause the stream to be
	// half-closed from the local side immediately upon creation.
	OpenStream() (Stream, error)

	// Accept returns the next stream initiated by the remote side
	Accept() (net.Conn, error)

	// Accept returns the next stream initiated by the remote side
	AcceptStream() (Stream, error)

	// Attempts to close the Session cleanly. Closes the underlying stream transport.
	Close() error

	// LocalAddr returns the local address of the transport stream over which the session is running.
	LocalAddr() net.Addr

	// RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
	RemoteAddr() net.Addr

	// Addr returns the session transport's local address
	Addr() net.Addr

	// Wait blocks until the session has shutdown and returns an error
	// explaining the session termination.
	Wait() (error, error, []byte)
}

Session multiplexes many Streams over a single underlying stream transport. Both sides of a muxado session can open new Streams. Sessions can also accept new streams from the remote side.

A muxado Session implements the net.Listener interface, returning new Streams from the remote side.

func Client

func Client(trans io.ReadWriteCloser, config *Config) Session

Client returns a new muxado client-side connection using trans as the transport.

func Server

func Server(trans io.ReadWriteCloser, config *Config) Session

Server returns a muxado server session using trans as the transport.

type Stream

type Stream interface {
	// Write writes the bytes in the given buffer to the stream
	Write([]byte) (int, error)

	// Read reads the next bytes on the stream into the given buffer
	Read([]byte) (int, error)

	// Closes the stream.
	Close() error

	// Half-closes the stream. Calls to Write will fail after this is invoked.
	CloseWrite() error

	// SetDeadline sets a time after which future Read and Write operations will
	// fail.
	//
	// Some implementation may not support this.
	SetDeadline(time.Time) error

	// SetReadDeadline sets a time after which future Read operations will fail.
	//
	// Some implementation may not support this.
	SetReadDeadline(time.Time) error

	// SetWriteDeadline sets a time after which future Write operations will
	// fail.
	//
	// Some implementation may not support this.
	SetWriteDeadline(time.Time) error

	// Id returns the stream's unique identifier.
	Id() uint32

	// Session returns the session object this stream is running on.
	Session() Session

	// RemoteAddr returns the session transport's remote address.
	RemoteAddr() net.Addr

	// LocalAddr returns the session transport's local address.
	LocalAddr() net.Addr
}

Stream is a full duplex stream-oriented connection that is multiplexed over a Session. Stream implements the net.Conn inteface.

type StreamType

type StreamType uint32

type TypedStream

type TypedStream interface {
	Stream
	StreamType() StreamType
}

type TypedStreamSession

type TypedStreamSession interface {
	Session
	OpenTypedStream(stype StreamType) (Stream, error)
	AcceptTypedStream() (TypedStream, error)
}

func NewTypedStreamSession

func NewTypedStreamSession(s Session) TypedStreamSession

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL