muxado: github.com/inconshreveable/muxado Index | Files | Directories

package muxado

import "github.com/inconshreveable/muxado"

muxado implements a general purpose stream-multiplexing protocol. muxado allows clients applications to multiplex any io.ReadWriteCloser (like a net.Conn) into multiple, independent full-duplex streams.

muxado is a useful protocol for any two communicating processes. It is an excellent base protocol for implementing lightweight RPC. It eliminates the need for custom async/pipeling code from your peers in order to support multiple simultaneous inflight requests between peers. For the same reason, it also eliminates the need to build connection pools for your clients. It enables servers to initiate streams to clients without building any NAT traversal. muxado can also yield performance improvements (especially latency) for protocols that require rapidly opening many concurrent connections.

Here's an example client which responds to simple JSON requests from a server.

conn, _ := net.Dial("tcp", "example.net:1234")
sess := muxado.Client(conn)
for {
    stream, _ := sess.Accept()
    go func(str net.Conn) {
        defer str.Close()
        var req Request
        json.NewDecoder(str).Decode(&req)
        response := handleRequest(&req)
        json.NewEncoder(str).Encode(response)
    }(stream)
}

Maybe the client wants to make a request to the server instead of just responding. This is easy as well:

stream, _ := sess.Open()
req := Request{
    Query: "What is the meaning of life, the universe and everything?",
}
json.NewEncoder(stream).Encode(&req)
var resp Response
json.dec.Decode(&resp)
if resp.Answer != "42" {
    panic("wrong answer to the ultimate question!")
}

muxado defines the following terms for clarity of the documentation:

A "Transport" is an underlying stream (typically TCP) that is multiplexed by sending frames between muxado peers over this transport.

A "Stream" is any of the full-duplex byte-streams multiplexed over the transport

A "Session" is two peers running the muxado protocol over a single transport

muxado's design is influenced heavily by the framing layer of HTTP2 and SPDY. However, instead of being specialized for a higher-level protocol, muxado is designed in a protocol agnostic way with simplicity and speed in mind. More advanced features are left to higher-level libraries and protocols.

muxado's API is designed to make it seamless to integrate into existing Go programs. muxado.Session implements the net.Listener interface and muxado.Stream implements net.Conn.

muxado ships with two wrappers that add commonly used functionality. The first is a TypedStreamSession which allows a client application to open streams with a type identifier so that the remote peer can identify the protocol that will be communicated on that stream.

The second wrapper is a simple Heartbeat which issues a callback to the application informing it of round-trip latency and heartbeat failure.

Index

Package Files

buffer.go config.go doc.go errors.go heartbeat.go interface.go session.go stream.go stream_map.go typed_stream.go window_manager.go

type Config Uses

type Config struct {
    // Maximum size of unread data to receive and buffer (per-stream). Default 256KB.
    MaxWindowSize uint32
    // Maximum number of inbound streams to queue for Accept(). Default 128.
    AcceptBacklog uint32
    // Function creating the Session's framer. Deafult frame.NewFramer()
    NewFramer func(io.Reader, io.Writer) frame.Framer
    // contains filtered or unexported fields
}

type ErrorCode Uses

type ErrorCode uint32

ErrorCode is a 32-bit integer indicating the type of an error condition

const (
    NoError ErrorCode = iota
    ProtocolError
    InternalError
    FlowControlError
    StreamClosed
    StreamRefused
    StreamCancelled
    StreamReset
    FrameSizeError
    AcceptQueueFull
    EnhanceYourCalm
    RemoteGoneAway
    StreamsExhausted
    WriteTimeout
    SessionClosed
    PeerEOF

    ErrorUnknown ErrorCode = 0xFF
)

func GetError Uses

func GetError(err error) (ErrorCode, error)

type Heartbeat Uses

type Heartbeat struct {
    TypedStreamSession
    // contains filtered or unexported fields
}

func NewHeartbeat Uses

func NewHeartbeat(sess TypedStreamSession, cb func(time.Duration), config *HeartbeatConfig) *Heartbeat

func (*Heartbeat) Accept Uses

func (h *Heartbeat) Accept() (net.Conn, error)

func (*Heartbeat) AcceptStream Uses

func (h *Heartbeat) AcceptStream() (Stream, error)

func (*Heartbeat) AcceptTypedStream Uses

func (h *Heartbeat) AcceptTypedStream() (TypedStream, error)

func (*Heartbeat) Close Uses

func (h *Heartbeat) Close() error

func (*Heartbeat) Start Uses

func (h *Heartbeat) Start()

type HeartbeatConfig Uses

type HeartbeatConfig struct {
    Interval  time.Duration
    Tolerance time.Duration
    Type      StreamType
}

func NewHeartbeatConfig Uses

func NewHeartbeatConfig() *HeartbeatConfig

type Session Uses

type Session interface {

    // Open initiates a new stream on the session. It is equivalent to
    // OpenStream(0, false)
    Open() (net.Conn, error)

    // OpenStream initiates a new stream on the session. A caller can specify an
    // opaque stream type.  Setting fin to true will cause the stream to be
    // half-closed from the local side immediately upon creation.
    OpenStream() (Stream, error)

    // Accept returns the next stream initiated by the remote side
    Accept() (net.Conn, error)

    // Accept returns the next stream initiated by the remote side
    AcceptStream() (Stream, error)

    // Attempts to close the Session cleanly. Closes the underlying stream transport.
    Close() error

    // LocalAddr returns the local address of the transport stream over which the session is running.
    LocalAddr() net.Addr

    // RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
    RemoteAddr() net.Addr

    // Addr returns the session transport's local address
    Addr() net.Addr

    // Wait blocks until the session has shutdown and returns an error
    // explaining the session termination.
    Wait() (error, error, []byte)
}

Session multiplexes many Streams over a single underlying stream transport. Both sides of a muxado session can open new Streams. Sessions can also accept new streams from the remote side.

A muxado Session implements the net.Listener interface, returning new Streams from the remote side.

func Client Uses

func Client(trans io.ReadWriteCloser, config *Config) Session

Client returns a new muxado client-side connection using trans as the transport.

func Server Uses

func Server(trans io.ReadWriteCloser, config *Config) Session

Server returns a muxado server session using trans as the transport.

type Stream Uses

type Stream interface {
    // Write writes the bytes in the given buffer to the stream
    Write([]byte) (int, error)

    // Read reads the next bytes on the stream into the given buffer
    Read([]byte) (int, error)

    // Closes the stream.
    Close() error

    // Half-closes the stream. Calls to Write will fail after this is invoked.
    CloseWrite() error

    // SetDeadline sets a time after which future Read and Write operations will
    // fail.
    //
    // Some implementation may not support this.
    SetDeadline(time.Time) error

    // SetReadDeadline sets a time after which future Read operations will fail.
    //
    // Some implementation may not support this.
    SetReadDeadline(time.Time) error

    // SetWriteDeadline sets a time after which future Write operations will
    // fail.
    //
    // Some implementation may not support this.
    SetWriteDeadline(time.Time) error

    // Id returns the stream's unique identifier.
    Id() uint32

    // Session returns the session object this stream is running on.
    Session() Session

    // RemoteAddr returns the session transport's remote address.
    RemoteAddr() net.Addr

    // LocalAddr returns the session transport's local address.
    LocalAddr() net.Addr
}

Stream is a full duplex stream-oriented connection that is multiplexed over a Session. Stream implements the net.Conn inteface.

type StreamType Uses

type StreamType uint32

type TypedStream Uses

type TypedStream interface {
    Stream
    StreamType() StreamType
}

type TypedStreamSession Uses

type TypedStreamSession interface {
    Session
    OpenTypedStream(stype StreamType) (Stream, error)
    AcceptTypedStream() (TypedStream, error)
}

func NewTypedStreamSession Uses

func NewTypedStreamSession(s Session) TypedStreamSession

Directories

PathSynopsis
frame

Package muxado imports 12 packages (graph) and is imported by 161 packages. Updated 2017-11-16. Refresh now. Tools for package owners.