netpoll

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2024 License: Apache-2.0 Imports: 20 Imported by: 59

README

CloudWeGo-Netpoll

中文

Release WebSite License Go Report Card OpenIssue ClosedIssue Stars Forks

Introduction

Netpoll is a high-performance non-blocking I/O networking framework, which focused on RPC scenarios, developed by ByteDance.

RPC is usually heavy on processing logic and therefore cannot handle I/O serially. But Go's standard library net is designed for blocking I/O APIs, so that the RPC framework can only follow the One Conn One Goroutine design. It will waste a lot of cost for context switching, due to a large number of goroutines under high concurrency. Besides, net.Conn has no API to check Alive, so it is difficult to make an efficient connection pool for RPC framework, because there may be a large number of failed connections in the pool.

On the other hand, the open source community currently lacks Go network libraries that focus on RPC scenarios. Similar repositories such as: evio, gnet, etc., are all focus on scenarios like Redis, HAProxy.

But now, Netpoll was born and solved the above problems. It draws inspiration from the design of evio and netty, has excellent Performance, and is more suitable for microservice architecture. Also Netpoll provides a number of Features, and it is recommended to replace net in some RPC scenarios.

We developed the RPC framework Kitex and HTTP framework Hertz based on Netpoll, both with industry-leading performance.

Examples show how to build RPC client and server using Netpoll.

For more information, please refer to Document.

Features

  • Already

    • LinkBuffer provides nocopy API for streaming reading and writing
    • gopool provides high-performance goroutine pool
    • mcache provides efficient memory reuse
    • IsActive supports checking whether the connection is alive
    • Dialer supports building clients
    • EventLoop supports building a server
    • TCP, Unix Domain Socket
    • Linux, macOS (operating system)
  • Future

  • Unsupported

    • Windows (operating system)

Performance

Benchmark should meet the requirements of industrial use. In the RPC scenario, concurrency and timeout are necessary support items.

We provide the netpoll-benchmark project to track and compare the performance of Netpoll and other frameworks under different conditions for reference.

More benchmarks reference kitex-benchmark and hertz-benchmark.

Reference

Documentation

Index

Constants

View Source
const (
	// The connection closed when in use.
	ErrConnClosed = syscall.Errno(0x101)
	// Read I/O buffer timeout, calling by Connection.Reader
	ErrReadTimeout = syscall.Errno(0x102)
	// Dial timeout
	ErrDialTimeout = syscall.Errno(0x103)
	// Calling dialer without timeout.
	ErrDialNoDeadline = syscall.Errno(0x104) // TODO: no-deadline support in future
	// The calling function not support.
	ErrUnsupported = syscall.Errno(0x105)
	// Same as io.EOF
	ErrEOF = syscall.Errno(0x106)
	// Write I/O buffer timeout, calling by Connection.Writer
	ErrWriteTimeout = syscall.Errno(0x107)
)

extends syscall.Errno, the range is set to 0x100-0x1FF

View Source
const (
	SO_ZEROCOPY       = 60
	SO_ZEROBLOCKTIMEO = 69
	MSG_ZEROCOPY      = 0x4000000
)
View Source
const BinaryInplaceThreshold = block4k

BinaryInplaceThreshold marks the minimum value of the nocopy slice length, which is the threshold to use copy to minimize overhead.

View Source
const EPOLLET = -syscall.EPOLLET
View Source
const ErrnoMask = 0xFF

Variables

View Source
var LinkBufferCap = block4k

LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer.

Functions

func DisableGopool added in v0.0.4

func DisableGopool() error

DisableGopool will remove gopool(the goroutine pool used to run OnRequest), which means that OnRequest will be run via `go OnRequest(...)`. Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. But if you can confirm that the OnRequest will not cause stack expansion, it is recommended to use DisableGopool to reduce redundancy and improve performance.

func EpollCreate added in v0.3.2

func EpollCreate(flag int) (fd int, err error)

EpollCreate implements epoll_create1.

func EpollCtl

func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error)

EpollCtl implements epoll_ctl.

func EpollWait

func EpollWait(epfd int, events []epollevent, msec int) (n int, err error)

EpollWait implements epoll_wait.

func Exception

func Exception(err error, suffix string) error

wrap Errno, implement xerrors.Wrapper

func GetSysFdPairs

func GetSysFdPairs() (r, w int)

GetSysFdPairs creates and returns the fds of a pair of sockets.

func Initialize added in v0.6.0

func Initialize()

Initialize the pollers actively. By default, it's lazy initialized. It's safe to call it multi times.

func NewIOReadWriter

func NewIOReadWriter(rw ReadWriter) io.ReadWriter

NewIOReadWriter convert ReadWriter to io.ReadWriter

func NewIOReader

func NewIOReader(r Reader) io.Reader

NewIOReader convert Reader to io.Reader

func NewIOWriter

func NewIOWriter(w Writer) io.Writer

NewIOWriter convert Writer to io.Writer

func SetKeepAlive

func SetKeepAlive(fd, secs int) error

just support ipv4

func SetLoadBalance

func SetLoadBalance(lb LoadBalance) error

SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt to distribute the incoming connections between multiple polls. This option only works when numLoops is set.

func SetLoggerOutput added in v0.3.2

func SetLoggerOutput(w io.Writer)

func SetNumLoops

func SetNumLoops(numLoops int) error

SetNumLoops is used to set the number of pollers, generally do not need to actively set. By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. If the number of cores in your service process is less than 20c, theoretically only one poller is needed. Otherwise you may need to adjust the number of pollers to achieve the best results. Experience recommends assigning a poller every 20c.

You can only use SetNumLoops before any connection is created. An example usage:

func init() {
    netpoll.SetNumLoops(...)
}

func SetRunner added in v0.6.0

func SetRunner(f func(ctx context.Context, f func()))

SetRunner set the runner function for every OnRequest/OnConnect callback

Types

type CloseCallback

type CloseCallback func(connection Connection) error

CloseCallback will be called after the connection is closed. Return: error is unused which will be ignored directly.

type Conn

type Conn interface {
	net.Conn

	// Fd return conn's fd, used by poll
	Fd() (fd int)
}

Conn extends net.Conn, but supports getting the conn's fd.

type Connection

type Connection interface {
	// Connection extends net.Conn, just for interface compatibility.
	// It's not recommended to use net.Conn API except for io.Closer.
	net.Conn

	// The recommended API for nocopy reading and writing.
	// Reader will return nocopy buffer data, or error after timeout which set by SetReadTimeout.
	Reader() Reader
	// Writer will write data to the connection by NIO mode,
	// so it will return an error only when the connection isn't Active.
	Writer() Writer

	// IsActive checks whether the connection is active or not.
	IsActive() bool

	// SetReadTimeout sets the timeout for future Read calls wait.
	// A zero value for timeout means Reader will not timeout.
	SetReadTimeout(timeout time.Duration) error

	// SetWriteTimeout sets the timeout for future Write calls wait.
	// A zero value for timeout means Writer will not timeout.
	SetWriteTimeout(timeout time.Duration) error

	// SetIdleTimeout sets the idle timeout of connections.
	// Idle connections that exceed the set timeout are no longer guaranteed to be active,
	// but can be checked by calling IsActive.
	SetIdleTimeout(timeout time.Duration) error

	// SetOnRequest can set or replace the OnRequest method for a connection, but can't be set to nil.
	// Although SetOnRequest avoids data race, it should still be used before transmitting data.
	// Replacing OnRequest while processing data may cause unexpected behavior and results.
	// Generally, the server side should uniformly set the OnRequest method for each connection via NewEventLoop,
	// which is set when the connection is initialized.
	// On the client side, if necessary, make sure that OnRequest is set before sending data.
	SetOnRequest(on OnRequest) error

	// AddCloseCallback can add hangup callback for a connection, which will be called when connection closing.
	// This is very useful for cleaning up idle connections. For instance, you can use callbacks to clean up
	// the local resources, which bound to the idle connection, when hangup by the peer. No need another goroutine
	// to polling check connection status.
	AddCloseCallback(callback CloseCallback) error
}

Connection supports reading and writing simultaneously, but does not support simultaneous reading or writing by multiple goroutines. It maintains its own input/output buffer, and provides nocopy API for reading and writing.

func DialConnection

func DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)

DialConnection is a default implementation of Dialer.

type Dialer

type Dialer interface {
	DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)

	DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)
}

Dialer extends net.Dialer's API, just for interface compatibility. DialConnection is recommended, but of course all functions are practically the same. The returned net.Conn can be directly asserted as Connection if error is nil.

func NewDialer

func NewDialer() Dialer

NewDialer only support TCP and unix socket now.

type EventLoop

type EventLoop interface {
	// Serve registers a listener and runs blockingly to provide services, including listening to ports,
	// accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
	// Serve will return an error which describes the specific reason.
	Serve(ln net.Listener) error

	// Shutdown is used to graceful exit.
	// It will close all idle connections on the server, but will not change the underlying pollers.
	//
	// Argument: ctx set the waiting deadline, after which an error will be returned,
	// but will not force the closing of connections in progress.
	Shutdown(ctx context.Context) error
}

A EventLoop is a network server.

func NewEventLoop

func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error)

NewEventLoop .

type FDOperator

type FDOperator struct {
	// FD is file descriptor, poll will bind when register.
	FD int

	// The FDOperator provides three operations of reading, writing, and hanging.
	// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
	OnRead  func(p Poll) error
	OnWrite func(p Poll) error
	OnHup   func(p Poll) error

	// The following is the required fn, which must exist when used, or directly panic.
	// Fns are only called by the poll when handles connection events.
	Inputs   func(vs [][]byte) (rs [][]byte)
	InputAck func(n int) (err error)

	// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
	Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
	OutputAck func(n int) (err error)
	// contains filtered or unexported fields
}

FDOperator is a collection of operations on file descriptors.

func (*FDOperator) Control

func (op *FDOperator) Control(event PollEvent) error

func (*FDOperator) Free added in v0.3.2

func (op *FDOperator) Free()

type LinkBuffer

type LinkBuffer struct {
	// contains filtered or unexported fields
}

LinkBuffer implements ReadWriter.

func NewLinkBuffer

func NewLinkBuffer(size ...int) *LinkBuffer

NewLinkBuffer size defines the initial capacity, but there is no readable data.

func (*LinkBuffer) Append

func (b *LinkBuffer) Append(w Writer) (err error)

Append implements Writer.

func (*LinkBuffer) Bytes

func (b *LinkBuffer) Bytes() []byte

Bytes returns all the readable bytes of this LinkBuffer.

func (*LinkBuffer) Close

func (b *LinkBuffer) Close() (err error)

Close will recycle all buffer.

func (*LinkBuffer) Flush

func (b *LinkBuffer) Flush() (err error)

Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.

func (*LinkBuffer) GetBytes

func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte)

GetBytes will read and fill the slice p as much as possible.

func (*LinkBuffer) IsEmpty

func (b *LinkBuffer) IsEmpty() (ok bool)

IsEmpty check if this LinkBuffer is empty.

func (*LinkBuffer) Len

func (b *LinkBuffer) Len() int

Len implements Reader.

func (*LinkBuffer) Malloc

func (b *LinkBuffer) Malloc(n int) (buf []byte, err error)

Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush).

func (*LinkBuffer) MallocAck

func (b *LinkBuffer) MallocAck(n int) (err error)

MallocAck will keep the first n malloc bytes and discard the rest.

func (*LinkBuffer) MallocLen

func (b *LinkBuffer) MallocLen() (length int)

MallocLen implements Writer.

func (*LinkBuffer) Next

func (b *LinkBuffer) Next(n int) (p []byte, err error)

Next implements Reader.

func (*LinkBuffer) Peek

func (b *LinkBuffer) Peek(n int) (p []byte, err error)

Peek does not have an independent lifecycle, and there is no signal to indicate that Peek content can be released, so Peek will not introduce mcache for now.

func (*LinkBuffer) ReadBinary

func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error)

ReadBinary implements Reader.

func (*LinkBuffer) ReadByte

func (b *LinkBuffer) ReadByte() (p byte, err error)

ReadByte implements Reader.

func (*LinkBuffer) ReadString

func (b *LinkBuffer) ReadString(n int) (s string, err error)

ReadString implements Reader.

func (*LinkBuffer) Release

func (b *LinkBuffer) Release() (err error)

Release the node that has been read. b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice

func (*LinkBuffer) Skip

func (b *LinkBuffer) Skip(n int) (err error)

Skip implements Reader.

func (*LinkBuffer) Slice

func (b *LinkBuffer) Slice(n int) (r Reader, err error)

Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer, and only holds the ability of Reader.

Slice will automatically execute a Release.

func (*LinkBuffer) Until added in v0.2.0

func (b *LinkBuffer) Until(delim byte) (line []byte, err error)

Until returns a slice ends with the delim in the buffer.

func (*LinkBuffer) WriteBinary

func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error)

WriteBinary implements Writer.

func (*LinkBuffer) WriteBuffer

func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error)

WriteBuffer will not submit(e.g. Flush) data to ensure normal use of MallocLen. you must actively submit before read the data. The argument buf can't be used after calling WriteBuffer. (set it to nil)

func (*LinkBuffer) WriteByte

func (b *LinkBuffer) WriteByte(p byte) (err error)

WriteByte implements Writer.

func (*LinkBuffer) WriteDirect

func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error

WriteDirect cannot be mixed with WriteString or WriteBinary functions.

func (*LinkBuffer) WriteString

func (b *LinkBuffer) WriteString(s string) (n int, err error)

WriteString implements Writer.

type Listener

type Listener interface {
	net.Listener

	// Fd return listener's fd, used by poll.
	Fd() (fd int)
}

Listener extends net.Listener, but supports getting the listener's fd.

func ConvertListener added in v0.0.4

func ConvertListener(l net.Listener) (nl Listener, err error)

ConvertListener converts net.Listener to Listener

func CreateListener

func CreateListener(network, addr string) (l Listener, err error)

CreateListener return a new Listener.

type LoadBalance

type LoadBalance int

LoadBalance sets the load balancing method.

const (
	// Random requests that connections are randomly distributed.
	Random LoadBalance = iota
	// RoundRobin requests that connections are distributed to a Poll
	// in a round-robin fashion.
	RoundRobin
)

type OnConnect added in v0.2.0

type OnConnect func(ctx context.Context, connection Connection) context.Context

OnConnect is called once connection created. It supports read/write/close connection, and could return a ctx which will be passed to OnRequest. OnConnect will not block the poller since it's executed asynchronously. Only after OnConnect finished the OnRequest could be executed.

An example usage in TCP Proxy scenario:

 func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
	  downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
	  return context.WithValue(ctx, downstreamKey, downstream)
 }
 func onRequest(ctx context.Context, upstream netpoll.Connection) error {
   downstream := ctx.Value(downstreamKey).(netpoll.Connection)
 }

type OnDisconnect added in v0.6.0

type OnDisconnect func(ctx context.Context, connection Connection)

OnDisconnect is called once connection is going to be closed. OnDisconnect must return as quick as possible because it will block poller. OnDisconnect is different from CloseCallback, you could check with "The Connection Callback Sequence Diagram" section.

type OnPrepare

type OnPrepare func(connection Connection) context.Context

OnPrepare is used to inject custom preparation at connection initialization, which is optional but important in some scenarios. For example, a qps limiter can be set by closing overloaded connections directly in OnPrepare.

Return: context will become the argument of OnRequest. Usually, custom resources can be initialized in OnPrepare and used in OnRequest.

PLEASE NOTE: OnPrepare is executed without any data in the connection, so Reader() or Writer() cannot be used here, but may be supported in the future.

type OnRequest

type OnRequest func(ctx context.Context, connection Connection) error

OnRequest defines the function for handling connection. When data is sent from the connection peer, netpoll actively reads the data in LT mode and places it in the connection's input buffer. Generally, OnRequest starts handling the data in the following way:

	func OnRequest(ctx context, connection Connection) error {
		input := connection.Reader().Next(n)
		handling input data...
 	send, _ := connection.Writer().Malloc(l)
		copy(send, output)
		connection.Flush()
		return nil
	}

OnRequest will run in a separate goroutine and it is guaranteed that there is one and only one OnRequest running at the same time. The underlying logic is similar to:

go func() {
	for !connection.Reader().IsEmpty() {
		OnRequest(ctx, connection)
	}
}()

PLEASE NOTE: OnRequest must either eventually read all the input data or actively Close the connection, otherwise the goroutine will fall into a dead loop.

Return: error is unused which will be ignored directly.

type Option

type Option struct {
	// contains filtered or unexported fields
}

Option .

func WithIdleTimeout

func WithIdleTimeout(timeout time.Duration) Option

WithIdleTimeout sets the idle timeout of connections.

func WithOnConnect added in v0.2.0

func WithOnConnect(onConnect OnConnect) Option

WithOnConnect registers the OnConnect method to EventLoop.

func WithOnDisconnect added in v0.6.0

func WithOnDisconnect(onDisconnect OnDisconnect) Option

WithOnDisconnect registers the OnDisconnect method to EventLoop.

func WithOnPrepare

func WithOnPrepare(onPrepare OnPrepare) Option

WithOnPrepare registers the OnPrepare method to EventLoop.

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) Option

WithReadTimeout sets the read timeout of connections.

func WithWriteTimeout added in v0.3.0

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets the write timeout of connections.

type Poll

type Poll interface {
	// Wait will poll all registered fds, and schedule processing based on the triggered event.
	// The call will block, so the usage can be like:
	//
	//  go wait()
	//
	Wait() error

	// Close the poll and shutdown Wait().
	Close() error

	// Trigger can be used to actively refresh the loop where Wait is located when no event is triggered.
	// On linux systems, eventfd is used by default, and kevent by default on bsd systems.
	Trigger() error

	// Control the event of file descriptor and the operations is defined by PollEvent.
	Control(operator *FDOperator, event PollEvent) error

	// Alloc the operator from cache.
	Alloc() (operator *FDOperator)

	// Free the operator from cache.
	Free(operator *FDOperator)
}

Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions, and shields underlying differences. On linux systems, poll uses epoll by default, and kevent by default on bsd systems.

type PollEvent

type PollEvent int

PollEvent defines the operation of poll.Control.

const (
	// PollReadable is used to monitor whether the FDOperator registered by
	// listener and connection is readable or closed.
	PollReadable PollEvent = 0x1

	// PollWritable is used to monitor whether the FDOperator created by the dialer is writable or closed.
	// ET mode must be used (still need to poll hup after being writable)
	PollWritable PollEvent = 0x2

	// PollDetach is used to remove the FDOperator from poll.
	PollDetach PollEvent = 0x3

	// PollR2RW is used to monitor writable for FDOperator,
	// which is only called when the socket write buffer is full.
	PollR2RW PollEvent = 0x5

	// PollRW2R is used to remove the writable monitor of FDOperator, generally used with PollR2RW.
	PollRW2R PollEvent = 0x6
)

type ReadWriter

type ReadWriter interface {
	Reader
	Writer
}

ReadWriter is a combination of Reader and Writer.

func NewReadWriter

func NewReadWriter(rw io.ReadWriter) ReadWriter

NewReadWriter convert io.ReadWriter to nocopy ReadWriter

type Reader

type Reader interface {
	// Next returns a slice containing the next n bytes from the buffer,
	// advancing the buffer as if the bytes had been returned by Read.
	//
	// If there are fewer than n bytes in the buffer, Next returns will be blocked
	// until data enough or an error occurs (such as a wait timeout).
	//
	// The slice p is only valid until the next call to the Release method.
	// Next is not globally optimal, and Skip, ReadString, ReadBinary methods
	// are recommended for specific scenarios.
	//
	// Return: len(p) must be n or 0, and p and error cannot be nil at the same time.
	Next(n int) (p []byte, err error)

	// Peek returns the next n bytes without advancing the reader.
	// Other behavior is the same as Next.
	Peek(n int) (buf []byte, err error)

	// Skip the next n bytes and advance the reader, which is
	// a faster implementation of Next when the next data is not used.
	Skip(n int) (err error)

	// Until reads until the first occurrence of delim in the input,
	// returning a slice stops with delim in the input buffer.
	// If Until encounters an error before finding a delimiter,
	// it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
	// Until returns err != nil only if line does not end in delim.
	Until(delim byte) (line []byte, err error)

	// ReadString is a faster implementation of Next when a string needs to be returned.
	// It replaces:
	//
	//  var p, err = Next(n)
	//  return string(p), err
	//
	ReadString(n int) (s string, err error)

	// ReadBinary is a faster implementation of Next when it needs to
	// return a copy of the slice that is not shared with the underlying layer.
	// It replaces:
	//
	//  var p, err = Next(n)
	//  var b = make([]byte, n)
	//  copy(b, p)
	//  return b, err
	//
	ReadBinary(n int) (p []byte, err error)

	// ReadByte is a faster implementation of Next when a byte needs to be returned.
	// It replaces:
	//
	//  var p, err = Next(1)
	//  return p[0], err
	//
	ReadByte() (b byte, err error)

	// Slice returns a new Reader containing the Next n bytes from this Reader.
	//
	// If you want to make a new Reader using the []byte returned by Next, Slice already does that,
	// and the operation is zero-copy. Besides, Slice would also Release this Reader.
	// The logic pseudocode is similar:
	//
	//  var p, err = this.Next(n)
	//  var reader = new Reader(p) // pseudocode
	//  this.Release()
	//  return reader, err
	//
	Slice(n int) (r Reader, err error)

	// Release the memory space occupied by all read slices. This method needs to be executed actively to
	// recycle the memory after confirming that the previously read data is no longer in use.
	// After invoking Release, the slices obtained by the method such as Next, Peek, Skip will
	// become an invalid address and cannot be used anymore.
	Release() (err error)

	// Len returns the total length of the readable data in the reader.
	Len() (length int)
}

Reader is a collection of operations for nocopy reads.

For ease of use, it is recommended to implement Reader as a blocking interface, rather than simply fetching the buffer. For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout. The return value is guaranteed to meet the requirements or an error will be returned.

func NewReader

func NewReader(r io.Reader) Reader

NewReader convert io.Reader to nocopy Reader

type TCPAddr

type TCPAddr struct {
	net.TCPAddr
}

TCPAddr represents the address of a TCP end point.

func ResolveTCPAddr

func ResolveTCPAddr(network, address string) (*TCPAddr, error)

ResolveTCPAddr returns an address of TCP end point.

The network must be a TCP network name.

If the host in the address parameter is not a literal IP address or the port is not a literal port number, ResolveTCPAddr resolves the address to an address of TCP end point. Otherwise, it parses the address as a pair of literal IP address and port number. The address parameter can use a host name, but this is not recommended, because it will return at most one of the host name's IP addresses.

See func Dial for a description of the network and address parameters.

type TCPConnection

type TCPConnection struct {
	// contains filtered or unexported fields
}

TCPConnection implements Connection.

func DialTCP

func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error)

DialTCP acts like Dial for TCP networks.

The network must be a TCP network name; see func Dial for details.

If laddr is nil, a local address is automatically chosen. If the IP field of raddr is nil or an unspecified IP address, the local system is assumed.

func (*TCPConnection) AddCloseCallback added in v0.2.5

func (c *TCPConnection) AddCloseCallback(callback CloseCallback) error

AddCloseCallback adds a CloseCallback to this connection.

func (*TCPConnection) Append

func (c *TCPConnection) Append(w Writer) (err error)

Append implements Connection.

func (*TCPConnection) Close

func (c *TCPConnection) Close() error

Close implements Connection.

func (*TCPConnection) Detach added in v0.4.0

func (c *TCPConnection) Detach() error

Detach detaches the connection from poller but doesn't close it.

func (*TCPConnection) Flush

func (c *TCPConnection) Flush() error

Flush will send all malloc data to the peer, so must confirm that the allocated bytes have been correctly assigned.

Flush first checks whether the out buffer is empty. If empty, it will call syscall.Write to send data directly, otherwise the buffer will be sent asynchronously by the epoll trigger.

func (*TCPConnection) IsActive

func (c *TCPConnection) IsActive() bool

IsActive implements Connection.

func (*TCPConnection) Len

func (c *TCPConnection) Len() (length int)

Len implements Connection.

func (*TCPConnection) Malloc

func (c *TCPConnection) Malloc(n int) (buf []byte, err error)

Malloc implements Connection.

func (*TCPConnection) MallocAck

func (c *TCPConnection) MallocAck(n int) (err error)

MallocAck implements Connection.

func (*TCPConnection) MallocLen

func (c *TCPConnection) MallocLen() (length int)

MallocLen implements Connection.

func (*TCPConnection) Next

func (c *TCPConnection) Next(n int) (p []byte, err error)

Next implements Connection.

func (*TCPConnection) Peek

func (c *TCPConnection) Peek(n int) (buf []byte, err error)

Peek implements Connection.

func (*TCPConnection) Read

func (c *TCPConnection) Read(p []byte) (n int, err error)

Read behavior is the same as net.Conn, it will return io.EOF if buffer is empty.

func (*TCPConnection) ReadBinary

func (c *TCPConnection) ReadBinary(n int) (p []byte, err error)

ReadBinary implements Connection.

func (*TCPConnection) ReadByte

func (c *TCPConnection) ReadByte() (b byte, err error)

ReadByte implements Connection.

func (*TCPConnection) ReadString

func (c *TCPConnection) ReadString(n int) (s string, err error)

ReadString implements Connection.

func (*TCPConnection) Reader

func (c *TCPConnection) Reader() Reader

Reader implements Connection.

func (*TCPConnection) Release

func (c *TCPConnection) Release() (err error)

Release implements Connection.

func (*TCPConnection) SetIdleTimeout

func (c *TCPConnection) SetIdleTimeout(timeout time.Duration) error

SetIdleTimeout implements Connection.

func (*TCPConnection) SetOnConnect added in v0.2.5

func (c *TCPConnection) SetOnConnect(onConnect OnConnect) error

SetOnConnect set the OnConnect callback.

func (*TCPConnection) SetOnDisconnect added in v0.6.0

func (c *TCPConnection) SetOnDisconnect(onDisconnect OnDisconnect) error

SetOnDisconnect set the OnDisconnect callback.

func (*TCPConnection) SetOnRequest added in v0.2.5

func (c *TCPConnection) SetOnRequest(onRequest OnRequest) error

SetOnRequest initialize ctx when setting OnRequest.

func (*TCPConnection) SetReadTimeout

func (c *TCPConnection) SetReadTimeout(timeout time.Duration) error

SetReadTimeout implements Connection.

func (*TCPConnection) SetWriteTimeout added in v0.3.0

func (c *TCPConnection) SetWriteTimeout(timeout time.Duration) error

SetWriteTimeout implements Connection.

func (*TCPConnection) Skip

func (c *TCPConnection) Skip(n int) (err error)

Skip implements Connection.

func (*TCPConnection) Slice

func (c *TCPConnection) Slice(n int) (r Reader, err error)

Slice implements Connection.

func (*TCPConnection) Until added in v0.2.0

func (c *TCPConnection) Until(delim byte) (line []byte, err error)

Until implements Connection.

func (*TCPConnection) Write

func (c *TCPConnection) Write(p []byte) (n int, err error)

Write will Flush soon.

func (*TCPConnection) WriteBinary

func (c *TCPConnection) WriteBinary(b []byte) (n int, err error)

WriteBinary implements Connection.

func (*TCPConnection) WriteByte

func (c *TCPConnection) WriteByte(b byte) (err error)

WriteByte implements Connection.

func (*TCPConnection) WriteDirect

func (c *TCPConnection) WriteDirect(p []byte, remainCap int) (err error)

WriteDirect implements Connection.

func (*TCPConnection) WriteString

func (c *TCPConnection) WriteString(s string) (n int, err error)

WriteString implements Connection.

func (*TCPConnection) Writer

func (c *TCPConnection) Writer() Writer

Writer implements Connection.

type UnixAddr

type UnixAddr struct {
	net.UnixAddr
}

UnixAddr represents the address of a Unix domain socket end point.

func ResolveUnixAddr

func ResolveUnixAddr(network, address string) (*UnixAddr, error)

ResolveUnixAddr returns an address of Unix domain socket end point.

The network must be a Unix network name.

See func Dial for a description of the network and address parameters.

type UnixConnection

type UnixConnection struct {
	// contains filtered or unexported fields
}

UnixConnection implements Connection.

func DialUnix

func DialUnix(network string, laddr, raddr *UnixAddr) (*UnixConnection, error)

DialUnix acts like Dial for Unix networks.

The network must be a Unix network name; see func Dial for details.

If laddr is non-nil, it is used as the local address for the connection.

func (*UnixConnection) AddCloseCallback added in v0.2.5

func (c *UnixConnection) AddCloseCallback(callback CloseCallback) error

AddCloseCallback adds a CloseCallback to this connection.

func (*UnixConnection) Append

func (c *UnixConnection) Append(w Writer) (err error)

Append implements Connection.

func (*UnixConnection) Close

func (c *UnixConnection) Close() error

Close implements Connection.

func (*UnixConnection) Detach added in v0.4.0

func (c *UnixConnection) Detach() error

Detach detaches the connection from poller but doesn't close it.

func (*UnixConnection) Flush

func (c *UnixConnection) Flush() error

Flush will send all malloc data to the peer, so must confirm that the allocated bytes have been correctly assigned.

Flush first checks whether the out buffer is empty. If empty, it will call syscall.Write to send data directly, otherwise the buffer will be sent asynchronously by the epoll trigger.

func (*UnixConnection) IsActive

func (c *UnixConnection) IsActive() bool

IsActive implements Connection.

func (*UnixConnection) Len

func (c *UnixConnection) Len() (length int)

Len implements Connection.

func (*UnixConnection) Malloc

func (c *UnixConnection) Malloc(n int) (buf []byte, err error)

Malloc implements Connection.

func (*UnixConnection) MallocAck

func (c *UnixConnection) MallocAck(n int) (err error)

MallocAck implements Connection.

func (*UnixConnection) MallocLen

func (c *UnixConnection) MallocLen() (length int)

MallocLen implements Connection.

func (*UnixConnection) Next

func (c *UnixConnection) Next(n int) (p []byte, err error)

Next implements Connection.

func (*UnixConnection) Peek

func (c *UnixConnection) Peek(n int) (buf []byte, err error)

Peek implements Connection.

func (*UnixConnection) Read

func (c *UnixConnection) Read(p []byte) (n int, err error)

Read behavior is the same as net.Conn, it will return io.EOF if buffer is empty.

func (*UnixConnection) ReadBinary

func (c *UnixConnection) ReadBinary(n int) (p []byte, err error)

ReadBinary implements Connection.

func (*UnixConnection) ReadByte

func (c *UnixConnection) ReadByte() (b byte, err error)

ReadByte implements Connection.

func (*UnixConnection) ReadString

func (c *UnixConnection) ReadString(n int) (s string, err error)

ReadString implements Connection.

func (*UnixConnection) Reader

func (c *UnixConnection) Reader() Reader

Reader implements Connection.

func (*UnixConnection) Release

func (c *UnixConnection) Release() (err error)

Release implements Connection.

func (*UnixConnection) SetIdleTimeout

func (c *UnixConnection) SetIdleTimeout(timeout time.Duration) error

SetIdleTimeout implements Connection.

func (*UnixConnection) SetOnConnect added in v0.2.5

func (c *UnixConnection) SetOnConnect(onConnect OnConnect) error

SetOnConnect set the OnConnect callback.

func (*UnixConnection) SetOnDisconnect added in v0.6.0

func (c *UnixConnection) SetOnDisconnect(onDisconnect OnDisconnect) error

SetOnDisconnect set the OnDisconnect callback.

func (*UnixConnection) SetOnRequest added in v0.2.5

func (c *UnixConnection) SetOnRequest(onRequest OnRequest) error

SetOnRequest initialize ctx when setting OnRequest.

func (*UnixConnection) SetReadTimeout

func (c *UnixConnection) SetReadTimeout(timeout time.Duration) error

SetReadTimeout implements Connection.

func (*UnixConnection) SetWriteTimeout added in v0.3.0

func (c *UnixConnection) SetWriteTimeout(timeout time.Duration) error

SetWriteTimeout implements Connection.

func (*UnixConnection) Skip

func (c *UnixConnection) Skip(n int) (err error)

Skip implements Connection.

func (*UnixConnection) Slice

func (c *UnixConnection) Slice(n int) (r Reader, err error)

Slice implements Connection.

func (*UnixConnection) Until added in v0.2.0

func (c *UnixConnection) Until(delim byte) (line []byte, err error)

Until implements Connection.

func (*UnixConnection) Write

func (c *UnixConnection) Write(p []byte) (n int, err error)

Write will Flush soon.

func (*UnixConnection) WriteBinary

func (c *UnixConnection) WriteBinary(b []byte) (n int, err error)

WriteBinary implements Connection.

func (*UnixConnection) WriteByte

func (c *UnixConnection) WriteByte(b byte) (err error)

WriteByte implements Connection.

func (*UnixConnection) WriteDirect

func (c *UnixConnection) WriteDirect(p []byte, remainCap int) (err error)

WriteDirect implements Connection.

func (*UnixConnection) WriteString

func (c *UnixConnection) WriteString(s string) (n int, err error)

WriteString implements Connection.

func (*UnixConnection) Writer

func (c *UnixConnection) Writer() Writer

Writer implements Connection.

type Writer

type Writer interface {
	// Malloc returns a slice containing the next n bytes from the buffer,
	// which will be written after submission(e.g. Flush).
	//
	// The slice p is only valid until the next submit(e.g. Flush).
	// Therefore, please make sure that all data has been written into the slice before submission.
	Malloc(n int) (buf []byte, err error)

	// WriteString is a faster implementation of Malloc when a string needs to be written.
	// It replaces:
	//
	//  var buf, err = Malloc(len(s))
	//  n = copy(buf, s)
	//  return n, err
	//
	// The argument string s will be referenced based on the original address and will not be copied,
	// so make sure that the string s will not be changed.
	WriteString(s string) (n int, err error)

	// WriteBinary is a faster implementation of Malloc when a slice needs to be written.
	// It replaces:
	//
	//  var buf, err = Malloc(len(b))
	//  n = copy(buf, b)
	//  return n, err
	//
	// The argument slice b will be referenced based on the original address and will not be copied,
	// so make sure that the slice b will not be changed.
	WriteBinary(b []byte) (n int, err error)

	// WriteByte is a faster implementation of Malloc when a byte needs to be written.
	// It replaces:
	//
	//  var buf, _ = Malloc(1)
	//  buf[0] = b
	//
	WriteByte(b byte) (err error)

	// WriteDirect is used to insert an additional slice of data on the current write stream.
	// For example, if you plan to execute:
	//
	//  var bufA, _ = Malloc(nA)
	//  WriteBinary(b)
	//  var bufB, _ = Malloc(nB)
	//
	// It can be replaced by:
	//
	//  var buf, _ = Malloc(nA+nB)
	//  WriteDirect(b, nB)
	//
	// where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.
	WriteDirect(p []byte, remainCap int) error

	// MallocAck will keep the first n malloc bytes and discard the rest.
	// The following behavior:
	//
	//  var buf, _ = Malloc(8)
	//  buf = buf[:5]
	//  MallocAck(5)
	//
	// equivalent as
	//  var buf, _ = Malloc(5)
	//
	MallocAck(n int) (err error)

	// Append the argument writer to the tail of this writer and set the argument writer to nil,
	// the operation is zero-copy, similar to p = append(p, w.p).
	Append(w Writer) (err error)

	// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
	// Its behavior is equivalent to the io.Writer hat already has parameters(slice b).
	Flush() (err error)

	// MallocLen returns the total length of the writable data that has not yet been submitted in the writer.
	MallocLen() (length int)
}

Writer is a collection of operations for nocopy writes.

The usage of the design is a two-step operation, first apply for a section of memory, fill it and then submit. E.g:

var buf, _ = Malloc(n)
buf = append(buf[:0], ...)
Flush()

Note that it is not recommended to submit self-managed buffers to Writer. Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission, it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.

func NewWriter

func NewWriter(w io.Writer) Writer

NewWriter convert io.Writer to nocopy Writer

Notes

Bugs

  • On JS, NaCl and Plan 9, methods and functions related to UnixConn and UnixListener are not implemented.

  • On Windows, methods and functions related to UnixConn and UnixListener don't work for "unixgram" and "unixpacket".

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL