goodmq

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

GoodMQ

GoDoc

A good streadway/amqp wrapper. Supporting connection reconnecting and channel recovering.

Thread Safe:Recommend one thread one channel. One connection manage lots of channels.

Feature

Need your issue and pull request to make this project more stable and stronger

  • Auto recovering
  • More flexible configuration
  • Auto restart consume
  • Connection/Channel Pool
  • TODO

Consumer Example

  1. Connection will try to reconnect every 5 sec after close
  2. When connection reconnect success, notify all channel recovering
func ListenHeartbeat() {
	mq := goodmq.NewAmqpConnection(config.AmqpAddress)
	defer mq.Close()
	consumer, err := mq.NewConsumer()
	if err != nil {
		panic(err)
	}
	defer consumer.Close()
	consumer.QueName = "heartbeat.queue"
	consumer.Exchange = "apiServers"
	consumeChan, ok := consumer.Consume()

	//retry consume by yourself
	for range time.Tick(5 * time.Second) {
		if ok {
			log.Println("Heartbeat connect success")
			for msg := range consumeChan {
				m := string(msg.Body)
				log.Printf("Receive heartbeat from %v\n", m)
			}
			ok = false
		} else {
			log.Println("Heartbeat connection closed! Recovering...")
			consumeChan, ok = consumer.Consume()
		}
	}
}
Logs
2022/04/12 18:42:09 Hearbeat connect success
2022/04/12 18:42:43 Hearbeat connection closed! Recovering...
2022/04/12 18:42:43 Reconnect to amqp://gdfs:gdfs@localhost:5672/goodfs
2022/04/12 18:42:43 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:42:44 Hearbeat connection closed! Recovering...
2022/04/12 18:42:44 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:42:48 Reconnect to amqp://gdfs:gdfs@localhost:5672/goodfs fail
2022/04/12 18:42:49 Hearbeat connection closed! Recovering...
2022/04/12 18:42:49 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:42:53 Reconnect to amqp://gdfs:gdfs@localhost:5672/goodfs fail
2022/04/12 18:42:54 Hearbeat connection closed! Recovering...
2022/04/12 18:42:54 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:42:58 Reconnect to amqp://gdfs:gdfs@localhost:5672/goodfs fail
2022/04/12 18:42:59 Hearbeat connection closed! Recovering...
2022/04/12 18:42:59 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:43:03 Reconnect AMQP success
2022/04/12 18:43:03 Broadcasting recovering message..
2022/04/12 18:43:04 Hearbeat connection closed! Recovering...
2022/04/12 18:43:04 Consume heartbeat.queue error Exception (504) Reason: "channel/connection is not open"
2022/04/12 18:43:08 Recovering 4f62b158-b847-4ce6-b4c4-1cbe9451c78b channel...
2022/04/12 18:43:08 Recovering 4f62b158-b847-4ce6-b4c4-1cbe9451c78b channel success
2022/04/12 18:43:09 Hearbeat connection closed! Recovering...
2022/04/12 18:43:14 Hearbeat connect success

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Info  = log.New(os.Stderr, "[GoodMQ] |INFO | ", log.LstdFlags)
	Error = log.New(os.Stderr, "[GoodMQ] |ERROR| ", log.LstdFlags)
	Warn  = log.New(os.Stderr, "[GoodMQ] |WARN | ", log.LstdFlags)
)
View Source
var (
	RecoverDelay = 5 * time.Second
)

Functions

func DailSync

func DailSync(addr string) <-chan *amqp.Connection

func DailWithTimeout

func DailWithTimeout(addr string, timeout time.Duration) (*amqp.Connection, error)

Types

type AmqpChannel

type AmqpChannel struct {
	// contains filtered or unexported fields
}

AmqpChannel TODO channel may be closed by others errors (exchange is not exist, queue is not exist...)

currently only recovering channel when connection recovered.

func NewAmqpChannel

func NewAmqpChannel(c *AmqpConnection) (*AmqpChannel, error)

func (*AmqpChannel) Ack added in v0.0.6

func (ch *AmqpChannel) Ack(tag uint64, multiply bool) error

func (*AmqpChannel) Close

func (ch *AmqpChannel) Close() error

func (*AmqpChannel) Consume

func (ch *AmqpChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool) (<-chan amqp.Delivery, error)

func (*AmqpChannel) ExchangeDeclare added in v0.0.5

func (ch *AmqpChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool) error

func (*AmqpChannel) Nack added in v0.0.6

func (ch *AmqpChannel) Nack(tag uint64, multiply, requeue bool) error

func (*AmqpChannel) Publish

func (ch *AmqpChannel) Publish(exchange, routeKey string, msg amqp.Publishing) error

func (*AmqpChannel) QueueBind

func (ch *AmqpChannel) QueueBind(name, key, exchange string, noWait bool) error

func (*AmqpChannel) QueueDeclare

func (ch *AmqpChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool) (amqp.Queue, error)

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

func NewAmqpConnection

func NewAmqpConnection(addr string) *AmqpConnection

func (*AmqpConnection) Close

func (c *AmqpConnection) Close() error

func (*AmqpConnection) NewChannel

func (c *AmqpConnection) NewChannel() (*amqp.Channel, error)

func (*AmqpConnection) NewConsumer

func (c *AmqpConnection) NewConsumer() (*AmqpConsumer, error)

func (*AmqpConnection) NewProvider

func (c *AmqpConnection) NewProvider() (*AmqpProvider, error)

func (*AmqpConnection) RemoveChan

func (c *AmqpConnection) RemoveChan(chanId uuid.UUID)

type AmqpConsumer

type AmqpConsumer struct {
	Channel *AmqpChannel

	QueName      string //QueName 默认为空,自动生成唯一队列并赋值
	ConsumerName string //ConsumerName 用于Consume,默认为空,会自动生成唯一标识符
	AutoAck      bool   //AutoAck 用于Consume 默认为true
	RouteKey     string //RouteKey 用于BindQueue,默认为空,自动生成为队列名称
	Exchange     string //Exchange 用于BindQueue, 默认为空,绑定将返回异常
	Durable      bool   //Durable 用于QueueDeclare,默认为false
	DeleteUnused bool   //DeleteUnused(auto-delete) 用于QueueDeclare,默认false
	// contains filtered or unexported fields
}

func (*AmqpConsumer) AckOne added in v0.0.6

func (cm *AmqpConsumer) AckOne(tag uint64) bool

AckOne multiply=false

func (*AmqpConsumer) Bind added in v0.0.5

func (cm *AmqpConsumer) Bind() error

func (*AmqpConsumer) Close

func (cm *AmqpConsumer) Close() error

func (*AmqpConsumer) Consume

func (cm *AmqpConsumer) Consume() (<-chan amqp.Delivery, bool)

func (*AmqpConsumer) ConsumeAuto added in v0.0.6

func (cm *AmqpConsumer) ConsumeAuto(fn func(delivery amqp.Delivery), interval time.Duration)

func (*AmqpConsumer) DeclareQueue added in v0.0.5

func (cm *AmqpConsumer) DeclareQueue() error

func (*AmqpConsumer) NackSafe added in v0.0.6

func (cm *AmqpConsumer) NackSafe(tag uint64) bool

NackSafe multiply=false, requeue=true

func (*AmqpConsumer) SetQueue added in v0.0.5

func (cm *AmqpConsumer) SetQueue(q amqp.Queue)

type AmqpProvider

type AmqpProvider struct {
	Channel  *AmqpChannel
	Exchange string
	RouteKey string
}

func (*AmqpProvider) Close

func (p *AmqpProvider) Close() error

func (*AmqpProvider) Publish

func (p *AmqpProvider) Publish(msg amqp.Publishing) bool

func (*AmqpProvider) PublishDirect added in v0.0.5

func (p *AmqpProvider) PublishDirect(exchange, routeKey string, msg amqp.Publishing) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL