message

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: BSD-3-Clause Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// XServiceError contains error info of service invocation
	XServiceError  = "X-Service-Error"
	XServicePath   = "X-Service-Path"
	XServiceMethod = "X-Service-Method"
)
View Source
const (
	Epoch int64 = 1625068800 // 2021.7.1 00:00:00
)
View Source
const HeaderLength = 12
View Source
const (
	MsgVersionOne byte = 1
)

Variables

View Source
var (
	// ErrMetaKVMissing some keys or values are missing.
	ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing")
	// ErrMessageTooLong message is too long
	ErrMessageTooLong        = errors.New("message size too long")
	ErrUnsupportedCompressor = errors.New("unsupported compressor")
	ErrMetadataTooLong       = errors.New("metadata size too long")
	ErrVersionNotMatch       = errors.New("version is not match")
	ErrInvalidMessage        = errors.New("invalid message")
)
View Source
var Codecs = map[SerializeType]Codec{
	SerializeNone: &ByteCodec{},
	ProtoBuffer:   &PBCodec{},
}

Codecs are codecs supported by mq. You can add customized codecs in Codecs.

View Source
var Compressors = map[CompressType]Compressor{}

Compressors are compressors supported by rpc. You can add customized compressor in Compressors.

Functions

func CreateID

func CreateID(sid int, seq uint32) uint64

func DecodeContentContext

func DecodeContentContext(ctx context.Context) interface{}

func DecodeMetaContext

func DecodeMetaContext(ctx context.Context) metadata.MD

func DecodeReqMetaContext

func DecodeReqMetaContext(ctx context.Context) map[string]string

func DecodeResMetaContext

func DecodeResMetaContext(ctx context.Context) map[string]string

func EncodeContentContext

func EncodeContentContext(ctx context.Context, content interface{}) context.Context

func EncodeMetaContext

func EncodeMetaContext(ctx context.Context, meta metadata.MD) context.Context

func EncodeReqMetaContext

func EncodeReqMetaContext(ctx context.Context, meta map[string]string) context.Context

func EncodeResMetaContext

func EncodeResMetaContext(ctx context.Context, meta map[string]string) context.Context

func PutMessage

func PutMessage(m *Message)

func RegisterCodec

func RegisterCodec(t SerializeType, c Codec)

RegisterCodec register customized codec.

func SizeMeta

func SizeMeta(m map[string]string) int

func SizeMetadata

func SizeMetadata(meta metadata.MD) (n int)

Types

type BufferRead

type BufferRead struct {
	// contains filtered or unexported fields
}

func NewBufferRead

func NewBufferRead(buf []byte) *BufferRead

func (*BufferRead) Buf

func (b *BufferRead) Buf() ([]byte, error)

func (*BufferRead) Byte

func (b *BufferRead) Byte() byte

func (*BufferRead) Bytes

func (b *BufferRead) Bytes(size int) []byte

func (*BufferRead) Double

func (b *BufferRead) Double() float64

func (*BufferRead) Dump

func (b *BufferRead) Dump() string

func (*BufferRead) DumpSize

func (b *BufferRead) DumpSize(size int) string

func (*BufferRead) Error

func (b *BufferRead) Error() error

func (*BufferRead) Int

func (b *BufferRead) Int() int

func (*BufferRead) Int16

func (b *BufferRead) Int16() int16

func (*BufferRead) Int32

func (b *BufferRead) Int32() int32

func (*BufferRead) Int64

func (b *BufferRead) Int64() int64

func (*BufferRead) Len

func (b *BufferRead) Len() int

func (*BufferRead) Offset

func (b *BufferRead) Offset() int

func (*BufferRead) Reset

func (b *BufferRead) Reset()

func (*BufferRead) SmallVarchar

func (b *BufferRead) SmallVarchar() string

func (*BufferRead) String

func (b *BufferRead) String() string

func (*BufferRead) UInt

func (b *BufferRead) UInt() uint

func (*BufferRead) UInt16

func (b *BufferRead) UInt16() uint16

func (*BufferRead) UInt32

func (b *BufferRead) UInt32() uint32

func (*BufferRead) UInt64

func (b *BufferRead) UInt64() uint64

func (*BufferRead) Varchar

func (b *BufferRead) Varchar() string

type BufferWrite

type BufferWrite struct {
	// contains filtered or unexported fields
}

func NewBufferWrite

func NewBufferWrite(buf []byte) *BufferWrite

func (*BufferWrite) AppendByte

func (b *BufferWrite) AppendByte(s byte)

func (*BufferWrite) AppendBytes

func (b *BufferWrite) AppendBytes(s []byte)

func (*BufferWrite) AppendDouble

func (b *BufferWrite) AppendDouble(s float64)

func (*BufferWrite) AppendInt

func (b *BufferWrite) AppendInt(s int)

func (*BufferWrite) AppendInt16

func (b *BufferWrite) AppendInt16(s int16)

func (*BufferWrite) AppendInt32

func (b *BufferWrite) AppendInt32(s int32)

func (*BufferWrite) AppendInt64

func (b *BufferWrite) AppendInt64(s int64)

func (*BufferWrite) AppendSmallVarchar

func (b *BufferWrite) AppendSmallVarchar(s string)

func (*BufferWrite) AppendString

func (b *BufferWrite) AppendString(s string)

func (*BufferWrite) AppendUInt

func (b *BufferWrite) AppendUInt(s uint)

func (*BufferWrite) AppendUInt16

func (b *BufferWrite) AppendUInt16(s uint16)

func (*BufferWrite) AppendUInt32

func (b *BufferWrite) AppendUInt32(s uint32)

func (*BufferWrite) AppendUInt64

func (b *BufferWrite) AppendUInt64(s uint64)

func (*BufferWrite) AppendVarchar

func (b *BufferWrite) AppendVarchar(s string)

func (*BufferWrite) Buf

func (b *BufferWrite) Buf() []byte

func (*BufferWrite) Detach

func (b *BufferWrite) Detach() []byte

func (*BufferWrite) Len

func (b *BufferWrite) Len() int

func (*BufferWrite) Reset

func (b *BufferWrite) Reset()

type ByteCodec

type ByteCodec struct{}

ByteCodec uses raw slice pf bytes and don't encode/decode.

func (ByteCodec) Decode

func (c ByteCodec) Decode(data []byte, i interface{}) error

Decode returns raw slice of bytes.

func (ByteCodec) Encode

func (c ByteCodec) Encode(i interface{}) ([]byte, error)

Encode returns raw slice of bytes.

type Codec

type Codec interface {
	Encode(i interface{}) ([]byte, error)
	Decode(data []byte, i interface{}) error
}

Codec defines the interface that decode/encode payload.

type CompressType

type CompressType byte

CompressType defines decompression type.

const (
	// None does not compress.
	None CompressType = iota
	// Gzip uses gzip compression.
	Gzip
	Snappy
	ZStd
)

type Compressor

type Compressor interface {
	Zip([]byte) (Stream, error)
	Unzip([]byte) (Stream, error)
}

Compressor defines a common compression interface.

type Header [HeaderLength]byte

Header is the first part of Message and has fixed size. Format:

version uint8
protocolType uint8:7
oneway uint8:6
statusType uint8:4-5
serializeType uint8:0-3
compressType uint8
reserved uint8
messageID uint64

func NewHeader

func NewHeader(h []byte) *Header

func (*Header) CompressType

func (h *Header) CompressType() CompressType

CompressType returns compression type of messages.

func (*Header) IsOneway

func (h *Header) IsOneway() bool

IsOneway returns whether the message is one-way message. If true, server won't send responses.

func (*Header) MessageType

func (h *Header) MessageType() ProtocolType

MessageType returns the message protocol type.

func (*Header) MsgID

func (h *Header) MsgID() uint64

MsgID returns sequence number of messages.

func (*Header) SerializeType

func (h *Header) SerializeType() SerializeType

SerializeType returns serialization type of payload.

func (*Header) SetCompressType

func (h *Header) SetCompressType(ct CompressType)

SetCompressType sets the compression type.

func (*Header) SetMessageType

func (h *Header) SetMessageType(mt ProtocolType)

SetMessageType sets message type.

func (*Header) SetMsgID

func (h *Header) SetMsgID(id uint64)

SetMsgID sets sequence number.

func (*Header) SetOneway

func (h *Header) SetOneway(oneway bool)

SetOneway sets the oneway flag.

func (*Header) SetSerializeType

func (h *Header) SetSerializeType(st SerializeType)

SetSerializeType sets the serialization type.

func (*Header) SetStatusType

func (h *Header) SetStatusType(mt StatusType)

SetStatusType sets message status type.

func (*Header) SetVersion

func (h *Header) SetVersion(v byte)

SetVersion sets version for this header.

func (*Header) StatusType

func (h *Header) StatusType() StatusType

StatusType returns the message status type.

func (*Header) Version

func (h *Header) Version() byte

Version returns version of app protocol.

type Marshaler

type Marshaler interface {
	Marshal() ([]byte, error)
}

Marshaler is the interface representing objects that can marshal themselves.

type Message

type Message struct {
	Header
	ServicePath   string
	ServiceMethod string
	Metadata      map[string]string
	Payload       []byte
	Content       interface{}
}

Message is the generic type of Request and Response. Version-One Format:

header [12]byte
metadata_length int
metadata []KV
payload_length int
payload  [payload_length-4]byte

func GetMessage

func GetMessage() *Message

func (*Message) AddMeta

func (m *Message) AddMeta(k, v string)

func (*Message) Clone

func (m *Message) Clone() *Message

Clone clones from a message.

func (*Message) Decode

func (m *Message) Decode(data []byte) error

Decode decodes a message from reader.

func (*Message) Encode

func (m *Message) Encode(data []byte) []byte

Encode encodes messages.

func (*Message) Free

func (m *Message) Free()

func (*Message) Reset

func (m *Message) Reset()

Reset clean data of this message but keep allocated data

type PBCodec

type PBCodec struct{}

PBCodec uses protobuf marshaler and unmarshaler.

func (PBCodec) Decode

func (c PBCodec) Decode(data []byte, i interface{}) error

Decode decodes an object from slice of bytes.

func (PBCodec) Encode

func (c PBCodec) Encode(i interface{}) ([]byte, error)

Encode encodes an object into slice of bytes.

type ProtocolType

type ProtocolType byte

ProtocolType is message type of requests and responses.

const (
	// Request is message type of request
	Request ProtocolType = iota
	// Response is message type of response
	Response
)

type ReqMetaKey

type ReqMetaKey struct{}

type ResMetaKey

type ResMetaKey struct{}

type SerializeType

type SerializeType byte

SerializeType defines serialization type of payload.

const (
	// SerializeNone uses raw []byte and don't serialize/deserialize
	SerializeNone SerializeType = iota
	// ProtoBuffer for payload.
	ProtoBuffer
	// JSON for payload.
	JSON
	// JSONIter for payload
	JSONIter
)

type StatusType

type StatusType byte

StatusType is status of messages.

const (
	// Normal is normal requests and responses.
	Normal StatusType = iota
	// Error indicates some errors occur.
	Error
)

type Stream

type Stream interface {
	io.Writer
	io.Reader
	Bytes() []byte
	Detach() []byte
	Reset()
	Free()
}

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]byte) error
}

Unmarshaler is the interface representing objects that can unmarshal themselves. The argument points to data that may be overwritten, so implementations should not keep references to the buffer. Unmarshal implementations should not clear the receiver. Any unmarshaled data should be merged into the receiver. Callers of Unmarshal that do not want to retain existing data should Reset the receiver before calling Unmarshal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL