Documentation ¶
Index ¶
- Constants
- Variables
- func TrimSuffix(s, suffix string) string
- type AllocateMessageQueueStrategy
- type ConsumeConcurrentlyStatus
- type Consumer
- type ConsumerConfig
- type DefaultConsumer
- func (c *DefaultConsumer) Callback(f func())
- func (c *DefaultConsumer) RegisterMessageListener(topic string, listener func(message Message) ConsumeConcurrentlyStatus)
- func (c *DefaultConsumer) Shutdown()
- func (c *DefaultConsumer) ShutdownCallback(callback func())
- func (c *DefaultConsumer) Start() error
- func (c *DefaultConsumer) Subscribe(topic string) bool
- func (c *DefaultConsumer) Unsubscribe(topics ...string)
- type DefaultProducer
- type FactoryConfig
- type Message
- type MessageQueue
- type MessageQueueSelector
- type Monitor
- type MonitorListener
- type Mq
- type MqConfig
- type MqFactory
- func (m *MqFactory) GetConsumer() Consumer
- func (m *MqFactory) GetConsumerByConfig(config *ConsumerConfig) Consumer
- func (m *MqFactory) GetConsumerByGroup(groupName string) Consumer
- func (m *MqFactory) GetProducer() Producer
- func (m *MqFactory) GetProducerByConfig(config *ProducerConfig) Producer
- func (m *MqFactory) GetProducerByGroup(groupName string) Producer
- type Msg
- type MsgQueueSelector
- type Pool
- type Producer
- type ProducerConfig
- type SelectMessageQueueByPolling
- type SelectMessageQueueByRandom
- type ServiceState
- type ToPicConfig
- type TopicPublishInfo
Constants ¶
View Source
const ( DefaultProducerGroup = "DEFAULT_PRODUCER" DefaultConsumerGroup = "DEFAULT_CONSUMER" )
View Source
const ( DefaultMessageQueueLength = 5 DefaultMessageCapLength = 1 DefaultPoolSize = 5 )
Variables ¶
View Source
var ( TopicEmpty = errors.New("please specify a topic to send messages ") ServiceStateNoStart = errors.New("producer service start createJust for not send messages") )
View Source
var ( // ErrShutdownAlready the producer service state not OK, maybe started once ErrShutdownAlready = errors.New("the producer service state not OK, maybe started once") // ConsumerConfigEmpty Consumer configuration required for MQ is empty ConsumerConfigEmpty = errors.New("consumer configuration required for MQ is empty") //ConsumerConfigMessageListenersEmpty consumer configuration MessageListeners required for MQ is empty ConsumerConfigMessageListenersEmpty = errors.New("consumer configuration MessageListeners required for MQ is empty") )
Functions ¶
func TrimSuffix ¶
Types ¶
type AllocateMessageQueueStrategy ¶
type AllocateMessageQueueStrategy interface { // Name The strategy name Name() string // Allocate To allocate result of given strategy Allocate() []MessageQueue }
AllocateMessageQueueStrategy Strategy Algorithm for message allocating between consumers
type ConsumeConcurrentlyStatus ¶
type ConsumeConcurrentlyStatus int
const ( // ConsumeSuccess Success consumption ConsumeSuccess ConsumeConcurrentlyStatus = iota // ReconsumeLater Failure consumption,later try to consume ReconsumeLater )
type Consumer ¶
type Consumer interface { // Start the consumer with the given parameters and return immediately Start() error // Shutdown Stop the consumer with the given parameters and return immediately Shutdown() ShutdownCallback(callback func()) // Subscribe with the given parameters and return immediately Subscribe(topic string) bool // Unsubscribe with the given parameters and return immediately Unsubscribe(topics ...string) // RegisterMessageListener with the given parameters and return immediately and RegisterMessageListener(topic string, l func(message Message) ConsumeConcurrentlyStatus) }
type ConsumerConfig ¶
type ConsumerConfig struct { PoolSize int ConsumerGroup string MessageListeners map[string]func(message Message) ConsumeConcurrentlyStatus }
type DefaultConsumer ¶
type DefaultConsumer struct { ConsumerGroup string ServiceState ServiceState Listeners map[string]func(message Message) ConsumeConcurrentlyStatus // contains filtered or unexported fields }
func (*DefaultConsumer) Callback ¶
func (c *DefaultConsumer) Callback(f func())
func (*DefaultConsumer) RegisterMessageListener ¶
func (c *DefaultConsumer) RegisterMessageListener(topic string, listener func(message Message) ConsumeConcurrentlyStatus)
func (*DefaultConsumer) Shutdown ¶
func (c *DefaultConsumer) Shutdown()
func (*DefaultConsumer) ShutdownCallback ¶
func (c *DefaultConsumer) ShutdownCallback(callback func())
func (*DefaultConsumer) Start ¶
func (c *DefaultConsumer) Start() error
func (*DefaultConsumer) Subscribe ¶
func (c *DefaultConsumer) Subscribe(topic string) bool
func (*DefaultConsumer) Unsubscribe ¶
func (c *DefaultConsumer) Unsubscribe(topics ...string)
type DefaultProducer ¶
type DefaultProducer struct { ProducerGroup string ServiceState ServiceState MqFactory *MqFactory }
func (*DefaultProducer) Send ¶
func (p *DefaultProducer) Send(msg Message) error
func (*DefaultProducer) Shutdown ¶
func (p *DefaultProducer) Shutdown()
func (*DefaultProducer) Start ¶
func (p *DefaultProducer) Start() error
type FactoryConfig ¶
type FactoryConfig struct { ToPicConfigs []*ToPicConfig //Topic 队列选择器 选填默认随机 优先ToPicConfigs 中 Selector MsgQueueSelector }
type MessageQueue ¶
type MessageQueueSelector ¶
type MessageQueueSelector interface {
Select(messageQueues []*MessageQueue) *MessageQueue
}
func MessageQueueSelectorCreate ¶
func MessageQueueSelectorCreate(selector MsgQueueSelector) MessageQueueSelector
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
func NewMonitor ¶ added in v0.3.0
func (*Monitor) CloseMonitor ¶
func (m *Monitor) CloseMonitor()
func (*Monitor) Surround ¶ added in v0.3.0
func (m *Monitor) Surround(listener func(message Message) ConsumeConcurrentlyStatus, msg Message)
func (*Monitor) TurnMonitor ¶
func (m *Monitor) TurnMonitor()
type MonitorListener ¶
type MonitorListener interface { Surround(messageListener func(message Message) ConsumeConcurrentlyStatus, msg Message) Print() Info() map[string]*Msg TurnMonitor() CloseMonitor() }
type Mq ¶
type Mq struct { Consumer Consumer Producer Producer MonitorListener MonitorListener }
type MqConfig ¶
type MqConfig struct { //Topic 队列选择器 //默认随机 ToPicConfig 不配置 默认去mq Selector MsgQueueSelector ProducerConfig *ProducerConfig ConsumerConfig *ConsumerConfig ToPicConfigs []*ToPicConfig DefaultTopicPoolSize int }
type MqFactory ¶
type MqFactory struct { TopicPublishInfoTable map[string]*TopicPublishInfo // contains filtered or unexported fields }
func Instance ¶
func Instance(config *FactoryConfig) *MqFactory
func (*MqFactory) GetConsumer ¶
GetConsumer 获取一个topic 对应的消费者 采用延迟初始化
func (*MqFactory) GetConsumerByConfig ¶
func (m *MqFactory) GetConsumerByConfig(config *ConsumerConfig) Consumer
func (*MqFactory) GetConsumerByGroup ¶
GetConsumer 获取一个topic 对应的消费者 采用延迟初始化
func (*MqFactory) GetProducer ¶
GetProducer 获取一个topic 对应的生产者 采用延迟初始化
func (*MqFactory) GetProducerByConfig ¶
func (m *MqFactory) GetProducerByConfig(config *ProducerConfig) Producer
func (*MqFactory) GetProducerByGroup ¶
GetProducer 获取一个topic 对应的生产者 采用延迟初始化
type Msg ¶
type Msg struct { // topicName name for monitoring TopicName string // consumerGroup name for monitoring ConsumerGroup string // taskCount task count for monitoring TaskCount int64 // timeCount time count for monitoring TimeCount int64 // maxTime max time task for monitoring MaxTime int64 // minTime min time task for monitoring MinTime int64 Running int64 Waiting int64 // started StartTime time.Time // LastTaskTime int64 // contains filtered or unexported fields }
type MsgQueueSelector ¶
type MsgQueueSelector int
const ( // Random 随机 Random MsgQueueSelector = iota Polling )
type ProducerConfig ¶
type ProducerConfig struct {
ProducerGroupName string
}
type SelectMessageQueueByPolling ¶
type SelectMessageQueueByPolling struct {
// contains filtered or unexported fields
}
func (*SelectMessageQueueByPolling) Select ¶
func (s *SelectMessageQueueByPolling) Select(messageQueues []*MessageQueue) *MessageQueue
Select 通过轮询选择消息队列
type SelectMessageQueueByRandom ¶
type SelectMessageQueueByRandom struct{}
func (*SelectMessageQueueByRandom) Select ¶
func (s *SelectMessageQueueByRandom) Select(messageQueues []*MessageQueue) *MessageQueue
Select 随机选择消息队列
type ServiceState ¶
type ServiceState int
const ( // CreateJust Service just created,not start CreateJust ServiceState = iota // Running Service Running Running // ShutdownAlready Service shutdown ShutdownAlready // StartFailed Service Start failure StartFailed )
type ToPicConfig ¶
type ToPicConfig struct { //名称 TopicName string //队列长度 MessageQueueLength int //队列缓存容量 MessageCapLength int //Topic 队列选择器 //默认随机 Selector MsgQueueSelector // 可用线程数 PoolSize int // contains filtered or unexported fields }
ToPicConfig Topic 配置
func NewToPicConfig ¶
func NewToPicConfig(topicName string) *ToPicConfig
func NewToPicConfigByPoolSize ¶ added in v0.3.0
func NewToPicConfigByPoolSize(topicName string, DefaultTopicPoolSize int) *ToPicConfig
type TopicPublishInfo ¶
type TopicPublishInfo struct { ToPicConfig *ToPicConfig MessageQueueSelector MessageQueueSelector // contains filtered or unexported fields }
func (*TopicPublishInfo) TopicBlockageMessageQueueCount ¶
func (topic *TopicPublishInfo) TopicBlockageMessageQueueCount() int64
Source Files ¶
Click to show internal directories.
Click to hide internal directories.