mqe

package
v0.0.0-...-276be26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2019 License: MIT Imports: 6 Imported by: 1

README

mqe - MQ extensions

帮助连接到 MQ,管理 MQ 连接,与其交互。

帮助实现 Topic Publish/Subscribe 模型。

具有自动重连机制。

实现发布者

注意要提供一个全局的退出信号,在主程序退出时发出该信号能够安全地停止发布者的内部循环。

var mqEngine *mqe.MqHub

func main(){
	mqEngine = mqe.Open(common.AppExitCh)
	
	// 使能内部的发件定时器以发送调试用的消息包
	// mqe.WithDebug(true)
	
	go func() {
		ticker := time.NewTicker(3 * time.Second)
		defer func() {
			ticker.Stop()
		}()
	
		for {
			select {
			case tm := <-ticker.C:
				msg := fmt.Sprintf("Hello World! %v", tm)
				mqEngine.Publish(IM_EVENT_BUS, "ev.fx.im.test", "text/plain", []byte(msg))
			}
		}
	}()
	
	mqEngine.CloseAll()
}
实现消费者

注意要提供一个全局的退出信号,在主程序退出时发出该信号能够安全地停止消费者的内部循环。

mqe.NewClient(mqe.IM_EVENT_BUS, common.AppExitCh).
	// WithExitSignal(common.AppExitCh).
	NewConsumer("abc", mqe.IM_EVENT_BUS, func(d amqp.Delivery) {
		logrus.Debugf(" [x] %v | %v", string(d.Body), d.Body)
	})

// optional: mqe.CloseClient(client)

fanout

实现 fanout 消息的订阅,在发布端,需要调整bus配置参数,此处略过;在客户端代码上有一点不同。

实现多个消费者以便同时消费fanout消息
mqe.NewClient(DEFAULT_BUS, common.AppExitCh).
	NewConsumerWithQueueName("abc", "fx.q.recv1", DEFAULT_BUS, func(d amqp.Delivery) {
		logrus.Debugf(" [x] %v | %v", string(d.Body), d.Body)
	}).
	NewConsumerWithQueueName("def", "fx.q.recv3", DEFAULT_BUS, func(d amqp.Delivery) {
		logrus.Debugf(" [-] %v | %v", string(d.Body), d.Body)
	}).
	NewConsumerWithQueueName("ghi", "fx.q.recv2", DEFAULT_BUS, func(d amqp.Delivery) {
		logrus.Debugf(" [+] %v | %v", string(d.Body), d.Body)
	})

由于 fanout 消息在被发布到 exchange 时,自动复制给 exchange 的所有绑定队列,所以客户端应该使用不同的队列名才能达到接收的效果。

按照示例代码的写法,每当发布者向 DEFAULT_BUS 发布一条消息时,所有消费者将会分别同时收到该消息。

也可以使用 NewConsumerWith(consumerName, queueName, busName, autoAck, exclusive, noLocal, noWait, args, onRecv):

mqe.NewClient(DEFAULT_BUS, common.AppExitCh).
	NewConsumerWith("abc", "fx.q.recv1", DEFAULT_BUS, true, false, false, false, nil, func(d amqp.Delivery) {
		logrus.Debugf(" [x] %v | %v", string(d.Body), d.Body)
	})

配置文件章节

根据配置文件的 server.mq.publish 章节定义的 bus 清单,mqe.Open(...) 会 建立和管理所有的 exchanges, queues。

你无需为每个 exchange, queue, 或者 bus 与 bind 建立一个 mqe.Open(...) 实例 对象,所有的 总线(bus) 使用同一个 MqHub 对象完成管理。

预定义的配置章节结构如下:

server:
  mq:
    backend: rabbitmq    # current backend
    env: devel           # current mode: devel/staging/prod, ...
    debug: true          # uses debug mode
    backends:
      rabbitmq:
        devel:
          url: "amqp://guest:guest@localhost:5672/"
          connectionTimeout: 30000
          maxOpenConns: 100
          maxIdleConns: 10
        prod:
          url: "amqp://guest:guest@localhost:5672/"
          connectionTimeout: 30000
          maxOpenConns: 100
          maxIdleConns: 10

    clients:
      - im_event_bus

    publish:
      logger_bus:
      monitor_bus:
      config_cast:

      # im-platform event cast
      im_event_bus:
        exchange:
          exchange:   fx.ex.event_bus
          type:       topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.event_bus
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.im.#
          noWait:     false
          arguments:  {}

      im_hook_event_bus:
        exchange:
          exchange:   fx.ex.event_bus
          type:       topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.event_bus.hooks
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.im.hooks.#
          noWait:     false
          arguments:  {}

      im_app_event_bus:
        exchange:
          exchange:   fx.ex.event_bus
          type:       topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.event_bus.apps
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.im.apps.#
          noWait:     false
          arguments:  {}

      # im-platform event bus
      im_event_cast:
        exchange:
          exchange:   fx.ex.event_cast
          type:       fanout # direct, fanout, topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.event_cast
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.im.# # as a sample: info,warning,error
          noWait:     false
          arguments:  {}

      sms_req:
        exchange:
          exchange:   fx.ex.sms_req
          type:       topic # direct, fanout, topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.sms_req
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.sms.#
          noWait:     false
          arguments:  {}
      mail_req:
        exchange:
          exchange:   fx.ex.mail_req
          type:       topic       # direct, fanout, topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.email_req
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.mail.#      # fx.mail.{user.{login,register,find-pwd},org.{sns.{like,fav,mentioned,...},ann.{publish,revoke}}}
          noWait:     false
          arguments:  {}
      cmdlet:
        exchange:
          exchange:   fx.ex.cmdlet
          type:       topic       # direct, fanout, topic
          passive:    false
          durable:    false
          autoDelete: false
          internal:   false
          noWait:     false
          arguments:  {}
        queue:
          queue:      fx.q.cmdlet
          passive:    false
          durable:    false
          exclusive:  false
          autoDelete: false
          noWait:     false
          arguments:  {}
        bind:
          queue:
          exchange:
          routingKey: fx.exec.#
          noWait:     false
          arguments:  {}

Documentation

Index

Constants

View Source
const (
	IM_EVENT_BUS      = "im_event_bus"      // IM 标准事件总线,未单列的全局事件一律走此交换机; 单列的则独立配置交换机;
	IM_EVENT_CAST     = "im_event_cast"     // 广播事件
	IM_HOOK_EVENT_BUS = "im_hook_event_bus" // webhooks' eventBus
)

Variables

This section is empty.

Functions

func ClearChanBuffer

func ClearChanBuffer(ch chan struct{}, duration time.Duration)

func CloseClient

func CloseClient(c *MqClient)

stops the daemon by `NewClient()`

Types

type Bus

type Bus struct {
	Exchange ExchangeDeclare
	Queue    QueueDeclare
	Bind     QueueBind
}

type ExchangeDeclare

type ExchangeDeclare struct {
	Exchange   string
	Type       string
	Passive    bool
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Arguments  amqp.Table
	// contains filtered or unexported fields
}

type MqBase

type MqBase struct {
}

type MqClient

type MqClient struct {
	MqBase
	// contains filtered or unexported fields
}

func NewClient

func NewClient(defaultBusName string, chGlobalExit chan struct{}) *MqClient

create an new consumer

func (*MqClient) AckFor

func (s *MqClient) AckFor(msg amqp.Delivery, multiple bool) (err error)

func (*MqClient) Close

func (s *MqClient) Close()

func (*MqClient) CloseAll

func (s *MqClient) CloseAll()

close all resources and reconnect looper by `mqe.StartPublisherDaemon()`, `mqe.NewClient()`, `MqHub.Open()`, `MqClient.Open()`

func (*MqClient) DropAll

func (s *MqClient) DropAll()

func (*MqClient) EnableReconnectLoop

func (s *MqClient) EnableReconnectLoop()

func (*MqClient) NewConsumer

func (s *MqClient) NewConsumer(consumerName, busName string, onRecv func(d amqp.Delivery)) *MqClient

func (*MqClient) NewConsumerWith

func (s *MqClient) NewConsumerWith(consumerName, queueName, busName string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table, onRecv func(d amqp.Delivery)) *MqClient

func (*MqClient) NewConsumerWithQueueName

func (s *MqClient) NewConsumerWithQueueName(consumerName, queueName, busName string, onRecv func(d amqp.Delivery)) *MqClient

func (*MqClient) Open

func (s *MqClient) Open(runLooper func()) (err error)

func (*MqClient) SetAndEnableReconnectLoop

func (s *MqClient) SetAndEnableReconnectLoop(fn func())

func (*MqClient) WithExitSignal

func (s *MqClient) WithExitSignal(chExit chan struct{}) *MqClient

type MqHub

type MqHub struct {
	MqBase
	// contains filtered or unexported fields
}

func StartPublisherDaemon

func StartPublisherDaemon(chExitGlobal chan struct{}) *MqHub

starts the daemon for producer/publisher

func (*MqHub) AckFor

func (s *MqHub) AckFor(msg amqp.Delivery, multiple bool) (err error)

func (*MqHub) Close

func (s *MqHub) Close()

func (*MqHub) CloseAll

func (s *MqHub) CloseAll()

close all resources and reconnect looper by `mqe.StartPublisherDaemon()`, `mqe.NewClient()`, `MqHub.Open()`, `MqClient.Open()`

func (*MqHub) DropAll

func (s *MqHub) DropAll()

func (*MqHub) EnableReconnectLoop

func (s *MqHub) EnableReconnectLoop()

func (*MqHub) Open

func (s *MqHub) Open(runLooper func()) (err error)

func (*MqHub) Publish

func (s *MqHub) Publish(msg []byte, busName, routingKey, contentType string)

func (*MqHub) PublishX

func (s *MqHub) PublishX(msg []byte, busName, routingKey, contentType string, mandatory, immediate bool)

func (*MqHub) SetAndEnableReconnectLoop

func (s *MqHub) SetAndEnableReconnectLoop(fn func())

func (*MqHub) WithDebug

func (s *MqHub) WithDebug(debug bool, busName string) *MqHub

func (*MqHub) WithExitSignal

func (s *MqHub) WithExitSignal(chExit chan struct{}) *MqHub

type Publishes

type Publishes struct {
	Publish map[string]*Bus

	Backend string
	Debug   bool
	// contains filtered or unexported fields
}

type QueueBind

type QueueBind struct {
	Queue      string
	Exchange   string
	RoutingKey string
	NoWait     bool
	Arguments  amqp.Table
	// contains filtered or unexported fields
}

type QueueDeclare

type QueueDeclare struct {
	Queue      string
	Passive    bool
	Durable    bool
	Exclusive  bool
	AutoDelete bool
	NoWait     bool
	Arguments  amqp.Table
	// contains filtered or unexported fields
}

type Table

type Table map[string]interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL