proto

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package proto 若涉及到字节序,则全部为大端序

Index

Constants

View Source
const (
	FrameHead = 0x3C // 0x3C (可见字符: <)
	FrameTail = 0x0D // 0x0D (回车符)
)
View Source
const FrameMinLength int = 7
View Source
const TotalNumberOfMessages int = 256

Variables

View Source
var (
	ErrMethodNotImplemented   = errors.New("method not implemented")
	ErrMessageNotFull         = errors.New("message is not full")
	ErrMessageSplitNotAllowed = errors.New("split is not allowed")
)

Functions

func AddDescriptor

func AddDescriptor(m Message, ack Message, text string) bool

AddDescriptor 添加描述符号表, 对于已经实现的协议则不允许修改

func CalcChecksum

func CalcChecksum(data []byte) uint16

CalcChecksum 经典校验和算法

func CalcSHA added in v0.3.1

func CalcSHA(text string, shaf ...func(stream []byte) []byte) string

CalcSHA 计算一个字符串hash值的十六进制字符串, 若字符串为空,则直接返回

func CalcSHA1 added in v0.3.1

func CalcSHA1(stream []byte) []byte

CalcSHA1 计算字符串的SHA-1值

func CalcSHA256 added in v0.3.1

func CalcSHA256(stream []byte) []byte

CalcSHA256 计算字符串的SHA-256值

func FrameCombine added in v0.3.3

func FrameCombine[T Message](f *TransferFrame, msgs []T, encrypt ...EncryptFunc) error

FrameCombine 组合消息帧,将若干个消息,组合到一个帧内 在调用此方法之前需首先设置帧类型 TransferFrame.SetType 组合空消息帧无意义, 因此需自行保证 msgs 不为空

func FrameSplit added in v0.3.3

func FrameSplit[T Message](f *TransferFrame, msgs *[]T, decrypt ...DecryptFunc) error

FrameSplit 消息帧拆分,将消息帧内的数据解析提取为若干个消息 因此在调用此方法之前不得调用 TransferFrame.SetType

func GetMessageResponseStatusText added in v0.3.1

func GetMessageResponseStatusText(status MessageResponseStatus) string

func IsMessageDefined added in v0.3.3

func IsMessageDefined(typ MessageType) bool

IsMessageDefined 此消息是否可以被自由定义

func JsonMessageParseFrom added in v0.3.3

func JsonMessageParseFrom(reader io.Reader, m Message) error

JsonMessageParseFrom 从reader解析消息,此操作不够优化,应考虑使用 parse 方法

Types

type AckType

type AckType string
const (
	NoConfirm     AckType = "0"
	LeaderConfirm AckType = "1"
	AllConfirm    AckType = "all"
)

type CMessage

type CMessage struct {
	Offset      []byte // uint64
	ProductTime []byte // time.Time.Unix() 消息创建的Unix时间戳
	PM          *PMessage
}

CMessage 消费者消息记录, 不允许复制

消息结构:
	|   TopicLen   |      Topic      |   KeyLen   |        key        |   ValueLen   |   Value   |   Offset   |   ProductTime   |
	|--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------|
len	|      1       | N [1-255] bytes |      1     |  N [1-255] bytes  |       2      |     N     |      8     |         8       |
   	|--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------|

func (*CMessage) MarshalMethod

func (m *CMessage) MarshalMethod() MarshalMethodType

func (*CMessage) MessageType

func (m *CMessage) MessageType() MessageType

func (*CMessage) Reset

func (m *CMessage) Reset()

func (*CMessage) String

func (m *CMessage) String() string

type CPMPool

type CPMPool struct {
	// contains filtered or unexported fields
}

func NewCPMPool

func NewCPMPool() *CPMPool

func (*CPMPool) GetCM

func (p *CPMPool) GetCM() *CMessage

GetCM Attention: PM is nil

func (*CPMPool) GetPM

func (p *CPMPool) GetPM() *PMessage

func (*CPMPool) PutCM

func (p *CPMPool) PutCM(v *CMessage)

func (*CPMPool) PutPM

func (p *CPMPool) PutPM(v *PMessage)

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter 计数器

func NewCounter

func NewCounter() *Counter

NewCounter 创建一个新的计数器

func (*Counter) Increment

func (c *Counter) Increment()

Increment 计数器 +1,并返回新的值

func (*Counter) Value

func (c *Counter) Value() uint64

Value 获取当前计数器的数值

func (*Counter) ValueBeforeIncrement

func (c *Counter) ValueBeforeIncrement() uint64

ValueBeforeIncrement 首先获取当前计数器的数值,然后将计数器 +1

type Crypto

type Crypto interface {
	Encrypt(stream []byte) ([]byte, error) // 加密数据体
	Decrypt(stream []byte) ([]byte, error) // 解密数据体
	String() string                        // 名称描述等
}

Crypto 加解密支持

func CreateCrypto added in v0.3.4

func CreateCrypto(option string, key ...string) Crypto

CreateCrypto 设置加密方案

@param	option	string		加密方案, 支持token/no (令牌加密和不加密)
@param	key 	[]string	其他加密参数

func DefaultCrypto

func DefaultCrypto() Crypto

DefaultCrypto 默认的加解密器,就是不加密

type DecryptFunc

type DecryptFunc = func(stream []byte) ([]byte, error)

DecryptFunc 解密方法签名

type Descriptor

type Descriptor struct {
	// contains filtered or unexported fields
}

func GetDescriptor

func GetDescriptor(typ MessageType) *Descriptor

func (Descriptor) MessageType

func (m Descriptor) MessageType() MessageType

MessageType 协议类别

func (Descriptor) NeedACK added in v0.3.3

func (m Descriptor) NeedACK() bool

NeedACK 消息交互是否需要有返回值

func (Descriptor) Text

func (m Descriptor) Text() string

Text 协议类别的文字描述

func (Descriptor) UserDefined

func (m Descriptor) UserDefined() bool

UserDefined 是否是用户自定义协议

type EncryptFunc

type EncryptFunc = func(stream []byte) ([]byte, error)

EncryptFunc 加密方法签名

type FramePool

type FramePool struct {
	// contains filtered or unexported fields
}

func NewFramePool

func NewFramePool() *FramePool

func (*FramePool) Get

func (p *FramePool) Get() *TransferFrame

func (*FramePool) HistoryNum added in v0.3.3

func (p *FramePool) HistoryNum() uint64

HistoryNum 历史数量

func (*FramePool) Put

func (p *FramePool) Put(v *TransferFrame)

type HeartbeatMessage added in v0.3.3

type HeartbeatMessage struct {
	Type      LinkType `json:"type" description:"客户端类型"`
	CreatedAt int64    `json:"created_at" description:"客户端创建时间戳"`
}

HeartbeatMessage 心跳

func (*HeartbeatMessage) MarshalMethod added in v0.3.3

func (m *HeartbeatMessage) MarshalMethod() MarshalMethodType

func (*HeartbeatMessage) MessageType added in v0.3.3

func (m *HeartbeatMessage) MessageType() MessageType

func (*HeartbeatMessage) Reset added in v0.3.3

func (m *HeartbeatMessage) Reset()

func (*HeartbeatMessage) String added in v0.3.3

func (m *HeartbeatMessage) String() string

type LinkType

type LinkType string
const (
	ConsumerLinkType LinkType = "CONSUMER"
	ProducerLinkType LinkType = "PRODUCER"
)

type MarshalMethodType

type MarshalMethodType string
const (
	JsonMarshalMethod   MarshalMethodType = "JSON"
	BinaryMarshalMethod MarshalMethodType = "BINARY"
)

type Message

type Message interface {
	MessageType() MessageType         // 消息类别
	MarshalMethod() MarshalMethodType // 消息序列化方法
	String() string                   // 类别和消息解码方法
	Reset()                           // 重置消息体
	// contains filtered or unexported methods
}

Message 消息定义 消息编解码过程中不支持加解密操作, 消息的加解密是对编码后的字节序列进行的操作,与消息定义无关

type MessageResponse

type MessageResponse struct {
	Type MessageType `json:"-"`
	// 仅当 AcceptedStatus 时才认为服务器接受了请求并下方了有效的参数
	Status      MessageResponseStatus `json:"status"`
	Offset      uint64                `json:"offset"`
	ReceiveTime int64                 `json:"receive_time"`
	// 定时器间隔,单位ms,仅生产者有效,生产者需要按照此间隔发送帧消息
	TickerInterval int `json:"ticker_duration" description:"定时器间隔,单位ms"`
	// 消费者需要按照此参数,在此周期内向服务端发送心跳
	// 生产者在此周期内若没有数据产生,也应发送心跳
	Keepalive float64 `json:"keepalive" description:"心跳间隔,单位s"`
}

MessageResponse 消息响应, P和C通用

func (*MessageResponse) Accepted added in v0.3.1

func (m *MessageResponse) Accepted() bool

func (*MessageResponse) MarshalMethod

func (m *MessageResponse) MarshalMethod() MarshalMethodType

func (*MessageResponse) MessageType

func (m *MessageResponse) MessageType() MessageType

MessageType 依据偏移量字段判断消息类型

func (*MessageResponse) Reset

func (m *MessageResponse) Reset()

func (*MessageResponse) String

func (m *MessageResponse) String() string

type MessageResponseStatus added in v0.3.1

type MessageResponseStatus string
const (
	AcceptedStatus       MessageResponseStatus = "0" // 已接受,正常状态
	RefusedStatus        MessageResponseStatus = "1"
	TokenIncorrectStatus MessageResponseStatus = "10" // 密钥不正确
	ReRegisterStatus     MessageResponseStatus = "11" // 令客户端重新发起注册流程, 无消息体
)

type MessageType

type MessageType byte
const (
	NotImplementMessageType MessageType = 0
	RegisterMessageType     MessageType = 1   // 客户端消费者/生产者注册消息类别 c -> s RegisterMessage
	RegisterMessageRespType MessageType = 2   // s -> c MessageResponse
	HeartbeatMessageType    MessageType = 4   // c -> s HeartbeatMessage
	MessageRespType         MessageType = 100 // 生产者消息响应 s -> c MessageResponse
	PMessageType            MessageType = 101 // 生产者消息类别 c -> s PMessage
	CMessageType            MessageType = 102 // 消费者消息类别 s -> c CMessage
)

如果增加了新的协议代码,都需要在 descriptors 中添加其类型

func (MessageType) CombinationAllowed added in v0.3.3

func (m MessageType) CombinationAllowed() bool

CombinationAllowed 是否允许组合多个消息为一个传输帧 TransferFrame

func (MessageType) EncryptionAllowed added in v0.3.3

func (m MessageType) EncryptionAllowed() bool

EncryptionAllowed 是否允许加密消息体

type NoCopy

type NoCopy struct{}

func (*NoCopy) Lock

func (*NoCopy) Lock()

func (*NoCopy) Unlock

func (*NoCopy) Unlock()

type NoCrypto added in v0.3.3

type NoCrypto struct{}

NoCrypto 不加密

func (NoCrypto) Decrypt added in v0.3.3

func (e NoCrypto) Decrypt(stream []byte) ([]byte, error)

func (NoCrypto) Encrypt added in v0.3.3

func (e NoCrypto) Encrypt(stream []byte) ([]byte, error)

func (NoCrypto) String added in v0.3.4

func (e NoCrypto) String() string

type NotImplementMessage

type NotImplementMessage struct{}

func (NotImplementMessage) MarshalMethod

func (m NotImplementMessage) MarshalMethod() MarshalMethodType

func (NotImplementMessage) MessageType

func (m NotImplementMessage) MessageType() MessageType

func (NotImplementMessage) Reset

func (m NotImplementMessage) Reset()

func (NotImplementMessage) String

func (m NotImplementMessage) String() string

type PMessage

type PMessage struct {
	Topic []byte // 字符串转字节
	Key   []byte
	Value []byte
	// contains filtered or unexported fields
}

PMessage 生产者消息数据, 不允许复制

消息结构:
	|   TopicLen   |      Topic      |   KeyLen   |        key        |   ValueLen   |   Value   |
	|--------------|-----------------|------------|-------------------|--------------|-----------|
len	|      1       | N [1-255] bytes |      1     |  N [1-255] bytes  |       2      |     N     |
   	|--------------|-----------------|------------|-------------------|--------------|-----------|

func (*PMessage) MarshalMethod

func (m *PMessage) MarshalMethod() MarshalMethodType

func (*PMessage) MessageType

func (m *PMessage) MessageType() MessageType

func (*PMessage) Reset

func (m *PMessage) Reset()

func (*PMessage) String

func (m *PMessage) String() string

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(capacity int) *Queue

func (*Queue) Append

func (q *Queue) Append(value any)

func (*Queue) Capacity

func (q *Queue) Capacity() int

func (*Queue) Left added in v0.3.7

func (q *Queue) Left() any

Left 获取最左端/最旧的元素

func (*Queue) Length

func (q *Queue) Length() int

func (*Queue) PopLeft

func (q *Queue) PopLeft() any

func (*Queue) Right added in v0.3.7

func (q *Queue) Right() any

Right 获取最右端/最新的元素

type RegisterMessage

type RegisterMessage struct {
	Topics []string `json:"topics"` // 对于生产者,无意义
	Ack    AckType  `json:"ack"`
	Type   LinkType `json:"type"`
	Token  string   `json:"token,omitempty"` // 认证密钥的hash值,当此值不为空时强制有效
}

RegisterMessage 消息注册,适用于生产者和消费者

func (*RegisterMessage) MarshalMethod

func (m *RegisterMessage) MarshalMethod() MarshalMethodType

func (*RegisterMessage) MessageType

func (m *RegisterMessage) MessageType() MessageType

func (*RegisterMessage) Reset

func (m *RegisterMessage) Reset()

func (*RegisterMessage) String

func (m *RegisterMessage) String() string

type TokenCrypto added in v0.3.3

type TokenCrypto struct {
	Token string `json:"token"` // 原始密钥的sha值
}

TokenCrypto 基于Token的加解密器,用于加密注册消息 也可用于加密传输消息

func (TokenCrypto) Decrypt added in v0.3.3

func (c TokenCrypto) Decrypt(stream []byte) ([]byte, error)

Decrypt 解密函数

func (TokenCrypto) Encrypt added in v0.3.3

func (c TokenCrypto) Encrypt(stream []byte) ([]byte, error)

Encrypt 加密函数

func (TokenCrypto) String added in v0.3.4

func (c TokenCrypto) String() string

type TransferFrame

type TransferFrame struct {
	// contains filtered or unexported fields
}

TransferFrame 传输协议帧

帧与底层的传输协议无关, 帧是包含了具体消息的增加了冗余校验等信息的数据报; 帧内部可以包含一条或多条相同类型的消息, 因此帧可一次性传输多条相同的消息; 帧的大小没有限制, 真正对帧大小有限制的是底层的传输层, 具体是 transfer.Transfer 的要求;

帧结构:
	|   head   |   mType  |   dataSize   |        data        |   checksum   |   tail   |
	|----------|----------|--------------|--------------------|--------------|----------|
len	|     1	   |    1     |       2      |      N bytes       |       2      |     1    |
   	|----------|----------|--------------|--------------------|--------------|----------|
取值	|   0x3C   |          |              |                    |              |   0x0D   |
	|----------|----------|--------------|--------------------|--------------|----------|

# Usage:

将帧载荷解析成具体的协议:
	frame := TransferFrame{}
	frame.Unmarshal(X)
	X := frame.UnmarshalTo()

从消息构建帧:
	frame := TransferFrame{}
	frame.BuildWith(proto.MessageType, []byte{})
	frame.BuildFrom(X)

func (*TransferFrame) Build

func (f *TransferFrame) Build() []byte

Build 编码消息帧用于显示获取构建好的字节流

func (*TransferFrame) BuildFrom

func (f *TransferFrame) BuildFrom(m Message, encrypt ...EncryptFunc) error

BuildFrom 从协议中构建消息帧, 仅适用于构建包含单个消息的帧 若需要包含多个消息, 需使用 FrameCombine 方法

func (*TransferFrame) BuildWith

func (f *TransferFrame) BuildWith(typ MessageType, data []byte, encrypt ...EncryptFunc) error

BuildWith 补充字段,编码消息帧, 仅适用于构建包含单个消息的帧 若需要包含多个消息, 需使用 FrameCombine 方法

func (*TransferFrame) Checksum

func (f *TransferFrame) Checksum() uint16

Checksum 获取帧校验和, 由 checksum 标识

func (*TransferFrame) DataSize

func (f *TransferFrame) DataSize() int

DataSize 获得消息的总长度, 由 dataSize 标识

func (*TransferFrame) Head

func (f *TransferFrame) Head() byte

func (*TransferFrame) Length

func (f *TransferFrame) Length() int

Length 获得帧总长

func (*TransferFrame) MarshalMethod

func (f *TransferFrame) MarshalMethod() MarshalMethodType

func (*TransferFrame) MessageText added in v0.3.3

func (f *TransferFrame) MessageText() string

MessageText 获取帧内消息的文字描述

func (*TransferFrame) Parse

func (f *TransferFrame) Parse(stream []byte) error

Parse 从字节序中解析数据帧

func (*TransferFrame) ParseFrom

func (f *TransferFrame) ParseFrom(reader io.Reader) error

ParseFrom 从流中解析数据帧

func (*TransferFrame) Payload added in v0.3.3

func (f *TransferFrame) Payload() []byte

func (*TransferFrame) Reset

func (f *TransferFrame) Reset()

func (*TransferFrame) SetPayload added in v0.3.3

func (f *TransferFrame) SetPayload(data []byte) *TransferFrame

SetPayload 修改帧载荷信息

func (*TransferFrame) String

func (f *TransferFrame) String() string

func (*TransferFrame) Tail

func (f *TransferFrame) Tail() byte

func (*TransferFrame) Text added in v0.3.3

func (f *TransferFrame) Text() string

Text 获取帧的文字描述

func (*TransferFrame) Type

func (f *TransferFrame) Type() MessageType

func (*TransferFrame) Unmarshal

func (f *TransferFrame) Unmarshal(msg Message, decrypt ...DecryptFunc) error

Unmarshal 反序列化帧消息体

func (*TransferFrame) UnmarshalTo

func (f *TransferFrame) UnmarshalTo(decrypt ...DecryptFunc) (Message, error)

UnmarshalTo 将帧消息解析成某一个具体的协议消息 此方法与 Unmarshal 的区别在于:Unmarshal 会显式的依据消息类型从流中解析数据 而 UnmarshalTo 则首先会依据 TransferFrame.mType 推断消息类型并创建一个新的实例,解析后返回 因此在调用 UnmarshalTo 之前不得修改 TransferFrame.mType 的值

func (*TransferFrame) WriteTo added in v0.3.3

func (f *TransferFrame) WriteTo(writer io.Writer) (int64, error)

WriteTo 将预构建的消息流直接写入 io.Writer 内 相比于返回[]byte的 BuildWith 和 BuildFrom 方法而言,能减少一倍的内存分配

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL