shmipc

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

Shmipc

English | 中文

Introduction

Shmipc is a high performance inter-process communication library developed by ByteDance. It is built on Linux's shared memory technology and uses unix or tcp connection to do process synchronization and finally implements zero copy communication across inter-processes. In IO-intensive or large-package scenarios, it has better performance.

Features

Zero copy

In an industrial production environment, the unix domain socket and tcp loopback are often used in inter-process communication.The read operation or the write operation need copy data between user space buffer and kernel space buffer.But shmipc directly store data to the share memory, so no copy compared to the former.

batch IO

An IO queue was mapped to share memory, which describe the metadata of communication data. so that a process could put many request to the IO queue, and other process could handle a batch IO per synchronization. It could effectively reduce the system calls which was brought by process synchronization.

Performance Testing

The source code bench_test.go, doing a performance comparison between shmipc and unix domain in ping-pong scenario with different package size. The result is as follows: having a performance improvement whatever small package or large package.

go test -bench=BenchmarkParallelPingPong -run BenchmarkParallelPingPong
goos: linux
goarch: amd64
pkg: github.com/cloudwego/shmipc-go
cpu: Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz
BenchmarkParallelPingPongByShmipc64B-40      	  733821	      1970 ns/op	  64.97 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc512B-40     	  536190	      1990 ns/op	 514.45 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc1KB-40      	  540517	      2045 ns/op	1001.62 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc4KB-40      	  509047	      2063 ns/op	3970.91 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc16KB-40     	  590398	      1996 ns/op	16419.46 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc32KB-40     	  607756	      1937 ns/op	33829.82 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc64KB-40     	  609824	      1995 ns/op	65689.31 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc256KB-40    	  622755	      1793 ns/op	292363.56 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc512KB-40    	  695401	      1993 ns/op	526171.77 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc1MB-40      	  538208	      1873 ns/op	1119401.64 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByShmipc4MB-40      	  606144	      1891 ns/op	4436936.93 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds64B-40         	  446019	      2657 ns/op	  48.18 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds512B-40        	  450124	      2665 ns/op	 384.30 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds1KB-40         	  446389	      2680 ns/op	 764.29 MB/s	       0 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds4KB-40         	  383552	      3093 ns/op	2648.83 MB/s	       1 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds16KB-40        	  307816	      3884 ns/op	8436.27 MB/s	       8 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds64KB-40        	  103027	     10259 ns/op	12776.17 MB/s	     102 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds256KB-40       	   25286	     46352 ns/op	11311.01 MB/s	    1661 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds512KB-40       	    9788	    122873 ns/op	8533.84 MB/s	    8576 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds1MB-40         	    4177	    283729 ns/op	7391.38 MB/s	   40178 B/op	       0 allocs/op
BenchmarkParallelPingPongByUds4MB-40         	     919	   1253338 ns/op	6693.01 MB/s	  730296 B/op	       1 allocs/op
PASS
ok  	github.com/cloudwego/shmipc	42.138s

  • BenchmarkParallelPingPongByUds, the ping-pong communication base on Unix domain socket.
  • BenchmarkParallelPingPongByShmipc, the ping-pong communication base on shmipc.
  • the suffix of the testing case name is the package size of communication, which from 64 Byte to 4 MB.
Quick start
HelloWorld
Integrate with application
HotRestart

hot restart demo

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//ErrInvalidVersion means that we received a message with an invalid version
	ErrInvalidVersion = errors.New("invalid protocol version")

	//ErrInvalidMsgType means that we received a message with an invalid message type
	ErrInvalidMsgType = errors.New("invalid msg type")

	//ErrSessionShutdown means that the session is shutdown
	ErrSessionShutdown = errors.New("session shutdown")

	//ErrStreamsExhausted means that the stream id was used out and maybe have some streams leaked.
	ErrStreamsExhausted = errors.New("streams exhausted")

	//ErrTimeout is used when we reach an IO deadline
	ErrTimeout = errors.New("i/o deadline reached")

	//ErrStreamClosed was returned when using a closed stream
	ErrStreamClosed = errors.New("stream closed")

	//ErrConnectionWriteTimeout means that the write timeout was happened in tcp/unix connection.
	ErrConnectionWriteTimeout = errors.New("connection write timeout")

	//ErrEndOfStream means that the stream is end, user shouldn't to read from the stream.
	ErrEndOfStream = errors.New("end of stream")

	//ErrSessionUnhealthy occurred at Session.OpenStream(), which mean that the session is overload.
	//user should retry after 60 seconds(now). the followings situation will result in ErrSessionUnhealthy.
	//on client side:
	//  1. when local share memory is not enough, client send request data via unix domain socket.
	//  2. when peer share memory is not enough, client receive response data from unix domain socket.
	ErrSessionUnhealthy = errors.New("now the session is unhealthy, please retry later")

	//ErrNotEnoughData means that the real read size < expect read size.
	ErrNotEnoughData = errors.New("current buffer is not enough data to read")

	//ErrNoMoreBuffer means that the share memory is busy, and not more buffer to allocate.
	ErrNoMoreBuffer = errors.New("share memory not more buffer")

	//ErrShareMemoryHadNotLeftSpace means that reached the limitation of the file system when using MemMapTypeDevShm.
	ErrShareMemoryHadNotLeftSpace = errors.New("share memory had not left space")

	//ErrStreamCallbackHadExisted was returned if the Stream'Callbacks had existed
	ErrStreamCallbackHadExisted = errors.New("stream callbacks had existed")

	//ErrOSNonSupported means that shmipc couldn't work in current OS. (only support Linux now)
	ErrOSNonSupported = errors.New("shmipc just support linux OS now")

	//ErrHotRestartInProgress was returned by Listener.HotRestart when the Session had under the hot restart state
	ErrHotRestartInProgress = errors.New("hot restart in progress, try again later")

	//ErrInHandshakeStage was happened in the case that the uninitialized session doing hot restart.
	ErrInHandshakeStage = errors.New("session in handshake stage, try again later")

	//ErrFileNameTooLong mean that eht Config.ShareMemoryPathPrefixFile's length reached the limitation of the OS.
	ErrFileNameTooLong = errors.New("share memory path prefix too long")

	//ErrQueueFull mean that the server is so busy that the io queue is full
	ErrQueueFull = errors.New("the io queue is full")
)

Functions

func DebugBufferListDetail

func DebugBufferListDetail(path string)

DebugBufferListDetail print all BufferList's status in share memory located in the `path` if MemMapType is MemMapTypeMemFd, you could using the command that `lsof -p $PID` to found the share memory which was mmap by memfd, and the command `cat /proc/$PID/$MEMFD > $path` dump the share memory to file system.

func DebugQueueDetail

func DebugQueueDetail(path string)

DebugQueueDetail print IO-Queue's status which was mmap in the `path`

func Listen

func Listen(shmIPCAddress string) (net.Listener, error)

Listen create listener with default backlog size(4096) shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener Remember close the listener if it is created successfully, or goroutine may leak Should I use Listen?

If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually,
which can achieve better batch results.
If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc
and protobuf, then you can use this listener to make it compatible with a little bit improved performance.

func ListenWithBacklog

func ListenWithBacklog(shmIPCAddress string, backlog int) (net.Listener, error)

ListenWithBacklog create listener with given backlog size shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener Remember close the listener if it is created successfully, or goroutine may leak Should I use ListenWithBacklog?

If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually,
which can achieve better batch results.
If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc
and protobuf, then you can use this listener to make it compatible with a little bit improved performance.

func MemfdCreate

func MemfdCreate(name string, flags int) (fd int, err error)

linux 3.17+ provided

func SetLogLevel

func SetLogLevel(l int)

SetLogLevel used to change the internal logger's level and the default level is Warning. The process env `SHMIPC_LOG_LEVEL` also could set log level

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type BufferReader

type BufferReader interface {
	io.ByteReader

	//Len() return the current unread size of buffer.
	//It will traverse all underlying slices to compute the unread size, please don't call frequently.
	Len() int

	//Read `size` bytes from share memory, which maybe block if size is greater than Len().
	//Notice: when ReleasePreviousRead() was called, the results of previous ReadBytes() will be invalid.
	ReadBytes(size int) ([]byte, error)

	//Peek `size` bytes from share memory. the different between Peek() and ReadBytes() is that
	//Peek() don't influence the return value of Len(), but the ReadBytes() will decrease the unread size.
	//eg: the buffer is [0,1,2,3]
	//1. after Peek(2), the buffer is also [0,1,2,3], and the Len() is 4.
	//2. after ReadBytes(3), the buffer is [3], and the Len() is 1.
	//Notice: when ReleasePreviousRead was called, the results of previous Peek call is invalid .
	Peek(size int) ([]byte, error)

	//Drop data of given length. If there's no that much data, will block until the data is enough to discard
	Discard(size int) (int, error)

	/* Call ReleasePreviousRead when it is safe to drop all previous result of ReadBytes and Peek, otherwise shm memory will leak.
	  eg:
	    buf, err := BufferReader.ReadBytes(size) // or Buffer.
		//do
	*/
	ReleasePreviousRead()

	//If you would like to read string from the buffer, ReadString(size) is better than string(ReadBytes(size)).
	ReadString(size int) (string, error)
}

BufferReader used to read data from stream.

type BufferWriter

type BufferWriter interface {
	//Len() return the current wrote size of buffer.
	//It will traverse all underlying slices to compute the unread size, please don't call frequently.
	Len() int
	io.ByteWriter
	//Reserve `size` byte share memory space, user could use it implement zero copy write.
	Reserve(size int) ([]byte, error)
	//Copy data to share memory.
	//return value: `n` is the written size
	//return value: `err`, is nil mean that succeed, otherwise failure.
	WriteBytes(data []byte) (n int, err error)
	//Copy string to share memory
	WriteString(string) error
}

BufferWriter used to write data to stream.

type Config

type Config struct {
	// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
	// we which will suspect a problem with the underlying connection and
	// close it. This is only applied to writes, where's there's generally
	// an expectation that things will move along quickly.
	ConnectionWriteTimeout time.Duration

	//In the initialization phase, client and server will exchange metadata and mapping share memory.
	//the InitializeTimeout specify how long time could use in this phase.
	InitializeTimeout time.Duration

	//The max number of pending io request. default is 8192
	QueueCap uint32

	//Share memory path of the underlying queue.
	QueuePath string

	//The capacity of buffer in share memory. default is 32MB
	ShareMemoryBufferCap uint32

	//The share memory path prefix of buffer.
	ShareMemoryPathPrefix string

	//LogOutput is used to control the log destination.
	LogOutput io.Writer

	//BufferSliceSizes could adjust share memory buffer slice size.
	//which could improve performance if most of all request or response's could write into single buffer slice instead of multi buffer slices.
	//Because multi buffer slices mean that more allocate and free operation,
	//and if the payload cross different buffer slice, it mean that payload in memory isn't continuous.
	//Default value is:
	// 1. 50% share memory hold on buffer slices that every slice is near to 8KB.
	// 2. 30% share memory hold on buffer slices that every slice is near to 32KB.
	// 3. 20% share memory hold on buffer slices that every slice is near to 128KB.
	BufferSliceSizes []*SizePercentPair

	//MemMapTypeDevShmFile or MemMapTypeMemFd (client set)
	MemMapType MemMapType

	//Session will emit some metrics to the Monitor with periodically (default 30s)
	Monitor Monitor
	// contains filtered or unexported fields
}

Config is used to tune the shmipc session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

type ListenCallback

type ListenCallback interface {
	//OnNewStream was called when accept a new stream
	OnNewStream(s *Stream)
	//OnShutdown was called when the listener was stopped
	OnShutdown(reason string)
}

ListenCallback is server's asynchronous API

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener listen socket and accept connection as shmipc server connection

func NewListener

func NewListener(callback ListenCallback, config *ListenerConfig) (*Listener, error)

NewListener will try listen the ListenPath of the configuration, and return the Listener if no error happened.

func (*Listener) Accept

func (l *Listener) Accept() (net.Conn, error)

Accept doesn't work, whose existence just adapt to the net.Listener interface.

func (*Listener) Addr

func (l *Listener) Addr() net.Addr

Addr returns the listener's network address.

func (*Listener) Close

func (l *Listener) Close() error

Close closes the listener. Any blocked Accept operations will be unblocked and return errors.

func (*Listener) HotRestart

func (l *Listener) HotRestart(epoch uint64) error

HotRestart will do shmipc server hot restart

func (*Listener) IsHotRestartDone

func (l *Listener) IsHotRestartDone() bool

IsHotRestartDone return whether the Listener is under the hot restart state.

func (*Listener) Run

func (l *Listener) Run() error

Run starting a loop to listen socket

func (*Listener) SetUnlinkOnClose

func (l *Listener) SetUnlinkOnClose(unlink bool)

SetUnlinkOnClose sets whether unlink unix socket path when Listener was stopped

type ListenerConfig

type ListenerConfig struct {
	*Config
	Network string //Only support unix or tcp
	//If Network is "tcp', the ListenPath is ip address and port, such as 0.0.0.0:6666(ipv4), [::]:6666 (ipv6)
	//If Network is "unix", the ListenPath is a file path, such as /your/socket/path/xx_shmipc.sock
	ListenPath string
}

ListenerConfig is the configuration of Listener

func NewDefaultListenerConfig

func NewDefaultListenerConfig(listenPath string, network string) *ListenerConfig

NewDefaultListenerConfig return the default Listener's config

type MemMapType

type MemMapType uint8

MemMapType is the mapping type of shared memory

const (
	// MemMapTypeDevShmFile maps share memory to /dev/shm (tmpfs)
	MemMapTypeDevShmFile MemMapType = 0
	// MemMapTypeMemFd maps share memory to memfd (Linux OS v3.17+)
	MemMapTypeMemFd MemMapType = 1
)

type Monitor

type Monitor interface {
	// OnEmitSessionMetrics was called by Session with periodically.
	OnEmitSessionMetrics(PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics, *Session)
	// flush metrics
	Flush() error
}

Monitor could emit some metrics with periodically

type PerformanceMetrics

type PerformanceMetrics struct {
	ReceiveSyncEventCount uint64 //the SyncEvent count that session had received
	SendSyncEventCount    uint64 //the SyncEvent count that session had sent
	OutFlowBytes          uint64 //the out flow in bytes that session had sent
	InFlowBytes           uint64 //the in flow in bytes that session had receive
	SendQueueCount        uint64 //the pending count of send queue
	ReceiveQueueCount     uint64 //the pending count of receive queue
}

PerformanceMetrics is the metrics about performance

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is used to wrap a reliable ordered connection and to multiplex it into multiple streams.

func Server

func Server(conn net.Conn, conf *Config) (*Session, error)

Server return a shmipc server with the giving connection and configuration

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams. Attempts to send a GoAway before closing the connection.

func (*Session) CloseChan

func (s *Session) CloseChan() <-chan struct{}

CloseChan returns a read-only channel which is closed as soon as the session is closed.

func (*Session) GetActiveStreamCount

func (s *Session) GetActiveStreamCount() int

GetActiveStreamCount returns the number of currently open streams

func (*Session) GetMetrics

GetMetrics return the session's metrics for monitoring

func (*Session) ID

func (s *Session) ID() string

ID return the a string to identify unique shmipc session in a process

func (*Session) IsClient

func (s *Session) IsClient() bool

IsClient return the session whether is a client

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) IsHealthy

func (s *Session) IsHealthy() bool

IsHealthy return whether the session is healthy

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr is used to get the local address of the underlying connection.

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream is used to create a new stream

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr is used to get the address of remote end of the underlying connection

type SessionManager

type SessionManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SessionManager will start multi Session with the peer process. when peer process was crashed or the underlying connection was closed, SessionManager could retry connect. and SessionManager could cooperate with peer process to finish hot restart.

func GlobalSessionManager

func GlobalSessionManager() *SessionManager

GlobalSessionManager return a global SessionManager. return nil if global SessionManager hadn't initialized

func InitGlobalSessionManager

func InitGlobalSessionManager(config *SessionManagerConfig) (*SessionManager, error)

InitGlobalSessionManager initializes a global SessionManager and could use in every where in process

func NewSessionManager

func NewSessionManager(config *SessionManagerConfig) (*SessionManager, error)

NewSessionManager return a SessionManager with giving configuration

func (*SessionManager) Close

func (sm *SessionManager) Close() error

Close will shutdown the SessionManager's background goroutine and close all stream in stream pool

func (*SessionManager) GetStream

func (sm *SessionManager) GetStream() (*Stream, error)

GetStream return a shmipc's Stream from stream pool. Every stream should explicitly call PutBack() to return it to SessionManager for next time using, otherwise it will cause resource leak.

func (*SessionManager) PutBack

func (sm *SessionManager) PutBack(stream *Stream)

PutBack is used to return unused stream to stream pool for next time using.

type SessionManagerConfig

type SessionManagerConfig struct {
	*Config
	UnixPath string //Deprecated , please use Network and Address.
	Network  string //tcp or unix
	Address  string //tcp address or unix domain socket path

	//SessionNum is similar to concurrency.
	//A session have dependent io queue, dependent tcp/unix connection.
	//we recommend the value equal peer process' thread count, and every thread keep a session.
	//if the peer process written by golang, recommend SessionNum = cpu cores / 4
	SessionNum int
	//Max number of stream per session's stream pool
	MaxStreamNum int
	//The idle time to close a stream
	StreamMaxIdleTime time.Duration
}

SessionManagerConfig is the configuration of SessionManager

func DefaultSessionManagerConfig

func DefaultSessionManagerConfig() *SessionManagerConfig

DefaultSessionManagerConfig return the default SessionManager's configuration

type ShareMemoryMetrics

type ShareMemoryMetrics struct {
	CapacityOfShareMemoryInBytes uint64 //capacity of all share memory
	AllInUsedShareMemoryInBytes  uint64 //current in-used share memory
}

ShareMemoryMetrics is the metrics about share memory's status

type SizePercentPair

type SizePercentPair struct {
	//A single buffer slice's capacity of buffer list,
	Size uint32
	//The used percent of buffer list in the total share memory
	Percent uint32
}

A SizePercentPair describe a buffer list's specification

type StabilityMetrics

type StabilityMetrics struct {
	AllocShmErrorCount uint64 //the error count of allocating share memory
	FallbackWriteCount uint64 //the count of the fallback data write to unix/tcp connection
	FallbackReadCount  uint64 //the error count of receiving fallback data from unix/tcp connection every period

	//the error count of unix/tcp connection
	//which usually happened in that the peer's process exit(crashed or other reason)
	EventConnErrorCount uint64

	//the error count due to the IO-Queue(SendQueue or ReceiveQueue) is full
	//which usually happened in that the peer was busy
	QueueFullErrorCount uint64

	//current all active stream count
	ActiveStreamCount uint64

	//the successful count of hot restart
	HotRestartSuccessCount uint64
	//the failed count of hot restart
	HotRestartErrorCount uint64
}

StabilityMetrics is the metrics about stability

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to represent a logical stream within a session.

func (*Stream) BufferReader

func (s *Stream) BufferReader() BufferReader

BufferReader return the buffer to read.

func (*Stream) BufferWriter

func (s *Stream) BufferWriter() BufferWriter

BufferWriter return the buffer to write, and after wrote done you should call Stream.Flush(endStream).

func (*Stream) Close

func (s *Stream) Close() error

Close used to close the stream, which maybe block if there is StreamCallbacks running. if a stream was leaked, it's also mean that some share memory was leaked.

func (*Stream) Flush

func (s *Stream) Flush(endStream bool) error

Flush the buffered stream data to peer. If the endStream is true, it mean that this stream hadn't send any data to peer after flush, and the peer could close stream after receive data

func (*Stream) IsOpen

func (s *Stream) IsOpen() bool

IsOpen return whether the stream is open

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local address

func (*Stream) Read

func (s *Stream) Read(p []byte) (int, error)

Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p` please use BufferReader() API to implement zero copy read

func (*Stream) ReleaseReadAndReuse

func (s *Stream) ReleaseReadAndReuse()

ReleaseReadAndReuse used to Release the data previous read by Stream.BufferReader(), and reuse the last share memory buffer slice of read buffer for next write by Stream.BufferWriter()

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr returns the remote address

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the associated stream session

func (*Stream) SetCallbacks

func (s *Stream) SetCallbacks(callback StreamCallbacks) error

SetCallbacks used to set the StreamCallbacks. Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read timeout for blocked and future Read calls.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline is the same as net.Conn

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline is the same as net.Conn

func (*Stream) StreamID

func (s *Stream) StreamID() uint32

StreamID returns the ID of this stream

func (*Stream) Write

func (s *Stream) Write(p []byte) (int, error)

Low performance api, it just adapt to the interface net.Conn, which will do copy data from `p` to write buffer please use BufferWriter() API to implement zero copy write

type StreamCallbacks

type StreamCallbacks interface {
	//OnData() will be call by new goroutine, When the stream receive new data
	OnData(reader BufferReader)
	//OnLocalClose was called when the stream was closed by local process
	OnLocalClose()
	//OnRemoteClose was called when the stream was closed by remote process
	OnRemoteClose()
}

StreamCallbacks provide asynchronous programming mode for improving performance in some scenarios

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL