kernel

package
v0.0.0-...-dbcf9a4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2020 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RetryGroupTopicPrefix = "%RETRY%"
	DefaultConsumerGroup  = "DEFAULT_CONSUMER"
)
View Source
const (
	PropertyKeySeparator                   = " "
	PropertyKeys                           = "KEYS"
	PropertyTags                           = "TAGS"
	PropertyWaitStoreMsgOk                 = "WAIT"
	PropertyDelayTimeLevel                 = "DELAY"
	PropertyRetryTopic                     = "RETRY_TOPIC"
	PropertyRealTopic                      = "REAL_TOPIC"
	PropertyRealQueueId                    = "REAL_QID"
	PropertyTransactionPrepared            = "TRAN_MSG"
	PropertyProducerGroup                  = "PGROUP"
	PropertyMinOffset                      = "MIN_OFFSET"
	PropertyMaxOffset                      = "MAX_OFFSET"
	PropertyBuyerId                        = "BUYER_ID"
	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag                   = "TRANSFER_FLAG"
	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
	PropertyMQ2Flag                        = "MQ2_FLAG"
	PropertyReconsumeTime                  = "RECONSUME_TIME"
	PropertyMsgRegion                      = "MSG_REGION"
	PropertyTraceSwitch                    = "TRACE_ON"
	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTime               = "CONSUME_START_TIME"
	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
)
View Source
const (
	ReqSendMessage              = int16(10)
	ReqPullMessage              = int16(11)
	ReqQueryConsumerOffset      = int16(14)
	ReqUpdateConsumerOffset     = int16(15)
	ReqSearchOffsetByTimestamp  = int16(30)
	ReqGetMaxOffset             = int16(30)
	ReqHeartBeat                = int16(34)
	ReqGetConsumerListByGroup   = int16(38)
	ReqLockBatchMQ              = int16(41)
	ReqUnlockBatchMQ            = int16(42)
	ReqGetRouteInfoByTopic      = int16(105)
	ReqSendBatchMessage         = int16(320)
	ReqCheckTransactionState    = int16(39)
	ReqNotifyConsumerIdsChanged = int16(40)
	ReqResetConsuemrOffset      = int16(220)
	ReqGetConsumerRunningInfo   = int16(307)
	ReqConsumeMessageDirectly   = int16(309)
)
View Source
const (
	ResSuccess              = int16(0)
	ResFlushDiskTimeout     = int16(10)
	ResSlaveNotAvailable    = int16(11)
	ResFlushSlaveTimeout    = int16(12)
	ResTopicNotExist        = int16(17)
	ResPullNotFound         = int16(19)
	ResPullRetryImmediately = int16(20)
	ResPullOffsetMoved      = int16(21)
)
View Source
const (
	EnvNameServerAddr = "NAMESRV_ADDR"

	MasterId = int64(0)
)
View Source
const (
	V4_1_0 = 0
)

Variables

View Source
var (
	ErrServiceState = errors.New("service state is not running, please check")
)
View Source
var (
	ErrTopicNotExist = errors.New("topic not exist")
)

Functions

func FindBrokerAddrByName

func FindBrokerAddrByName(brokerName string) string

func FindBrokerAddrByTopic

func FindBrokerAddrByTopic(topic string) string

func GetRetryTopic

func GetRetryTopic(group string) string

func ValidateGroup

func ValidateGroup(group string)

Types

type BrokerData

type BrokerData struct {
	Cluster         string           `json:"cluster"`
	BrokerName      string           `json:"brokerName"`
	BrokerAddresses map[int64]string `json:"brokerAddrs"`
	// contains filtered or unexported fields
}

BrokerData BrokerData

func (*BrokerData) Equals

func (b *BrokerData) Equals(bd *BrokerData) bool

type ClientOption

type ClientOption struct {
	NameServerAddr    string
	ClientIP          string
	InstanceName      string
	UnitMode          bool
	UnitName          string
	VIPChannelEnabled bool
	UseTLS            bool
}

func (*ClientOption) ChangeInstanceNameToPID

func (opt *ClientOption) ChangeInstanceNameToPID()

func (*ClientOption) String

func (opt *ClientOption) String() string

type FindBrokerResult

type FindBrokerResult struct {
	BrokerAddr    string
	Slave         bool
	BrokerVersion int32
}

func FindBrokerAddressInSubscribe

func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult

type GetConsumerList

type GetConsumerList struct {
	ConsumerGroup string `json:"consumerGroup"`
}

func (*GetConsumerList) Encode

func (request *GetConsumerList) Encode() map[string]string

type GetMaxOffsetRequest

type GetMaxOffsetRequest struct {
	Topic   string `json:"topic"`
	QueueId int    `json:"queueId"`
}

func (*GetMaxOffsetRequest) Encode

func (request *GetMaxOffsetRequest) Encode() map[string]string

type GetRouteInfoRequest

type GetRouteInfoRequest struct {
	Topic string `json:"topic"`
}

func (*GetRouteInfoRequest) Decode

func (request *GetRouteInfoRequest) Decode(properties map[string]string) error

func (*GetRouteInfoRequest) Encode

func (request *GetRouteInfoRequest) Encode() map[string]string

type InnerConsumer

type InnerConsumer interface {
	PersistConsumerOffset()
	UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
	IsSubscribeTopicNeedUpdate(topic string) bool
	SubscriptionDataList() []*SubscriptionData
	Rebalance()
	IsUnitMode() bool
}

type InnerProducer

type InnerProducer interface {
	PublishTopicList() []string
	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
	IsPublishTopicNeedUpdate(topic string) bool
	//GetTransactionListener() TransactionListener
	IsUnitMode() bool
}

type Message

type Message struct {
	Topic         string
	Body          []byte
	Flag          int32
	Properties    map[string]string
	TransactionId string
	Batch         bool
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

func (*Message) PutProperty

func (msg *Message) PutProperty(key, value string)

func (*Message) RemoveProperty

func (msg *Message) RemoveProperty(key string) string

func (*Message) String

func (msg *Message) String() string

type MessageExt

type MessageExt struct {
	Message
	MsgId                     string
	QueueId                   int32
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  string
	StoreTimestamp            int64
	StoreHost                 string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

func (*MessageExt) GetTags

func (msgExt *MessageExt) GetTags() string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

MessageQueue message queue

func FetchSubscribeMessageQueues

func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error)

func (*MessageQueue) Equals

func (mq *MessageQueue) Equals(queue *MessageQueue) bool

func (*MessageQueue) HashCode

func (mq *MessageQueue) HashCode() int

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type PullMessageRequest

type PullMessageRequest struct {
	ConsumerGroup        string        `json:"consumerGroup"`
	Topic                string        `json:"topic"`
	QueueId              int32         `json:"queueId"`
	QueueOffset          int64         `json:"queueOffset"`
	MaxMsgNums           int32         `json:"maxMsgNums"`
	SysFlag              int32         `json:"sysFlag"`
	CommitOffset         int64         `json:"commitOffset"`
	SuspendTimeoutMillis time.Duration `json:"suspendTimeoutMillis"`
	SubExpression        string        `json:"subscription"`
	SubVersion           int64         `json:"subVersion"`
	ExpressionType       string        `json:"expressionType"`
}

func (*PullMessageRequest) Encode

func (request *PullMessageRequest) Encode() map[string]string

type PullMessageResponse

type PullMessageResponse struct {
	SuggestWhichBrokerId int64
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
}

type PullResult

type PullResult struct {
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
	Status               PullStatus
	SuggestWhichBrokerId int64
	// contains filtered or unexported fields
}

PullResult the pull result

func (*PullResult) GetMessageExts

func (result *PullResult) GetMessageExts() []*MessageExt

func (*PullResult) GetMessages

func (result *PullResult) GetMessages() []*Message

func (*PullResult) SetMessageExts

func (result *PullResult) SetMessageExts(msgExts []*MessageExt)

func (*PullResult) String

func (result *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull status

const (
	PullFound PullStatus = iota
	PullNoNewMsg
	PullNoMsgMatched
	PullOffsetIllegal
	PullBrokerTimeout
)

predefined pull status

type QueryConsumerOffsetRequest

type QueryConsumerOffsetRequest struct {
	ConsumerGroup string `json:"consumerGroup"`
	Topic         string `json:"topic"`
	QueueId       int    `json:"queueId"`
}

func (*QueryConsumerOffsetRequest) Encode

func (request *QueryConsumerOffsetRequest) Encode() map[string]string

type QueueData

type QueueData struct {
	BrokerName     string `json:"brokerName"`
	ReadQueueNums  int    `json:"readQueueNums"`
	WriteQueueNums int    `json:"writeQueueNums"`
	Perm           int    `json:"perm"`
	TopicSynFlag   int    `json:"topicSynFlag"`
}

QueueData QueueData

func (*QueueData) Equals

func (q *QueueData) Equals(qd *QueueData) bool

type RMQClient

type RMQClient struct {
	// contains filtered or unexported fields
}

func GetOrNewRocketMQClient

func GetOrNewRocketMQClient(option ClientOption) *RMQClient

func (*RMQClient) CheckClientInBroker

func (c *RMQClient) CheckClientInBroker()

func (*RMQClient) ClientID

func (c *RMQClient) ClientID() string

func (*RMQClient) InvokeOneWay

func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
	timeoutMillis time.Duration) error

func (*RMQClient) InvokeSync

func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
	timeoutMillis time.Duration) (*remote.RemotingCommand, error)

func (*RMQClient) ProcessSendResponse

func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult

func (*RMQClient) PullMessage

func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error)

PullMessage with sync

func (*RMQClient) PullMessageAsync

func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error

PullMessageAsync pull message async

func (*RMQClient) RebalanceImmediately

func (c *RMQClient) RebalanceImmediately()

func (*RMQClient) RegisterConsumer

func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error

func (*RMQClient) RegisterProducer

func (c *RMQClient) RegisterProducer(group string, producer InnerProducer)

func (*RMQClient) SelectConsumer

func (c *RMQClient) SelectConsumer(group string) InnerConsumer

func (*RMQClient) SelectProducer

func (c *RMQClient) SelectProducer(group string) InnerProducer

func (*RMQClient) SendHeartbeatToAllBrokerWithLock

func (c *RMQClient) SendHeartbeatToAllBrokerWithLock()

TODO

func (*RMQClient) SendMessageAsync

func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
	msgs []*Message, f func(result *SendResult)) error

SendMessageAsync send message with batch by async

func (*RMQClient) SendMessageOneWay

func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
	msgs []*Message) (*SendResult, error)

func (*RMQClient) Shutdown

func (c *RMQClient) Shutdown()

func (*RMQClient) Start

func (c *RMQClient) Start()

func (*RMQClient) UnregisterConsumer

func (c *RMQClient) UnregisterConsumer(group string)

func (*RMQClient) UnregisterProducer

func (c *RMQClient) UnregisterProducer(group string)

func (*RMQClient) UpdatePublishInfo

func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData)

func (*RMQClient) UpdateSubscribeInfo

func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData)

func (*RMQClient) UpdateTopicRouteInfo

func (c *RMQClient) UpdateTopicRouteInfo()

type SearchOffsetRequest

type SearchOffsetRequest struct {
	Topic     string `json:"topic"`
	QueueId   int    `json:"queueId"`
	Timestamp int64  `json:"timestamp"`
}

func (*SearchOffsetRequest) Encode

func (request *SearchOffsetRequest) Encode() map[string]string

type SendMessageRequest

type SendMessageRequest struct {
	ProducerGroup     string `json:"producerGroup"`
	Topic             string `json:"topic"`
	QueueId           int    `json:"queueId"`
	SysFlag           int    `json:"sysFlag"`
	BornTimestamp     int64  `json:"bornTimestamp"`
	Flag              int32  `json:"flag"`
	Properties        string `json:"properties"`
	ReconsumeTimes    int    `json:"reconsumeTimes"`
	UnitMode          bool   `json:"unitMode"`
	MaxReconsumeTimes int    `json:"maxReconsumeTimes"`
	Batch             bool
}

func (*SendMessageRequest) Decode

func (request *SendMessageRequest) Decode(properties map[string]string) error

func (*SendMessageRequest) Encode

func (request *SendMessageRequest) Encode() map[string]string

type SendMessageResponse

type SendMessageResponse struct {
	MsgId         string
	QueueId       int32
	QueueOffset   int64
	TransactionId string
	MsgRegion     string
}

func (*SendMessageResponse) Decode

func (response *SendMessageResponse) Decode(properties map[string]string)

type SendResult

type SendResult struct {
	Status        SendStatus
	MsgID         string
	MessageQueue  *MessageQueue
	QueueOffset   int64
	TransactionID string
	OffsetMsgID   string
	RegionID      string
	TraceOn       bool
}

SendResult RocketMQ send result

func (*SendResult) String

func (result *SendResult) String() string

SendResult send message result to string(detail result)

type SendStatus

type SendStatus int

SendStatus of message

const (
	SendOK SendStatus = iota
	SendFlushDiskTimeout
	SendFlushSlaveTimeout
	SendSlaveNotAvailable

	FlagCompressed = 0x1
	MsgIdLength    = 8 + 8
)

type ServiceState

type ServiceState int
const (
	StateCreateJust ServiceState = iota
	StateStartFailed
	StateRunning
	StateShutdown
)

type SubscriptionData

type SubscriptionData struct {
	ClassFilterMode bool
	Topic           string
	SubString       string
	Tags            map[string]bool
	Codes           map[int32]bool
	SubVersion      int64
	ExpType         string
}

type TopicPublishInfo

type TopicPublishInfo struct {
	OrderTopic          bool
	HaveTopicRouterInfo bool
	MqList              []*MessageQueue
	RouteData           *TopicRouteData
	TopicQueueIndex     int32
}

key is topic, value is TopicPublishInfo

type TopicRouteData

type TopicRouteData struct {
	OrderTopicConf string
	QueueDataList  []*QueueData  `json:"queueDatas"`
	BrokerDataList []*BrokerData `json:"brokerDatas"`
}

TopicRouteData TopicRouteData

func UpdateTopicRouteInfo

func UpdateTopicRouteInfo(topic string) *TopicRouteData

func (*TopicRouteData) String

func (routeData *TopicRouteData) String() string

type TransactionListener

type TransactionListener interface {
}

type UpdateConsumerOffsetRequest

type UpdateConsumerOffsetRequest struct {
	ConsumerGroup string `json:"consumerGroup"`
	Topic         string `json:"topic"`
	QueueId       int    `json:"queueId"`
	CommitOffset  int64  `json:"commitOffset"`
}

func (*UpdateConsumerOffsetRequest) Encode

func (request *UpdateConsumerOffsetRequest) Encode() map[string]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL