gnet

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2020 License: MIT Imports: 16 Imported by: 0

README

gnet

English | 🇨🇳中文

📖 Introduction

gnet is an event-driven networking framework that is fast and lightweight. It makes direct epoll and kqueue syscalls rather than using the standard Go net package, and works in a similar manner as netty and libuv.

The goal of this project is to create a server framework for Go that performs on par with Redis and Haproxy for packet handling.

gnet sells itself as a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go which works on transport layer with TCP/UDP protocols and Unix Domain Socket , so it allows developers to implement their own protocols(HTTP, RPC, WebSocket, Redis, etc.) of application layer upon gnet for building diversified network applications, for instance, you get an HTTP Server or Web Framework if you implement HTTP protocol upon gnet while you have a Redis Server done with the implementation of Redis protocol upon gnet and so on.

gnet derives from the project: evio while having a much higher performance and more features.

🚀 Features

  • High-performance event-loop under networking model of multiple threads/goroutines
  • Built-in load balancing algorithm: Round-Robin
  • Built-in goroutine pool powered by the library ants
  • Built-in memory pool with bytes powered by the library bytebufferpool
  • Concise APIs
  • Efficient memory usage: Ring-Buffer
  • Supporting multiple protocols/IPC mechanism: TCP, UDP and Unix Domain Socket
  • Supporting two event-driven mechanisms: epoll on Linux and kqueue on FreeBSD
  • Supporting asynchronous write operation
  • Flexible ticker event
  • SO_REUSEPORT socket option
  • Built-in multiple codecs to encode/decode network frames into/from TCP stream: LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec and LengthFieldBasedFrameCodec, referencing netty codec, also supporting customized codecs
  • Supporting Windows platform with event-driven mechanism of IOCP Go stdlib: net
  • Additional load-balancing algorithms: Random, Least-Connections, Consistent-hashing and so on
  • TLS support
  • Implementation of gnet Client

💡 Key Designs

Networking Model of Multiple Threads/Goroutines

Multiple Reactors

gnet redesigns and implements a new built-in networking model of multiple threads/goroutines: 『multiple reactors』 which is also the default networking model of multiple threads in netty, Here's the schematic diagram:

multi_reactor

and it works as the following sequence diagram:

reactor

Multiple Reactors + Goroutine-Pool

You may ask me a question: what if my business logic in EventHandler.React contains some blocking code which leads to blocking in event-loop of gnet, what is the solution for this kind of situation?

As you know, there is a most important tenet when writing code under gnet: you should never block the event-loop goroutine in the EventHandler.React, which is also the most important tenet in netty, otherwise, it will result in a low throughput in your gnet server.

And the solution to that could be found in the subsequent networking model of multiple threads/goroutines in gnet: 『multiple reactors with thread/goroutine pool』which pulls you out from the blocking mire, it will construct a worker-pool with fixed capacity and put those blocking jobs in EventHandler.React into the worker-pool to make the event-loop goroutines non-blocking.

The networking model:『multiple reactors with thread/goroutine pool』dissolves the blocking jobs by introducing a goroutine pool, as shown below:

multi_reactor_thread_pool

and it works as the following sequence diagram:

multi-reactors

gnet implements the networking model:『multiple reactors with thread/goroutine pool』by the aid of a high-performance goroutine pool called ants that allows you to manage and recycle a massive number of goroutines in your concurrent programs, the full features and usages in ants are documented here.

gnet integrates ants and provides the pool.goroutine.Default() method that you can call to instantiate a ants pool where you are able to put your blocking code logic and call the function gnet.Conn.AsyncWrite([]byte) to send out data asynchronously after you finish the blocking process and get the output data, which makes the goroutine of event-loop non-blocking.

The details about integrating gnet with ants are shown here.

Auto-scaling Ring Buffer

There are two ring-buffers inside gnet: inbound buffer and outbound buffer to buffer and manage inbound/outbound network data.

The purpose of implementing inbound and outbound ring-buffers in gnet is to transfer the logic of buffering and managing network data based on application protocol upon TCP stream from business server to framework and unify the network data buffer, which minimizes the complexity of business code so that developers are able to concentrate on business logic instead of the underlying implementation.

🎉 Getting Started

Prerequisites

gnet requires Go 1.9 or later.

Installation

go get -u github.com/panjf2000/gnet

gnet is available as a Go module, with Go 1.11 Modules support (Go 1.11+), just simply import "github.com/panjf2000/gnet" in your source code and go [build|run|test] will download the necessary dependencies automatically.

Usage Examples

The detailed documentation is located in here: docs of gnet, but let's pass through the brief instructions first.

It is easy to create a network server with gnet. All you have to do is just to make your implementation of gnet.EventHandler interface and register your event-handler functions to it, then pass it to the gnet.Serve function along with the binding address(es). Each connection is represented as a gnet.Conn interface that is passed to various events to differentiate the clients. At any point you can close a connection or shutdown the server by return a Close or Shutdown action from an event function.

The simplest example to get you started playing with gnet would be the echo server. So here you are, a simplest echo server upon gnet that is listening on port 9000:

Echo server without blocking logic
Old version(<=v1.0.0-rc.4)
package main

import (
	"log"

	"github.com/panjf2000/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
	out = c.Read()
	c.ResetBuffer()
	return
}

func main() {
	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
}
package main

import (
	"log"

	"github.com/panjf2000/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	out = frame
	return
}

func main() {
	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
}

As you can see, this example of echo server only sets up the EventHandler.React function where you commonly write your main business code and it will be called once the server receives input data from a client. What you should know is that the input parameter: frame is a complete packet which has been decoded by the codec, as a general rule, you should implement the gnet codec interface as the business codec to packet and unpacket TCP stream, but if you don't, your gnet server is going to work with the default codec under the acquiescence, which means all data inculding latest data and previous data in buffer will be stored in the input parameter: frame when EventHandler.React is being triggered. The output data will be then encoded and sent back to that client by assigning the out variable and returning it after your business code finish processing data(in this case, it just echo the data back).

Echo server with blocking logic
Old version(<=v1.0.0-rc.4)
package main

import (
	"log"
	"time"

	"github.com/panjf2000/gnet"
	"github.com/panjf2000/gnet/pool/goroutine"
)

type echoServer struct {
	*gnet.EventServer
	pool *goroutine.Pool
}

func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
	data := append([]byte{}, c.Read()...)
	c.ResetBuffer()

	// Use ants pool to unblock the event-loop.
	_ = es.pool.Submit(func() {
		time.Sleep(1 * time.Second)
		c.AsyncWrite(data)
	})

	return
}

func main() {
	p := goroutine.Default()
	defer p.Release()
	
	echo := &echoServer{pool: p}
	log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
}
package main

import (
	"log"
	"time"

	"github.com/panjf2000/gnet"
	"github.com/panjf2000/gnet/pool/goroutine"
)

type echoServer struct {
	*gnet.EventServer
	pool *goroutine.Pool
}

func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	data := append([]byte{}, frame...)

	// Use ants pool to unblock the event-loop.
	_ = es.pool.Submit(func() {
		time.Sleep(1 * time.Second)
		c.AsyncWrite(data)
	})

	return
}

func main() {
	p := goroutine.Default()
	defer p.Release()

	echo := &echoServer{pool: p}
	log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
}

Like I said in the 『Multiple Reactors + Goroutine-Pool』section, if there are blocking code in your business logic, then you ought to turn them into non-blocking code in any way, for instance you can wrap them into a goroutine, but it will result in a massive amount of goroutines if massive traffic is passing through your server so I would suggest you utilize a goroutine pool like ants to manage those goroutines and reduce the cost of system resources.

All gnet examples:

TCP Echo Server
package main

import (
	"flag"
	"fmt"
	"log"

	"github.com/panjf2000/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
		srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	// Echo synchronously.
	out = frame
	return

	/*
		// Echo asynchronously.
		data := append([]byte{}, frame...)
		go func() {
			time.Sleep(time.Second)
			c.AsyncWrite(data)
		}()
		return
	*/
}

func main() {
	var port int
	var multicore bool

	// Example command: go run echo.go --port 9000 --multicore=true
	flag.IntVar(&port, "port", 9000, "--port 9000")
	flag.BoolVar(&multicore, "multicore", false, "--multicore true")
	flag.Parse()
	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore)))
}
UDP Echo Server
package main

import (
	"flag"
	"fmt"
	"log"

	"github.com/panjf2000/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("UDP Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
		srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	// Echo synchronously.
	out = frame
	return

	/*
		// Echo asynchronously.
		data := append([]byte{}, frame...)
		go func() {
			time.Sleep(time.Second)
			c.SendTo(data)
		}()
		return
	*/
}

func main() {
	var port int
	var multicore, reuseport bool

	// Example command: go run echo.go --port 9000 --multicore=true --reuseport=true
	flag.IntVar(&port, "port", 9000, "--port 9000")
	flag.BoolVar(&multicore, "multicore", false, "--multicore true")
	flag.BoolVar(&reuseport, "reuseport", false, "--reuseport true")
	flag.Parse()
	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, fmt.Sprintf("udp://:%d", port), gnet.WithMulticore(multicore), gnet.WithReusePort(reuseport)))
}
UDS Echo Server
package main

import (
	"flag"
	"fmt"
	"log"

	"github.com/panjf2000/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
		srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	// Echo synchronously.
	out = frame
	return

	/*
		// Echo asynchronously.
		data := append([]byte{}, frame...)
		go func() {
			time.Sleep(time.Second)
			c.AsyncWrite(data)
		}()
		return
	*/
}

func main() {
	var addr string
	var multicore bool

	// Example command: go run echo.go --sock echo.sock --multicore=true
	flag.StringVar(&addr, "sock", "echo.sock", "--port 9000")
	flag.BoolVar(&multicore, "multicore", false, "--multicore true")
	flag.Parse()

	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, fmt.Sprintf("unix://%s", addr), gnet.WithMulticore(multicore)))
}
HTTP Server
package main

import (
	"flag"
	"fmt"
	"log"
	"strconv"
	"strings"
	"time"
	"unsafe"

	"github.com/panjf2000/gnet"
)

var res string

type request struct {
	proto, method string
	path, query   string
	head, body    string
	remoteAddr    string
}

type httpServer struct {
	*gnet.EventServer
}

var errMsg = "Internal Server Error"
var errMsgBytes = []byte(errMsg)

type httpCodec struct {
	req request
}

func (hc *httpCodec) Encode(c gnet.Conn, buf []byte) (out []byte, err error) {
	if c.Context() == nil {
		return buf, nil
	}
	return appendResp(out, "500 Error", "", errMsg+"\n"), nil
}

func (hc *httpCodec) Decode(c gnet.Conn) (out []byte, err error) {
	buf := c.Read()
	c.ResetBuffer()

	// process the pipeline
	var leftover []byte
pipeline:
	leftover, err = parseReq(buf, &hc.req)
	// bad thing happened
	if err != nil {
		c.SetContext(err)
		return nil, err
	} else if len(leftover) == len(buf) {
		// request not ready, yet
		return
	}
	out = appendHandle(out, res)
	buf = leftover
	goto pipeline
}

func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
		srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}

func (hs *httpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	if c.Context() != nil {
		// bad thing happened
		out = errMsgBytes
		action = gnet.Close
		return
	}
	// handle the request
	out = frame
	return
}

func main() {
	var port int
	var multicore bool

	// Example command: go run http.go --port 8080 --multicore=true
	flag.IntVar(&port, "port", 8080, "server port")
	flag.BoolVar(&multicore, "multicore", true, "multicore")
	flag.Parse()

	res = "Hello World!\r\n"

	http := new(httpServer)
	hc := new(httpCodec)

	// Start serving!
	log.Fatal(gnet.Serve(http, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithCodec(hc)))
}

// appendHandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendHandle(b []byte, res string) []byte {
	return appendResp(b, "200 OK", "", res)
}

// appendResp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendResp(b []byte, status, head, body string) []byte {
	b = append(b, "HTTP/1.1"...)
	b = append(b, ' ')
	b = append(b, status...)
	b = append(b, '\r', '\n')
	b = append(b, "Server: gnet\r\n"...)
	b = append(b, "Date: "...)
	b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, "Content-Length: "...)
		b = strconv.AppendInt(b, int64(len(body)), 10)
		b = append(b, '\r', '\n')
	}
	b = append(b, head...)
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, body...)
	}
	return b
}

func b2s(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}

// parseReq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parseReq(data []byte, req *request) (leftover []byte, err error) {
	sdata := b2s(data)
	var i, s int
	var head string
	var clen int
	var q = -1
	// method, path, proto line
	for ; i < len(sdata); i++ {
		if sdata[i] == ' ' {
			req.method = sdata[s:i]
			for i, s = i+1, i+1; i < len(sdata); i++ {
				if sdata[i] == '?' && q == -1 {
					q = i - s
				} else if sdata[i] == ' ' {
					if q != -1 {
						req.path = sdata[s:q]
						req.query = req.path[q+1 : i]
					} else {
						req.path = sdata[s:i]
					}
					for i, s = i+1, i+1; i < len(sdata); i++ {
						if sdata[i] == '\n' && sdata[i-1] == '\r' {
							req.proto = sdata[s:i]
							i, s = i+1, i+1
							break
						}
					}
					break
				}
			}
			break
		}
	}
	if req.proto == "" {
		return data, fmt.Errorf("malformed request")
	}
	head = sdata[:s]
	for ; i < len(sdata); i++ {
		if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
			line := sdata[s : i-1]
			s = i + 1
			if line == "" {
				req.head = sdata[len(head)+2 : i+1]
				i++
				if clen > 0 {
					if len(sdata[i:]) < clen {
						break
					}
					req.body = sdata[i : i+clen]
					i += clen
				}
				return data[i:], nil
			}
			if strings.HasPrefix(line, "Content-Length:") {
				n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
				if err == nil {
					clen = int(n)
				}
			}
		}
	}
	// not enough data
	return data, nil
}
Push Server
package main

import (
	"flag"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/panjf2000/gnet"
)

type pushServer struct {
	*gnet.EventServer
	tick             time.Duration
	connectedSockets sync.Map
}

func (ps *pushServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("Push server is listening on %s (multi-cores: %t, loops: %d), "+
		"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop, ps.tick.String())
	return
}
func (ps *pushServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	log.Printf("Socket with addr: %s has been opened...\n", c.RemoteAddr().String())
	ps.connectedSockets.Store(c.RemoteAddr().String(), c)
	return
}
func (ps *pushServer) OnClosed(c gnet.Conn, err error) (action gnet.Action) {
	log.Printf("Socket with addr: %s is closing...\n", c.RemoteAddr().String())
	ps.connectedSockets.Delete(c.RemoteAddr().String())
	return
}
func (ps *pushServer) Tick() (delay time.Duration, action gnet.Action) {
	log.Println("It's time to push data to clients!!!")
	ps.connectedSockets.Range(func(key, value interface{}) bool {
		addr := key.(string)
		c := value.(gnet.Conn)
		c.AsyncWrite([]byte(fmt.Sprintf("heart beating to %s\n", addr)))
		return true
	})
	delay = ps.tick
	return
}
func (ps *pushServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	out = frame
	return
}

func main() {
	var port int
	var multicore bool
	var interval time.Duration
	var ticker bool

	// Example command: go run push.go --port 9000 --tick 1s --multicore=true
	flag.IntVar(&port, "port", 9000, "server port")
	flag.BoolVar(&multicore, "multicore", true, "multicore")
	flag.DurationVar(&interval, "tick", 0, "pushing tick")
	flag.Parse()
	if interval > 0 {
		ticker = true
	}
	push := &pushServer{tick: interval}
	log.Fatal(gnet.Serve(push, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithTicker(ticker)))
}
Codec Client/Server

Client:

// Reference https://github.com/smallnest/goframe/blob/master/_examples/goclient/client.go

package main

import (
	"encoding/binary"
	"fmt"
	"net"

	"github.com/smallnest/goframe"
)

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:9000")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	encoderConfig := goframe.EncoderConfig{
		ByteOrder:                       binary.BigEndian,
		LengthFieldLength:               4,
		LengthAdjustment:                0,
		LengthIncludesLengthFieldLength: false,
	}

	decoderConfig := goframe.DecoderConfig{
		ByteOrder:           binary.BigEndian,
		LengthFieldOffset:   0,
		LengthFieldLength:   4,
		LengthAdjustment:    0,
		InitialBytesToStrip: 4,
	}

	fc := goframe.NewLengthFieldBasedFrameConn(encoderConfig, decoderConfig, conn)
	err = fc.WriteFrame([]byte("hello"))
	if err != nil {
		panic(err)
	}
	err = fc.WriteFrame([]byte("world"))
	if err != nil {
		panic(err)
	}

	buf, err := fc.ReadFrame()
	if err != nil {
		panic(err)
	}
	fmt.Println("received: ", string(buf))
	buf, err = fc.ReadFrame()
	if err != nil {
		panic(err)
	}
	fmt.Println("received: ", string(buf))
}

Server:

package main

import (
	"encoding/binary"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/panjf2000/gnet"
	"github.com/panjf2000/gnet/pool/goroutine"
)

type codecServer struct {
	*gnet.EventServer
	addr       string
	multicore  bool
	async      bool
	codec      gnet.ICodec
	workerPool *goroutine.Pool
}

func (cs *codecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	log.Printf("Test codec server is listening on %s (multi-cores: %t, loops: %d)\n",
		srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}

func (cs *codecServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	if cs.async {
		data := append([]byte{}, frame...)
		_ = cs.workerPool.Submit(func() {
			c.AsyncWrite(data)
		})
		return
	}
	out = frame
	return
}

func testCodecServe(addr string, multicore, async bool, codec gnet.ICodec) {
	var err error
	if codec == nil {
		encoderConfig := gnet.EncoderConfig{
			ByteOrder:                       binary.BigEndian,
			LengthFieldLength:               4,
			LengthAdjustment:                0,
			LengthIncludesLengthFieldLength: false,
		}
		decoderConfig := gnet.DecoderConfig{
			ByteOrder:           binary.BigEndian,
			LengthFieldOffset:   0,
			LengthFieldLength:   4,
			LengthAdjustment:    0,
			InitialBytesToStrip: 4,
		}
		codec = gnet.NewLengthFieldBasedFrameCodec(encoderConfig, decoderConfig)
	}
	cs := &codecServer{addr: addr, multicore: multicore, async: async, codec: codec, workerPool: goroutine.Default()}
	err = gnet.Serve(cs, addr, gnet.WithMulticore(multicore), gnet.WithTCPKeepAlive(time.Minute*5), gnet.WithCodec(codec))
	if err != nil {
		panic(err)
	}
}

func main() {
	var port int
	var multicore bool

	// Example command: go run server.go --port 9000 --multicore=true
	flag.IntVar(&port, "port", 9000, "server port")
	flag.BoolVar(&multicore, "multicore", true, "multicore")
	flag.Parse()
	addr := fmt.Sprintf("tcp://:%d", port)
	testCodecServe(addr, multicore, false, nil)
}

For more details, check out here: examples of gnet.

I/O Events

Current supported I/O events in gnet:

  • EventHandler.OnInitComplete fires when the server has been initialized and ready to accept new connections.
  • EventHandler.OnOpened fires once a connection has been opened.
  • EventHandler.OnClosed fires after a connection has been closed.
  • EventHandler.React fires when the server receives inbound data from a socket/connection. (usually it is where you write the code of business logic)
  • EventHandler.Tick fires right after the server starts and then fires every specified interval.
  • EventHandler.PreWrite fires just before any data has been written to client.

Ticker

The EventHandler.Tick event fires ticks at a specified interval. The first tick fires right after the gnet server starts up and if you intend to set up a ticker event, don't forget to pass an option: gnet.WithTicker(true) to gnet.Serve.

events.Tick = func() (delay time.Duration, action Action){
	log.Printf("tick")
	delay = time.Second
	return
}

UDP

gnet supports UDP protocol so the gnet.Serve method can bind to UDP addresses.

  • All incoming and outgoing packets will not be buffered but read and sent directly.
  • The EventHandler.OnOpened and EventHandler.OnClosed events are not available for UDP sockets, only the React event.
  • The UDP equivalents of AsyncWrite([]byte) in TCP is SendTo([]byte).

Unix Domain Socket

gnet also supports UDS(Unix Domain Socket), just pass the UDS addresses like "unix://xxx" to the gnet.Serve method and you could play with it.

It is nothing different from making use of TCP when doing stuff with UDS, so the gnet UDS servers are able to leverage all event functions which are available under TCP protocol.

Multi-threads

The gnet.WithMulticore(true) indicates whether the server will be effectively created with multi-cores, if so, then you must take care of synchronizing memory between all event callbacks, otherwise, it will run the server with a single thread. The number of threads in the server will be automatically assigned to the value of runtime.NumCPU().

Load Balancing

The current built-in load balancing algorithm in gnet is Round-Robin.

SO_REUSEPORT

gnet server is able to utilize the SO_REUSEPORT option which allows multiple sockets on the same host to bind to the same port and the OS kernel takes care of the load balancing for you, it wakes one socket per connect event coming to resolved the thundering herd.

By default, gnet is not going to be haunted by the thundering herd under its networking model:『multiple reactors』which gets only one main reactor to listen on "address:port" and accept new sockets. So this SO_REUSEPORT option is trivial in gnet but note that it will fall back to the old networking model of evio when you enable the SO_REUSEPORT option.

Just use functional options to set up SO_REUSEPORT and you can enjoy this feature:

gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true)))

Multiple built-in codecs for TCP stream

There are multiple built-in codecs in gnet which allow you to encode/decode frames into/from TCP stream.

So far gnet has four kinds of built-in codecs: LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec and LengthFieldBasedFrameCodec, which generally meets most scenarios, but still gnet allows users to customize their own codecs in their gnet servers by implementing the interface gnet.ICodec and replacing the default codec in gnet with customized codec via functional options.

Here is an example with codec, showing you how to leverage codec to encode/decode network frames into/from TCP stream.

📊 Performance

Benchmarks on TechEmpower

# Hardware
CPU: 28 HT Cores Intel(R) Xeon(R) Gold 5120 CPU @ 2.20GHz
Mem: 32GB RAM
OS : Ubuntu 18.04.3 4.15.0-88-generic #88-Ubuntu
Net: Switched 10-gigabit ethernet
Go : go1.14 linux/amd64

All language

This is the top 50 on the framework ranking of all programming languages (386 frameworks in total).

Golang

This is the full framework ranking of Golang.

To see the full ranking list, visit Full ranking list of Plaintext.

Contrasts to the similar networking libraries

On Linux (epoll)

Test Environment
# Machine information
        OS : Ubuntu 18.04/x86_64
       CPU : 8 Virtual CPUs
    Memory : 16.0 GiB

# Go version and configurations
Go Version : go1.12.9 linux/amd64
GOMAXPROCS=8
Echo Server

HTTP Server

On FreeBSD (kqueue)

Test Environment
# Machine information
        OS : macOS Mojave 10.14.6/x86_64
       CPU : 4 CPUs
    Memory : 8.0 GiB

# Go version and configurations
Go Version : go version go1.12.9 darwin/amd64
GOMAXPROCS=4
Echo Server

HTTP Server

📄 License

Source code in gnet is available under the MIT License.

👏 Contributors

Please read our Contributing Guidelines before opening a PR and thank you to all the developers who already made contributions to gnet!

🙏 Acknowledgments

📚 Relevant Articles

JetBrains OS licenses

gnet had been being developed with GoLand IDE under the free JetBrains Open Source license(s) granted by JetBrains s.r.o., hence I would like to express my thanks here.

💰 Backers

Support us with a monthly donation and help us continue our activities.

💎 Sponsors

Become a bronze sponsor with a monthly donation of $10 and get your logo on our README on Github.

☕️ Buy me a coffee

        

Documentation

Overview

gnet is an event-driven networking framework that is fast and small. It makes direct epoll and kqueue syscalls rather than using the standard Go net package, and works in a similar manner as netty and libuv.

The goal of this project is to create a server framework for Go that performs on par with Redis and Haproxy for packet handling.

gnet sells itself as a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go which works on transport layer with TCP/UDP/Unix-Socket protocols, so it allows developers to implement their own protocols of application layer upon gnet for building diversified network applications, for instance, you get an HTTP Server or Web Framework if you implement HTTP protocol upon gnet while you have a Redis Server done with the implementation of Redis protocol upon gnet and so on.

Echo server built upon gnet is shown below:

package main

import (
	"log"

	"github.com/panlibin/gnet"
)

type echoServer struct {
	*gnet.EventServer
}

func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	out = frame
	return
}

func main() {
	echo := new(echoServer)
	log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrProtocolNotSupported occurs when trying to use protocol that is not supported.
	ErrProtocolNotSupported = errors.New("not supported protocol on this platform")
	// ErrServerShutdown occurs when server is closing.
	ErrServerShutdown = errors.New("server is going to be shutdown")
	// ErrInvalidFixedLength occurs when the output data have invalid fixed length.
	ErrInvalidFixedLength = errors.New("invalid fixed length of bytes")
	// ErrUnexpectedEOF occurs when no enough data to read by codec.
	ErrUnexpectedEOF = errors.New("there is no enough data")
	// ErrDelimiterNotFound occurs when no such a delimiter is in input data.
	ErrDelimiterNotFound = errors.New("there is no such a delimiter")
	// ErrCRLFNotFound occurs when a CRLF is not found by codec.
	ErrCRLFNotFound = errors.New("there is no CRLF")
	// ErrUnsupportedLength occurs when unsupported lengthFieldLength is from input data.
	ErrUnsupportedLength = errors.New("unsupported lengthFieldLength. (expected: 1, 2, 3, 4, or 8)")
	// ErrTooLessLength occurs when adjusted frame length is less than zero.
	ErrTooLessLength = errors.New("adjusted frame length is less than zero")
)
View Source
var CRLFByte = byte('\n')

CRLFByte represents a byte of CRLF.

Functions

func Serve

func Serve(eventHandler EventHandler, addr string, opts ...Option) error

Types

type Action

type Action int

Action is an action that occurs after the completion of an event.

const (
	// None indicates that no action should occur following an event.
	None Action = iota

	// Close closes the connection.
	Close

	// Shutdown shutdowns the server.
	Shutdown
)

type BuiltInFrameCodec

type BuiltInFrameCodec struct {
}

BuiltInFrameCodec is the built-in codec which will be assigned to gnet server when customized codec is not set up.

func (*BuiltInFrameCodec) Decode

func (cc *BuiltInFrameCodec) Decode(c Conn) ([]byte, error)

Decode ...

func (*BuiltInFrameCodec) Encode

func (cc *BuiltInFrameCodec) Encode(c Conn, buf []byte) ([]byte, error)

Encode ...

type Conn

type Conn interface {
	// Context returns a user-defined context.
	Context() (ctx interface{})

	// SetContext sets a user-defined context.
	SetContext(ctx interface{})

	// LocalAddr is the connection's local socket address.
	LocalAddr() (addr net.Addr)

	// RemoteAddr is the connection's remote peer address.
	RemoteAddr() (addr net.Addr)

	// Read reads all data from inbound ring-buffer and event-loop-buffer without moving "read" pointer, which means
	// it does not evict the data from ring-buffer actually and those data will present in ring-buffer until the
	// ResetBuffer method is invoked.
	Read() (buf []byte)

	// ResetBuffer resets the inbound ring-buffer, which means all data in the inbound ring-buffer has been evicted.
	ResetBuffer()

	// ReadN reads bytes with the given length from inbound ring-buffer and event-loop-buffer without moving
	// "read" pointer, which means it will not evict the data from buffer until the ShiftN method is invoked,
	// it reads data from the inbound ring-buffer and event-loop-buffer when the length of the available data is equal
	// to the given "n", otherwise, it will not read any data from the inbound ring-buffer.
	//
	// So you should use this method only if you know exactly the length of subsequent TCP stream based on the protocol,
	// like the Content-Length attribute in an HTTP request which indicates you how much data you should read
	// from inbound ring-buffer.
	ReadN(n int) (size int, buf []byte)

	// ShiftN shifts "read" pointer in buffer with the given length.
	ShiftN(n int) (size int)

	// BufferLength returns the length of available data in the inbound ring-buffer.
	BufferLength() (size int)

	// SendTo writes data for UDP sockets, it allows you to send data back to UDP socket in individual goroutines.
	SendTo(buf []byte) error

	// AsyncWrite writes data to client/connection asynchronously, usually you would invoke it in individual goroutines
	// instead of the event-loop goroutines.
	AsyncWrite(buf []byte) error

	// Wake triggers a React event for this connection.
	Wake() error

	// Close closes the current connection.
	Close() error
}

Conn is a interface of gnet connection.

type DecoderConfig

type DecoderConfig struct {
	// ByteOrder is the ByteOrder of the length field.
	ByteOrder binary.ByteOrder
	// LengthFieldOffset is the offset of the length field
	LengthFieldOffset int
	// LengthFieldLength is the length of the length field
	LengthFieldLength int
	// LengthAdjustment is the compensation value to add to the value of the length field
	LengthAdjustment int
	// InitialBytesToStrip is the number of first bytes to strip out from the decoded frame
	InitialBytesToStrip int
}

DecoderConfig config for decoder.

type DelimiterBasedFrameCodec

type DelimiterBasedFrameCodec struct {
	// contains filtered or unexported fields
}

DelimiterBasedFrameCodec encodes/decodes specific-delimiter-separated frames into/from TCP stream.

func NewDelimiterBasedFrameCodec

func NewDelimiterBasedFrameCodec(delimiter byte) *DelimiterBasedFrameCodec

NewDelimiterBasedFrameCodec instantiates and returns a codec with a specific delimiter.

func (*DelimiterBasedFrameCodec) Decode

func (cc *DelimiterBasedFrameCodec) Decode(c Conn) ([]byte, error)

Decode ...

func (*DelimiterBasedFrameCodec) Encode

func (cc *DelimiterBasedFrameCodec) Encode(c Conn, buf []byte) ([]byte, error)

Encode ...

type EncoderConfig

type EncoderConfig struct {
	// ByteOrder is the ByteOrder of the length field.
	ByteOrder binary.ByteOrder
	// LengthFieldLength is the length of the length field.
	LengthFieldLength int
	// LengthAdjustment is the compensation value to add to the value of the length field
	LengthAdjustment int
	// LengthIncludesLengthFieldLength is true, the length of the prepended length field is added to the value of the prepended length field
	LengthIncludesLengthFieldLength bool
}

EncoderConfig config for encoder.

type EventHandler

type EventHandler interface {
	// OnInitComplete fires when the server is ready for accepting connections.
	// The server parameter has information and various utilities.
	OnInitComplete(server Server) (action Action)

	// OnOpened fires when a new connection has been opened.
	// The info parameter has information about the connection such as
	// it's local and remote address.
	// Use the out return value to write data to the connection.
	OnOpened(c Conn) (out []byte, action Action)

	// OnClosed fires when a connection has been closed.
	// The err parameter is the last known connection error.
	OnClosed(c Conn, err error) (action Action)

	// PreWrite fires just before any data is written to any client socket.
	PreWrite()

	// React fires when a connection sends the server data.
	// Invoke c.Read() or c.ReadN(n) within the parameter c to read incoming data from client/connection.
	// Use the out return value to write data to the client/connection.
	React(frame []byte, c Conn) (out []byte, action Action)

	// Tick fires immediately after the server starts and will fire again
	// following the duration specified by the delay return value.
	Tick() (delay time.Duration, action Action)
}

EventHandler represents the server events' callbacks for the Serve call. Each event has an Action return value that is used manage the state of the connection and server.

type EventServer

type EventServer struct {
}

EventServer is a built-in implementation of EventHandler which sets up each method with a default implementation, you can compose it with your own implementation of EventHandler when you don't want to implement all methods in EventHandler.

func (*EventServer) OnClosed

func (es *EventServer) OnClosed(c Conn, err error) (action Action)

OnClosed fires when a connection has been closed. The err parameter is the last known connection error.

func (*EventServer) OnInitComplete

func (es *EventServer) OnInitComplete(svr Server) (action Action)

OnInitComplete fires when the server is ready for accepting connections. The server parameter has information and various utilities.

func (*EventServer) OnOpened

func (es *EventServer) OnOpened(c Conn) (out []byte, action Action)

OnOpened fires when a new connection has been opened. The info parameter has information about the connection such as it's local and remote address. Use the out return value to write data to the connection.

func (*EventServer) PreWrite

func (es *EventServer) PreWrite()

PreWrite fires just before any data is written to any client socket.

func (*EventServer) React

func (es *EventServer) React(frame []byte, c Conn) (out []byte, action Action)

React fires when a connection sends the server data. Invoke c.Read() or c.ReadN(n) within the parameter c to read incoming data from client/connection. Use the out return value to write data to the client/connection.

func (*EventServer) Tick

func (es *EventServer) Tick() (delay time.Duration, action Action)

Tick fires immediately after the server starts and will fire again following the duration specified by the delay return value.

type FixedLengthFrameCodec

type FixedLengthFrameCodec struct {
	// contains filtered or unexported fields
}

FixedLengthFrameCodec encodes/decodes fixed-length-separated frames into/from TCP stream.

func NewFixedLengthFrameCodec

func NewFixedLengthFrameCodec(frameLength int) *FixedLengthFrameCodec

NewFixedLengthFrameCodec instantiates and returns a codec with fixed length.

func (*FixedLengthFrameCodec) Decode

func (cc *FixedLengthFrameCodec) Decode(c Conn) ([]byte, error)

Decode ...

func (*FixedLengthFrameCodec) Encode

func (cc *FixedLengthFrameCodec) Encode(c Conn, buf []byte) ([]byte, error)

Encode ...

type GServer

type GServer struct {
	// contains filtered or unexported fields
}

func (*GServer) Serve

func (s *GServer) Serve(eventHandler EventHandler, addr string, opts ...Option) error

Serve starts handling events for the specified addresses.

Addresses should use a scheme prefix and be formatted like `tcp://192.168.0.10:9851` or `unix://socket`. Valid network schemes:

tcp   - bind to both IPv4 and IPv6
tcp4  - IPv4
tcp6  - IPv6
udp   - bind to both IPv4 and IPv6
udp4  - IPv4
udp6  - IPv6
unix  - Unix Domain Socket

The "tcp" network scheme is assumed when one is not specified.

func (*GServer) SignalShutdown

func (s *GServer) SignalShutdown()

func (*GServer) WaitShutdown

func (s *GServer) WaitShutdown()

type ICodec

type ICodec interface {
	// Encode encodes frames upon server responses into TCP stream.
	Encode(c Conn, buf []byte) ([]byte, error)
	// Decode decodes frames from TCP stream via specific implementation.
	Decode(c Conn) ([]byte, error)
}

ICodec is the interface of gnet codec.

type IEventLoopGroup

type IEventLoopGroup interface {
	// contains filtered or unexported methods
}

IEventLoopGroup represents a set of event-loops.

type LengthFieldBasedFrameCodec

type LengthFieldBasedFrameCodec struct {
	// contains filtered or unexported fields
}

LengthFieldBasedFrameCodec is the refactoring from https://github.com/smallnest/goframe/blob/master/length_field_based_frameconn.go, licensed by Apache License 2.0. It encodes/decodes frames into/from TCP stream with value of the length field in the message. Original implementation: https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java

func NewLengthFieldBasedFrameCodec

func NewLengthFieldBasedFrameCodec(encoderConfig EncoderConfig, decoderConfig DecoderConfig) *LengthFieldBasedFrameCodec

NewLengthFieldBasedFrameCodec instantiates and returns a codec based on the length field. It is the go implementation of netty LengthFieldBasedFrameecoder and LengthFieldPrepender. you can see javadoc of them to learn more details.

func (*LengthFieldBasedFrameCodec) Decode

func (cc *LengthFieldBasedFrameCodec) Decode(c Conn) ([]byte, error)

Decode ...

func (*LengthFieldBasedFrameCodec) Encode

func (cc *LengthFieldBasedFrameCodec) Encode(c Conn, buf []byte) (out []byte, err error)

Encode ...

type LineBasedFrameCodec

type LineBasedFrameCodec struct {
}

LineBasedFrameCodec encodes/decodes line-separated frames into/from TCP stream.

func (*LineBasedFrameCodec) Decode

func (cc *LineBasedFrameCodec) Decode(c Conn) ([]byte, error)

Decode ...

func (*LineBasedFrameCodec) Encode

func (cc *LineBasedFrameCodec) Encode(c Conn, buf []byte) ([]byte, error)

Encode ...

type Logger

type Logger interface {
	// Printf must have the same semantics as log.Printf.
	Printf(format string, args ...interface{})
}

Logger is used for logging formatted messages.

type Option

type Option func(opts *Options)

Option is a function that will set up option.

func WithCodec

func WithCodec(codec ICodec) Option

WithCodec sets up a codec to handle TCP stream.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets up a customized logger.

func WithMulticore

func WithMulticore(multicore bool) Option

WithMulticore sets up multi-cores in gnet server.

func WithNumEventLoop

func WithNumEventLoop(numEventLoop int) Option

WithNumEventLoop sets up NumEventLoop in gnet server.

func WithOptions

func WithOptions(options Options) Option

WithOptions sets up all options.

func WithReusePort

func WithReusePort(reusePort bool) Option

WithReusePort sets up SO_REUSEPORT socket option.

func WithTCPKeepAlive

func WithTCPKeepAlive(tcpKeepAlive time.Duration) Option

WithTCPKeepAlive sets up SO_KEEPALIVE socket option.

func WithTicker

func WithTicker(ticker bool) Option

WithTicker indicates that a ticker is set.

type Options

type Options struct {
	// Multicore indicates whether the server will be effectively created with multi-cores, if so,
	// then you must take care with synchronizing memory between all event callbacks, otherwise,
	// it will run the server with single thread. The number of threads in the server will be automatically
	// assigned to the value of runtime.NumCPU().
	Multicore bool

	// NumEventLoop is set up to start the given number of event-loop goroutine.
	// Note: Setting up NumEventLoop will override Multicore.
	NumEventLoop int

	// ReusePort indicates whether to set up the SO_REUSEPORT socket option.
	ReusePort bool

	// Ticker indicates whether the ticker has been set up.
	Ticker bool

	// TCPKeepAlive (SO_KEEPALIVE) socket option.
	TCPKeepAlive time.Duration

	// ICodec encodes and decodes TCP stream.
	Codec ICodec

	// Logger is the customized logger for logging info, if it is not set, default standard logger from log package is used.
	Logger Logger
}

Options are set when the client opens.

type Server

type Server struct {
	// Multicore indicates whether the server will be effectively created with multi-cores, if so,
	// then you must take care of synchronizing the shared data between all event callbacks, otherwise,
	// it will run the server with single thread. The number of threads in the server will be automatically
	// assigned to the value of runtime.NumCPU().
	Multicore bool

	// The Addr parameter is the listening address that align
	// with the addr string passed to the Serve function.
	Addr net.Addr

	// NumEventLoop is the number of event-loops that the server is using.
	NumEventLoop int

	// ReusePort indicates whether SO_REUSEPORT is enable.
	ReusePort bool

	// TCPKeepAlive (SO_KEEPALIVE) socket option.
	TCPKeepAlive time.Duration
}

Server represents a server context which provides information about the running server and has control functions for managing state.

Directories

Path Synopsis
examples
pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL