smux

package module
v0.0.0-...-37c006e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2022 License: MIT Imports: 11 Imported by: 0

README

smux

<<<<<<< HEAD fork@xtaci/smux and @cs8425/smux smux

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd Sourcegraph

smux

Introduction

Smux ( Simple MUltipleXing) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, and provides stream-oriented multiplexing. The original intention of this library is to power the connection management for kcp-go.

Features

  1. Token bucket controlled receiving, which provides smoother bandwidth graph(see picture below).
  2. Session-wide receive buffer, shared among streams, fully controlled overall memory usage.
  3. Minimized header(8Bytes), maximized payload.
  4. Well-tested on millions of devices in kcptun.
  5. Builtin fair queue traffic shaping.
  6. Per-stream sliding window to control congestion.(protocol version 2+).

smooth bandwidth curve

Documentation

For complete documentation, see the associated Godoc.

Benchmark

$ go test -v -run=^$ -bench .
goos: darwin
goarch: amd64
pkg: github.com/xtaci/smux
BenchmarkMSB-4           	30000000	        51.8 ns/op
BenchmarkAcceptClose-4   	   50000	     36783 ns/op
BenchmarkConnSmux-4      	   30000	     58335 ns/op	2246.88 MB/s	    1208 B/op	      19 allocs/op
BenchmarkConnTCP-4       	   50000	     25579 ns/op	5124.04 MB/s	       0 B/op	       0 allocs/op
PASS
ok  	github.com/xtaci/smux	7.811s

Specification

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)  

VALUES FOR LATEST VERSION:
VERSION:
    1/2
    
CMD:
    cmdSYN(0)
    cmdFIN(1)
    cmdPSH(2)
    cmdNOP(3)
    cmdUPD(4)	// only supported on version 2
    cmdACK(5)	// only supported on version 2
    CmdMax(6)	// only supported on version 2, for custom ID start
    
STREAMID:
    client use odd numbers starts from 1
    server use even numbers starts from 0
    
cmdUPD:
    | CONSUMED(4B) | WINDOW(4B) |

Usage

=======

socket multiplexing, 基于可靠连接tcp的多路复用。

基于 xtaci/smux 修改、新增部分逻辑。相较于原项目,传输效率有所下降。

下降原因,项目增加了发送字节确认机制。

1 修改及新增

  1. 某一个 stream 不读,但对端一直写导致 tcp 缓冲区被写满,进而堵塞其他的 stream

  2. 对于tcp的超时写,可能已经写了部分数据才超时。 返回到stream应该表现出来。由问题1引出,如果设置了写超时, 本次的写入数据只写入了一部分,返回应用层写入0(但实际tcp会将本次数据写完),如果应用层选择重发,导致数据重复或者混乱。

  3. streamID 可能溢出,虽然uint32的容量挺大的,但还是有可能。同时降低包头 streamID占用的长度,提高传输效率。

  4. 新增检测对端是否使用多路复用

  5. 新增 aio

2 项目设计

基于可靠有序的连接

2.1 协议包头

cmd (byte) + id (uint16) + length (uint32)

cmd 命令类型:cmdSYN 打开一个streamcmdFIN 关闭一个 streamcmdPSH 数据推送;cmdCFM 数据接受确认;

id streamID。

length 视情况而定。 cmdPSH 代表携带的数据长度。cmdCFM 代表确认的数据长度。其他情况为0,无意义。

2.2 StreamID

根据协议包头可知,最大能支持的ID值为 65535

streamID 采用回收复用的方式,避免累加后超过上限值。用位图来存储使用、空闲的id,减少内存占用。

也意味着同时能开启的stream数量为65535个,超过这个数字就没有可使用的id了。经测试这个值较为合适, 连接数达到一定阀值,即tcp的传输效率已经到达最大,更多的stream数反而会带来性能瓶颈。这个时候建议多开tcp连接。

2.3 打开连接

idBitmap 中申请一个id,发往对端 cmdSYN命令。

正常情况下对端新建 stream 将其放入 accept channel中,对端通过 Accept 获取新stream。 存在两端同时 Open 且 分配到相同的ID。两端都会收到 cmdSYN,直接忽略命令(相当于直接绑定到现有stream上)。

2.4 数据传输

给每个stream 设置读buffer,通过窗口调节发送频率。

由于通信链路只有一条,而有多条虚拟链路。需保证通信链路畅通,不被任意 stream 堵塞。 通信链路的 buffer 大小固定为 64kstream 的读写缓冲区大小固定为 512k

A端发送数据累计到 waitConfirm 中,往对端发送的窗口大小为 streamWindowSize - waitConfirm, 结果为0 时,B端读缓冲区已满。本端不再发送数据,等待B端读数据。

B端应用层调用 Read 后,推送已读数据长度 bufferRead 。A端 waitConfirm -= bufferBuffer, A端的发送窗口不为0,继续发送数据。

2.5 写超时

Stream端可能会多次调用 tcp.Write(发送数据大于frameSize将会分包)。 已经调用了tcp.Write 的数据默认成功发送(除非tcp连接报错,否则一定能到达对端,即使会堵塞一段时间)。

如果 stream 的写超时到达,累计已经调用 tcp.Write 数据的长度返回,还未调用的不再调用。由应用层选择是否继续发送超时数据。

2.6 关闭连接

主动关闭连接方发送 cmdFIN到对端,并等待 cmdFIN回来。 整个流程全部完成,释放资源并回收streamID

需要确认关闭的原因: 存在 A 调用Close,已经释放资源并回收streamIDcmdFIN正在发送B端。 这时,A端 Open 一个相同ID 的 Stream,B端还没有收到 cmdFIN,仍在向A端发送数据。 导致数据混乱。

同时关闭的情况,对于任意一端来说,都相当于发出并接收到cmdFIN,完成了整个关闭流程。

2.7 判断对端是否使用多路复用

cmdVRM 。id 和 length 为随机数,对端经过算法验证返回两个随机数的确定值。

IsSmux(conn net.Conn) bool

/*
	v1:uint16, v2:uint32。
	v1 分4个4位(b1,b2,b3,b4): (b1,b2,b3,b4) -> (b3,b4,b2,b1)
	v2 分4个8位(b1,b2,b3,b4): (b1,b2,b3,b4) -> (b1,b4,b2,b3)
*/
func verifyCode(v1 uint16, v2 uint32) (uint16, uint32) {
	v11 := ((v1 & 0xFF) << 8) | ((v1 & 0xF000) >> 16) | ((v1 & 0xF00) >> 4)
	v22 := (v2 & 0xFF000000) | ((v2 & 0xFF0000) >> 8) | ((v2 & 0xFF00) >> 8) | ((v2 & 0xFF) << 16)
	return v11, v22
}

3 Usage

77536ae649e2d3704f212458bca23979f9ab7990


func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

<<<<<<< HEAD
    // Setup client side of smux
    session, err := smux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.OpenStream()
=======
    // Session of smux
    session := smux.SmuxSession(conn)

    // Open a new stream
    stream, err := session.Open()
>>>>>>> 77536ae649e2d3704f212458bca23979f9ab7990
    if err != nil {
        panic(err)
    }

    // Stream implements io.ReadWriteCloser
    stream.Write([]byte("ping"))
    stream.Close()
    session.Close()
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

<<<<<<< HEAD
    // Setup server side of smux
    session, err := smux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.AcceptStream()
=======
   // Session of smux
   session := smux.SmuxSession(conn)

    // Accept a stream
    stream, err := session.Accept()
>>>>>>> 77536ae649e2d3704f212458bca23979f9ab7990
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
    stream.Close()
    session.Close()
}
<<<<<<< HEAD

Status

Stable


## example

网关服务 https://github.com/yddeng/gate
>>>>>>> 77536ae649e2d3704f212458bca23979f9ab7990

Documentation

Overview

Package smux is a multiplexing library for Golang.

It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, and provides stream-oriented multiplexing over a single channel.

Index

Constants

View Source
const (
	CmdMax byte // for custom ID start
)

Variables

View Source
var (
	ErrInvalidProtocol = errors.New("invalid protocol")
	ErrConsumed        = errors.New("peer consumed more than sent")
	ErrGoAway          = errors.New("stream id overflows, should start a new connection")
	ErrTimeout         = errors.New("timeout")
	ErrWouldBlock      = errors.New("operation would block on IO")
)
View Source
var (
	ErrTimeout    = errors.New("timeout. ")
	ErrClosedPipe = errors.New("the stream has closed. ")
	ErrBrokenPipe = errors.New("write on closed stream. ")
)
View Source
var ErrNonblock = errors.New("nonblock none. ")

只有在非堵塞模式下才有的返回错误 读情况下,没有数据可读;写情况下,不能写且已发送为0

View Source
var (
	ErrServiceClosed = errors.New("aioService is closed. ")
)

Functions

func IsSmux

func IsSmux(conn net.Conn, timeout time.Duration) bool

判断对端是否是多路复用

func Listen

func Listen(address string, callback func(session *MuxSession)) error

func NewIDBitmap

func NewIDBitmap() *idBitmap

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type AIOService

type AIOService struct {
	// contains filtered or unexported fields
}

func OpenAIOService

func OpenAIOService(mux *MuxSession, worker int) *AIOService

func (*AIOService) Close

func (this *AIOService) Close()

func (*AIOService) Unwatch

func (this *AIOService) Unwatch(fd uint16)

func (*AIOService) Watch

func (this *AIOService) Watch(fd uint16, callback func(event Event)) error

type Allocator

type Allocator struct {
	// contains filtered or unexported fields
}

Allocator for incoming frames, optimized to prevent overwriting after zeroing

func NewAllocator

func NewAllocator() *Allocator

NewAllocator initiates a []byte allocator for frames less than 65536 bytes, the waste(memory fragmentation) of space allocation is guaranteed to be no more than 50%.

func (*Allocator) Get

func (alloc *Allocator) Get(size int) []byte

Get a []byte from pool with most appropriate cap

func (*Allocator) Put

func (alloc *Allocator) Put(buf []byte) error

Put returns a []byte to pool for future use, which the cap must be exactly 2^n

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(size int) *Buffer

func (*Buffer) Cap

func (b *Buffer) Cap() int

func (*Buffer) Clear

func (b *Buffer) Clear()

func (*Buffer) Empty

func (b *Buffer) Empty() bool

func (*Buffer) Full

func (b *Buffer) Full() bool

func (*Buffer) Grow

func (b *Buffer) Grow(n int)

func (*Buffer) Len

func (b *Buffer) Len() int

func (*Buffer) Read

func (b *Buffer) Read(buf []byte) (n int, err error)

func (*Buffer) Write

func (b *Buffer) Write(buf []byte) (n int, err error)

type Config

type Config struct {
	// SMUX Protocol version, support 1,2
	Version int

	// Disabled keepalive
	KeepAliveDisabled bool

	// Disabled keepalive
	KeepAliveCloseDisabled bool

	// KeepAliveInterval is how often to send a NOP command to the remote
	KeepAliveInterval time.Duration

	// KeepAliveTimeout is how long the session
	// will be closed if no data has arrived
	// v2 can smaller than keep-alive interval
	// and will closed if RTT test > KeepAliveTimeout
	KeepAliveTimeout time.Duration

	// MaxFrameSize is used to control the maximum
	// frame size to sent to the remote
	MaxFrameSize int

	// MaxReceiveBuffer is used to control the maximum
	// number of data in the buffer pool
	MaxReceiveBuffer int

	// MaxStreamBuffer is used to control the maximum
	// number of data per stream
	MaxStreamBuffer int
}

Config is used to tune the Smux session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

func DefaultConfigV1

func DefaultConfigV1() *Config

type Event

type Event byte
const (
	EV_READ  Event = 1 << 1
	EV_WRITE Event = 1 << 2
	EV_ERROR Event = 1 << 3
)

Event poller 返回事件值

func (Event) Readable

func (e Event) Readable() bool

func (Event) Writable

func (e Event) Writable() bool

type Frame

type Frame struct {
	// contains filtered or unexported fields
}

Frame defines a packet from or to be multiplexed into a single connection

func (Frame) Cmd

func (f Frame) Cmd() byte

func (Frame) Data

func (f Frame) Data() []byte

func (Frame) Sid

func (f Frame) Sid() uint32

type MuxConn

type MuxConn struct {
	// contains filtered or unexported fields
}

func (*MuxConn) Close

func (this *MuxConn) Close() error

func (*MuxConn) ID

func (this *MuxConn) ID() uint16

func (*MuxConn) LocalAddr

func (this *MuxConn) LocalAddr() net.Addr

LocalAddr satisfies net.Conn interface

func (*MuxConn) Read

func (this *MuxConn) Read(b []byte) (n int, err error)

func (*MuxConn) Readable

func (this *MuxConn) Readable() (int, bool)

Readable returns length of read buffer

func (*MuxConn) RemoteAddr

func (this *MuxConn) RemoteAddr() net.Addr

RemoteAddr satisfies net.Conn interface

func (*MuxConn) SetDeadline

func (this *MuxConn) SetDeadline(t time.Time) error

SetDeadline sets both read and write deadlines as defined by net.Conn.SetDeadline. A zero time value disables the deadlines.

func (*MuxConn) SetNonblock

func (this *MuxConn) SetNonblock(nonblocking bool)

func (*MuxConn) SetReadDeadline

func (this *MuxConn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline as defined by net.Conn.SetReadDeadline. A zero time value disables the deadline.

func (*MuxConn) SetWriteDeadline

func (this *MuxConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline as defined by net.Conn.SetWriteDeadline. A zero time value disables the deadline.

func (*MuxConn) Writable

func (this *MuxConn) Writable() (int, bool)

Writable returns write windows

func (*MuxConn) Write

func (this *MuxConn) Write(b []byte) (n int, err error)

type MuxSession

type MuxSession struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(address string, timeout time.Duration) (*MuxSession, error)

func NewMuxSession

func NewMuxSession(conn net.Conn) *MuxSession

func (*MuxSession) Accept

func (this *MuxSession) Accept() (*MuxConn, error)

func (*MuxSession) Addr

func (this *MuxSession) Addr() net.Addr

func (*MuxSession) Close

func (this *MuxSession) Close()

func (*MuxSession) GetMuxConn

func (this *MuxSession) GetMuxConn(connID uint16) *MuxConn

func (*MuxSession) IsClosed

func (this *MuxSession) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*MuxSession) NumMuxConn

func (this *MuxSession) NumMuxConn() int

func (*MuxSession) Open

func (this *MuxSession) Open() (*MuxConn, error)

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session defines a multiplexed connection for streams

func Client

func Client(conn io.ReadWriteCloser, config *Config) (*Session, error)

Client is used to initialize a new client-side connection.

func Server

func Server(conn io.ReadWriteCloser, config *Config) (*Session, error)

Server is used to initialize a new server-side connection.

func (*Session) Accept

func (s *Session) Accept() (io.ReadWriteCloser, error)

Accept Returns a generic ReadWriteCloser instead of smux.Stream

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) CheckRTT

func (s *Session) CheckRTT(sendTimeout time.Duration) uint32

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams.

func (*Session) GetCustomFrame

func (s *Session) GetCustomFrame() (*Frame, error)

GetCustomFrame is used to block until the next available Custom Frame exist TODO: put buffer back

func (*Session) GetRTT

func (s *Session) GetRTT() time.Duration

func (*Session) GetRTTCh

func (s *Session) GetRTTCh() <-chan uint32

GetRTTCh returns a readonly chan which can be readable when got ack

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr satisfies net.Conn interface

func (*Session) NumStreams

func (s *Session) NumStreams() int

NumStreams returns the number of currently open streams

func (*Session) Open

func (s *Session) Open() (io.ReadWriteCloser, error)

Open returns a generic ReadWriteCloser

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream is used to create a new stream

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr satisfies net.Conn interface

func (*Session) SetDeadline

func (s *Session) SetDeadline(t time.Time) error

SetDeadline sets a deadline used by Accept* calls. A zero time value disables the deadline.

func (*Session) WriteCustomFrame

func (s *Session) WriteCustomFrame(cmd byte, sid uint32, data []byte) (n int, err error)

func (*Session) WriteCustomFrameTimeout

func (s *Session) WriteCustomFrameTimeout(cmd byte, sid uint32, data []byte, deadline <-chan time.Time) (n int, err error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream implements net.Conn

func (*Stream) Close

func (s *Stream) Close() error

Close implements net.Conn

func (*Stream) GetDieCh

func (s *Stream) GetDieCh() <-chan struct{}

GetDieCh returns a readonly chan which can be readable when the stream is to be closed.

func (*Stream) ID

func (s *Stream) ID() uint32

ID returns the unique stream ID.

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr satisfies net.Conn interface

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read implements net.Conn

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr satisfies net.Conn interface

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets both read and write deadlines as defined by net.Conn.SetDeadline. A zero time value disables the deadlines.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline as defined by net.Conn.SetReadDeadline. A zero time value disables the deadline.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline as defined by net.Conn.SetWriteDeadline. A zero time value disables the deadline.

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Write implements net.Conn

Note that the behavior when multiple goroutines write concurrently is not deterministic, frames may interleave in random way.

func (*Stream) WriteTo

func (s *Stream) WriteTo(w io.Writer) (n int64, err error)

WriteTo implements io.WriteTo

Notes

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL