qnet

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	V1HeaderLength = 16                             // 消息头大小
	MaxPacketSize  = 0x00FFFFFF                     // 最大消息大小,~16MB
	MaxPayloadSize = MaxPacketSize - V1HeaderLength //
)
View Source
const (
	SendBlock    = 0
	SendNonblock = 1
)
View Source
const (
	DefaultRecvQueueSize        = 1 << 16
	DefaultBackendSendQueueSize = 1 << 14
	DefaultSessionSendQueueSize = 448
	DefaultBacklogSize          = 128
	DefaultErrorChanSize        = 64
)
View Source
const (
	NodeInstanceShift          = 32
	NodeTypeShift              = 48
	MaxNodeInstance            = (1 << NodeInstanceShift) - 1
	NodeBackendTypeMask NodeID = 1 << NodeTypeShift
)
View Source
const ErrCodeField = "Code"
View Source
const PrefixLength = 2 // sizeof uint16
View Source
const (
	UrlFormKey = "pb-data"
)

Variables

View Source
var (
	DefaultCompressThreshold = 1 << 12 // 压缩阈值,4KB
	MaxClientUpStreamSize    = 1 << 18 // 最大client上行消息大小,256KB
)
View Source
var (
	ErrPktSizeOutOfRange    = errors.New("packet size out of range")
	ErrPktChecksumMismatch  = errors.New("packet checksum mismatch")
	ErrCannotDecryptPkt     = errors.New("cannot decrypt packet")
	ErrConnNotRunning       = errors.New("connection not running")
	ErrConnOutboundOverflow = errors.New("connection outbound queue overflow")
	ErrConnForceClose       = errors.New("connection forced to close")
	ErrBufferOutOfRange     = errors.New("buffer out of range")
)
View Source
var DefaultMsgIDReflector = func(msg codec.Message) uint32 {
	var fullname string
	var rType = reflect.TypeOf(msg)
	if rType.Kind() == reflect.Ptr {
		fullname = rType.Elem().String()
	} else {
		fullname = rType.String()
	}
	return codec.NameHash(fullname)
}

DefaultMsgIDReflector get message ID by reflection

View Source
var (
	TCPReadTimeout = 300 * time.Second // 默认读超时
)

Functions

func DecodeBodyByFlags added in v1.0.5

func DecodeBodyByFlags(flags MsgFlag, body []byte, decrypt Encryptor) ([]byte, error)

func DecodeHTTPRequestBody

func DecodeHTTPRequestBody(req *http.Request, ptr interface{}) error

DecodeHTTPRequestBody 解析http请求的body为json

func DecodeMsgFrom

func DecodeMsgFrom(rd io.Reader, maxSize uint32, decrypt Encryptor, netMsg *NetMessage) error

DecodeMsgFrom decode message from reader

func DecodeNetMsg added in v1.0.5

func DecodeNetMsg(head NetV1Header, body []byte, decrypt Encryptor, netMsg *NetMessage) error

func EncodeMsgTo

func EncodeMsgTo(netMsg *NetMessage, encrypt Encryptor, w io.Writer) error

EncodeMsgTo encode message to writer

func FreeNetMessage added in v1.0.4

func FreeNetMessage(netMsg *NetMessage)

func GetHTTPRequestIP

func GetHTTPRequestIP(req *http.Request) string

GetHTTPRequestIP 获取http请求的来源IP

func GetLocalIPList

func GetLocalIPList() []net.IP

GetLocalIPList 获取本地IP列表

func ReadHeadBody

func ReadHeadBody(rd io.Reader, head NetV1Header, maxSize uint32) ([]byte, error)

ReadHeadBody read header and body less than `maxSize`

func ReadLenData added in v1.0.4

func ReadLenData(r io.Reader, maxSize uint16) ([]byte, error)

ReadLenData 读取长度[2字节]开头的数据

func ReadProtoFromHTTPRequest

func ReadProtoFromHTTPRequest(req *http.Request, msg codec.Message) error

ReadProtoFromHTTPRequest 从http请求中读取proto消息

func ReadProtoMessage

func ReadProtoMessage(conn net.Conn, msg codec.Message) error

func RequestProtoMessage

func RequestProtoMessage(conn net.Conn, req, ack codec.Message) error

RequestProtoMessage send req and wait for ack

func TryEnqueueMsg added in v1.0.3

func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool

TryEnqueueMsg 尝试将消息放入队列,如果队列已满返回false

func WriteLenData added in v1.0.4

func WriteLenData(w io.Writer, body []byte) error

WriteLenData 写入长度[2字节]开头的数据

func WriteProtoHTTPResponse

func WriteProtoHTTPResponse(w http.ResponseWriter, msg codec.Message, contentType string) error

WriteProtoHTTPResponse 写入proto消息到http响应

func WriteProtoMessage

func WriteProtoMessage(w io.Writer, msg codec.Message) error

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer added in v1.0.4

func NewBuffer(b []byte, pool *pool.ObjectPool[Buffer]) *Buffer

func (*Buffer) Bytes added in v1.0.4

func (b *Buffer) Bytes() []byte

func (*Buffer) Free added in v1.0.4

func (b *Buffer) Free()

func (*Buffer) MustReadBool added in v1.0.4

func (b *Buffer) MustReadBool() bool

func (*Buffer) MustReadFloat32 added in v1.0.4

func (b *Buffer) MustReadFloat32() float32

func (*Buffer) MustReadFloat64 added in v1.0.4

func (b *Buffer) MustReadFloat64() float64

func (*Buffer) MustReadInt16 added in v1.0.4

func (b *Buffer) MustReadInt16() int16

func (*Buffer) MustReadInt32 added in v1.0.4

func (b *Buffer) MustReadInt32() int32

func (*Buffer) MustReadInt64 added in v1.0.4

func (b *Buffer) MustReadInt64() int64

func (*Buffer) MustReadInt8 added in v1.0.4

func (b *Buffer) MustReadInt8() int8

func (*Buffer) MustReadUint16 added in v1.0.4

func (b *Buffer) MustReadUint16() uint16

func (*Buffer) MustReadUint32 added in v1.0.4

func (b *Buffer) MustReadUint32() uint32

func (*Buffer) MustReadUint64 added in v1.0.4

func (b *Buffer) MustReadUint64() uint64

func (*Buffer) MustReadUint8 added in v1.0.4

func (b *Buffer) MustReadUint8() uint8

func (*Buffer) PeekBool

func (b *Buffer) PeekBool() (bool, error)

func (*Buffer) PeekFloat32

func (b *Buffer) PeekFloat32() (float32, error)

func (*Buffer) PeekFloat64

func (b *Buffer) PeekFloat64() (float64, error)

func (*Buffer) PeekInt16

func (b *Buffer) PeekInt16() (int16, error)

func (*Buffer) PeekInt32

func (b *Buffer) PeekInt32() (int32, error)

func (*Buffer) PeekInt64

func (b *Buffer) PeekInt64() (int64, error)

func (*Buffer) PeekInt8

func (b *Buffer) PeekInt8() (int8, error)

func (*Buffer) PeekUint16

func (b *Buffer) PeekUint16() (uint16, error)

func (*Buffer) PeekUint32

func (b *Buffer) PeekUint32() (uint32, error)

func (*Buffer) PeekUint64

func (b *Buffer) PeekUint64() (uint64, error)

func (*Buffer) PeekUint8

func (b *Buffer) PeekUint8() (uint8, error)

func (*Buffer) ReadBool

func (b *Buffer) ReadBool() (bool, error)

func (*Buffer) ReadFloat32

func (b *Buffer) ReadFloat32() (float32, error)

func (*Buffer) ReadFloat64

func (b *Buffer) ReadFloat64() (float64, error)

func (*Buffer) ReadInt16

func (b *Buffer) ReadInt16() (int16, error)

func (*Buffer) ReadInt32

func (b *Buffer) ReadInt32() (int32, error)

func (*Buffer) ReadInt64

func (b *Buffer) ReadInt64() (int64, error)

func (*Buffer) ReadInt8

func (b *Buffer) ReadInt8() (int8, error)

func (*Buffer) ReadNBytes added in v1.0.4

func (b *Buffer) ReadNBytes(n int) (r []byte, err error)

func (*Buffer) ReadNString added in v1.0.4

func (b *Buffer) ReadNString(n int) (string, error)

func (*Buffer) ReadUint16

func (b *Buffer) ReadUint16() (uint16, error)

func (*Buffer) ReadUint32

func (b *Buffer) ReadUint32() (n uint32, err error)

func (*Buffer) ReadUint64

func (b *Buffer) ReadUint64() (n uint64, err error)

func (*Buffer) ReadUint8

func (b *Buffer) ReadUint8() (uint8, error)

func (*Buffer) Reset added in v1.0.4

func (b *Buffer) Reset()

func (*Buffer) WriteBool

func (b *Buffer) WriteBool(v bool)

func (*Buffer) WriteBytes added in v1.0.4

func (b *Buffer) WriteBytes(buf []byte)

func (*Buffer) WriteFloat32

func (b *Buffer) WriteFloat32(f float32)

func (*Buffer) WriteFloat64

func (b *Buffer) WriteFloat64(f float64)

func (*Buffer) WriteInt16

func (b *Buffer) WriteInt16(n int16)

func (*Buffer) WriteInt32

func (b *Buffer) WriteInt32(n int32)

func (*Buffer) WriteInt64

func (b *Buffer) WriteInt64(n int64)

func (*Buffer) WriteInt8

func (b *Buffer) WriteInt8(n int8)

func (*Buffer) WriteString added in v1.0.4

func (b *Buffer) WriteString(s string)

func (*Buffer) WriteUint16

func (b *Buffer) WriteUint16(n uint16)

func (*Buffer) WriteUint32

func (b *Buffer) WriteUint32(n uint32)

func (*Buffer) WriteUint64

func (b *Buffer) WriteUint64(n uint64)

func (*Buffer) WriteUint8 added in v1.0.4

func (b *Buffer) WriteUint8(n uint8)

type Encryptor

type Encryptor interface {
	Encrypt([]byte) ([]byte, error)
	Decrypt([]byte) ([]byte, error)
}

type Endpoint

type Endpoint interface {
	GetNode() NodeID
	SetNode(NodeID)
	GetRemoteAddr() string
	UnderlyingConn() net.Conn

	GetUserData() any
	SetEncryption(Encryptor, Encryptor)
	SetSendQueue(chan *NetMessage)
	Go(ctx context.Context, reader, writer bool) // 开启read/write线程

	SendMsg(*NetMessage, int) error
	Close() error
	ForceClose(error)
}

Endpoint 网络端点

type EndpointMap

type EndpointMap struct {
	// contains filtered or unexported fields
}

EndpointMap 线程安全的Endpoint Map

func NewEndpointMap

func NewEndpointMap() *EndpointMap

func (*EndpointMap) Clear

func (m *EndpointMap) Clear()

func (*EndpointMap) Foreach

func (m *EndpointMap) Foreach(action func(Endpoint) bool)

Foreach action应该对map是read-only

func (*EndpointMap) Get

func (m *EndpointMap) Get(node NodeID) Endpoint

func (*EndpointMap) Has

func (m *EndpointMap) Has(node NodeID) bool

func (*EndpointMap) Init

func (m *EndpointMap) Init() *EndpointMap

func (*EndpointMap) IsEmpty

func (m *EndpointMap) IsEmpty() bool

func (*EndpointMap) Keys

func (m *EndpointMap) Keys() []NodeID

func (*EndpointMap) Put

func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)

func (*EndpointMap) PutIfAbsent

func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)

func (*EndpointMap) Remove

func (m *EndpointMap) Remove(node NodeID)

func (*EndpointMap) Size

func (m *EndpointMap) Size() int

type Error

type Error struct {
	Err      error
	Endpoint Endpoint
}

func NewError

func NewError(err error, endpoint Endpoint) *Error

func (Error) Error

func (e Error) Error() string

type MsgFlag

type MsgFlag uint8
const (
	FlagCompress MsgFlag = 0x10
	FlagEncrypt  MsgFlag = 0x20
	FlagError    MsgFlag = 0x40
	FlagExtent   MsgFlag = 0x80
)

func (MsgFlag) Clear

func (g MsgFlag) Clear(n MsgFlag) MsgFlag

func (MsgFlag) Has

func (g MsgFlag) Has(n MsgFlag) bool

type NetMessage

type NetMessage struct {
	CreatedAt int64         `json:"created_at,omitempty"` // microseconds
	Command   uint32        `json:"cmd"`
	Seq       uint32        `json:"seq,omitempty"`
	Data      []byte        `json:"data,omitempty"`
	Body      codec.Message `json:"body,omitempty"`
	Session   Endpoint      `json:"-"`
}

func AllocNetMessage

func AllocNetMessage() *NetMessage

func CreateNetMessage added in v1.0.3

func CreateNetMessage(cmd, seq uint32, body codec.Message) *NetMessage

func CreateNetMessageWith added in v1.0.3

func CreateNetMessageWith(body codec.Message) *NetMessage

func NewNetMessage

func NewNetMessage(cmd, seq uint32, data []byte) *NetMessage

func TryDequeueMsg added in v1.0.3

func TryDequeueMsg(queue <-chan *NetMessage) *NetMessage

TryDequeueMsg 尝试从队列中取出消息,如果队列为空返回nil

func (*NetMessage) Ack added in v1.0.4

func (m *NetMessage) Ack(ack codec.Message) error

func (*NetMessage) Clone

func (m *NetMessage) Clone() *NetMessage

func (*NetMessage) DecodeTo

func (m *NetMessage) DecodeTo(msg codec.Message) error

DecodeTo decode `Data` to `msg`

func (*NetMessage) Encode

func (m *NetMessage) Encode() error

Encode encode `Body` to `Data`

func (*NetMessage) Refuse

func (m *NetMessage) Refuse(ec int32) error

Refuse 返回一个带错误码的Ack

func (*NetMessage) Reply

func (m *NetMessage) Reply(cmd uint32, data []byte) error

func (*NetMessage) Reset

func (m *NetMessage) Reset()

type NetV1Header added in v1.0.3

type NetV1Header []byte

NetV1Header 协议头

func NewNetV1Header added in v1.0.3

func NewNetV1Header() NetV1Header

func (NetV1Header) CRC added in v1.0.3

func (h NetV1Header) CRC() uint32

func (NetV1Header) CalcCRC added in v1.0.3

func (h NetV1Header) CalcCRC(body []byte) uint32

CalcCRC checksum = f(head) and f(body)

func (NetV1Header) Clear added in v1.0.5

func (h NetV1Header) Clear()

func (NetV1Header) Command added in v1.0.3

func (h NetV1Header) Command() uint32

func (NetV1Header) Flag added in v1.0.3

func (h NetV1Header) Flag() MsgFlag

func (NetV1Header) Len added in v1.0.3

func (h NetV1Header) Len() uint32

func (NetV1Header) Pack added in v1.0.3

func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)

func (NetV1Header) Seq added in v1.0.3

func (h NetV1Header) Seq() uint32

func (NetV1Header) SetCRC added in v1.0.3

func (h NetV1Header) SetCRC(v uint32)

type NodeID

type NodeID uint64

NodeID 节点ID 一个64位整数表示的节点号,用以标识一个service,低32位为服务实例编号,32-48位为服务类型; 或者一个客户端session,低32位为GATE内部的session编号,32-48位为GATE编号;

func MakeBackendNode added in v1.0.4

func MakeBackendNode(service uint16, instance uint32) NodeID

MakeBackendNode 根据服务号和实例号创建一个节点ID

func MakeGateSession added in v1.0.4

func MakeGateSession(instance uint16, session uint32) NodeID

MakeGateSession `instance`指GATE的实例编号,限定为16位

func (NodeID) GateID

func (n NodeID) GateID() uint16

GateID client会话的网关ID

func (NodeID) Instance

func (n NodeID) Instance() uint32

Instance service节点的实例编号

func (NodeID) IsBackend

func (n NodeID) IsBackend() bool

IsBackend 是否backend节点

func (NodeID) IsSession

func (n NodeID) IsSession() bool

IsSession 是否client会话

func (NodeID) Service

func (n NodeID) Service() uint16

Service 服务型

func (NodeID) Session added in v1.0.4

func (n NodeID) Session() uint32

func (NodeID) String

func (n NodeID) String() string

type NodeIDSet

type NodeIDSet = []NodeID

NodeIDSet 没有重复ID的有序集合

type SessionMessage

type SessionMessage struct {
	Session Endpoint
	MsgId   uint32
	MsgBody codec.Message
}

type StreamConnBase

type StreamConnBase struct {
	Node       NodeID             //
	Running    atomic.Bool        //
	RecvQueue  chan<- *NetMessage // 收消息队列
	SendQueue  chan *NetMessage   // 发消息队列
	ErrChan    chan *Error        // error signal
	Encrypt    Encryptor          // 加密
	Decrypt    Encryptor          // 解密
	RemoteAddr string             // 缓存的远端地址
	Userdata   any                // user data
}

StreamConnBase base stream connection

func (*StreamConnBase) GetNode added in v1.0.3

func (c *StreamConnBase) GetNode() NodeID

func (*StreamConnBase) GetRemoteAddr added in v1.0.3

func (c *StreamConnBase) GetRemoteAddr() string

func (*StreamConnBase) GetUserData added in v1.0.3

func (c *StreamConnBase) GetUserData() any

func (*StreamConnBase) IsRunning

func (c *StreamConnBase) IsRunning() bool

func (*StreamConnBase) SendMsg

func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error

func (*StreamConnBase) SendNonBlock

func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool

func (*StreamConnBase) SetEncryption

func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)

func (*StreamConnBase) SetNode

func (c *StreamConnBase) SetNode(node NodeID)

func (*StreamConnBase) SetSendQueue

func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)

func (*StreamConnBase) SetUserData

func (c *StreamConnBase) SetUserData(val any)

type TcpSession

type TcpSession struct {
	StreamConnBase
	// contains filtered or unexported fields
}

func NewTcpSession

func NewTcpSession(conn net.Conn, sendQSize int) *TcpSession

func (*TcpSession) Close

func (t *TcpSession) Close() error

func (*TcpSession) ForceClose

func (t *TcpSession) ForceClose(reason error)

func (*TcpSession) Go

func (t *TcpSession) Go(ctx context.Context, reader, writer bool)

func (*TcpSession) SetIntranet

func (t *TcpSession) SetIntranet(v bool)

func (*TcpSession) UnderlyingConn

func (t *TcpSession) UnderlyingConn() net.Conn

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL