pool

package module
v2.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2022 License: MIT Imports: 11 Imported by: 0

README

这是什么?(WTF it is?)

这是一个 为 客户端 设计的 TCP连接池。(没有服务端,是因为我觉得Golang服务端不需要连接池,每一个连接独立一个goroutine即可。

为了解决多Goroutine下对TCP读写的问题。

例如

Relay等工具。

他们往往一下子涌入一大堆连接。此时,解决办法有Ring Buffer,或者新建连接。

但是随意滥开连接会导致TCP连接太多触发系统 HARD/SOFT LIMITS。

那么为什么不用ring buffer?我认为,goroutine就是一个设计很好的队列,其次,数据就不应该被放入内存中等待太久,就应该立刻发送,以便降低时延。

所以这个连接池就诞生了。目的就是为了解决在高并发环境下TCP写问题。

我是如何解决写问题的?

通过互斥锁,从池子里拿出连接,就上锁,放入就解锁。但是互斥锁会自旋,会休眠,如果连接很多,这些在抢锁的过程中也会产生较大时延。

但是得益于Go 1.18新加入的TryLock函数,我们可以不进行抢锁的过程,如果这个连接繁忙,跳过,进行下一个。

如果全部都很繁忙(也就是互斥锁全都锁上了),那么就会尝试新建连接。

默认会创建16个连接,在我本人测试下(8核心机器),开500个goroutine,16个连接完全绰绰有余,最多只会用到前3个。

那么读呢?

V1版本的连接池并没有考虑到这个问题。

而那么多个连接,不可能一个个开一个goroutine去等待。

因此,我引入了系统的epoll通知钩子,通过epoll来获取可以读的连接,并且将它们放入一个Readable Queue(可读队列,默认长度1024)中,来供不同的goroutine读。

Documentation

Index

Constants

View Source
const (
	UP int32 = iota
	DOWN
)
View Source
const (
	MIN_SIZE                = 8
	EPOLL_MAX_SIZE          = 1024
	MIN_READABLE_QUEUE_SIZE = 1024
	ATTEMPT_RECONNECT       = 100
	RECONNECT_TIMEOUT       = 300
	// it seems that the value of syscall.EPOLLET is wrong
	EPOLLET = 0x80000000
)

Variables

View Source
var (
	NO_AVAILABLE_CONN = errors.New("no available connections!")
	POOL_CLOSED       = errors.New("pool has been closed")
)

Functions

func CreatePool

func CreatePool(remote string) (net.Conn, error)

for those lazy people like me

func GetSysMax

func GetSysMax() int32

func NetConn

func NetConn(p *Pool) net.Conn

func Wrapper

func Wrapper(p *Pool) io.ReadWriteCloser

Types

type ConnNode

type ConnNode struct {
	Conn net.Conn
	Lock sync.Mutex
	// contains filtered or unexported fields
}

func (*ConnNode) After

func (cn *ConnNode) After(n *ConnNode)

move the node after n

func (*ConnNode) Before

func (cn *ConnNode) Before(n *ConnNode)

move the node ahead n

func (*ConnNode) Down added in v2.1.0

func (cn *ConnNode) Down() bool

set the connection down

func (*ConnNode) IsAvailable added in v2.0.3

func (cn *ConnNode) IsAvailable() bool

connection is available or not

func (*ConnNode) IsBusy added in v2.1.4

func (cn *ConnNode) IsBusy() bool

the connection is busy or not

func (*ConnNode) IsClosed added in v2.1.0

func (cn *ConnNode) IsClosed() bool

the connection is closed or not

func (*ConnNode) IsDown added in v2.1.0

func (cn *ConnNode) IsDown() bool

the connection is down or not

func (*ConnNode) MoveTo

func (cn *ConnNode) MoveTo(next *ConnNode, prev *ConnNode)

Move the node between next and prev

func (*ConnNode) Up added in v2.1.0

func (cn *ConnNode) Up() bool

set the connection up

func (*ConnNode) Wait added in v2.1.4

func (cn *ConnNode) Wait()

wait until the connection is available

type Opts

type Opts []interface{}

func DefaultOpts

func DefaultOpts() Opts

func (Opts) Parse

func (o Opts) Parse() (*net.Dialer, int32, context.Context)

func (Opts) WithContext

func (o Opts) WithContext(c context.Context)

func (Opts) WithDialer

func (o Opts) WithDialer(d *net.Dialer)

func (Opts) WithMinSize

func (o Opts) WithMinSize(m int32)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func New

func New(remote string, opts Opts) (*Pool, error)

func (*Pool) AddConn added in v2.0.2

func (p *Pool) AddConn(cn *ConnNode)

this is NOT A THREAD-SAFE method please check the Reconnect/Push function for the correct usage.

func (*Pool) Close

func (p *Pool) Close()

close all connection.

func (*Pool) EpollClose

func (p *Pool) EpollClose()

func (*Pool) Get

func (p *Pool) Get() (*ConnNode, error)

get a writable connection.

func (*Pool) GetReadableConn

func (p *Pool) GetReadableConn() (net.Conn, error)

Deprecated

func (*Pool) MoveToHead

func (p *Pool) MoveToHead(cp *ConnNode)

move the node to the head

func (*Pool) MoveToTail

func (p *Pool) MoveToTail(cp *ConnNode)

move the node to the tail

func (*Pool) Push

func (p *Pool) Push(c net.Conn) *ConnNode

this is for creating a new connection. push the new connection into the pool

func (*Pool) Put

func (p *Pool) Put(c *ConnNode)

put the writable connection into the pool.

func (*Pool) Read added in v2.1.2

func (p *Pool) Read(b []byte) (n int, err error)

func (*Pool) Reconnect

func (p *Pool) Reconnect(cn *ConnNode)

func (*Pool) Remote

func (p *Pool) Remote() string

func (*Pool) Remove added in v2.0.1

func (p *Pool) Remove(n *ConnNode)

func (*Pool) RemoveConn added in v2.0.2

func (p *Pool) RemoveConn(cn *ConnNode)

this is NOT A THREAD-SAFE method please check the Reconnect/Push function for the correct usage. before calling this, you MUST lock the mutex lock of ConnNode. otherwise, it will cause the data race problem. if this connection is in used, removing this will cause nil pointer panic.

func (*Pool) ReplaceRemote added in v2.0.2

func (p *Pool) ReplaceRemote(remote string) error

func (*Pool) SetDialContext added in v2.0.2

func (p *Pool) SetDialContext(ctx context.Context)

set new dial context

func (*Pool) SetDialer added in v2.0.2

func (p *Pool) SetDialer(d *net.Dialer)

set new dialer

func (*Pool) SetMaxConn added in v2.0.2

func (p *Pool) SetMaxConn(max int32)

set new max conn

func (*Pool) SetMinConn added in v2.0.2

func (p *Pool) SetMinConn(min int32) error

set new min conn

func (*Pool) SetRemote added in v2.0.2

func (p *Pool) SetRemote(remote string)

set dial new remote

type PoolNetconn

type PoolNetconn struct {
	// contains filtered or unexported fields
}

THIS IS NOT THREAD-SAFE How to use? this is for one goroutine to wrapper the pool but which only requires one connection.

func (*PoolNetconn) Close

func (p *PoolNetconn) Close() error

func (*PoolNetconn) LocalAddr

func (p *PoolNetconn) LocalAddr() net.Addr

func (*PoolNetconn) Read

func (p *PoolNetconn) Read(b []byte) (n int, err error)

func (*PoolNetconn) ReadFrom added in v2.0.5

func (p *PoolNetconn) ReadFrom(r io.Reader) (n int64, err error)

func (*PoolNetconn) RemoteAddr

func (p *PoolNetconn) RemoteAddr() net.Addr

func (*PoolNetconn) SetDeadline

func (p *PoolNetconn) SetDeadline(t time.Time) error

func (*PoolNetconn) SetReadDeadline

func (p *PoolNetconn) SetReadDeadline(t time.Time) error

func (*PoolNetconn) SetWriteDeadline

func (p *PoolNetconn) SetWriteDeadline(t time.Time) error

func (*PoolNetconn) Write

func (p *PoolNetconn) Write(b []byte) (n int, err error)

this function will keep the same one connection unless the writing function returns an error. In the most case, it only requires one connection.

func (*PoolNetconn) WriteTo added in v2.0.9

func (p *PoolNetconn) WriteTo(w io.Writer) (n int64, err error)

type PoolWrapper

type PoolWrapper struct {
	// contains filtered or unexported fields
}

func (*PoolWrapper) Close

func (p *PoolWrapper) Close() error

func (*PoolWrapper) Read

func (p *PoolWrapper) Read(b []byte) (n int, err error)

func (*PoolWrapper) ReadFrom added in v2.0.5

func (p *PoolWrapper) ReadFrom(r io.Reader) (n int64, err error)

func (*PoolWrapper) Write

func (p *PoolWrapper) Write(b []byte) (n int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL