Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeBodyByFlags(flags MsgFlag, body []byte, decrypt Encryptor) ([]byte, error)
- func DecodeHTTPRequestBody(req *http.Request, ptr interface{}) error
- func DecodeMsgFrom(rd io.Reader, maxSize uint32, decrypt Encryptor, netMsg *NetMessage) error
- func DecodeNetMsg(head NetV1Header, body []byte, decrypt Encryptor, netMsg *NetMessage) error
- func EncodeMsgTo(netMsg *NetMessage, encrypt Encryptor, w io.Writer) error
- func FreeNetMessage(netMsg *NetMessage)
- func GetHTTPRequestIP(req *http.Request) string
- func GetLocalIPList() []net.IP
- func ReadHeadBody(rd io.Reader, head NetV1Header, maxSize uint32) ([]byte, error)
- func ReadLenData(r io.Reader, maxSize uint16) ([]byte, error)
- func ReadProtoFromHTTPRequest(req *http.Request, msg codec.Message) error
- func ReadProtoMessage(conn net.Conn, msg codec.Message) error
- func RequestProtoMessage(conn net.Conn, req, ack codec.Message) error
- func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool
- func WriteLenData(w io.Writer, body []byte) error
- func WriteProtoHTTPResponse(w http.ResponseWriter, msg codec.Message, contentType string) error
- func WriteProtoMessage(w io.Writer, msg codec.Message) error
- type Buffer
- func (b *Buffer) Bytes() []byte
- func (b *Buffer) Free()
- func (b *Buffer) MustReadBool() bool
- func (b *Buffer) MustReadFloat32() float32
- func (b *Buffer) MustReadFloat64() float64
- func (b *Buffer) MustReadInt16() int16
- func (b *Buffer) MustReadInt32() int32
- func (b *Buffer) MustReadInt64() int64
- func (b *Buffer) MustReadInt8() int8
- func (b *Buffer) MustReadUint16() uint16
- func (b *Buffer) MustReadUint32() uint32
- func (b *Buffer) MustReadUint64() uint64
- func (b *Buffer) MustReadUint8() uint8
- func (b *Buffer) PeekBool() (bool, error)
- func (b *Buffer) PeekFloat32() (float32, error)
- func (b *Buffer) PeekFloat64() (float64, error)
- func (b *Buffer) PeekInt16() (int16, error)
- func (b *Buffer) PeekInt32() (int32, error)
- func (b *Buffer) PeekInt64() (int64, error)
- func (b *Buffer) PeekInt8() (int8, error)
- func (b *Buffer) PeekUint16() (uint16, error)
- func (b *Buffer) PeekUint32() (uint32, error)
- func (b *Buffer) PeekUint64() (uint64, error)
- func (b *Buffer) PeekUint8() (uint8, error)
- func (b *Buffer) ReadBool() (bool, error)
- func (b *Buffer) ReadFloat32() (float32, error)
- func (b *Buffer) ReadFloat64() (float64, error)
- func (b *Buffer) ReadInt16() (int16, error)
- func (b *Buffer) ReadInt32() (int32, error)
- func (b *Buffer) ReadInt64() (int64, error)
- func (b *Buffer) ReadInt8() (int8, error)
- func (b *Buffer) ReadNBytes(n int) (r []byte, err error)
- func (b *Buffer) ReadNString(n int) (string, error)
- func (b *Buffer) ReadUint16() (uint16, error)
- func (b *Buffer) ReadUint32() (n uint32, err error)
- func (b *Buffer) ReadUint64() (n uint64, err error)
- func (b *Buffer) ReadUint8() (uint8, error)
- func (b *Buffer) Reset()
- func (b *Buffer) WriteBool(v bool)
- func (b *Buffer) WriteBytes(buf []byte)
- func (b *Buffer) WriteFloat32(f float32)
- func (b *Buffer) WriteFloat64(f float64)
- func (b *Buffer) WriteInt16(n int16)
- func (b *Buffer) WriteInt32(n int32)
- func (b *Buffer) WriteInt64(n int64)
- func (b *Buffer) WriteInt8(n int8)
- func (b *Buffer) WriteString(s string)
- func (b *Buffer) WriteUint16(n uint16)
- func (b *Buffer) WriteUint32(n uint32)
- func (b *Buffer) WriteUint64(n uint64)
- func (b *Buffer) WriteUint8(n uint8)
- type Encryptor
- type Endpoint
- type EndpointMap
- func (m *EndpointMap) Clear()
- func (m *EndpointMap) Foreach(action func(Endpoint) bool)
- func (m *EndpointMap) Get(node NodeID) Endpoint
- func (m *EndpointMap) Has(node NodeID) bool
- func (m *EndpointMap) Init() *EndpointMap
- func (m *EndpointMap) IsEmpty() bool
- func (m *EndpointMap) Keys() []NodeID
- func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointMap) Remove(node NodeID)
- func (m *EndpointMap) Size() int
- type Error
- type MsgFlag
- type NetMessage
- func (m *NetMessage) Ack(ack codec.Message) error
- func (m *NetMessage) Clone() *NetMessage
- func (m *NetMessage) DecodeTo(msg codec.Message) error
- func (m *NetMessage) Encode() error
- func (m *NetMessage) Refuse(ec int32) error
- func (m *NetMessage) Reply(cmd uint32, data []byte) error
- func (m *NetMessage) Reset()
- type NetV1Header
- func (h NetV1Header) CRC() uint32
- func (h NetV1Header) CalcCRC(body []byte) uint32
- func (h NetV1Header) Clear()
- func (h NetV1Header) Command() uint32
- func (h NetV1Header) Flag() MsgFlag
- func (h NetV1Header) Len() uint32
- func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)
- func (h NetV1Header) Seq() uint32
- func (h NetV1Header) SetCRC(v uint32)
- type NodeID
- type NodeIDSet
- type SessionMessage
- type StreamConnBase
- func (c *StreamConnBase) GetNode() NodeID
- func (c *StreamConnBase) GetRemoteAddr() string
- func (c *StreamConnBase) GetUserData() any
- func (c *StreamConnBase) IsRunning() bool
- func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error
- func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool
- func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)
- func (c *StreamConnBase) SetNode(node NodeID)
- func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)
- func (c *StreamConnBase) SetUserData(val any)
- type TcpSession
Constants ¶
const ( V1HeaderLength = 16 // 消息头大小 MaxPacketSize = 0x00FFFFFF // 最大消息大小,~16MB MaxPayloadSize = MaxPacketSize - V1HeaderLength // )
const ( SendBlock = 0 SendNonblock = 1 )
const ( DefaultRecvQueueSize = 1 << 16 DefaultBackendSendQueueSize = 1 << 14 DefaultSessionSendQueueSize = 448 DefaultBacklogSize = 128 DefaultErrorChanSize = 64 )
const ( NodeInstanceShift = 32 NodeTypeShift = 48 MaxNodeInstance = (1 << NodeInstanceShift) - 1 NodeBackendTypeMask NodeID = 1 << NodeTypeShift )
const ErrCodeField = "Code"
const PrefixLength = 2 // sizeof uint16
const (
UrlFormKey = "pb-data"
)
Variables ¶
var ( DefaultCompressThreshold = 1 << 12 // 压缩阈值,4KB MaxClientUpStreamSize = 1 << 18 // 最大client上行消息大小,256KB )
var ( ErrPktSizeOutOfRange = errors.New("packet size out of range") ErrPktChecksumMismatch = errors.New("packet checksum mismatch") ErrCannotDecryptPkt = errors.New("cannot decrypt packet") ErrConnNotRunning = errors.New("connection not running") ErrConnOutboundOverflow = errors.New("connection outbound queue overflow") ErrConnForceClose = errors.New("connection forced to close") ErrBufferOutOfRange = errors.New("buffer out of range") )
var DefaultMsgIDReflector = func(msg codec.Message) uint32 { var fullname string var rType = reflect.TypeOf(msg) if rType.Kind() == reflect.Ptr { fullname = rType.Elem().String() } else { fullname = rType.String() } return codec.NameHash(fullname) }
DefaultMsgIDReflector get message ID by reflection
var (
TCPReadTimeout = 300 * time.Second // 默认读超时
)
Functions ¶
func DecodeBodyByFlags ¶ added in v1.0.5
func DecodeHTTPRequestBody ¶
DecodeHTTPRequestBody 解析http请求的body为json
func DecodeMsgFrom ¶
DecodeMsgFrom decode message from reader
func DecodeNetMsg ¶ added in v1.0.5
func DecodeNetMsg(head NetV1Header, body []byte, decrypt Encryptor, netMsg *NetMessage) error
func EncodeMsgTo ¶
func EncodeMsgTo(netMsg *NetMessage, encrypt Encryptor, w io.Writer) error
EncodeMsgTo encode message to writer
func FreeNetMessage ¶ added in v1.0.4
func FreeNetMessage(netMsg *NetMessage)
func GetHTTPRequestIP ¶
GetHTTPRequestIP 获取http请求的来源IP
func ReadHeadBody ¶
ReadHeadBody read header and body less than `maxSize`
func ReadLenData ¶ added in v1.0.4
ReadLenData 读取长度[2字节]开头的数据
func ReadProtoFromHTTPRequest ¶
ReadProtoFromHTTPRequest 从http请求中读取proto消息
func RequestProtoMessage ¶
RequestProtoMessage send req and wait for ack
func TryEnqueueMsg ¶ added in v1.0.3
func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool
TryEnqueueMsg 尝试将消息放入队列,如果队列已满返回false
func WriteLenData ¶ added in v1.0.4
WriteLenData 写入长度[2字节]开头的数据
func WriteProtoHTTPResponse ¶
WriteProtoHTTPResponse 写入proto消息到http响应
Types ¶
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
func (*Buffer) MustReadBool ¶ added in v1.0.4
func (*Buffer) MustReadFloat32 ¶ added in v1.0.4
func (*Buffer) MustReadFloat64 ¶ added in v1.0.4
func (*Buffer) MustReadInt16 ¶ added in v1.0.4
func (*Buffer) MustReadInt32 ¶ added in v1.0.4
func (*Buffer) MustReadInt64 ¶ added in v1.0.4
func (*Buffer) MustReadInt8 ¶ added in v1.0.4
func (*Buffer) MustReadUint16 ¶ added in v1.0.4
func (*Buffer) MustReadUint32 ¶ added in v1.0.4
func (*Buffer) MustReadUint64 ¶ added in v1.0.4
func (*Buffer) MustReadUint8 ¶ added in v1.0.4
func (*Buffer) PeekFloat32 ¶
func (*Buffer) PeekFloat64 ¶
func (*Buffer) PeekUint16 ¶
func (*Buffer) PeekUint32 ¶
func (*Buffer) PeekUint64 ¶
func (*Buffer) ReadFloat32 ¶
func (*Buffer) ReadFloat64 ¶
func (*Buffer) ReadNBytes ¶ added in v1.0.4
func (*Buffer) ReadUint16 ¶
func (*Buffer) ReadUint32 ¶
func (*Buffer) ReadUint64 ¶
func (*Buffer) WriteBytes ¶ added in v1.0.4
func (*Buffer) WriteFloat32 ¶
func (*Buffer) WriteFloat64 ¶
func (*Buffer) WriteInt16 ¶
func (*Buffer) WriteInt32 ¶
func (*Buffer) WriteInt64 ¶
func (*Buffer) WriteString ¶ added in v1.0.4
func (*Buffer) WriteUint16 ¶
func (*Buffer) WriteUint32 ¶
func (*Buffer) WriteUint64 ¶
func (*Buffer) WriteUint8 ¶ added in v1.0.4
type Endpoint ¶
type Endpoint interface { GetNode() NodeID SetNode(NodeID) GetRemoteAddr() string UnderlyingConn() net.Conn GetUserData() any SetEncryption(Encryptor, Encryptor) SetSendQueue(chan *NetMessage) Go(ctx context.Context, reader, writer bool) // 开启read/write线程 SendMsg(*NetMessage, int) error Close() error ForceClose(error) }
Endpoint 网络端点
type EndpointMap ¶
type EndpointMap struct {
// contains filtered or unexported fields
}
EndpointMap 线程安全的Endpoint Map
func NewEndpointMap ¶
func NewEndpointMap() *EndpointMap
func (*EndpointMap) Clear ¶
func (m *EndpointMap) Clear()
func (*EndpointMap) Foreach ¶
func (m *EndpointMap) Foreach(action func(Endpoint) bool)
Foreach action应该对map是read-only
func (*EndpointMap) Get ¶
func (m *EndpointMap) Get(node NodeID) Endpoint
func (*EndpointMap) Has ¶
func (m *EndpointMap) Has(node NodeID) bool
func (*EndpointMap) Init ¶
func (m *EndpointMap) Init() *EndpointMap
func (*EndpointMap) IsEmpty ¶
func (m *EndpointMap) IsEmpty() bool
func (*EndpointMap) Keys ¶
func (m *EndpointMap) Keys() []NodeID
func (*EndpointMap) Put ¶
func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointMap) PutIfAbsent ¶
func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointMap) Remove ¶
func (m *EndpointMap) Remove(node NodeID)
func (*EndpointMap) Size ¶
func (m *EndpointMap) Size() int
type NetMessage ¶
type NetMessage struct { CreatedAt int64 `json:"created_at,omitempty"` // microseconds Command uint32 `json:"cmd"` Seq uint32 `json:"seq,omitempty"` Data []byte `json:"data,omitempty"` Body codec.Message `json:"body,omitempty"` Session Endpoint `json:"-"` }
func AllocNetMessage ¶
func AllocNetMessage() *NetMessage
func CreateNetMessage ¶ added in v1.0.3
func CreateNetMessage(cmd, seq uint32, body codec.Message) *NetMessage
func CreateNetMessageWith ¶ added in v1.0.3
func CreateNetMessageWith(body codec.Message) *NetMessage
func NewNetMessage ¶
func NewNetMessage(cmd, seq uint32, data []byte) *NetMessage
func TryDequeueMsg ¶ added in v1.0.3
func TryDequeueMsg(queue <-chan *NetMessage) *NetMessage
TryDequeueMsg 尝试从队列中取出消息,如果队列为空返回nil
func (*NetMessage) Clone ¶
func (m *NetMessage) Clone() *NetMessage
func (*NetMessage) DecodeTo ¶
func (m *NetMessage) DecodeTo(msg codec.Message) error
DecodeTo decode `Data` to `msg`
func (*NetMessage) Reset ¶
func (m *NetMessage) Reset()
type NetV1Header ¶ added in v1.0.3
type NetV1Header []byte
NetV1Header 协议头
func NewNetV1Header ¶ added in v1.0.3
func NewNetV1Header() NetV1Header
func (NetV1Header) CRC ¶ added in v1.0.3
func (h NetV1Header) CRC() uint32
func (NetV1Header) CalcCRC ¶ added in v1.0.3
func (h NetV1Header) CalcCRC(body []byte) uint32
CalcCRC checksum = f(head) and f(body)
func (NetV1Header) Clear ¶ added in v1.0.5
func (h NetV1Header) Clear()
func (NetV1Header) Command ¶ added in v1.0.3
func (h NetV1Header) Command() uint32
func (NetV1Header) Flag ¶ added in v1.0.3
func (h NetV1Header) Flag() MsgFlag
func (NetV1Header) Len ¶ added in v1.0.3
func (h NetV1Header) Len() uint32
func (NetV1Header) Pack ¶ added in v1.0.3
func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)
func (NetV1Header) Seq ¶ added in v1.0.3
func (h NetV1Header) Seq() uint32
func (NetV1Header) SetCRC ¶ added in v1.0.3
func (h NetV1Header) SetCRC(v uint32)
type NodeID ¶
type NodeID uint64
NodeID 节点ID 一个64位整数表示的节点号,用以标识一个service,低32位为服务实例编号,32-48位为服务类型; 或者一个客户端session,低32位为GATE内部的session编号,32-48位为GATE编号;
func MakeBackendNode ¶ added in v1.0.4
MakeBackendNode 根据服务号和实例号创建一个节点ID
func MakeGateSession ¶ added in v1.0.4
MakeGateSession `instance`指GATE的实例编号,限定为16位
type SessionMessage ¶
type StreamConnBase ¶
type StreamConnBase struct { Node NodeID // Running atomic.Bool // RecvQueue chan<- *NetMessage // 收消息队列 SendQueue chan *NetMessage // 发消息队列 ErrChan chan *Error // error signal Encrypt Encryptor // 加密 Decrypt Encryptor // 解密 RemoteAddr string // 缓存的远端地址 Userdata any // user data }
StreamConnBase base stream connection
func (*StreamConnBase) GetNode ¶ added in v1.0.3
func (c *StreamConnBase) GetNode() NodeID
func (*StreamConnBase) GetRemoteAddr ¶ added in v1.0.3
func (c *StreamConnBase) GetRemoteAddr() string
func (*StreamConnBase) GetUserData ¶ added in v1.0.3
func (c *StreamConnBase) GetUserData() any
func (*StreamConnBase) IsRunning ¶
func (c *StreamConnBase) IsRunning() bool
func (*StreamConnBase) SendMsg ¶
func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error
func (*StreamConnBase) SendNonBlock ¶
func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool
func (*StreamConnBase) SetEncryption ¶
func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)
func (*StreamConnBase) SetNode ¶
func (c *StreamConnBase) SetNode(node NodeID)
func (*StreamConnBase) SetSendQueue ¶
func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)
func (*StreamConnBase) SetUserData ¶
func (c *StreamConnBase) SetUserData(val any)
type TcpSession ¶
type TcpSession struct { StreamConnBase // contains filtered or unexported fields }
func NewTcpSession ¶
func NewTcpSession(conn net.Conn, sendQSize int) *TcpSession
func (*TcpSession) Close ¶
func (t *TcpSession) Close() error
func (*TcpSession) ForceClose ¶
func (t *TcpSession) ForceClose(reason error)
func (*TcpSession) SetIntranet ¶
func (t *TcpSession) SetIntranet(v bool)
func (*TcpSession) UnderlyingConn ¶
func (t *TcpSession) UnderlyingConn() net.Conn