rocketmq

package module
v0.0.0-...-f74f85a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2017 License: Apache-2.0 Imports: 20 Imported by: 0

README

rocketmq-go

rocketmq client for golang

Documentation

Index

Constants

View Source
const (
	RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"
	MASTER_ID                = 0
)
View Source
const (
	BrokerSuspendMaxTimeMillis       = 1000 * 15
	FLAG_COMMIT_OFFSET         int32 = 0x1 << 0
	FLAG_SUSPEND               int32 = 0x1 << 1
	FLAG_SUBSCRIPTION          int32 = 0x1 << 2
	FLAG_CLASS_FILTER          int32 = 0x1 << 3
)
View Source
const (
	NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"
	NAMESPACE_PROJECT_CONFIG     = "PROJECT_CONFIG"
)
View Source
const (
	RPC_TYPE      int = 0
	RPC_ONEWAYint     = 1
	LANGUAGE          = "JAVA"
)
View Source
const (
	// Broker 发送消息
	SEND_MESSAGE = 10
	// Broker 订阅消息
	PULL_MESSAGE = 11
	// Broker 查询消息
	QUERY_MESSAGE = 12
	// Broker 查询Broker Offset
	QUERY_BROKER_OFFSET = 13
	// Broker 查询Consumer Offset
	QUERY_CONSUMER_OFFSET = 14
	// Broker 更新Consumer Offset
	UPDATE_CONSUMER_OFFSET = 15
	// Broker 更新或者增加一个Topic
	UPDATE_AND_CREATE_TOPIC = 17
	// Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置)
	GET_ALL_TOPIC_CONFIG = 21
	// Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置)
	GET_TOPIC_CONFIG_LIST = 22
	// Broker 获取所有Topic名称列表
	GET_TOPIC_NAME_LIST = 23
	// Broker 更新Broker上的配置
	UPDATE_BROKER_CONFIG = 25
	// Broker 获取Broker上的配置
	GET_BROKER_CONFIG = 26
	// Broker 触发Broker删除文件
	TRIGGER_DELETE_FILES = 27
	// Broker 获取Broker运行时信息
	GET_BROKER_RUNTIME_INFO = 28
	// Broker 根据时间查询队列的Offset
	SEARCH_OFFSET_BY_TIMESTAMP = 29
	// Broker 查询队列最大Offset
	GET_MAX_OFFSET = 30
	// Broker 查询队列最小Offset
	GET_MIN_OFFSET = 31
	// Broker 查询队列最早消息对应时间
	GET_EARLIEST_MSG_STORETIME = 32
	// Broker 根据消息ID来查询消息
	VIEW_MESSAGE_BY_ID = 33
	// Broker Client向Client发送心跳,并注册自身
	HEART_BEAT = 34
	// Broker Client注销
	UNREGISTER_CLIENT = 35
	// Broker Consumer将处理不了的消息发回服务器
	CONSUMER_SEND_MSG_BACK = 36
	// Broker Commit或者Rollback事务
	END_TRANSACTION = 37
	// Broker 获取ConsumerId列表通过GroupName
	GET_CONSUMER_LIST_BY_GROUP = 38
	// Broker 主动向Producer回查事务状态
	CHECK_TRANSACTION_STATE = 39
	// Broker Broker通知Consumer列表变化
	NOTIFY_CONSUMER_IDS_CHANGED = 40
	// Broker Consumer向Master锁定队列
	LOCK_BATCH_MQ = 41
	// Broker Consumer向Master解锁队列
	UNLOCK_BATCH_MQ = 42
	// Broker 获取所有Consumer Offset
	GET_ALL_CONSUMER_OFFSET = 43
	// Broker 获取所有定时进度
	GET_ALL_DELAY_OFFSET = 45
	// Namesrv 向Namesrv追加KV配置
	PUT_KV_CONFIG = 100
	// Namesrv 从Namesrv获取KV配置
	GET_KV_CONFIG = 101
	// Namesrv 从Namesrv获取KV配置
	DELETE_KV_CONFIG = 102
	// Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
	REGISTER_BROKER = 103
	// Namesrv 卸载一个Broker,数据都是持久化的
	UNREGISTER_BROKER = 104
	// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
	GET_ROUTEINTO_BY_TOPIC = 105
	// Namesrv 获取注册到Name Server的所有Broker集群信息
	GET_BROKER_CLUSTER_INFO             = 106
	UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
	GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
	GET_TOPIC_STATS_INFO                = 202
	GET_CONSUMER_CONNECTION_LIST        = 203
	GET_PRODUCER_CONNECTION_LIST        = 204
	WIPE_WRITE_PERM_OF_BROKER           = 205

	// 从Name Server获取完整Topic列表
	GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
	// 从Broker删除订阅组
	DELETE_SUBSCRIPTIONGROUP = 207
	// 从Broker获取消费状态(进度)
	GET_CONSUME_STATS = 208
	// Suspend Consumer消费过程
	SUSPEND_CONSUMER = 209
	// Resume Consumer消费过程
	RESUME_CONSUMER = 210
	// 重置Consumer Offset
	RESET_CONSUMER_OFFSET_IN_CONSUMER = 211
	// 重置Consumer Offset
	RESET_CONSUMER_OFFSET_IN_BROKER = 212
	// 调整Consumer线程池数量
	ADJUST_CONSUMER_THREAD_POOL = 213
	// 查询消息被哪些消费组消费
	WHO_CONSUME_THE_MESSAGE = 214

	// 从Broker删除Topic配置
	DELETE_TOPIC_IN_BROKER = 215
	// 从Namesrv删除Topic配置
	DELETE_TOPIC_IN_NAMESRV = 216
	// Namesrv 通过 project 获取所有的 server ip 信息
	GET_KV_CONFIG_BY_VALUE = 217
	// Namesrv 删除指定 project group 下的所有 server ip 信息
	DELETE_KV_CONFIG_BY_VALUE = 218
	// 通过NameSpace获取所有的KV List
	GET_KVLIST_BY_NAMESPACE = 219

	// offset 重置
	RESET_CONSUMER_CLIENT_OFFSET = 220
	// 客户端订阅消息
	GET_CONSUMER_STATUS_FROM_CLIENT = 221
	// 通知 broker 调用 offset 重置处理
	INVOKE_BROKER_TO_RESET_OFFSET = 222
	// 通知 broker 调用客户端订阅消息处理
	INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223

	// Broker 查询topic被谁消费
	// 2014-03-21 Add By shijia
	QUERY_TOPIC_CONSUME_BY_WHO = 300

	// 获取指定集群下的所有 topic
	// 2014-03-26
	GET_TOPICS_BY_CLUSTER = 224

	// 向Broker注册Filter Server
	// 2014-04-06 Add By shijia
	REGISTER_FILTER_SERVER = 301
	// 向Filter Server注册Class
	// 2014-04-06 Add By shijia
	REGISTER_MESSAGE_FILTER_CLASS = 302
	// 根据 topic 和 group 获取消息的时间跨度
	QUERY_CONSUME_TIME_SPAN = 303
	// 获取所有系统内置 Topic 列表
	GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
	GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305

	// 清理失效队列
	CLEAN_EXPIRED_CONSUMEQUEUE = 306

	// 通过Broker查询Consumer内存数据
	// 2014-07-19 Add By shijia
	GET_CONSUMER_RUNNING_INFO = 307

	// 查找被修正 offset (转发组件)
	QUERY_CORRECTION_OFFSET = 308

	// 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方
	// 2014-08-11 Add By shijia
	CONSUME_MESSAGE_DIRECTLY = 309

	// Broker 发送消息,优化网络数据包
	SEND_MESSAGE_V2 = 310

	// 单元化相关 topic
	GET_UNIT_TOPIC_LIST = 311
	// 获取含有单元化订阅组的 Topic 列表
	GET_HAS_UNIT_SUB_TOPIC_LIST = 312
	// 获取含有单元化订阅组的非单元化 Topic 列表
	GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
	// 克隆某一个组的消费进度到新的组
	CLONE_GROUP_OFFSET = 314

	// 查看Broker上的各种统计信息
	VIEW_BROKER_STATS_DATA = 315
)
View Source
const (
	// 成功
	SUCCESS = 0
	// 发生了未捕获异常
	SYSTEM_ERROR = 1
	// 由于线程池拥堵,系统繁忙
	SYSTEM_BUSY = 2
	// 请求代码不支持
	REQUEST_CODE_NOT_SUPPORTED = 3
	//事务失败,添加db失败
	TRANSACTION_FAILED = 4
	// Broker 刷盘超时
	FLUSH_DISK_TIMEOUT = 10
	// Broker 同步双写,Slave不可用
	SLAVE_NOT_AVAILABLE = 11
	// Broker 同步双写,等待Slave应答超时
	FLUSH_SLAVE_TIMEOUT = 12
	// Broker 消息非法
	MESSAGE_ILLEGAL = 13
	// Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题
	SERVICE_NOT_AVAILABLE = 14
	// Broker, Namesrv 版本号不支持
	VERSION_NOT_SUPPORTED = 15
	// Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作
	NO_PERMISSION = 16
	// Broker, Topic不存在
	TOPIC_NOT_EXIST = 17
	// Broker, Topic已经存在,创建Topic
	TOPIC_EXIST_ALREADY = 18
	// Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息)
	PULL_NOT_FOUND = 19
	// Broker 可能被过滤,或者误通知等
	PULL_RETRY_IMMEDIATELY = 20
	// Broker 拉消息请求的Offset不合法,太小或太大
	PULL_OFFSET_MOVED = 21
	// Broker 查询消息未找到
	QUERY_NOT_FOUND = 22
	// Broker 订阅关系解析失败
	SUBSCRIPTION_PARSE_FAILED = 23
	// Broker 订阅关系不存在
	SUBSCRIPTION_NOT_EXIST = 24
	// Broker 订阅关系不是最新的
	SUBSCRIPTION_NOT_LATEST = 25
	// Broker 订阅组不存在
	SUBSCRIPTION_GROUP_NOT_EXIST = 26
	// Producer 事务应该被提交
	TRANSACTION_SHOULD_COMMIT = 200
	// Producer 事务应该被回滚
	TRANSACTION_SHOULD_ROLLBACK = 201
	// Producer 事务状态未知
	TRANSACTION_STATE_UNKNOW = 202
	// Producer ProducerGroup错误
	TRANSACTION_STATE_GROUP_WRONG = 203
	// 单元化消息,需要设置 buyerId
	NO_BUYER_ID = 204

	// 单元化消息,非本单元消息
	NOT_IN_CURRENT_UNIT = 205

	// Consumer不在线
	CONSUMER_NOT_ONLINE = 206

	// Consumer消费消息超时
	CONSUME_MSG_TIMEOUT = 207

	// 消息ID定长 Add: tianyuliang Since: 2017/4/18
	MSG_ID_LENGTH = 8 + 8

	// 属性key与value,在byte[]的分隔符
	NAME_VALUE_SEPARATOR = 0x1

	// 消息属性在byte[]的分隔符
	PROPERTY_SEPARATOR = 0x2

	// 消息key
	PROPERTY_KEYS = "KEYS"

	// 消息TAG
	PROPERTY_TAGS = "TAGS"
)
View Source
const (
	MEMORY_FIRST_THEN_STORE = 0
	READ_FROM_MEMORY        = 1
	READ_FROM_STORE         = 2
)
View Source
const (
	CompressedFlag = (0x1 << 0)
)

Variables

View Source
var Action = struct {
	CommitMessage  int //消费成功
	ReconsumeLater int //消费失败(消息进入重试队列,过一会儿消费)
}{
	1,
	2,
}

Action 共享申请

View Source
var (
	ConfigVersion int = -1
)
View Source
var ConsumeTypes = struct {
	Actively  string //主动方式消费
	Passively string //被动方式消费
}{
	"CONSUME_ACTIVELY",
	"CONSUME_PASSIVELY",
}
View Source
var DEFAULT_IP = GetLocalIp4()
View Source
var TrackTypes = struct {
	SubscribedAndConsumed      int //订阅了,而且消费了(Offset越过了)
	SubscribedButFilterd       int //订阅了,但是被过滤掉了
	SubscribedButPull          int //订阅了,但是PULL,结果未知
	SubscribedAndNotConsumeYet int //订阅了,但是没有消费(Offset小)
	UnknowExeption             int //未知异常
}{
	0,
	1,
	2,
	3,
	4,
}

Functions

func Debug

func Debug(format string, args ...interface{})

Debug 打印debug级别日志

func Error

func Error(format string, args ...interface{})

Error 打印error级别日志

func Fatal

func Fatal(format string, args ...interface{})

Fatal 打印critial级别日志

func GetLocalIp4

func GetLocalIp4() (ip string)

func HashCode

func HashCode(s string) int32

HashCode return stirng hashcode(equal java)

func Info

func Info(format string, args ...interface{})

Info 打印info级别日志

func SetLog

func SetLog(log *logs.BeeLogger)

SetLog 日志函数

func Trace

func Trace(format string, args ...interface{})

Trace 打印trace级别日志

func UnixNano

func UnixNano() int64

UnixNano return current time unix

func Warn

func Warn(format string, args ...interface{})

Warn 打印warn级别日志

Types

type Admin

type Admin interface {
	SearchTopic() (*TopicData, error)
	SearchClusterInfo() (*ClusterInfo, error)
	CreateTopic(topicName, clusterName string, readQueueNum int, writeQueueNum int, order bool) (bool, error)
	QueryMessageById(msgId string) (*MessageExt, error)
	MessageTrackDetail(msg *MessageExt) ([]*MessageTrack, error)
	QueryConsumerGroupByTopic(topic string) ([]string, error)
	QueryConsumerProgress(consumeGroupId string) (*ConsumerProgress, error)
	QueryConsumerConnection(consumeGroupId string) (*ConsumerConnection, error)
	FetchBrokerRuntimeStats(brokerAddr string) (*BrokerRuntimeInfo, error)
	Close() error
}

func NewDefaultAdmin

func NewDefaultAdmin(conf *Config) (Admin, error)

type AllocateMessageQueueAveragely

type AllocateMessageQueueAveragely struct{}

type AllocateMessageQueueStrategy

type AllocateMessageQueueStrategy interface {
	// contains filtered or unexported methods
}

type BrokerData

type BrokerData struct {
	BrokerName      string           `json:"brokerName"`
	BrokerAddrs     map[int64]string `json:"brokerAddrs"`
	BrokerAddrsLock sync.RWMutex     `json:"-"`
}

func (*BrokerData) MarshalJSON

func (mj *BrokerData) MarshalJSON() ([]byte, error)

func (*BrokerData) MarshalJSONBuf

func (mj *BrokerData) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*BrokerData) UnmarshalJSON

func (uj *BrokerData) UnmarshalJSON(input []byte) error

func (*BrokerData) UnmarshalJSONFFLexer

func (uj *BrokerData) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type BrokerRuntimeInfo

type BrokerRuntimeInfo struct {
	BrokerVersionDesc           string  `json:"brokerVersionDesc"`
	BrokerVersion               string  `json:"brokerVersion"`
	MsgPutTotalYesterdayMorning string  `json:"msgPutTotalYesterdayMorning"`
	MsgPutTotalTodayMorning     string  `json:"msgPutTotalTodayMorning"`
	MsgPutTotalTodayNow         string  `json:"msgPutTotalTodayNow"`
	MsgGetTotalYesterdayMorning string  `json:"msgGetTotalYesterdayMorning"`
	MsgGetTotalTodayNow         string  `json:"msgGetTotalTodayNow"`
	SendThreadPoolQueueSize     string  `json:"sendThreadPoolQueueSize"`
	SendThreadPoolQueueCapacity string  `json:"sendThreadPoolQueueCapacity"`
	MsgGetTotalTodayMorning     string  `json:"msgGetTotalTodayMorning"`
	InTps                       float64 `json:"inTps"`
	OutTps                      float64 `json:"outTps"`
}

type ClusterInfo

type ClusterInfo struct {
	BrokerAddrTable  map[string]BrokerData `json:"brokerAddrTable"`
	ClusterAddrTable map[string][]string   `json:"clusterAddrTable"`
}

ClusterInfo cluster info

func (*ClusterInfo) MarshalJSON

func (mj *ClusterInfo) MarshalJSON() ([]byte, error)

func (*ClusterInfo) MarshalJSONBuf

func (mj *ClusterInfo) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*ClusterInfo) UnmarshalJSON

func (uj *ClusterInfo) UnmarshalJSON(input []byte) error

func (*ClusterInfo) UnmarshalJSONFFLexer

func (uj *ClusterInfo) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type Config

type Config struct {
	Nameserver   string
	ClientIp     string
	InstanceName string
}

type Connection

type Connection struct {
	ClientId   string `json:"clientId"`
	ClientAddr string `json:"clientAddr"`
	Language   string `json:"language"`
	Version    int    `json:"version"`
}

type ConsumeStats

type ConsumeStats struct {
	OffsetTable map[MessageQueue]OffsetWrapper `json:"offsetTable"`
	ConsumeTps  int64                          `json:"consumeTps"`
}

func (*ConsumeStats) MarshalJSON

func (mj *ConsumeStats) MarshalJSON() ([]byte, error)

func (*ConsumeStats) MarshalJSONBuf

func (mj *ConsumeStats) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*ConsumeStats) UnmarshalJSON

func (uj *ConsumeStats) UnmarshalJSON(input []byte) error

func (*ConsumeStats) UnmarshalJSONFFLexer

func (uj *ConsumeStats) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type Consumer

type Consumer interface {
	//Admin
	Start() error
	Shutdown()
	RegisterMessageListener(listener MessageListener)
	Subscribe(topic string, subExpression string)
	UnSubcribe(topic string)
	SendMessageBack(msg MessageExt, delayLevel int) error
	SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
	// contains filtered or unexported methods
}

func NewDefaultConsumer

func NewDefaultConsumer(name string, conf *Config) (Consumer, error)

type ConsumerConnection

type ConsumerConnection struct {
	ConnectionSet     []*Connection                `json:"connectionSet"`
	SubscriptionTable map[string]*SubscriptionData `json:"subscriptionTable"`
	ConsumeType       string                       `json:"consumeType"`
	MessageModel      string                       `json:"messageModel"`
	ConsumeFromWhere  string                       `json:"consumeFromWhere"`
}

type ConsumerData

type ConsumerData struct {
	GroupName           string
	ConsumerType        string
	MessageModel        string
	ConsumeFromWhere    string
	SubscriptionDataSet []*SubscriptionData
	UnitMode            bool
}

type ConsumerGroup

type ConsumerGroup struct {
	BrokerOffset   int64 `json:"brokerOffset"`
	ConsumerOffset int64 `json:"consumerOffset"`
	Diff           int64 `json:"diff"`
	MessageQueue
}

type ConsumerIdSorter

type ConsumerIdSorter []string

func (ConsumerIdSorter) Len

func (self ConsumerIdSorter) Len() int

func (ConsumerIdSorter) Less

func (self ConsumerIdSorter) Less(i, j int) bool

func (ConsumerIdSorter) Swap

func (self ConsumerIdSorter) Swap(i, j int)

type ConsumerProgress

type ConsumerProgress struct {
	GroupId string           `json:"groupId"`
	Tps     int64            `json:"tps"`
	Diff    int64            `json:"diff"`
	List    []*ConsumerGroup `json:"list"`
}

type DefalutRemotingClient

type DefalutRemotingClient struct {
	// contains filtered or unexported fields
}

func (*DefalutRemotingClient) ScanResponseTable

func (self *DefalutRemotingClient) ScanResponseTable()

type DefaultAdmin

type DefaultAdmin struct {
	// contains filtered or unexported fields
}

func (*DefaultAdmin) Close

func (admin *DefaultAdmin) Close() error

func (*DefaultAdmin) CreateTopic

func (admin *DefaultAdmin) CreateTopic(topicName, clusterName string, readQueueNum int, writeQueueNum int, order bool) (bool, error)

CreateTopic 创建topic

func (*DefaultAdmin) FetchBrokerRuntimeStats

func (admin *DefaultAdmin) FetchBrokerRuntimeStats(brokerAddr string) (*BrokerRuntimeInfo, error)

FetchBrokerRuntimeStats 根据broker addr查询在线broker运行统计信息

func (*DefaultAdmin) MessageTrackDetail

func (admin *DefaultAdmin) MessageTrackDetail(msg *MessageExt) ([]*MessageTrack, error)

MessageTrackDetail 根据msgid查询消息轨迹

func (*DefaultAdmin) QueryConsumerConnection

func (admin *DefaultAdmin) QueryConsumerConnection(consumeGroupId string) (*ConsumerConnection, error)

QueryConsumerConnection 根据consumeGroupId查询消费进程

func (*DefaultAdmin) QueryConsumerGroupByTopic

func (admin *DefaultAdmin) QueryConsumerGroupByTopic(topic string) ([]string, error)

QueryConsumerGroupByTopic 根据topic查询消费组

func (*DefaultAdmin) QueryConsumerProgress

func (admin *DefaultAdmin) QueryConsumerProgress(consumeGroupId string) (*ConsumerProgress, error)

QueryConsumerProgress 根据consumeGroupId查询消费进度

func (*DefaultAdmin) QueryMessageById

func (admin *DefaultAdmin) QueryMessageById(msgId string) (*MessageExt, error)

QueryMessageById 根据msgid查询消息详情

func (*DefaultAdmin) SearchClusterInfo

func (admin *DefaultAdmin) SearchClusterInfo() (*ClusterInfo, error)

SearchClusterInfo 查询集群的信息

func (*DefaultAdmin) SearchTopic

func (admin *DefaultAdmin) SearchTopic() (*TopicData, error)

GetTopic 查询topic列表接口

type DefaultConsumer

type DefaultConsumer struct {
	// contains filtered or unexported fields
}

func (*DefaultConsumer) RegisterMessageListener

func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)

func (*DefaultConsumer) SendMessageBack

func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error

func (*DefaultConsumer) SendMessageBack1

func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error

func (*DefaultConsumer) Shutdown

func (self *DefaultConsumer) Shutdown()

func (*DefaultConsumer) Start

func (self *DefaultConsumer) Start() error

func (*DefaultConsumer) Subscribe

func (self *DefaultConsumer) Subscribe(topic string, subExpression string)

func (*DefaultConsumer) UnSubcribe

func (self *DefaultConsumer) UnSubcribe(topic string)

type GetConsumeStatsRequestHeader

type GetConsumeStatsRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
	Topic         string `json:"topic"`
}

type GetConsumerConnectionListRequestHeader

type GetConsumerConnectionListRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
}

type GetConsumerListByGroupRequestHeader

type GetConsumerListByGroupRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
}

type GetConsumerListByGroupResponseBody

type GetConsumerListByGroupResponseBody struct {
	ConsumerIdList []string `json:"consumerIdList"`
}

type GetKVConfigRequestHeader

type GetKVConfigRequestHeader struct {
	Namespace string `json:"namespace"`
	Key       string `json:"key"`
}

type GetKVConfigResponseHeader

type GetKVConfigResponseHeader struct {
	Value string `json:"value"`
}

type GetRouteInfoRequestHeader

type GetRouteInfoRequestHeader struct {
	Topic string `json:"topic"`
}

type GroupList

type GroupList struct {
	GroupList []string `json:"groupList"`
}

type HeartbeatData

type HeartbeatData struct {
	ClientId        string
	ConsumerDataSet []*ConsumerData
}

type InvokeCallback

type InvokeCallback func(responseFuture *ResponseFuture)

type Message

type Message struct {
	Topic      string            `json:"topic"`
	Flag       int32             `json:"flag"`
	Body       []byte            `json:"body"`
	Properties map[string]string `json:"properties"`
}

func (*Message) Key

func (msg *Message) Key() string

func (*Message) Propertie

func (msg *Message) Propertie(key string) string

func (*Message) Tag

func (msg *Message) Tag() string

type MessageExt

type MessageExt struct {
	Message
	QueueId                   int32  `json:"queueId"`
	StoreSize                 int32  `json:"storeSize"`
	QueueOffset               int64  `json:"queueOffset"`
	SysFlag                   int32  `json:"sysFlag"`
	BornTimestamp             int64  `json:"bornTimestamp"`
	BornHost                  string `json:"bornHost"`
	StoreTimestamp            int64  `json:"storeTimestamp"`
	StoreHost                 string `json:"storeHost"`
	MsgId                     string `json:"msgId"`
	CommitLogOffset           int64  `json:"commitLogOffset"`
	BodyCRC                   int32  `json:"bodyCRC"`
	ReconsumeTimes            int32  `json:"reconsumeTimes"`
	PreparedTransactionOffset int64  `json:"preparedTransactionOffset"`
}

type MessageListener

type MessageListener func(msgs []*MessageExt) (int, error)

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int32  `json:"queueId"`
}

func (*MessageQueue) MarshalJSON

func (mj *MessageQueue) MarshalJSON() ([]byte, error)

func (*MessageQueue) MarshalJSONBuf

func (mj *MessageQueue) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*MessageQueue) UnmarshalJSON

func (uj *MessageQueue) UnmarshalJSON(input []byte) error

func (*MessageQueue) UnmarshalJSONFFLexer

func (uj *MessageQueue) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type MessageQueues

type MessageQueues []*MessageQueue

func (MessageQueues) Len

func (self MessageQueues) Len() int

func (MessageQueues) Less

func (self MessageQueues) Less(i, j int) bool

func (MessageQueues) Swap

func (self MessageQueues) Swap(i, j int)

type MessageTrack

type MessageTrack struct {
	ConsumerGroup string `json:"consumerGroup"`
	TrackType     int    `json:"trackType"`
	ExceptionDesc string `json:"exceptionDesc"`
}

type MqClient

type MqClient struct {
	// contains filtered or unexported fields
}

func NewMqClient

func NewMqClient() *MqClient

func (*MqClient) Close

func (self *MqClient) Close() error

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

type OffsetWrapper

type OffsetWrapper struct {
	BrokerOffset   int64 `json:"brokerOffset"`
	ConsumerOffset int64 `json:"consumerOffset"`
	LastTimestamp  int64 `json:"lastTimestamp"` // 消费的最后一条消息对应的时间戳
}

func (*OffsetWrapper) MarshalJSON

func (mj *OffsetWrapper) MarshalJSON() ([]byte, error)

func (*OffsetWrapper) MarshalJSONBuf

func (mj *OffsetWrapper) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*OffsetWrapper) UnmarshalJSON

func (uj *OffsetWrapper) UnmarshalJSON(input []byte) error

func (*OffsetWrapper) UnmarshalJSONFFLexer

func (uj *OffsetWrapper) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type PullMessageRequestHeader

type PullMessageRequestHeader struct {
	ConsumerGroup        string `json:"consumerGroup"`
	Topic                string `json:"topic"`
	QueueId              int32  `json:"queueId"`
	QueueOffset          int64  `json:"queueOffset"`
	MaxMsgNums           int32  `json:"maxMsgNums"`
	SysFlag              int32  `json:"sysFlag"`
	CommitOffset         int64  `json:"commitOffset"`
	SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
	Subscription         string `json:"subscription"`
	SubVersion           int64  `json:"subVersion"`
}

type PullMessageService

type PullMessageService struct {
	// contains filtered or unexported fields
}

func NewPullMessageService

func NewPullMessageService() *PullMessageService

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

type QueryConsumerOffsetRequestHeader

type QueryConsumerOffsetRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
	Topic         string `json:"topic"`
	QueueId       int32  `json:"queueId"`
}

type QueryTopicConsumeByWhoRequestHeader

type QueryTopicConsumeByWhoRequestHeader struct {
	Topic string `json:"topic"`
}

type QueueData

type QueueData struct {
	BrokerName     string
	ReadQueueNums  int32
	WriteQueueNums int32
	Perm           int32
	TopicSynFlag   int32
}

func (*QueueData) MarshalJSON

func (mj *QueueData) MarshalJSON() ([]byte, error)

func (*QueueData) MarshalJSONBuf

func (mj *QueueData) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*QueueData) UnmarshalJSON

func (uj *QueueData) UnmarshalJSON(input []byte) error

func (*QueueData) UnmarshalJSONFFLexer

func (uj *QueueData) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type Rebalance

type Rebalance struct {
	// contains filtered or unexported fields
}

func NewRebalance

func NewRebalance() *Rebalance

type RemoteOffsetStore

type RemoteOffsetStore struct {
	// contains filtered or unexported fields
}

type RemotingClient

type RemotingClient interface {
	ScanResponseTable()
	// contains filtered or unexported methods
}

func NewDefaultRemotingClient

func NewDefaultRemotingClient() RemotingClient

type RemotingCommand

type RemotingCommand struct {
	//header
	Code     int    `json:"code"`
	Language string `json:"language"`
	Version  int    `json:"version"`
	Opaque   int32  `json:"opaque"`
	Flag     int    `json:"flag"`

	ExtFields interface{} `json:"extFields"`
	//body
	Body []byte `json:"body,omitempty"`
	// contains filtered or unexported fields
}

type ResponseFuture

type ResponseFuture struct {
	// contains filtered or unexported fields
}

type SubscriptionData

type SubscriptionData struct {
	Topic           string   `json:"topic"`
	SubString       string   `json:"subString"`
	ClassFilterMode bool     `json:"classFilterMode"`
	TagsSet         []string `json:"tagsSet"`
	CodeSet         []int    `json:"codeSet"`
	SubVersion      int64    `json:"subVersion"`
}

type TopicData

type TopicData struct {
	List []string `json:"topicList"`
}

TopicData topic data

type TopicRouteData

type TopicRouteData struct {
	OrderTopicConf string
	QueueDatas     []*QueueData
	BrokerDatas    []*BrokerData
}

func (*TopicRouteData) MarshalJSON

func (mj *TopicRouteData) MarshalJSON() ([]byte, error)

func (*TopicRouteData) MarshalJSONBuf

func (mj *TopicRouteData) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*TopicRouteData) UnmarshalJSON

func (uj *TopicRouteData) UnmarshalJSON(input []byte) error

func (*TopicRouteData) UnmarshalJSONFFLexer

func (uj *TopicRouteData) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type UpdateConsumerOffsetRequestHeader

type UpdateConsumerOffsetRequestHeader struct {
	// contains filtered or unexported fields
}

type ViewMessageRequestHeader

type ViewMessageRequestHeader struct {
	Offset uint64 `json:"offset"`
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL