Documentation ¶
Index ¶
- Constants
- Variables
- func FindBrokerAddrByName(brokerName string) string
- func FindBrokerAddrByTopic(topic string) string
- func GetRetryTopic(group string) string
- func ValidateGroup(group string)
- type BrokerData
- type ClientOption
- type FindBrokerResult
- type GetConsumerList
- type GetMaxOffsetRequest
- type GetRouteInfoRequest
- type InnerConsumer
- type InnerProducer
- type Message
- type MessageExt
- type MessageQueue
- type PullMessageRequest
- type PullMessageResponse
- type PullResult
- type PullStatus
- type QueryConsumerOffsetRequest
- type QueueData
- type RMQClient
- func (c *RMQClient) CheckClientInBroker()
- func (c *RMQClient) ClientID() string
- func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error
- func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error)
- func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult
- func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error)
- func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, ...) error
- func (c *RMQClient) RebalanceImmediately()
- func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error
- func (c *RMQClient) RegisterProducer(group string, producer InnerProducer)
- func (c *RMQClient) SelectConsumer(group string) InnerConsumer
- func (c *RMQClient) SelectProducer(group string) InnerProducer
- func (c *RMQClient) SendHeartbeatToAllBrokerWithLock()
- func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, ...) error
- func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, ...) (*SendResult, error)
- func (c *RMQClient) Shutdown()
- func (c *RMQClient) Start()
- func (c *RMQClient) UnregisterConsumer(group string)
- func (c *RMQClient) UnregisterProducer(group string)
- func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData)
- func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData)
- func (c *RMQClient) UpdateTopicRouteInfo()
- type SearchOffsetRequest
- type SendMessageRequest
- type SendMessageResponse
- type SendResult
- type SendStatus
- type ServiceState
- type SubscriptionData
- type TopicPublishInfo
- type TopicRouteData
- type TransactionListener
- type UpdateConsumerOffsetRequest
Constants ¶
View Source
const ( RetryGroupTopicPrefix = "%RETRY%" DefaultConsumerGroup = "DEFAULT_CONSUMER" )
View Source
const ( PropertyKeySeparator = " " PropertyKeys = "KEYS" PropertyTags = "TAGS" PropertyWaitStoreMsgOk = "WAIT" PropertyDelayTimeLevel = "DELAY" PropertyRetryTopic = "RETRY_TOPIC" PropertyRealTopic = "REAL_TOPIC" PropertyRealQueueId = "REAL_QID" PropertyTransactionPrepared = "TRAN_MSG" PropertyProducerGroup = "PGROUP" PropertyMinOffset = "MIN_OFFSET" PropertyMaxOffset = "MAX_OFFSET" PropertyBuyerId = "BUYER_ID" PropertyOriginMessageId = "ORIGIN_MESSAGE_ID" PropertyTransferFlag = "TRANSFER_FLAG" PropertyCorrectionFlag = "CORRECTION_FLAG" PropertyMQ2Flag = "MQ2_FLAG" PropertyReconsumeTime = "RECONSUME_TIME" PropertyMsgRegion = "MSG_REGION" PropertyTraceSwitch = "TRACE_ON" PropertyUniqueClientMessageIdKeyIndex = "UNIQ_KEY" PropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES" PropertyConsumeStartTime = "CONSUME_START_TIME" PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET" PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES" PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS" )
View Source
const ( ReqSendMessage = int16(10) ReqPullMessage = int16(11) ReqQueryConsumerOffset = int16(14) ReqUpdateConsumerOffset = int16(15) ReqSearchOffsetByTimestamp = int16(30) ReqGetMaxOffset = int16(30) ReqHeartBeat = int16(34) ReqGetConsumerListByGroup = int16(38) ReqLockBatchMQ = int16(41) ReqUnlockBatchMQ = int16(42) ReqGetRouteInfoByTopic = int16(105) ReqSendBatchMessage = int16(320) ReqCheckTransactionState = int16(39) ReqNotifyConsumerIdsChanged = int16(40) ReqResetConsuemrOffset = int16(220) ReqGetConsumerRunningInfo = int16(307) ReqConsumeMessageDirectly = int16(309) )
View Source
const ( ResSuccess = int16(0) ResFlushDiskTimeout = int16(10) ResSlaveNotAvailable = int16(11) ResFlushSlaveTimeout = int16(12) ResTopicNotExist = int16(17) ResPullNotFound = int16(19) ResPullRetryImmediately = int16(20) ResPullOffsetMoved = int16(21) )
View Source
const ( EnvNameServerAddr = "NAMESRV_ADDR" MasterId = int64(0) )
View Source
const (
V4_1_0 = 0
)
Variables ¶
View Source
var (
ErrServiceState = errors.New("service state is not running, please check")
)
View Source
var (
ErrTopicNotExist = errors.New("topic not exist")
)
Functions ¶
func FindBrokerAddrByName ¶
func FindBrokerAddrByTopic ¶
func GetRetryTopic ¶
func ValidateGroup ¶
func ValidateGroup(group string)
Types ¶
type BrokerData ¶
type BrokerData struct { Cluster string `json:"cluster"` BrokerName string `json:"brokerName"` BrokerAddresses map[int64]string `json:"brokerAddrs"` // contains filtered or unexported fields }
BrokerData BrokerData
func (*BrokerData) Equals ¶
func (b *BrokerData) Equals(bd *BrokerData) bool
type ClientOption ¶
type ClientOption struct { NameServerAddr string ClientIP string InstanceName string UnitMode bool UnitName string VIPChannelEnabled bool UseTLS bool }
func (*ClientOption) ChangeInstanceNameToPID ¶
func (opt *ClientOption) ChangeInstanceNameToPID()
func (*ClientOption) String ¶
func (opt *ClientOption) String() string
type FindBrokerResult ¶
func FindBrokerAddressInSubscribe ¶
func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult
type GetConsumerList ¶
type GetConsumerList struct {
ConsumerGroup string `json:"consumerGroup"`
}
func (*GetConsumerList) Encode ¶
func (request *GetConsumerList) Encode() map[string]string
type GetMaxOffsetRequest ¶
func (*GetMaxOffsetRequest) Encode ¶
func (request *GetMaxOffsetRequest) Encode() map[string]string
type GetRouteInfoRequest ¶
type GetRouteInfoRequest struct {
Topic string `json:"topic"`
}
func (*GetRouteInfoRequest) Decode ¶
func (request *GetRouteInfoRequest) Decode(properties map[string]string) error
func (*GetRouteInfoRequest) Encode ¶
func (request *GetRouteInfoRequest) Encode() map[string]string
type InnerConsumer ¶
type InnerConsumer interface { PersistConsumerOffset() UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue) IsSubscribeTopicNeedUpdate(topic string) bool SubscriptionDataList() []*SubscriptionData Rebalance() IsUnitMode() bool }
type InnerProducer ¶
type Message ¶
type Message struct { Topic string Body []byte Flag int32 Properties map[string]string TransactionId string Batch bool }
func NewMessage ¶
func (*Message) PutProperty ¶
func (*Message) RemoveProperty ¶
type MessageExt ¶
type MessageExt struct { Message MsgId string QueueId int32 StoreSize int32 QueueOffset int64 SysFlag int32 BornTimestamp int64 BornHost string StoreTimestamp int64 StoreHost string CommitLogOffset int64 BodyCRC int32 ReconsumeTimes int32 PreparedTransactionOffset int64 }
func (*MessageExt) GetTags ¶
func (msgExt *MessageExt) GetTags() string
func (*MessageExt) String ¶
func (msgExt *MessageExt) String() string
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int `json:"queueId"` }
MessageQueue message queue
func FetchSubscribeMessageQueues ¶
func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error)
func (*MessageQueue) Equals ¶
func (mq *MessageQueue) Equals(queue *MessageQueue) bool
func (*MessageQueue) HashCode ¶
func (mq *MessageQueue) HashCode() int
func (*MessageQueue) String ¶
func (mq *MessageQueue) String() string
type PullMessageRequest ¶
type PullMessageRequest struct { ConsumerGroup string `json:"consumerGroup"` Topic string `json:"topic"` QueueId int32 `json:"queueId"` QueueOffset int64 `json:"queueOffset"` MaxMsgNums int32 `json:"maxMsgNums"` SysFlag int32 `json:"sysFlag"` CommitOffset int64 `json:"commitOffset"` SuspendTimeoutMillis time.Duration `json:"suspendTimeoutMillis"` SubExpression string `json:"subscription"` SubVersion int64 `json:"subVersion"` ExpressionType string `json:"expressionType"` }
func (*PullMessageRequest) Encode ¶
func (request *PullMessageRequest) Encode() map[string]string
type PullMessageResponse ¶
type PullResult ¶
type PullResult struct { NextBeginOffset int64 MinOffset int64 MaxOffset int64 Status PullStatus SuggestWhichBrokerId int64 // contains filtered or unexported fields }
PullResult the pull result
func (*PullResult) GetMessageExts ¶
func (result *PullResult) GetMessageExts() []*MessageExt
func (*PullResult) GetMessages ¶
func (result *PullResult) GetMessages() []*Message
func (*PullResult) SetMessageExts ¶
func (result *PullResult) SetMessageExts(msgExts []*MessageExt)
func (*PullResult) String ¶
func (result *PullResult) String() string
type PullStatus ¶
type PullStatus int
PullStatus pull status
const ( PullFound PullStatus = iota PullNoNewMsg PullNoMsgMatched PullOffsetIllegal PullBrokerTimeout )
predefined pull status
type QueryConsumerOffsetRequest ¶
type QueryConsumerOffsetRequest struct { ConsumerGroup string `json:"consumerGroup"` Topic string `json:"topic"` QueueId int `json:"queueId"` }
func (*QueryConsumerOffsetRequest) Encode ¶
func (request *QueryConsumerOffsetRequest) Encode() map[string]string
type QueueData ¶
type QueueData struct { BrokerName string `json:"brokerName"` ReadQueueNums int `json:"readQueueNums"` WriteQueueNums int `json:"writeQueueNums"` Perm int `json:"perm"` TopicSynFlag int `json:"topicSynFlag"` }
QueueData QueueData
type RMQClient ¶
type RMQClient struct {
// contains filtered or unexported fields
}
func GetOrNewRocketMQClient ¶
func GetOrNewRocketMQClient(option ClientOption) *RMQClient
func (*RMQClient) CheckClientInBroker ¶
func (c *RMQClient) CheckClientInBroker()
func (*RMQClient) InvokeOneWay ¶
func (*RMQClient) InvokeSync ¶
func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error)
func (*RMQClient) ProcessSendResponse ¶
func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult
func (*RMQClient) PullMessage ¶
func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error)
PullMessage with sync
func (*RMQClient) PullMessageAsync ¶
func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error
PullMessageAsync pull message async
func (*RMQClient) RebalanceImmediately ¶
func (c *RMQClient) RebalanceImmediately()
func (*RMQClient) RegisterConsumer ¶
func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error
func (*RMQClient) RegisterProducer ¶
func (c *RMQClient) RegisterProducer(group string, producer InnerProducer)
func (*RMQClient) SelectConsumer ¶
func (c *RMQClient) SelectConsumer(group string) InnerConsumer
func (*RMQClient) SelectProducer ¶
func (c *RMQClient) SelectProducer(group string) InnerProducer
func (*RMQClient) SendHeartbeatToAllBrokerWithLock ¶
func (c *RMQClient) SendHeartbeatToAllBrokerWithLock()
TODO
func (*RMQClient) SendMessageAsync ¶
func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, msgs []*Message, f func(result *SendResult)) error
SendMessageAsync send message with batch by async
func (*RMQClient) SendMessageOneWay ¶
func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, msgs []*Message) (*SendResult, error)
func (*RMQClient) UnregisterConsumer ¶
func (*RMQClient) UnregisterProducer ¶
func (*RMQClient) UpdatePublishInfo ¶
func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData)
func (*RMQClient) UpdateSubscribeInfo ¶
func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData)
func (*RMQClient) UpdateTopicRouteInfo ¶
func (c *RMQClient) UpdateTopicRouteInfo()
type SearchOffsetRequest ¶
type SearchOffsetRequest struct { Topic string `json:"topic"` QueueId int `json:"queueId"` Timestamp int64 `json:"timestamp"` }
func (*SearchOffsetRequest) Encode ¶
func (request *SearchOffsetRequest) Encode() map[string]string
type SendMessageRequest ¶
type SendMessageRequest struct { ProducerGroup string `json:"producerGroup"` Topic string `json:"topic"` QueueId int `json:"queueId"` SysFlag int `json:"sysFlag"` BornTimestamp int64 `json:"bornTimestamp"` Flag int32 `json:"flag"` Properties string `json:"properties"` ReconsumeTimes int `json:"reconsumeTimes"` UnitMode bool `json:"unitMode"` MaxReconsumeTimes int `json:"maxReconsumeTimes"` Batch bool }
func (*SendMessageRequest) Decode ¶
func (request *SendMessageRequest) Decode(properties map[string]string) error
func (*SendMessageRequest) Encode ¶
func (request *SendMessageRequest) Encode() map[string]string
type SendMessageResponse ¶
type SendMessageResponse struct { MsgId string QueueId int32 QueueOffset int64 TransactionId string MsgRegion string }
func (*SendMessageResponse) Decode ¶
func (response *SendMessageResponse) Decode(properties map[string]string)
type SendResult ¶
type SendResult struct { Status SendStatus MsgID string MessageQueue *MessageQueue QueueOffset int64 TransactionID string OffsetMsgID string RegionID string TraceOn bool }
SendResult RocketMQ send result
func (*SendResult) String ¶
func (result *SendResult) String() string
SendResult send message result to string(detail result)
type SendStatus ¶
type SendStatus int
SendStatus of message
const ( SendOK SendStatus = iota SendFlushDiskTimeout SendFlushSlaveTimeout SendSlaveNotAvailable FlagCompressed = 0x1 MsgIdLength = 8 + 8 )
type ServiceState ¶
type ServiceState int
const ( StateCreateJust ServiceState = iota StateStartFailed StateRunning StateShutdown )
type SubscriptionData ¶
type TopicPublishInfo ¶
type TopicPublishInfo struct { OrderTopic bool HaveTopicRouterInfo bool MqList []*MessageQueue RouteData *TopicRouteData TopicQueueIndex int32 }
key is topic, value is TopicPublishInfo
type TopicRouteData ¶
type TopicRouteData struct { OrderTopicConf string QueueDataList []*QueueData `json:"queueDatas"` BrokerDataList []*BrokerData `json:"brokerDatas"` }
TopicRouteData TopicRouteData
func UpdateTopicRouteInfo ¶
func UpdateTopicRouteInfo(topic string) *TopicRouteData
func (*TopicRouteData) String ¶
func (routeData *TopicRouteData) String() string
type TransactionListener ¶
type TransactionListener interface { }
type UpdateConsumerOffsetRequest ¶
type UpdateConsumerOffsetRequest struct { ConsumerGroup string `json:"consumerGroup"` Topic string `json:"topic"` QueueId int `json:"queueId"` CommitOffset int64 `json:"commitOffset"` }
func (*UpdateConsumerOffsetRequest) Encode ¶
func (request *UpdateConsumerOffsetRequest) Encode() map[string]string
Click to show internal directories.
Click to hide internal directories.