stgbroker

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 54 Imported by: 0

README

smartgo broker

Read the docs

环境变量

export SMARTGO_HOME="/home/smartgo/" # registry日志配置文件

编译broker

cd $GOPATH/src/git.oschina.net/cloudzone/smartgo/stgbroker/start
go get ./...
go build 
mv start broker

单实例默认启动broker

cd $SMARTGO_HOME
mkdir -p $SMARTGO_HOME/conf
cp $GOPATH/src/git.oschina.net/cloudzone/smartgo/conf/broker-a.toml $SMARTGO_HOME/conf/
mkdir ./logs
touch ./logs/broker.log  # 第一次启动确保./logs/registry.log文件存在
nohup ./broker &

多实例启动broker

# 编辑broker-a.toml, 确保每个brokerRole、brokerId不相同
# (1)如果在一台服务器启动多个broker实例,请在broker-a.toml文件的brokerPort字段更新监听端口
# (2)如果在一台服务器启动多个broker实例,请在broker-a.toml文件的smartgoDataPath字段设置不同实例的数据目录
vim $SMARTGO_HOME/conf/smartgoBroker.toml

cd $SMARTGO_HOME
mkdir -p $SMARTGO_HOME/conf
cp $GOPATH/src/git.oschina.net/cloudzone/smartgo/conf/broker-a.toml $SMARTGO_HOME/conf/
mkdir ./logs
touch ./logs/broker.log  # 第一次启动确保./logs/broker.log文件存在

# 启动brokerA实例
nohup ./brokerA -c $SMARTGO_HOME/conf/smartgoBrokerA.toml &

# 启动brokerB实例
nohup ./brokerB -c $SMARTGO_HOME/conf/smartgoBrokerB.toml &

查看broker日志

tailf nohup.out
tailf ./logs/broker.log

smartgo部署包结构

smartgo/
├── stgbroker
│   ├── broker
│   ├── conf
│   │   └── broker-a.toml
│   ├── logs
│   │   └── broker.log
│   └── store
│       ├── abort
│       ├── checkpoint
│       ├── commitlog
│       ├── config
│       │   ├── consumerOffset.json
│       │   ├── consumerOffset.json.bak
│       │   ├── subscriptionGroup.json
│       │   ├── topics.json
│       │   └── topics.json.bak
│       └── consumequeue
├── stgclient
│   ├── consumer
│   │   └── push
│   ├── producer
│   │   └── producer
│   ├── topic
│   │   └── topic
│   └── tps
│       └── tps
└── stgregistry
    ├── cfg.json
    ├── logs
    │   └── registry.log
    ├── nohup.out
    └── register
    

环境变量SMARTGO_HOME-c指令说明问题

  • 1.-c指令优先级高于SMARTGO_HOME环境变量
  • 2.启动命令如果是./broker -c xxx.toml,则优先使用-c指令对应的toml文件
  • 3.如果-c之类对应的toml找不到,并且已配置SMARTGO_HOME环境变量,则尝试读取$SMARTGO_HOME/conf/broker-a.toml
  • 4.如果$SMARTGO_HOME/conf/broker-a.toml读取失败,则尝试读取./conf/broker-a.toml
  • 5.如果./conf/broker-a.toml读取失败,则读取$GOPATH/src/git.oschina.net/cloudzone/smartgo/conf/broker-a.toml
  • 6.如果以上步骤都无法读取toml文件,则启动broker报错

关于IDEA编辑器特殊启动问题

  • IDEA的 golang-sdk,在windows系统执行目录是C:\\Users\\xxxxx\\AppData\\Local\\Temp
  • 配置环境变量SMARTGO_HOME="$GOPATH/src/git.oschina.net/cloudzone/smartgo/"
  • 通过IDEA编辑器启动broker后自动寻找$SMARTGO_HOME/conf/broker-a.toml配置文件

broker数据目录、监听端口

在测试服务器不够的情况下,需要在1台服务器启动多个broker,因此需要配置多个broker的数据目录、以及各自broker的监听端口

  • broker监听端口,支持在toml文件brokerPort字段配置。如果不配置,则默认监听10911端口
  • broker数据目录,支持在toml文件storePathRootDir字段配置。如果不配置,则默认目录是$HOME/store

Documentation

Index

Constants

View Source
const (
	TOPIC_GROUP_SEPARATOR = "@"
	MAX_VALUE             = 0x7fffffffffffffff
)
View Source
const (
	DLQ_NUMS_PER_GROUP = 1
)

Variables

This section is empty.

Functions

func DoResponse

func DoResponse(ctx netm.Context,
	request *protocol.RemotingCommand, response *protocol.RemotingCommand)

func GetConsumerOffsetPath

func GetConsumerOffsetPath(rootDir string) string

GetConsumerOffsetPath 获取consumerOffset.json路径 Author gaoyanlei Since 2017/8/21

func GetSubscriptionGroupPath

func GetSubscriptionGroupPath(rootDir string) string

GetSubscriptionGroupPath 获取subscriptionGroup.json路径 Author gaoyanlei Since 2017/8/21

func GetTopicConfigPath

func GetTopicConfigPath(rootDir string) string

GetTopicConfigPath 获取topic.json路径 Author gaoyanlei Since 2017/8/21

Types

type AbstractSendMessageProcessor

type AbstractSendMessageProcessor struct {
	BrokerController *BrokerController
	Rand             *rand.Rand
	StoreHost        string
	// contains filtered or unexported fields
}

AbstractSendMessageProcessor 发送处理类 Author gaoyanlei Since 2017/8/14

func NewAbstractSendMessageProcessor

func NewAbstractSendMessageProcessor(brokerController *BrokerController) *AbstractSendMessageProcessor

NewAbstractSendMessageProcessor 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/14

func (*AbstractSendMessageProcessor) ExecuteSendMessageHookAfter

func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookAfter(response *protocol.RemotingCommand, context *mqtrace.SendMessageContext)

ExecuteSendMessageHookAfter 发送消息后执行回调函数 Author rongzhihong Since 2017/9/11

func (*AbstractSendMessageProcessor) ExecuteSendMessageHookBefore

func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookBefore(ctx netm.Context, request *protocol.RemotingCommand, context *mqtrace.SendMessageContext)

ExecuteSendMessageHookBefore 发送消息前执行回调函数 Author rongzhihong Since 2017/9/11

func (*AbstractSendMessageProcessor) HasSendMessageHook

func (asmp *AbstractSendMessageProcessor) HasSendMessageHook() bool

hasSendMessageHook 检查SendMessageHookList的长度 Author rongzhihong Since 2017/9/11

func (*AbstractSendMessageProcessor) RegisterSendMessageHook

func (asmp *AbstractSendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)

RegisterSendMessageHook 注册赋值 Author rongzhihong Since 2017/9/11

type AdminBrokerProcessor

type AdminBrokerProcessor struct {
	BrokerController *BrokerController
}

AdminBrokerProcessor 管理类请求处理 Author gaoyanlei Since 2017/8/23

func NewAdminBrokerProcessor

func NewAdminBrokerProcessor(controller *BrokerController) *AdminBrokerProcessor

NewAdminBrokerProcessor 初始化 Author gaoyanlei Since 2017/8/23

func (*AdminBrokerProcessor) ProcessRequest

func (self *AdminBrokerProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

ProcessRequest 请求入口 Author rongzhihong Since 2017/8/23

func (*AdminBrokerProcessor) ViewBrokerStatsData

func (abp *AdminBrokerProcessor) ViewBrokerStatsData(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

ViewBrokerStatsData 查看Broker统计信息 Author rongzhihong Since 2017/9/19

type Broker2Client

type Broker2Client struct {
	BrokerController *BrokerController
}

Broker2Client Broker主动调用客户端接口 Author gaoyanlei Since 2017/8/9

func NewBroker2Clientr

func NewBroker2Clientr(brokerController *BrokerController) *Broker2Client

NewBroker2Clientr Broker2Client Author gaoyanlei Since 2017/8/9

func (*Broker2Client) CallClient

func (b2c *Broker2Client) CallClient(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

CallClient 调用客户端 Author rongzhihong Since 2017/9/18

func (*Broker2Client) CheckProducerTransactionState

func (b2c *Broker2Client) CheckProducerTransactionState(channel netm.Context, requestHeader *header.CheckTransactionStateRequestHeader,
	selectMapedBufferResult *stgstorelog.SelectMapedBufferResult)

CheckProducerTransactionState Broker主动回查Producer事务状态,Oneway Author rongzhihong Since 2017/9/11

func (*Broker2Client) GetConsumeStatus

func (b2c *Broker2Client) GetConsumeStatus(topic, group, originClientId string) *protocol.RemotingCommand

GetConsumeStatus Broker主动获取Consumer端的消息情况 Author rongzhihong Since 2017/9/18

func (*Broker2Client) ResetOffset

func (b2c *Broker2Client) ResetOffset(topic, group string, timeStamp int64, isForce bool) *protocol.RemotingCommand

ResetOffset Broker 主动通知 Consumer,offset列表发生变化,需要进行重置 Author rongzhihong Since 2017/9/18

type BrokerAllConfig

type BrokerAllConfig struct {
	BrokerConfig       *stgcommon.BrokerConfig         `json:"brokerConfig"`
	MessageStoreConfig *stgstorelog.MessageStoreConfig `json:"messageStoreConfig"`
}

BrokerAllConfig Broker配置文件信息 Author rongzhihong Since 2017/9/12

func NewBrokerAllConfig

func NewBrokerAllConfig() *BrokerAllConfig

NewBrokerAllConfig Broker配置文件信息初始化 Author rongzhihong Since 2017/9/12

func NewDefaultBrokerAllConfig

func NewDefaultBrokerAllConfig(brokerConfig *stgcommon.BrokerConfig, messageStoreConfig *stgstorelog.MessageStoreConfig) *BrokerAllConfig

NewDefaultBrokerAllConfig Broker配置文件信息初始化 Author: tianyuliang Since: 2017/9/27

type BrokerController

type BrokerController struct {
	BrokerConfig                         *stgcommon.BrokerConfig
	MessageStoreConfig                   *stgstorelog.MessageStoreConfig
	ConfigDataVersion                    *stgcommon.DataVersion
	ConsumerOffsetManager                *ConsumerOffsetManager
	ConsumerManager                      *client.ConsumerManager
	ProducerManager                      *client.ProducerManager
	ClientHousekeepingService            *ClientHouseKeepingService
	DefaultTransactionCheckExecuter      *DefaultTransactionCheckExecuter
	PullMessageProcessor                 *PullMessageProcessor
	PullRequestHoldService               *PullRequestHoldService
	Broker2Client                        *Broker2Client
	SubscriptionGroupManager             *SubscriptionGroupManager
	ConsumerIdsChangeListener            rebalance.ConsumerIdsChangeListener
	RebalanceLockManager                 *RebalanceLockManager
	BrokerOuterAPI                       *out.BrokerOuterAPI
	SlaveSynchronize                     *SlaveSynchronize
	MessageStore                         *stgstorelog.DefaultMessageStore
	RemotingClient                       *remoting.DefalutRemotingClient
	RemotingServer                       *remoting.DefalutRemotingServer
	TopicConfigManager                   *TopicConfigManager
	UpdateMasterHAServerAddrPeriodically bool

	FilterServerManager *FilterServerManager

	StoreHost  string
	ConfigFile string
	// contains filtered or unexported fields
}

BrokerController broker服务控制器 Author gaoyanlei Since 2017/8/25

func CreateBrokerController

func CreateBrokerController(smartgoBrokerFilePath ...string) *BrokerController

CreateBrokerController 创建BrokerController对象 Author: tianyuliang Since: 2017/9/20

func NewBrokerController

func NewBrokerController(brokerConfig *stgcommon.BrokerConfig, messageStoreConfig *stgstorelog.MessageStoreConfig, remotingClient *remoting.DefalutRemotingClient) *BrokerController

NewBrokerController 初始化broker服务控制器 Author gaoyanlei Since 2017/8/25

func Start

func Start(stopChan chan bool, smartgoBrokerFilePath string) *BrokerController

Start 启动BrokerController Author: tianyuliang Since: 2017/9/20

func (*BrokerController) EncodeAllConfig

func (self *BrokerController) EncodeAllConfig() string

EncodeAllConfig 读取所有配置文件信息 Author rongzhihong Since 2017/9/12

func (*BrokerController) GetBrokerAddr

func (self *BrokerController) GetBrokerAddr() string

GetBrokerAddr 获得brokerAddr Author rongzhihong Since 2017/9/5

func (*BrokerController) GetStoreHost

func (self *BrokerController) GetStoreHost() string

GetStoreHost 获取StoreHost Author: tianyuliang, <tianyuliang@gome.com.cn> Since: 2017/9/26

func (*BrokerController) Initialize

func (self *BrokerController) Initialize() bool

Initialize 初始化broker必要操作 Author rongzhihong Since 2017/9/12

func (*BrokerController) RegisterBrokerAll

func (self *BrokerController) RegisterBrokerAll(checkOrderConfig bool, oneway bool)

RegisterBrokerAll 注册所有broker Author rongzhihong Since 2017/9/12

func (*BrokerController) RegisterConsumeMessageHook

func (self *BrokerController) RegisterConsumeMessageHook(hook mqtrace.ConsumeMessageHook)

RegisterSendMessageHook 注册消费消息的回调 Author rongzhihong Since 2017/9/11

func (*BrokerController) RegisterSendMessageHook

func (self *BrokerController) RegisterSendMessageHook(hook mqtrace.SendMessageHook)

RegisterSendMessageHook 注册发送消息的回调 Author rongzhihong Since 2017/9/11

func (*BrokerController) Shutdown

func (self *BrokerController) Shutdown()

Shutdown BrokerController停止入口 Author rongzhihong Since 2017/9/12

func (*BrokerController) Start

func (self *BrokerController) Start()

Start BrokerController控制器的start启动入口 Author rongzhihong Since 2017/9/12

func (*BrokerController) UpdateAllConfig

func (self *BrokerController) UpdateAllConfig(properties []byte)

UpdateAllConfig 更新所有文件 Author rongzhihong Since 2017/9/12

type BrokerControllerTask

type BrokerControllerTask struct {
	BrokerController            *BrokerController
	DeleteTopicTask             *timeutil.Ticker
	BrokerStatsRecordTask       *timeutil.Ticker
	PersistConsumerOffsetTask   *timeutil.Ticker
	ScanUnSubscribedTopicTask   *timeutil.Ticker
	FetchNameServerAddrTask     *timeutil.Ticker
	SlaveSynchronizeTask        *timeutil.Ticker
	PrintMasterAndSlaveDiffTask *timeutil.Ticker
	RegisterAllBrokerTask       *timeutil.Ticker
}

BrokerControllerTask broker控制器的各种任务 Author: tianyuliang Since: 2017/10/11

func NewBrokerControllerTask

func NewBrokerControllerTask(controller *BrokerController) *BrokerControllerTask

func (*BrokerControllerTask) Shutdown

func (self *BrokerControllerTask) Shutdown() bool

type ClientHouseKeepingService

type ClientHouseKeepingService struct {
	// contains filtered or unexported fields
}

ClientHousekeepingService 定期检测客户端连接,清除不活动的连接 Author rongzhihong Since 2017/9/8

func NewClientHousekeepingService

func NewClientHousekeepingService(controller *BrokerController) *ClientHouseKeepingService

NewClientHousekeepingService 初始化定期检查客户端连接的服务 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) OnContextClose

func (self *ClientHouseKeepingService) OnContextClose(ctx netm.Context)

OnContextClose 监听通道关闭 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) OnContextConnect

func (self *ClientHouseKeepingService) OnContextConnect(ctx netm.Context)

OnContextConnect 监听通道连接 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) OnContextError

func (self *ClientHouseKeepingService) OnContextError(ctx netm.Context)

OnContextError 监听通道异常 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) OnContextIdle

func (self *ClientHouseKeepingService) OnContextIdle(ctx netm.Context)

OnContextIdle 监听通道闲置 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) Shutdown

func (self *ClientHouseKeepingService) Shutdown()

Shutdown 停止定时扫描过期的连接的服务 Author rongzhihong Since 2017/9/8

func (*ClientHouseKeepingService) Start

func (self *ClientHouseKeepingService) Start()

Start 启动定时扫描过期的连接的服务 Author rongzhihong Since 2017/9/8

type ClientManageProcessor

type ClientManageProcessor struct {
	BrokerController *BrokerController
	// contains filtered or unexported fields
}

func NewClientManageProcessor

func NewClientManageProcessor(controller *BrokerController) *ClientManageProcessor

NewClientManageProcessor 初始化ClientManageProcessor Author gaoyanlei Since 2017/8/9

func (*ClientManageProcessor) ExecuteConsumeMessageHookAfter

func (cmp *ClientManageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)

ExecuteConsumeMessageHookAfter 消费消息后执行的回调函数 Author rongzhihong Since 2017/9/14

func (*ClientManageProcessor) HasConsumeMessageHook

func (cmp *ClientManageProcessor) HasConsumeMessageHook() bool

hasConsumeMessageHook 判断是否有回调函数 Author rongzhihong Since 2017/9/14

func (*ClientManageProcessor) ProcessRequest

func (*ClientManageProcessor) RegisterConsumeMessageHook

func (cmp *ClientManageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)

RegisterConsumeMessageHook 注册回调函数 Author rongzhihong Since 2017/9/14

type ConfigManager

type ConfigManager interface {
	Encode(prettyFormat bool) string

	Decode(jsonString []byte)

	ConfigFilePath() string
}

type ConfigManagerExt

type ConfigManagerExt struct {
	ConfigManager ConfigManager
	sync.RWMutex
}

func NewConfigManagerExt

func NewConfigManagerExt(configManager ConfigManager) *ConfigManagerExt

func (*ConfigManagerExt) Load

func (cme *ConfigManagerExt) Load() bool

func (*ConfigManagerExt) Persist

func (cme *ConfigManagerExt) Persist()

type ConsumerOffsetManager

type ConsumerOffsetManager struct {
	TOPIC_GROUP_SEPARATOR string
	Offsets               *OffsetTable
	BrokerController      *BrokerController
	// contains filtered or unexported fields
}

ConsumerOffsetManager Consumer消费进度管理 Author gaoyanlei Since 2017/8/9

func NewConsumerOffsetManager

func NewConsumerOffsetManager(brokerController *BrokerController) *ConsumerOffsetManager

NewConsumerOffsetManager 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/9

func (*ConsumerOffsetManager) CloneOffset

func (com *ConsumerOffsetManager) CloneOffset(srcGroup, destGroup, topic string)

CloneOffset 克隆偏移量 Author rongzhihong Since 2017/9/18

func (*ConsumerOffsetManager) CommitOffset

func (com *ConsumerOffsetManager) CommitOffset(group, topic string, queueId int, offset int64)

CommitOffset 提交offset Author gaoyanlei Since 2017/9/10

func (*ConsumerOffsetManager) ConfigFilePath

func (com *ConsumerOffsetManager) ConfigFilePath() string

func (*ConsumerOffsetManager) Decode

func (com *ConsumerOffsetManager) Decode(buf []byte)

func (*ConsumerOffsetManager) Encode

func (com *ConsumerOffsetManager) Encode(prettyFormat bool) string

func (*ConsumerOffsetManager) Load

func (com *ConsumerOffsetManager) Load() bool

func (*ConsumerOffsetManager) Persist

func (com *ConsumerOffsetManager) Persist()

func (*ConsumerOffsetManager) QueryMinOffsetInAllGroup

func (com *ConsumerOffsetManager) QueryMinOffsetInAllGroup(topic, filterGroups string) map[int]int64

QueryMinOffsetInAllGroup 查询所有组中最小偏移量 Author rongzhihong Since 2017/9/18

func (*ConsumerOffsetManager) QueryOffset

func (com *ConsumerOffsetManager) QueryOffset(group, topic string, queueId int) int64

QueryOffset 获取group下topic queueId 的offset Author gaoyanlei Since 2017/9/10

func (*ConsumerOffsetManager) QueryOffsetByGroupAndTopic

func (com *ConsumerOffsetManager) QueryOffsetByGroupAndTopic(group, topic string) map[int]int64

QueryOffsetByGroupAndTopic 获取group与topuic所有队列offset Author rongzhihong Since 2017/9/12

func (*ConsumerOffsetManager) ScanUnsubscribedTopic

func (self *ConsumerOffsetManager) ScanUnsubscribedTopic()

ScanUnsubscribedTopic 扫描被删除Topic,并删除该Topic对应的Offset Author gaoyanlei Since 2017/8/22

func (*ConsumerOffsetManager) WhichGroupByTopic

func (com *ConsumerOffsetManager) WhichGroupByTopic(topic string) set.Set

WhichGroupByTopic 获得Topic的消费者 Author rongzhihong Since 2017/9/18

func (*ConsumerOffsetManager) WhichTopicByConsumer

func (com *ConsumerOffsetManager) WhichTopicByConsumer(group string) set.Set

WhichTopicByConsumer 获得消费者的Topic Author rongzhihong Since 2017/9/18

type DefaultConsumerIdsChangeListener

type DefaultConsumerIdsChangeListener struct {
	BrokerController *BrokerController
}

DefaultConsumerIdsChangeListener ConsumerId列表变化,通知所有Consumer Author gaoyanlei Since 2017/8/9

func NewDefaultConsumerIdsChangeListener

func NewDefaultConsumerIdsChangeListener(brokerController *BrokerController) *DefaultConsumerIdsChangeListener

NewDefaultConsumerIdsChangeListener 初始化 Author gaoyanlei Since 2017/8/9

func (*DefaultConsumerIdsChangeListener) ConsumerIdsChanged

func (listener *DefaultConsumerIdsChangeListener) ConsumerIdsChanged(group string, channels []netm.Context)

ConsumerIdsChanged 通知Consumer改变 Author gaoyanlei Since 2017/8/9

type DefaultTransactionCheckExecuter

type DefaultTransactionCheckExecuter struct {
	// contains filtered or unexported fields
}

DefaultTransactionCheckExecuter 存储层回调此接口,用来主动回查Producer的事务状态 Author rongzhihong Since 2017/9/17

func NewDefaultTransactionCheckExecuter

func NewDefaultTransactionCheckExecuter(brokerController *BrokerController) *DefaultTransactionCheckExecuter

NewDefaultTransactionCheckExecuter 初始化事务 Author rongzhihong Since 2017/9/17

func (*DefaultTransactionCheckExecuter) GotoCheck

func (trans *DefaultTransactionCheckExecuter) GotoCheck(producerGroupHashCode int, tranStateTableOffset, commitLogOffset int64, msgSize int)

GotoCheck 回调检查方法 Author rongzhihong Since 2017/9/17

type EndTransactionProcessor

type EndTransactionProcessor struct {
	BrokerController *BrokerController
}

EndTransactionProcessor Commit或Rollback事务 Author rongzhihong Since 2017/9/18

func (*EndTransactionProcessor) ProcessRequest

ProcessRequest 请求 Author rongzhihong Since 2017/9/18

type FilterServerInfo

type FilterServerInfo struct {
	// contains filtered or unexported fields
}

FilterServerInfo FilterServer基本信息 Author rongzhihong Since 2017/9/8

type FilterServerManager

type FilterServerManager struct {
	FilterServerMaxIdleTimeMills int64
	// contains filtered or unexported fields
}

FilterServerManager FilterServer管理 Author rongzhihong Since 2017/9/8

func NewFilterServerManager

func NewFilterServerManager(bc *BrokerController) *FilterServerManager

NewFilterServerManager 初始化FilterServerManager Author rongzhihong Since 2017/9/8

func (*FilterServerManager) BuildNewFilterServerList

func (fsm *FilterServerManager) BuildNewFilterServerList() (filterServerAdds []string)

BuildNewFilterServerList FilterServer地址列表 Author rongzhihong Since 2017/9/8

func (*FilterServerManager) RegisterFilterServer

func (fsm *FilterServerManager) RegisterFilterServer(ctx netm.Context, filterServerAddr string)

RegisterFilterServer 注册FilterServer Author rongzhihong Since 2017/9/8

func (*FilterServerManager) ScanNotActiveChannel

func (fsm *FilterServerManager) ScanNotActiveChannel()

ScanNotActiveChannel 10s向Broker注册一次,Broker如果发现30s没有注册,则删除它 Author rongzhihong Since 2017/9/8

func (*FilterServerManager) Shutdown

func (fsm *FilterServerManager) Shutdown()

Shutdown 停止检查Filter Server的定时任务 Author rongzhihong Since 2017/9/8

func (*FilterServerManager) Start

func (fsm *FilterServerManager) Start()

Start 启动;定时检查Filter Server个数,数量不符合,则创建 Author rongzhihong Since 2017/9/8

type OffsetTable

type OffsetTable struct {
	Offsets      map[string]map[int]int64 `json:"offsets"`
	sync.RWMutex `json:"-"`
}

func (*OffsetTable) Foreach

func (table *OffsetTable) Foreach(fn func(k string, v map[int]int64))

func (*OffsetTable) Get

func (table *OffsetTable) Get(k string) map[int]int64

func (*OffsetTable) MarshalJSON

func (j *OffsetTable) MarshalJSON() ([]byte, error)

MarshalJSON marshal bytes to json - template

func (*OffsetTable) MarshalJSONBuf

func (j *OffsetTable) MarshalJSONBuf(buf fflib.EncodingBuffer) error

MarshalJSONBuf marshal buff to json - template

func (*OffsetTable) Put

func (table *OffsetTable) Put(k string, v map[int]int64)

func (*OffsetTable) PutAll

func (table *OffsetTable) PutAll(offsetMap *syncmap.Map)

PutAll 同步Offset配置文件 Author rongzhihong Since 2017/9/18

func (*OffsetTable) Remove

func (table *OffsetTable) Remove(k string) map[int]int64

func (*OffsetTable) RemoveByFlag

func (table *OffsetTable) RemoveByFlag(fn func(k string, v map[int]int64) bool)

func (*OffsetTable) Size

func (table *OffsetTable) Size() int

func (*OffsetTable) UnmarshalJSON

func (j *OffsetTable) UnmarshalJSON(input []byte) error

UnmarshalJSON umarshall json - template of ffjson

func (*OffsetTable) UnmarshalJSONFFLexer

func (j *OffsetTable) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

UnmarshalJSONFFLexer fast json unmarshall - template ffjson

type PullMessageProcessor

type PullMessageProcessor struct {
	BrokerController       *BrokerController
	ConsumeMessageHookList []mqtrace.ConsumeMessageHook
}

PullMessageProcessor 拉消息请求处理 Author gaoyanlei Since 2017/8/10

func NewEndTransactionProcessor

func NewEndTransactionProcessor(brokerController *BrokerController) *PullMessageProcessor

NewEndTransactionProcessor 初始化EndTransactionProcessor Author rongzhihong Since 2017/9/18

func NewPullMessageProcessor

func NewPullMessageProcessor(brokerController *BrokerController) *PullMessageProcessor

NewPullMessageProcessor 初始化PullMessageProcessor Author gaoyanlei Since 2017/8/9

func (*PullMessageProcessor) ExecuteConsumeMessageHookBefore

func (pull *PullMessageProcessor) ExecuteConsumeMessageHookBefore(context *mqtrace.ConsumeMessageContext)

ExecuteConsumeMessageHookBefore 消费消息前,执行回调 Author rongzhihong Since 2017/9/11

func (*PullMessageProcessor) ExecuteRequestWhenWakeup

func (pull *PullMessageProcessor) ExecuteRequestWhenWakeup(ctx netm.Context, request *protocol.RemotingCommand)

ExecuteRequestWhenWakeup 唤醒拉取消息的请求 Author rongzhihong Since 2017/9/5

func (*PullMessageProcessor) ProcessRequest

func (pull *PullMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

func (*PullMessageProcessor) RegisterConsumeMessageHook

func (pull *PullMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)

ConsumeMessageHook 消费消息回调 Author rongzhihong Since 2017/9/11

type PullRequestHoldService

type PullRequestHoldService struct {
	TOPIC_QUEUEID_SEPARATOR string
	// contains filtered or unexported fields
}

PullRequestHoldService 拉消息请求管理,如果拉不到消息,则在这里Hold住,等待消息到来 Author rongzhihong Since 2017/9/5

func NewPullRequestHoldService

func NewPullRequestHoldService(brokerController *BrokerController) *PullRequestHoldService

NewPullRequestHoldService 初始化拉消息请求服务 Author rongzhihong Since 2017/9/5

func (*PullRequestHoldService) Shutdown

func (serv *PullRequestHoldService) Shutdown()

Shutdown 停止 Author rongzhihong Since 2017/9/5

func (*PullRequestHoldService) Start

func (serv *PullRequestHoldService) Start()

Start 启动入口 Author rongzhihong Since 2017/9/5

func (*PullRequestHoldService) SuspendPullRequest

func (serv *PullRequestHoldService) SuspendPullRequest(topic string, queueId int32, pullRequest *longpolling.PullRequest)

SuspendPullRequest 延缓拉请求 Author rongzhihong Since 2017/9/5

type QueryMessageProcessor

type QueryMessageProcessor struct {
	BrokerController *BrokerController
}

QueryMessageProcessor 查询消息请求处理 Author rongzhihong Since 2017/9/18

func NewQueryMessageProcessor

func NewQueryMessageProcessor(brokerController *BrokerController) *QueryMessageProcessor

NewQueryMessageProcessor 初始化QueryMessageProcessor Author rongzhihong Since 2017/9/18

func (*QueryMessageProcessor) ProcessRequest

ProcessRequest 请求 Author rongzhihong Since 2017/9/18

func (*QueryMessageProcessor) QueryMessage

ProcessRequest 查询请求 Author rongzhihong Since 2017/9/18

func (*QueryMessageProcessor) ViewMessageById

func (qmp *QueryMessageProcessor) ViewMessageById(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

ProcessRequest 根据MsgId查询消息 Author rongzhihong Since 2017/9/18

type RebalanceLockManager

type RebalanceLockManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RebalanceLockManager 平衡锁管理 Author rongzhihong Since 2017/9/20

func NewRebalanceLockManager

func NewRebalanceLockManager() *RebalanceLockManager

NewRebalanceLockManager 初始化 Author rongzhihong Since 2017/9/20

func (*RebalanceLockManager) TryLock

func (manager *RebalanceLockManager) TryLock(group string, mq *message.MessageQueue, clientId string) bool

TryLock 尝试锁住 Author rongzhihong Since 2017/9/20

func (*RebalanceLockManager) TryLockBatch

func (manager *RebalanceLockManager) TryLockBatch(group string, mqs set.Set, clientId string) set.Set

TryLockBatch 批量方式锁队列,返回锁定成功的队列集合 Author rongzhihong Since 2017/9/20

func (*RebalanceLockManager) UnlockBatch

func (manager *RebalanceLockManager) UnlockBatch(group string, mqs set.Set, clientId string)

UnlockBatch 批量方式解锁队列 Author rongzhihong Since 2017/9/20

type SendMessageProcessor

type SendMessageProcessor struct {
	BrokerController *BrokerController
	// contains filtered or unexported fields
}

SendMessageProcessor 处理客户端发送消息的请求 Author gaoyanlei Since 2017/8/24

func NewSendMessageProcessor

func NewSendMessageProcessor(brokerController *BrokerController) *SendMessageProcessor

func (*SendMessageProcessor) ConsumerSendMsgBack

func (smp *SendMessageProcessor) ConsumerSendMsgBack(conn netm.Context,
	request *protocol.RemotingCommand) (remotingCommand *protocol.RemotingCommand)

consumerSendMsgBack 客户端返回未消费消息 Author gaoyanlei Since 2017/8/17

func (*SendMessageProcessor) ExecuteConsumeMessageHookAfter

func (smp *SendMessageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)

ExecuteConsumeMessageHookAfter 消费消息后执行回调 Author rongzhihong Since 2017/9/5

func (*SendMessageProcessor) HasConsumeMessageHook

func (smp *SendMessageProcessor) HasConsumeMessageHook() bool

HasConsumeMessageHook 判断是否存在消费消息回调 Author rongzhihong Since 2017/9/5

func (*SendMessageProcessor) HasSendMessageHook

func (smp *SendMessageProcessor) HasSendMessageHook() bool

HasSendMessageHook 判断是否存在发送消息回调 Author rongzhihong Since 2017/9/5

func (*SendMessageProcessor) ProcessRequest

func (smp *SendMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)

func (*SendMessageProcessor) RegisterConsumeMessageHook

func (smp *SendMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)

RegisterSendMessageHook 注册赋值消费消息回调 Author rongzhihong Since 2017/9/5

func (*SendMessageProcessor) RegisterSendMessageHook

func (smp *SendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)

RegisterSendMessageHook 注册赋值发送消息回调 Author rongzhihong Since 2017/9/5

func (*SendMessageProcessor) SendMessage

sendMessage 正常消息 Author gaoyanlei Since 2017/8/17

type SlaveSynchronize

type SlaveSynchronize struct {
	BrokerController *BrokerController
	// contains filtered or unexported fields
}

SlaveSynchronize Slave从Master同步信息(非消息) Author gaoyanlei Since 2017/8/10

func NewSlaveSynchronize

func NewSlaveSynchronize(brokerController *BrokerController) *SlaveSynchronize

NewSlaveSynchronize 初始化SubscriptionGroupManager Author gaoyanlei Since 2017/8/9

type SubscriptionGroupManager

type SubscriptionGroupManager struct {
	BrokerController       *BrokerController
	SubscriptionGroupTable *subscription.SubscriptionGroupTable
	ConfigManagerExt       *ConfigManagerExt
}

SubscriptionGroupManager 用来管理订阅组,包括订阅权限等 Author gaoyanlei Since 2017/8/9

func NewSubscriptionGroupManager

func NewSubscriptionGroupManager(brokerController *BrokerController) *SubscriptionGroupManager

NewSubscriptionGroupManager 创建SubscriptionGroupManager Author gaoyanlei Since 2017/8/9

func (*SubscriptionGroupManager) ConfigFilePath

func (self *SubscriptionGroupManager) ConfigFilePath() string

func (*SubscriptionGroupManager) Decode

func (self *SubscriptionGroupManager) Decode(buf []byte)

func (*SubscriptionGroupManager) DeleteSubscriptionGroupConfig

func (self *SubscriptionGroupManager) DeleteSubscriptionGroupConfig(groupName string)

deleteSubscriptionGroupConfig 删除某个订阅组的配置 Author rongzhihong Since 2017/9/18

func (*SubscriptionGroupManager) Encode

func (self *SubscriptionGroupManager) Encode(prettyFormat bool) string

func (*SubscriptionGroupManager) FindSubscriptionGroupConfig

func (self *SubscriptionGroupManager) FindSubscriptionGroupConfig(group string) *subscription.SubscriptionGroupConfig

FindSubscriptionGroupConfig 查找订阅关系 Author gaoyanlei Since 2017/8/17

func (*SubscriptionGroupManager) Load

func (self *SubscriptionGroupManager) Load() bool

func (*SubscriptionGroupManager) UpdateSubscriptionGroupConfig

func (self *SubscriptionGroupManager) UpdateSubscriptionGroupConfig(config *subscription.SubscriptionGroupConfig)

UpdateSubscriptionGroupConfig 更新订阅组配置 Author rongzhihong Since 2017/9/18

type TopicConfigManager

type TopicConfigManager struct {
	LockTimeoutMillis int64

	BrokerController            *BrokerController
	TopicConfigSerializeWrapper *body.TopicConfigSerializeWrapper
	SystemTopicList             set.Set
	ConfigManagerExt            *ConfigManagerExt
	DataVersion                 *stgcommon.DataVersion
	// contains filtered or unexported fields
}

func NewTopicConfigManager

func NewTopicConfigManager(brokerController *BrokerController) *TopicConfigManager

NewTopicConfigManager 初始化TopicConfigManager Author gaoyanlei Since 2017/8/9

func (*TopicConfigManager) ConfigFilePath

func (self *TopicConfigManager) ConfigFilePath() string

func (*TopicConfigManager) CreateTopicInSendMessageBackMethod

func (tcm *TopicConfigManager) CreateTopicInSendMessageBackMethod(topic string, clientDefaultTopicQueueNums int32, perm, topicSysFlag int) (topicConfig *stgcommon.TopicConfig, err error)

createTopicInSendMessageBackMethod 该方法没有判断broker权限. Author gaoyanlei Since 2017/8/11

func (*TopicConfigManager) CreateTopicInSendMessageMethod

func (tcm *TopicConfigManager) CreateTopicInSendMessageMethod(topic, defaultTopic,
	remoteAddress string, clientDefaultTopicQueueNums int32, topicSysFlag int) (topicConfig *stgcommon.TopicConfig, err error)

createTopicInSendMessageMethod 创建topic Author gaoyanlei Since 2017/8/10

func (*TopicConfigManager) Decode

func (self *TopicConfigManager) Decode(content []byte)

func (*TopicConfigManager) DeleteTopicConfig

func (tcm *TopicConfigManager) DeleteTopicConfig(topic string)

deleteTopicConfig 删除topic Author gaoyanlei Since 2017/8/10

func (*TopicConfigManager) Encode

func (self *TopicConfigManager) Encode(prettyFormat bool) string

func (*TopicConfigManager) IsOrderTopic

func (tcm *TopicConfigManager) IsOrderTopic(topic string) bool

isOrderTopic 判断是否是顺序topic Author gaoyanlei Since 2017/8/10

func (*TopicConfigManager) Load

func (tcm *TopicConfigManager) Load() bool

func (*TopicConfigManager) SelectTopicConfig

func (tcm *TopicConfigManager) SelectTopicConfig(topic string) *stgcommon.TopicConfig

selectTopicConfig 根据topic查找 Author gaoyanlei Since 2017/8/11

func (*TopicConfigManager) UpdateTopicConfig

func (tcm *TopicConfigManager) UpdateTopicConfig(topicConfig *stgcommon.TopicConfig)

updateTopicConfig 更新topic信息 Author gaoyanlei Since 2017/8/10

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL