mpx

package module
v0.0.0-...-eca4e8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2023 License: GPL-3.0 Imports: 17 Imported by: 0

README

mpx

Build Status go report go.dev Licenses

通用连接多路复用协议

用途

  • 复用已有连接,省去握手时间。可用于解决websocket+CDN代理场景下建立连接握手时间太长
  • 聚合多个IP/服务器,同时聚合带宽,多个低带宽服务器聚合成高带宽
  • 某些情况下聚合多条TCP连接可以提高峰值带宽

使用 mpx-tunnel

mpx-tunnel使用多个TCP连接承载

使用docker部署聚合2台服务区带宽

假设我们有两台服务器server-1, server-2

公网IP 内网IP
server-1 public-ip-1 private-ip-1
server-2 public-ip-2 private-ip-2
server-1 部署
docker run -d --restart=always --name=ssgo -p 2551:2551 -t niiv0832/go-shadowsocks2:latest -u -s 'ss://AEAD_CHACHA20_POLY1305:password@:2551'
docker run -d --restart=always --name mpx -p 5512:5512 -e LISTEN_ADDR="0.0.0.0:5512" -e TARGET_ADDR="private-ip-1:2551" fregie/mpx:latest
server-2 部署
echo "net.ipv4.ip_forward = 1" >> /etc/sysctl.conf
sysctl -p
iptables -t nat -I POSTROUTING  -j MASQUERADE
iptables -t nat -I PREROUTING -p tcp --dport 6666 -j DNAT --to-destination private-ip-1:5512
iptables -t nat -I POSTROUTING -d private-ip-1 -p tcp --dport 5512 -j SNAT --to-source private-ip-2
本地部署
docker run -d --restart=always --name mpx -p 5513:5513 -e LISTEN_ADDR="0.0.0.0:5513" -e SERVER_ADDR="public-ip-1:5512|1,public-ip-2:6666|1" fregie/mpx:latest

使用以下配置启动任意shadowsocks客户端

server: 127.0.0.1
port: 5513
method: AEAD_CHACHA20_POLY1305
password: password
源码编译安装部署
安装
go install github.com/fregie/mpx/mpx-tunnel@latest
服务端

以go-shadowsocks2举例,首先在服务器启动ss服务端:

go-shadowsocks2 -s 'ss://AEAD_CHACHA20_POLY1305:your-password@:8488' -verbose

在服务器启动mpx server,转发连接到ss服务端端口

# 指定target则为服务端
mpx-tunnel -listen 0.0.0.0:5512 -target 127.0.0.1:8848

或使用docker部署

docker run -d --restart=always --name mpx -p 5512:5512 -e LISTEN_ADDR="0.0.0.0:5512" -e TARGET_ADDR="127.0.0.1:8848" fregie/mpx:latest
客户端

启动mpx client

# 指定server则为客户端 
mpx-tunnel -listen 0.0.0.0:5513 --server server-ip:5512 -p 4

或使用docker部署

docker run -d --restart=always --name mpx -p 5512:5512 -e LISTEN_ADDR="0.0.0.0:5512" -e SERVER_ADDR="server-ip:5512" fregie/mpx:latest

-p:配置保持的长连接数量
启动ss客户端

go-shadowsocks2 -c 'ss://AEAD_CHACHA20_POLY1305:your-password@127.0.0.1:5513' -socks :1080 -verbose
聚合不同IP的连接

假如你有两台XX云30mbps轻量级服务器server-1,server-2,在同一个内网中,公网ip分别为public-ip-1,public-ip-2,内网ip分别为private-ip-1,private-ip-2

  1. server-1按以上方法部署mpx-ser以及ss server
  2. server-2配置通过iptables端口转发,转发6666端口到server-15512端口
echo "net.ipv4.ip_forward = 1" >> /etc/sysctl.conf
sysctl -p
iptables -t nat -I POSTROUTING  -j MASQUERADE
iptables -t nat -I PREROUTING -p tcp --dport 6666 -j DNAT --to-destination private-ip-1:5512
iptables -t nat -I POSTROUTING -d private-ip-1 -p tcp --dport 5512 -j SNAT --to-source private-ip-2
  1. 在客户端使用以下命令启动mpx-cli:
mpx-tunnel -listen 0.0.0.0:5513 -server "public-ip-1:5512|2,public-ip-2:6666|2" -p 4
# 使用docker部署
docker run -d --restart=always --name mpx -p 5512:5512 -e LISTEN_ADDR="0.0.0.0:5512" -e SERVER_ADDR="public-ip-1:5512|2,public-ip-2:6666|2" fregie/mpx:latest

-s: 使用,分割不同服务端,|前为服务端地址ip:port,|之后为权重,会根据其权重来分配对应地址承载的带宽。例如你有一台30m和一台60m的服务器,那你应该将其权重配为1:2

  1. 按上面的方法启动ss client

原理

mpx

mpx是一个基于标准库 net 中interface同时也实现了 net 库中部分interface的连接多路复用库。

输入

mpx接受任何实现了 net.Conn 接口的连接作为输入。
可以直接调用 AddConn 方法将Conn输入,也可以调用 ServeWithListener 输入一个 net.Listener ,调用 StartWithDialer 输入一个 dailer (mpx库中的一个interface)来使用mpx

输出

mpx提供给调用者一个名为 ConnPool 的struct。
该struct实现了 net.Listener 供服务端调用,同时提供一个 dial 方法供客户端建立连接(返回一个 net.Conn )。

TODO

  • 支持丢包重传
  • 输入连接可用性检测
  • 支持配置输入连接权重

集成

安装
go get github.com/fregie/mpx
服务端(TCP)
import (
  "github.com/fregie/mpx"
  "net"
)

func main(){
  lis, _ := net.Listen("tcp", "0.0.0.0:5512")
  // Skip exception handling here
  cp := mpx.NewConnPool()
  go cp.ServeWithListener(lis)
  for {
    conn, _ := cp.Accept()
    // Skip exception handling here
    go func(){
      defer conn.Close()
      // Do something with conn(net.Conn)
    }
  }
}
客户端(TCP)
import (
  "github.com/fregie/mpx"
  "net"
)

type TCPDialer struct {
  ServerAddr string
}
func (t *TCPDialer) Dial() (net.Conn, error) {
	return net.Dial("tcp", t.ServerAddr)
}

func main(){
  cp := mpx.NewConnPool()
  cp.StartWithDialer(&TCPDialer{ServerAddr: "ip:port"}, 5)
  conn, _ := cp.Dial(nil)
  // Skip exception handling here
  defer conn.Close()
  // Do something with conn(net.Conn)
  conn.Write([]byte("something"))
}

Documentation

Index

Constants

View Source
const (
	Connect packetType = iota
	Disconnect
	Data
	RST
	Heartbeat
	SetWeight
	ACK
)
View Source
const (
	Connected state = iota
	Closed
)

Variables

View Source
var (
	ErrClosed = errors.New("closed")
)
View Source
var (
	MaxCachedNum = 65535
)

Functions

func NewAckPacket

func NewAckPacket(tunnelID, seq, length uint32) *mpxPacket

func NewHeartbeatPacket

func NewHeartbeatPacket() *mpxPacket

func NewRSTPacket

func NewRSTPacket(tunnID uint32, data []byte) *mpxPacket

func NewSetWeightPacket

func NewSetWeightPacket(weight uint32) *mpxPacket

func PacketFromReader

func PacketFromReader(r io.Reader) (*mpxPacket, error)

func ParseType

func ParseType(t uint8) packetType

func Verbose

func Verbose(enable bool)

Types

type Conn

type Conn struct {
	net.Conn
	ID int
}

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

ConnPool 是使用mpx主要用到的结构体 接受任何实现了 net.Conn 接口的连接作为输入 可以直接调用 AddConn 方法将Conn输入,也可以调用 ServeWithListener 输入一个 net.Listener ,调用 StartWithDialer 输入一个 dailer (mpx库中的一个interface)来使用mpx

func NewConnPool

func NewConnPool() *ConnPool

NewConnPool 创建一个新的 *ConnPool

func (*ConnPool) Accept

func (p *ConnPool) Accept() (*Tunnel, error)

Accept 阻塞直到有新的连接可以返回 返回的 *Tunnel 实现了 net.Conn

func (*ConnPool) AddConn

func (p *ConnPool) AddConn(conn net.Conn) error

AddConn 向ConnPool中添加一个新的net.Conn

func (*ConnPool) AddConnWithWeight

func (p *ConnPool) AddConnWithWeight(conn net.Conn, weight uint32) error

func (*ConnPool) Addr

func (p *ConnPool) Addr() net.Addr

Addr returns the listener's network address.

func (*ConnPool) Close

func (p *ConnPool) Close() error

Close closes the listener. Any blocked Accept operations will be unblocked and return errors.

func (*ConnPool) ConnCount

func (p *ConnPool) ConnCount() int

func (*ConnPool) Connect

func (p *ConnPool) Connect(data []byte) (*Tunnel, error)

Connect 同Dial

func (*ConnPool) Dial

func (p *ConnPool) Dial(data []byte) (*Tunnel, error)

Dial 建立并返回一个新的mpx连接(net.Conn) 参数中data为在建立连接的时候携带要传输的数据,可以为nil

func (*ConnPool) Serve

func (p *ConnPool) Serve() error

Serve 启用服务,适用于使用 AddConn 方法输入连接的情况下启用服务 如果已经调用 ServeWithListener 或 StartWithDialer,请勿调用该方法

func (*ConnPool) ServeWithListener

func (p *ConnPool) ServeWithListener(lis net.Listener) error

ServeWithListener 启用服务,通过 net.Listener 输入连接 请勿和 Serve 同时调用

func (*ConnPool) StartWithDialer

func (p *ConnPool) StartWithDialer(dialer Dialer, connNum int) (err error)

StartWithDialer 启用服务,通过 Dailer 输入连接 注意:请勿和 Serve 同时调用 注意:err != nil 代表建立第一个连接失败,但是仍然会启动服务,若想停止服务请调用Close()

type Dialer

type Dialer interface {
	Dial() (net.Conn, uint32, error)
}

Dialer 用于建立net.Conn连接

type Tunnel

type Tunnel struct {
	ID uint32
	// contains filtered or unexported fields
}

func (*Tunnel) Close

func (t *Tunnel) Close() error

Close closes the connection. Any blocked Read or Write operations will be unblocked and return errors.

func (*Tunnel) LastSeen

func (t *Tunnel) LastSeen() time.Time

func (*Tunnel) LocalAddr

func (t *Tunnel) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Tunnel) Read

func (t *Tunnel) Read(buf []byte) (int, error)

func (*Tunnel) RemoteAddr

func (t *Tunnel) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*Tunnel) RemoteClose

func (t *Tunnel) RemoteClose()

func (*Tunnel) SetDeadline

func (t *Tunnel) SetDeadline(ti time.Time) error

SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline.

A deadline is an absolute time after which I/O operations fail with a timeout (see type Error) instead of blocking. The deadline applies to all future and pending I/O, not just the immediately following call to Read or Write. After a deadline has been exceeded, the connection can be refreshed by setting a deadline in the future.

An idle timeout can be implemented by repeatedly extending the deadline after successful Read or Write calls.

A zero value for t means I/O operations will not time out.

Note that if a TCP connection has keep-alive turned on, which is the default unless overridden by Dialer.KeepAlive or ListenConfig.KeepAlive, then a keep-alive failure may also return a timeout error. On Unix systems a keep-alive failure on I/O can be detected using errors.Is(err, syscall.ETIMEDOUT).

func (*Tunnel) SetReadDeadline

func (t *Tunnel) SetReadDeadline(ti time.Time) error

SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.

func (*Tunnel) SetWriteDeadline

func (t *Tunnel) SetWriteDeadline(ti time.Time) error

SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.

func (*Tunnel) Write

func (t *Tunnel) Write(b []byte) (n int, err error)

Write writes data to the connection. Write can be made to time out and return an Error with Timeout() == true after a fixed time limit; see SetDeadline and SetWriteDeadline.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL