nbio

package module
v1.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: MIT Imports: 18 Imported by: 39

README

NBIO - NON-BLOCKING IO

Mentioned in Awesome Go MIT licensed Go Version Build Status Go Report Card

Contents

Features

Cross Platform
  • Linux: Epoll with LT/ET/ET+ONESHOT supported, LT as default
  • BSD(MacOS): Kqueue
  • Windows: Based on std net, for debugging only
Protocols Supported
  • TCP/UDP/Unix Socket supported
  • TLS supported
  • HTTP/HTTPS 1.x supported
  • Websocket supported, Passes the Autobahn Test Suite, OnOpen/OnMessage/OnClose order guaranteed
Interfaces
  • Implements a non-blocking net.Conn(except windows)
  • SetDeadline/SetReadDeadline/SetWriteDeadline supported
  • Concurrent Write/Close supported(both nbio.Conn and nbio/nbhttp/websocket.Conn)

Quick Start

package main

import (
	"log"

	"github.com/lesismal/nbio"
)

func main() {
	engine := nbio.NewEngine(nbio.Config{
		Network:            "tcp",//"udp", "unix"
		Addrs:              []string{":8888"},
		MaxWriteBufferSize: 6 * 1024 * 1024,
	})

	// hanlde new connection
	engine.OnOpen(func(c *nbio.Conn) {
		log.Println("OnOpen:", c.RemoteAddr().String())
	})
	// hanlde connection closed
	engine.OnClose(func(c *nbio.Conn, err error) {
		log.Println("OnClose:", c.RemoteAddr().String(), err)
	})
	// handle data
	engine.OnData(func(c *nbio.Conn, data []byte) {
		c.Write(append([]byte{}, data...))
	})

	err := engine.Start()
	if err != nil {
		log.Fatalf("nbio.Start failed: %v\n", err)
		return
	}
	defer engine.Stop()

	<-make(chan int)
}

Examples

TCP Echo Examples
UDP Echo Examples
TLS Examples
HTTP Examples
HTTPS Examples
Websocket Examples
Websocket TLS Examples
Use With Other STD Based Frameworkds
More Examples

1M Websocket Connections Benchmark

For more details: go-websocket-benchmark

# lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.6 LTS
Release:        20.04
Codename:       focal

# free
              total        used        free      shared  buff/cache   available
Mem:       24969564    15656352     3422212        1880     5891000     8899604
Swap:             0           0           0

# cat /proc/cpuinfo | grep processor
processor       : 0
processor       : 1
processor       : 2
processor       : 3
processor       : 4
processor       : 5
processor       : 6
processor       : 7
processor       : 8
processor       : 9
processor       : 10
processor       : 11
processor       : 12
processor       : 13
processor       : 14
processor       : 15


# taskset
run nbio_nonblocking server on cpu 0-7

--------------------------------------------------------------
BenchType  : BenchEcho
Framework  : nbio_nonblocking
TPS        : 104713
EER        : 280.33
Min        : 56.90us
Avg        : 95.36ms
Max        : 2.29s
TP50       : 62.82ms
TP75       : 65.38ms
TP90       : 89.38ms
TP95       : 409.55ms
TP99       : 637.95ms
Used       : 47.75s
Total      : 5000000
Success    : 5000000
Failed     : 0
Conns      : 1000000
Concurrency: 10000
Payload    : 1024
CPU Min    : 0.00%
CPU Avg    : 373.53%
CPU Max    : 602.33%
MEM Min    : 978.70M
MEM Avg    : 979.88M
MEM Max    : 981.14M
--------------------------------------------------------------

Magics For HTTP and Websocket

Different IOMod
IOMod Remarks
IOModNonBlocking There's no difference between this IOMod and the old version with no IOMod. All the connections will be handled by poller.
IOModBlocking All the connections will be handled by at least one goroutine, for websocket, we can set Upgrader.BlockingModAsyncWrite=true to handle writing with a separated goroutine and then avoid Head-of-line blocking on broadcasting scenarios.
IOModMixed We set the Engine.MaxBlockingOnline, if the online num is smaller than it, the new connection will be handled by single goroutine as IOModBlocking, else the new connection will be handled by poller.

The IOModBlocking aims to improve the performance for low online service, it runs faster than std. The IOModMixed aims to keep a balance between performance and cpu/mem cost in different scenarios: when there are not too many online connections, it performs better than std, or else it can serve lots of online connections and keep healthy.

Using Websocket With Std Server
package main

import (
	"fmt"
	"net/http"

	"github.com/lesismal/nbio/nbhttp/websocket"
)

var (
	upgrader = newUpgrader()
)

func newUpgrader() *websocket.Upgrader {
	u := websocket.NewUpgrader()
	u.OnOpen(func(c *websocket.Conn) {
		// echo
		fmt.Println("OnOpen:", c.RemoteAddr().String())
	})
	u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
		// echo
		fmt.Println("OnMessage:", messageType, string(data))
		c.WriteMessage(messageType, data)
	})
	u.OnClose(func(c *websocket.Conn, err error) {
		fmt.Println("OnClose:", c.RemoteAddr().String(), err)
	})
	return u
}

func onWebsocket(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		panic(err)
	}
	fmt.Println("Upgraded:", conn.RemoteAddr().String())
}

func main() {
	mux := &http.ServeMux{}
	mux.HandleFunc("/ws", onWebsocket)
	server := http.Server{
		Addr:    "localhost:8080",
		Handler: mux,
	}
	fmt.Println("server exit:", server.ListenAndServe())
}

Credits

Contributors

Thanks Everyone:

Star History

Star History Chart

Documentation

Index

Constants

View Source
const (
	// DefaultReadBufferSize .
	DefaultReadBufferSize = 1024 * 64

	// DefaultMaxWriteBufferSize .
	DefaultMaxWriteBufferSize = 1024 * 1024

	// DefaultMaxConnReadTimesPerEventLoop .
	DefaultMaxConnReadTimesPerEventLoop = 3

	// DefaultUDPReadTimeout .
	DefaultUDPReadTimeout = 120 * time.Second
)
View Source
const (
	// EPOLLLT .
	EPOLLLT = 0

	// EPOLLET .
	EPOLLET = 0x80000000

	// EPOLLONESHOT .
	EPOLLONESHOT = syscall.EPOLLONESHOT
)
View Source
const (
	IPPROTO_TCP   = syscall.IPPROTO_TCP
	TCP_KEEPINTVL = syscall.TCP_KEEPINTVL
	TCP_KEEPIDLE  = syscall.TCP_KEEPIDLE
)

Variables

View Source
var (
	ErrReadTimeout = errors.New("read timeout")

	ErrWriteTimeout = errors.New("write timeout")

	ErrOverflow = errors.New("write overflow")

	ErrUnsupported = errors.New("unsupported operation")
)
View Source
var (
	// MaxOpenFiles .
	MaxOpenFiles = 1024 * 1024 * 2
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Name describes your gopher name for logging, it's set to "NB" by default.
	Name string

	// Network is the listening protocol, used with Addrs together.
	// tcp* supported only by now, there's no plan for other protocol such as udp,
	// because it's too easy to write udp server/client.
	Network string

	// Addrs is the listening addr list for a nbio server.
	// if it is empty, no listener created, then the Engine is used for client by default.
	Addrs []string

	// NPoller represents poller goroutine num, it's set to runtime.NumCPU() by default.
	NPoller int

	// ReadBufferSize represents buffer size for reading, it's set to 64k by default.
	ReadBufferSize int

	// MaxWriteBufferSize represents max write buffer size for Conn, it's set to 1m by default.
	// if the connection's Send-Q is full and the data cached by nbio is
	// more than MaxWriteBufferSize, the connection would be closed by nbio.
	MaxWriteBufferSize int

	// MaxConnReadTimesPerEventLoop represents max read times in one poller loop for one fd
	MaxConnReadTimesPerEventLoop int

	// LockListener represents whether to lock thread for listener's goroutine, false by default.
	LockListener bool

	// LockPoller represents whether to lock thread for poller's goroutine, false by default.
	LockPoller bool

	// EpollMod sets the epoll mod, EPOLLLT by default.
	EpollMod uint32

	// EPOLLONESHOT sets EPOLLONESHOT, 0 by default.
	EPOLLONESHOT uint32

	// UDPReadTimeout sets the timeout for udp sessions.
	UDPReadTimeout time.Duration

	// Listen is used to create listener for Engine.
	// Users can set this func to customize listener, such as reuseport.
	Listen func(network, addr string) (net.Listener, error)

	// ListenUDP is used to create udp listener for Engine.
	ListenUDP func(network string, laddr *net.UDPAddr) (*net.UDPConn, error)

	// AsyncReadInPoller represents how the reading events and reading are handled
	// by epoll goroutine:
	// true : epoll goroutine handles the reading events only, another goroutine
	//        pool will handles the reading.
	// false: epoll goroutine handles both the reading events and the reading.
	AsyncReadInPoller bool
	// IOExecute is used to handle the aysnc reading, users can customize it.
	IOExecute func(f func([]byte))

	// BodyAllocator sets the buffer allocator for write cache.
	BodyAllocator mempool.Allocator
}

Config Of Engine.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn implements net.Conn with non-blocking interfaces.

func Dial

func Dial(network string, address string) (*Conn, error)

Dial calls net.Dial to make a net.Conn and convert it to *nbio.Conn.

func DialTimeout

func DialTimeout(network string, address string, timeout time.Duration) (*Conn, error)

Dial calls net.DialTimeout to make a net.Conn and convert it to *nbio.Conn.

func NBConn

func NBConn(conn net.Conn) (*Conn, error)

NBConn converts net.Conn to *Conn.

func (*Conn) AsyncRead added in v1.4.0

func (c *Conn) AsyncRead()

AsyncReadInPoller is used for reading data async.

func (*Conn) Close

func (c *Conn) Close() error

Close implements closes connection.

func (*Conn) CloseWithError

func (c *Conn) CloseWithError(err error) error

CloseWithError closes connection with user specified error.

func (*Conn) DataHandler added in v1.2.4

func (c *Conn) DataHandler() func(conn *Conn, data []byte)

DataHandler returns Conn's data handler.

func (*Conn) Execute added in v1.2.0

func (c *Conn) Execute(job func()) bool

Execute is used to run the job.

How it works: If the job is the head/first of the Conn's job list, it will call the nbio.Engine.Execute to run all the jobs in the job list that include:

  1. This job
  2. New jobs that are pushed to the back of the list before this job is done.
  3. nbio.Engine.Execute returns until there's no more jobs in the job list.

Else if the job is not the head/first of the job list, it will push the job to the back of the job list and wait to be called. This guarantees there's at most one flow or goroutine running job/jobs for each Conn. This guarantees all the jobs are executed in order.

Notice:

  1. The job wouldn't run or pushed to the back of the job list if the connection is closed.
  2. nbio.Engine.Execute is handled by a goroutine pool by default, users can customize it.

func (*Conn) ExecuteLen added in v1.2.0

func (c *Conn) ExecuteLen() int

ExecuteLen returns the length of the Conn's job list.

func (*Conn) Hash

func (c *Conn) Hash() int

Hash returns a hash code of this connection.

func (*Conn) IsClosed added in v1.1.22

func (c *Conn) IsClosed() (bool, error)

IsClosed returns whether the Conn is closed.

func (*Conn) IsTCP added in v1.3.7

func (c *Conn) IsTCP() bool

IsTCP returns whether this Conn is a TCP Conn.

func (*Conn) IsUDP added in v1.3.0

func (c *Conn) IsUDP() bool

IsUDP returns whether this Conn is a UDP Conn.

func (*Conn) IsUnix added in v1.3.7

func (c *Conn) IsUnix() bool

IsUnix returns whether this Conn is a Unix Conn.

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns the local network address, if known.

func (*Conn) Lock added in v1.1.22

func (c *Conn) Lock()

Lock .

func (*Conn) MustExecute added in v1.2.0

func (c *Conn) MustExecute(job func())

MustExecute implements a similar function as Execute did, but will still execute or push the job to the back of the job list no matter whether Conn has been closed, it guarantees the job to be executed. This is used to handle the close event in nbio/nbhttp.

func (*Conn) OnData added in v1.2.4

func (c *Conn) OnData(h func(conn *Conn, data []byte))

OnData registers Conn's data handler. Notice:

  1. The data readed by the poller is not handled by this Conn's data handler by default.
  2. The data readed by the poller is handled by nbio.Engine's data handler which is registered by nbio.Engine.OnData by default.
  3. This Conn's data handler is used to customize your implementation, you can set different data handler for different Conns, and call Conn's data handler in nbio.Engine's data handler. For example: engine.OnData(func(c *nbio.Conn, data byte){ c.DataHandler()(c, data) }) conn1.OnData(yourDatahandler1) conn2.OnData(yourDatahandler2)

func (*Conn) Read

func (c *Conn) Read(b []byte) (int, error)

Read . Depracated . It was used to customize users' reading implementation, but better to use `ReadAndGetConn` instead, which can handle different types of connection and returns the consistent connection instance for UDP. Notice: non-blocking interface, should not be used as you use std.

func (*Conn) ReadAndGetConn added in v1.3.1

func (c *Conn) ReadAndGetConn(b []byte) (*Conn, int, error)

ReadAndGetConn handles reading for different types of connection. It returns the real connection:

  1. For Non-UDP connection, it returns the Conn itself.
  2. For UDP connection, it may be a UDP Server fd, then it returns consistent Conn for the same socket which has the same local addr and remote addr.

Notice: non-blocking interface, should not be used as you use std.

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address, if known.

func (*Conn) ResetPollerEvent added in v1.3.16

func (c *Conn) ResetPollerEvent()

func (*Conn) Sendfile

func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error)

Sendfile .

func (*Conn) Session

func (c *Conn) Session() interface{}

Session returns user session.

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

SetDeadline sets deadline for both read and write. If it is time.Zero, SetDeadline will clear the deadlines.

func (*Conn) SetKeepAlive

func (c *Conn) SetKeepAlive(keepalive bool) error

SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.

func (*Conn) SetKeepAlivePeriod

func (c *Conn) SetKeepAlivePeriod(d time.Duration) error

SetKeepAlivePeriod sets period between keep-alives.

func (*Conn) SetLinger

func (c *Conn) SetLinger(onoff int32, linger int32) error

SetLinger .

func (*Conn) SetNoDelay

func (c *Conn) SetNoDelay(nodelay bool) error

SetNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func (*Conn) SetReadBuffer

func (c *Conn) SetReadBuffer(bytes int) error

SetReadBuffer sets the size of the operating system's receive buffer associated with the connection.

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls. When the user doesn't update the deadline and the deadline exceeds, the connection will be closed. If it is time.Zero, SetReadDeadline will clear the deadline.

Notice:

  1. Users should update the read deadline in time.
  2. For example, call SetReadDeadline whenever a new WebSocket message is received.

func (*Conn) SetSession

func (c *Conn) SetSession(session interface{})

SetSession sets user session.

func (*Conn) SetWriteBuffer

func (c *Conn) SetWriteBuffer(bytes int) error

SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future data writing. If it is time.Zero, SetReadDeadline will clear the deadline.

If the next Write call writes all the data successfully and there's no data left to bewritten, the deadline timer will be cleared automatically; Else when the user doesn't update the deadline and the deadline exceeds, the connection will be closed.

func (*Conn) SyscallConn added in v1.5.5

func (c *Conn) SyscallConn() (syscall.RawConn, error)

func (*Conn) Type added in v1.3.0

func (c *Conn) Type() ConnType

Type .

func (*Conn) Unlock added in v1.1.22

func (c *Conn) Unlock()

Unlock .

func (*Conn) Write

func (c *Conn) Write(b []byte) (int, error)

Write writes data to the connection. Notice:

  1. This is a non-blocking interface, but you can use it as you use std.
  2. When it can't write all the data now, the connection will cache the data left to be written and wait for the writing event then try to flush it.

func (*Conn) Writev

func (c *Conn) Writev(in [][]byte) (int, error)

Writev does similar things as Write, but with [][]byte input arg. Notice: doesn't support UDP if more than 1 []byte.

type ConnType added in v1.3.0

type ConnType = int8

ConnType is used to identify different types of Conn.

const (
	// ConnTypeTCP represents TCP Conn.
	ConnTypeTCP ConnType = iota + 1
	// ConnTypeUDPServer represents UDP Conn used as a listener.
	ConnTypeUDPServer
	// ConnTypeUDPClientFromRead represents UDP connection that
	// is sending data to our UDP Server from peer.
	ConnTypeUDPClientFromRead
	// ConnTypeUDPClientFromDial represents UDP Conn that is sending
	// data to other UDP Server from ourselves.
	ConnTypeUDPClientFromDial
	// ConnTypeUnix represents Unix Conn.
	ConnTypeUnix
)

type Engine added in v1.2.14

type Engine struct {
	Config
	*timer.Timer
	sync.WaitGroup

	Execute func(f func())
	// contains filtered or unexported fields
}

Engine is a manager of poller.

func NewEngine added in v1.2.14

func NewEngine(conf Config) *Engine

NewEngine creates an Engine and init default configurations.

func (*Engine) AddConn added in v1.2.14

func (g *Engine) AddConn(conn net.Conn) (*Conn, error)

AddConn adds conn to a poller.

func (*Engine) OnClose added in v1.2.14

func (g *Engine) OnClose(h func(c *Conn, err error))

OnClose registers callback for disconnected.

func (*Engine) OnData added in v1.2.14

func (g *Engine) OnData(h func(c *Conn, data []byte))

OnData registers callback for data.

func (*Engine) OnOpen added in v1.2.14

func (g *Engine) OnOpen(h func(c *Conn))

OnOpen registers callback for new connection.

func (*Engine) OnRead added in v1.2.14

func (g *Engine) OnRead(h func(c *Conn))

OnRead registers callback for reading event.

func (*Engine) OnReadBufferAlloc added in v1.2.14

func (g *Engine) OnReadBufferAlloc(h func(c *Conn) []byte)

OnReadBufferAlloc registers callback for memory allocating.

func (*Engine) OnReadBufferFree added in v1.2.14

func (g *Engine) OnReadBufferFree(h func(c *Conn, b []byte))

OnReadBufferFree registers callback for memory release.

func (*Engine) OnStop added in v1.2.14

func (g *Engine) OnStop(h func())

OnStop registers callback before Engine is stopped.

func (*Engine) OnUDPListen added in v1.5.5

func (g *Engine) OnUDPListen(h func(c *Conn))

OnOpen registers callback for new connection.

func (*Engine) OnWrittenSize added in v1.4.0

func (g *Engine) OnWrittenSize(h func(c *Conn, b []byte, n int))

OnWrittenSize registers callback for written size. If len(b) is bigger than 0, it represents that it's writing a buffer, else it's operating by Sendfile.

func (*Engine) PollerBuffer added in v1.2.14

func (g *Engine) PollerBuffer(c *Conn) []byte

PollerBuffer returns Poller's buffer by Conn, can be used on linux/bsd.

func (*Engine) SetETAsyncRead added in v1.5.4

func (e *Engine) SetETAsyncRead()

SetETAsyncRead .

func (*Engine) SetLTSyncRead added in v1.5.4

func (e *Engine) SetLTSyncRead()

SetLTSyncRead .

func (*Engine) Shutdown added in v1.2.14

func (g *Engine) Shutdown(ctx context.Context) error

Shutdown stops Engine gracefully with context.

func (*Engine) Start added in v1.2.14

func (g *Engine) Start() error

Start inits and starts pollers.

func (*Engine) Stop added in v1.2.14

func (g *Engine) Stop()

Stop closes listeners/pollers/conns/timer.

type Gopher

type Gopher = Engine

Gopher keeps old type to compatible with new name Engine.

func NewGopher

func NewGopher(conf Config) *Gopher

Directories

Path Synopsis
autobahn
extension
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL