emux

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: MPL-2.0 Imports: 20 Imported by: 0

README

emux

emux (Encrypted Multiplexer) is a multiplexing library for Golang based on Yamux. It relies on an underlying connection to provide reliability and ordering, such as TCP or Unix domain sockets, and provides stream-oriented multiplexing. It is inspired by SPDY but is not interoperable with it.

emux features include:

  • Bi-directional streams
    • Streams can be opened by either client or server
    • Useful for NAT traversal
    • Server-side push support
  • Flow control
    • Avoid starvation
    • Back-pressure to prevent overwhelming a receiver
  • Keep Alives
    • Enables persistent connections over a load balancer
  • Efficient
    • Enables thousands of logical streams with low overhead

Documentation

For complete documentation, see Yamux doc: Godoc.

If you want use encryption, just set emux.Config.CipherType and emux.Config.Password

Available CipherType content:

  • Empty String (Default, No Encryption)
  • aes-128-gcm
  • aes-192-gcm
  • aes-256-gcm

emux has Pool object to provide connection pool feature.

emux.NewPool(remoteAddr string, config *emux.Config, size int) (*emux.Pool, error)

Create new connection pool.

Pool.Size() int

Get Pool size.

Pool.TakeSession() (*emux.Session, error)

Get a Session from Pool.

Pool.Open() (*emux.Stream, error)

Pool.OpenStream() (*emux.Stream, error)

Open a Stream from Pool.

Specification

The full specification for emux is provided in the spec.md file. It can be used as a guide to implementors of interoperable libraries.

Usage

Using emux is remarkably simple:


func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

    // Setup client side of emux
    session, err := emux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of emux
    session, err := emux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.Accept()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}

For encryption

func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }
    // Setup client side of emux
    cfg := emux.DefaultConfig()
    cfg.CipherType = "aes-128-gcm"
    cfg.Password = "password"
    session, err := emux.Client(conn, cfg)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of emux
    cfg := emux.DefaultConfig()
    cfg.CipherType = "aes-128-gcm"
    cfg.Password = "password"
    session, err := emux.Server(conn, cfg)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.Accept()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}

Use Pool to create Stream

func client() {
    // Create Config of emux
    cfg := emux.DefaultConfig()
    cfg.CipherType = "aes-128-gcm"
    cfg.Password = "password"

    // Create Pool
    pool, err := emux.NewPool("127.0.0.1:80", cfg, 10)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := pool.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

Documentation

Index

Constants

View Source
const SALT = "cecb7328135c3dede13d08b06aa344c5"

Variables

View Source
var (
	// ErrInvalidVersion means we received a frame with an
	// invalid version
	ErrInvalidVersion = fmt.Errorf("invalid protocol version")

	// ErrInvalidMsgType means we received a frame with an
	// invalid message type
	ErrInvalidMsgType = fmt.Errorf("invalid msg type")

	// ErrSessionShutdown is used if there is a shutdown during
	// an operation
	ErrSessionShutdown = fmt.Errorf("session shutdown")

	// ErrStreamsExhausted is returned if we have no more
	// stream ids to issue
	ErrStreamsExhausted = fmt.Errorf("streams exhausted")

	// ErrDuplicateStream is used if a duplicate stream is
	// opened inbound
	ErrDuplicateStream = fmt.Errorf("duplicate stream initiated")

	// ErrReceiveWindowExceeded indicates the window was exceeded
	ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded")

	// ErrTimeout is used when we reach an IO deadline
	ErrTimeout = fmt.Errorf("i/o deadline reached")

	// ErrStreamClosed is returned when using a closed stream
	ErrStreamClosed = fmt.Errorf("stream closed")

	// ErrUnexpectedFlag is set when we get an unexpected flag
	ErrUnexpectedFlag = fmt.Errorf("unexpected flag")

	// ErrRemoteGoAway is used when we get a go away from the other side
	ErrRemoteGoAway = fmt.Errorf("remote end is not accepting connections")

	// ErrConnectionReset is sent if a stream is reset. This can happen
	// if the backlog is exceeded, or if there was a remote GoAway.
	ErrConnectionReset = fmt.Errorf("connection reset")

	// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
	// timeout writing to the underlying stream connection.
	ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout")

	// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
	ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout")
)
View Source
var CIPHER_TYPE_MAP = map[string]CipherConfig{
	"aes-128-gcm": {16},
	"aes-192-gcm": {24},
	"aes-256-gcm": {32},
}
View Source
var (
	UnkonwnCipherTypeErr = errors.New("Unknown cipher type")
)

Functions

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type Cipher

type Cipher struct {
	// contains filtered or unexported fields
}

func NewBlockCipher

func NewBlockCipher(cipherType string, pass []byte) (*Cipher, error)

func (*Cipher) Decrypt

func (c *Cipher) Decrypt(data []byte) ([]byte, error)

func (*Cipher) Encrypt

func (c *Cipher) Encrypt(data []byte) []byte

func (*Cipher) Overhead

func (c *Cipher) Overhead() int

type CipherConfig

type CipherConfig struct {
	KeySize int
}

type Config

type Config struct {
	// AcceptBacklog is used to limit how many streams may be
	// waiting an accept.
	AcceptBacklog int

	// EnableKeepalive is used to do a period keep alive
	// messages using a ping.
	EnableKeepAlive bool

	// KeepAliveInterval is how often to perform the keep alive
	KeepAliveInterval time.Duration

	// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
	// we which will suspect a problem with the underlying connection and
	// close it. This is only applied to writes, where's there's generally
	// an expectation that things will move along quickly.
	ConnectionWriteTimeout time.Duration

	// MaxStreamWindowSize is used to control the maximum
	// window size that we allow for a stream.
	MaxStreamWindowSize uint32

	// LogOutput is used to control the log destination
	LogOutput io.Writer

	// Cipher type
	CipherType string
	Password   string
	// contains filtered or unexported fields
}

Config is used to tune the Yamux session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(addr string, config *Config, size int) (*Pool, error)

Create new connection pool

func (*Pool) Close

func (p *Pool) Close()

func (*Pool) Open

func (p *Pool) Open() (*Stream, error)

Alias for OpenStream

func (*Pool) OpenStream

func (p *Pool) OpenStream() (*Stream, error)

Take a Session and then create a Stream from Session toked. It will create a new Session when stream id exhausted, and retry create Stream from new created Session.

func (*Pool) Size

func (p *Pool) Size() int

Return pool size

func (*Pool) TakeSession

func (p *Pool) TakeSession() (*Session, error)

Get a session from pool

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is used to wrap a reliable ordered connection and to multiplex it into multiple streams.

func Client

func Client(conn io.ReadWriteCloser, config *Config) (*Session, error)

Client is used to initialize a new client-side connection. There must be at most one client-side connection.

func Server

func Server(conn io.ReadWriteCloser, config *Config) (*Session, error)

Server is used to initialize a new server-side connection. There must be at most one server-side connection. If a nil config is provided, the DefaultConfiguration will be used.

func (*Session) Accept

func (s *Session) Accept() (net.Conn, error)

Accept is used to block until the next available stream is ready to be accepted.

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) Addr

func (s *Session) Addr() net.Addr

Addr is used to get the address of the listener.

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams. Attempts to send a GoAway before closing the connection.

func (*Session) GoAway

func (s *Session) GoAway() error

GoAway can be used to prevent accepting further connections. It does not close the underlying conn.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr is used to get the local address of the underlying connection.

func (*Session) NumStreams

func (s *Session) NumStreams() int

NumStreams returns the number of currently open streams

func (*Session) Open

func (s *Session) Open() (net.Conn, error)

Open is used to create a new stream as a net.Conn

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream is used to create a new stream

func (*Session) Ping

func (s *Session) Ping() (time.Duration, error)

Ping is used to measure the RTT response time

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr is used to get the address of remote end of the underlying connection

func (*Session) StreamIDExhausted

func (s *Session) StreamIDExhausted() bool

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to represent a logical stream within a session.

func (*Stream) Close

func (s *Stream) Close() error

Close is used to close the stream

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local address

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read is used to read from the stream

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

LocalAddr returns the remote address

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the associated stream session

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls

func (*Stream) Shrink

func (s *Stream) Shrink()

Shrink is used to compact the amount of buffers utilized This is useful when using Yamux in a connection pool to reduce the idle memory utilization.

func (*Stream) StreamID

func (s *Stream) StreamID() uint32

StreamID returns the ID of this stream

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Write is used to write to the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL