message

package
v0.0.0-...-d86056c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2018 License: Apache-2.0 Imports: 12 Imported by: 4

Documentation

Overview

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	// 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
	PROPERTY_KEYS = "KEYS"

	// 消息标签,只支持设置一个Tag(服务端消息过滤使用)
	PROPERTY_TAGS = "TAGS"

	// 是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器)
	PROPERTY_WAIT_STORE_MSG_OK = "WAIT"

	// 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)
	PROPERTY_DELAY_TIME_LEVEL = "DELAY"

	// 内部使用
	PROPERTY_RETRY_TOPIC          = "RETRY_TOPIC"
	PROPERTY_REAL_TOPIC           = "REAL_TOPIC"
	PROPERTY_REAL_QUEUE_ID        = "REAL_QID"
	PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"
	PROPERTY_PRODUCER_GROUP       = "PGROUP"
	PROPERTY_MIN_OFFSET           = "MIN_OFFSET"
	PROPERTY_MAX_OFFSET           = "MAX_OFFSET"
	PROPERTY_BUYER_ID             = "BUYER_ID"
	PROPERTY_ORIGIN_MESSAGE_ID    = "ORIGIN_MESSAGE_ID"
	PROPERTY_TRANSFER_FLAG        = "TRANSFER_FLAG"
	PROPERTY_CORRECTION_FLAG      = "CORRECTION_FLAG"
	PROPERTY_MQ2_FLAG             = "MQ2_FLAG"
	PROPERTY_RECONSUME_TIME       = "RECONSUME_TIME"
	KEY_SEPARATOR                 = " "
)
View Source
const (
	// 消息ID定长
	MSG_ID_LENGTH = 8 + 8

	// 存储记录各个字段位置
	MessageMagicCodePostion      = 4
	MessageFlagPostion           = 16
	MessagePhysicOffsetPostion   = 28
	MessageStoreTimestampPostion = 56

	// 序列化消息属性
	NAME_VALUE_SEPARATOR = 1
	PROPERTY_SEPARATOR   = 2
)

MessageDecoder: 消息解码 Author: yintongqiang Since: 2017/8/16

Variables

View Source
var (
	MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
)

Functions

func Bytes2messageProperties

func Bytes2messageProperties(propertiesBuf []byte) map[string]string

func ClearProperty

func ClearProperty(msg *Message, name string)

func CreateMessageId

func CreateMessageId(addr string, offset int64) (string, error)

CreateMessageId 解析消息msgId字段addr是host:port

func GetOriginMessageId

func GetOriginMessageId(msg Message) string

func GetReconsumeTime

func GetReconsumeTime(msg *Message) string

Author: yintongqiang Since: 2017/8/16

func JoinHostPort

func JoinHostPort(hostBytes []byte, port int32) string

JoinHostPort 连接host:port

func MessageProperties2Bytes

func MessageProperties2Bytes(properties map[string]string) []byte

func MessageProperties2String

func MessageProperties2String(properties map[string]string) string

修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25

func PutProperty

func PutProperty(msg *Message, name string, value string)

func SetOriginMessageId

func SetOriginMessageId(msg *Message, originMessageId string)

func SetProperties

func SetProperties(msg *Message, name string, value string)

func SetPropertiesMap

func SetPropertiesMap(msg *Message, properties map[string]string)

func SetReconsumeTime

func SetReconsumeTime(msg *Message, reconsumeTimes string)

func SplitHostPort

func SplitHostPort(addr string) (string, int32, error)

SplitHostPort 解析host:port

func String2messageProperties

func String2messageProperties(propertiesStr string) map[string]string

修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25

Types

type Message

type Message struct {
	Topic      string            // 消息主题
	Flag       int32             // 消息标志,系统不做干预,完全由应用决定如何使用
	Properties map[string]string // 消息属性,都是系统属性,禁止应用设置
	Body       []byte            // 消息体
}

Message: 消息结构体 Author: yintongqiang Since: 2017/8/9

func NewMessage

func NewMessage(topic string, tags string, body []byte) *Message

func (*Message) ClearProperty

func (msg *Message) ClearProperty(name string)

func (*Message) GetKeys

func (msg *Message) GetKeys() string

func (*Message) GetOriginMessageID

func (msg *Message) GetOriginMessageID() string

func (*Message) GetProperty

func (msg *Message) GetProperty(name string) string

func (*Message) GetTags

func (msg *Message) GetTags() string

func (*Message) PutProperty

func (msg *Message) PutProperty(name string, value string)

func (*Message) SetDelayTimeLevel

func (msg *Message) SetDelayTimeLevel(level int)

func (*Message) SetKeys

func (msg *Message) SetKeys(keys string)

func (*Message) SetTags

func (msg *Message) SetTags(tags string)

func (*Message) SetWaitStoreMsgOK

func (msg *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool)

type MessageExt

type MessageExt struct {
	Message                          // 消息结构体
	QueueId                   int32  // 队列ID<PUT>
	StoreSize                 int32  // 存储记录大小
	QueueOffset               int64  // 队列偏移量
	SysFlag                   int32  // 消息标志位 <PUT>
	BornTimestamp             int64  // 消息在客户端创建时间戳 <PUT>
	BornHost                  string // 消息来自哪里 <PUT>
	StoreTimestamp            int64  // 消息在服务器存储时间戳
	StoreHost                 string // 消息存储在哪个服务器 <PUT>
	MsgId                     string // 消息ID
	CommitLogOffset           int64  // 消息对应的Commit Log Offset
	BodyCRC                   int32  // 消息体CRC
	ReconsumeTimes            int32  // 当前消息被某个订阅组重新消费了几次(订阅组之间独立计数)
	PreparedTransactionOffset int64  // 事务预处理偏移量
}

MessageExt 消息拓展结构体

func DecodeMessageExt

func DecodeMessageExt(buffer []byte, isReadBody, isCompressBody bool) (*MessageExt, error)

DecodeMessageExt 解析消息体,返回MessageExt

func DecodesMessageExt

func DecodesMessageExt(buffer []byte, isReadBody bool) ([]*MessageExt, error)

DecodesMessageExt 解析消息体,返回多个消息

func (*MessageExt) Encode

func (msgExt *MessageExt) Encode() ([]byte, error)

Encode 编码MessageExt

func (*MessageExt) String

func (m *MessageExt) String() string

String 打印消息Message的数据

type MessageId

type MessageId struct {
	Address string // 消息落地存储,角色为storeHost对应的brokerAddr
	Offset  uint64 // 消息落地存储,物理偏移量, 即 physicOffset、commitLogOffset
}

func DecodeMessageId

func DecodeMessageId(msgId string) (*MessageId, error)

DecodeMessageId 解析messageId Author: jerrylou, <gunsluo@gmail.com> Since: 2017-08-23

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

func NewMessageQueue

func NewMessageQueue(topic, brokerName string, queueId int) *MessageQueue

func (*MessageQueue) Equal

func (mq *MessageQueue) Equal(other *MessageQueue) bool

func (*MessageQueue) Equals

func (mq *MessageQueue) Equals(v interface{}) bool

func (*MessageQueue) HashBytes

func (mq *MessageQueue) HashBytes() []byte

func (*MessageQueue) Key

func (mq *MessageQueue) Key() string

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type MessageQueues

type MessageQueues []*MessageQueue

func (MessageQueues) Len

func (mqs MessageQueues) Len() int

func (MessageQueues) Less

func (mqs MessageQueues) Less(i, j int) bool

func (MessageQueues) Swap

func (mqs MessageQueues) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL