rabbitmq

package module
v0.0.0-...-5d5887e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2020 License: MIT Imports: 7 Imported by: 0

README

rabbitmq

Github License Go Doc Go Report Github Latest Release Github Latest Tag Github Stars

Friendly RabbitMQ Wrapper.

Installation

  1. Create rabbitmq docker container by using:
$ docker run --name rabbitmq --hostname rabbitmq-test-node-1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123123 -d rabbitmq:3.8.5-management
  1. Download rabbitmq package by using:
$ go get github.com/sliveryou/rabbitmq

Documentation

Index

Constants

View Source
const (
	// DefaultRetryTimes represents default retry times of safe publish
	DefaultRetryTimes = 3
	// DefaultRepublishRoutine represents default number of quick republish goroutine
	DefaultRepublishRoutine = 10
)
View Source
const (
	// DefaultDelaySeconds represents default delay retry seconds of reconnect or recreate
	DefaultDelaySeconds = 3
)
View Source
const (
	// DefaultMapGC represents default number of keys to start map garbage collection
	DefaultMapGC = 200
)

Variables

This section is empty.

Functions

func DefaultPublishNotifier

func DefaultPublishNotifier(p *SafeProducer, message amqp.Publishing, confirm amqp.Confirmation)

DefaultPublishNotifier represents default AMQP message confirmation handler. If received nak confirmation, the message will be sent again.

func EnableDebug

func EnableDebug(isEnable bool)

EnableDebug enables or not enables debug log.

Types

type BindOptions

type BindOptions struct {
	BindingKey string
	NoWait     bool
	Args       amqp.Table
}

BindOptions represents queue bind options.

type Channel

type Channel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

Channel is amqp.Channel wrapper.

func (*Channel) Close

func (ch *Channel) Close() error

Close closes the channel and sets the closed flag.

func (*Channel) Consume

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume warps amqp.Channel.Consume, the returned Delivery will end only when channel closed by developer.

func (*Channel) DoMethod

func (ch *Channel) DoMethod(channel *amqp.Channel, methodName string) []reflect.Value

DoMethod executes the registered channel method and params by methodName.

func (*Channel) IsClosed

func (ch *Channel) IsClosed() bool

IsClosed reports whether the channel is closed by developer.

func (*Channel) RegisterMethod

func (ch *Channel) RegisterMethod(methodName string, params ...interface{})

RegisterMethod registers the channel method and params, when the channel is recreated, the method can be executed again.

func (Channel) SetDelay

func (d Channel) SetDelay(seconds int)

SetDelay sets delay retry seconds for the delayer.

type Config

type Config struct {
	Host     string
	Port     int
	Username string
	Password string
	Vhost    string
}

Config represents a RabbitMQ AMQP URI config.

type Connection

type Connection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

Connection is amqp.Connection wrapper.

func Dial

func Dial(url string) (*Connection, error)

Dial wraps amqp.Dial, which can dial and get a auto reconnect connection.

func (*Connection) Channel

func (c *Connection) Channel() (*Channel, error)

Channel wraps amqp.Connection.Channel, which can get a auto recreate channel.

func (Connection) SetDelay

func (d Connection) SetDelay(seconds int)

SetDelay sets delay retry seconds for the delayer.

type ConsumeOptions

type ConsumeOptions struct {
	Tag       string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
}

ConsumeOptions represents consumer consume options.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a AMQP consumer.

func (Consumer) Close

func (e Consumer) Close() error

Close closes the AMQP channel.

func (*Consumer) Consume

func (c *Consumer) Consume(handler MessageHandler) error

Consume starts consuming AMQP delivery message by handler until the consumer channel closed by developer.

func (Consumer) DeclareAndBind

func (e Consumer) DeclareAndBind() error

DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan amqp.Delivery

Deliveries returns consumer AMQP message delivery chan.

func (*Consumer) DoMethod

func (c *Consumer) DoMethod(methodName string) []reflect.Value

DoMethod executes the consumer channel registered channel method and params by methodName.

func (Consumer) ExchangeDeclare

func (e Consumer) ExchangeDeclare() error

ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.

func (*Consumer) Get

func (c *Consumer) Get(handler MessageHandler) error

Get consumes a single AMQP delivery message from the head of a queue by handler.

func (*Consumer) Handler

func (c *Consumer) Handler() MessageHandler

Handler returns consumer message handler.

func (*Consumer) Qos

func (c *Consumer) Qos(prefetchCount int) error

Qos controls how many messages the server will try to keep on the network for consumers before receiving delivery acks.

func (Consumer) QueueBind

func (e Consumer) QueueBind() error

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

func (Consumer) QueueDeclare

func (e Consumer) QueueDeclare() error

QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.

func (*Consumer) RegisterMethod

func (c *Consumer) RegisterMethod(methodName string, params ...interface{})

RegisterMethod registers the channel method and params in consumer channel, when the channel is recreated, the method can be executed again.

func (Consumer) SetDelay

func (e Consumer) SetDelay(seconds int)

SetDelay sets delay retry seconds for the AMQP channel.

type DeliveryCacher

type DeliveryCacher interface {
	Store(deliveryTag uint64, publishing amqp.Publishing)
	Load(deliveryTag uint64) amqp.Publishing
	Republish() <-chan amqp.Publishing
}

DeliveryCacher represents an object that can store and load AMQP publishing by deliveryTag, also when the channel is recreated, it can return publishings that not get ack confirmation and need to be republish in the delivery cache by AMQP publishing chan.

type DeliveryMapCache

type DeliveryMapCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DeliveryMapCache implements the DeliveryCacher interface, and it can store, load and republish AMQP publishings by the thread-safe deliveryMap.

func NewDeliveryMapCache

func NewDeliveryMapCache() *DeliveryMapCache

NewDeliveryMapCache creates a new *DeliveryMapCache.

func (*DeliveryMapCache) Load

func (c *DeliveryMapCache) Load(deliverTag uint64) amqp.Publishing

Load loads the AMQP publishing by deliveryTag and deletes the deliveryTag key in deliveryMap.

func (*DeliveryMapCache) Republish

func (c *DeliveryMapCache) Republish() <-chan amqp.Publishing

Republish returns the AMQP publishing chan, then sends the publishing that not get ack confirmation and need to be republish in the deliveryMap.

func (*DeliveryMapCache) Store

func (c *DeliveryMapCache) Store(deliveryTag uint64, publishing amqp.Publishing)

Store stores the AMQP publishing in deliveryMap by deliveryTag. If the number of deliveryTag keys reaches DefaultMapGC, deliveryMap will gc and recreate.

type Exchange

type Exchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

Exchange represents exchange declare config.

type MessageHandler

type MessageHandler func(amqp.Delivery)

MessageHandler represents a AMQP delivery message handle function.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a AMQP producer.

func (Producer) Close

func (e Producer) Close() error

Close closes the AMQP channel.

func (Producer) DeclareAndBind

func (e Producer) DeclareAndBind() error

DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.

func (Producer) ExchangeDeclare

func (e Producer) ExchangeDeclare() error

ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.

func (*Producer) NotifyReturn

func (p *Producer) NotifyReturn(notifier ReturnNotifier)

NotifyReturn registers a listener for AMQP return message by notifier. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.

func (*Producer) Publish

func (p *Producer) Publish(publishing amqp.Publishing) error

Publish sends a amqp.Publishing from the producer to an exchange on the server. If an error occurred, producer will retry to publish by retryTimes and delaySeconds, when retryTimes < 0, it will retry forever until publish successfully.

func (Producer) QueueBind

func (e Producer) QueueBind() error

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

func (Producer) QueueDeclare

func (e Producer) QueueDeclare() error

QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.

func (*Producer) RetryTimes

func (p *Producer) RetryTimes() int

RetryTimes returns producer publish retry times.

func (*Producer) ReturnNotifier

func (p *Producer) ReturnNotifier() ReturnNotifier

ReturnNotifier returns producer AMQP return message handler.

func (Producer) SetDelay

func (e Producer) SetDelay(seconds int)

SetDelay sets delay retry seconds for the AMQP channel.

func (*Producer) SetRetryTimes

func (p *Producer) SetRetryTimes(times int)

SetRetryTimes sets publish retry times for the producer.

type PublishNotifier

type PublishNotifier func(*SafeProducer, amqp.Publishing, amqp.Confirmation)

PublishNotifier represents a AMQP message confirmation handle function.

type PublishOptions

type PublishOptions struct {
	RoutingKey string
	Mandatory  bool
	Immediate  bool
}

PublishOptions represents producer publish options.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Queue represents queue declare config.

type RabbitMQ

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ represents a AMQP connection with its URI config.

func New

func New(c Config) (*RabbitMQ, error)

New returns a new *RabbitMQ which contains a new AMQP connection by config.

func (*RabbitMQ) Close

func (r *RabbitMQ) Close() error

Close closes the AMQP connection.

func (*RabbitMQ) Connect

func (r *RabbitMQ) Connect() error

Connect dials if the AMQP connection is nil, otherwise returns nil.

func (*RabbitMQ) Connection

func (r *RabbitMQ) Connection() *Connection

Connection returns the AMQP connection.

func (*RabbitMQ) Dial

func (r *RabbitMQ) Dial() error

Dial dials a new AMQP connection over TCP using the AMQP URI config.

func (*RabbitMQ) NewConsumer

func (r *RabbitMQ) NewConsumer(session Session) (*Consumer, error)

NewConsumer returns a new *Consumer which contains a new AMQP channel by session.

func (*RabbitMQ) NewProducer

func (r *RabbitMQ) NewProducer(session Session) (*Producer, error)

NewProducer returns a new *Producer which contains a new AMQP channel by session.

func (*RabbitMQ) NewSafeProducer

func (r *RabbitMQ) NewSafeProducer(session Session, deliveryCache DeliveryCacher, notifier ...PublishNotifier) (*SafeProducer, error)

NewSafeProducer returns a new *SafeProducer which contains a new AMQP channel by session. The *SafeProducer will set the retryTimes with DefaultRetryTimes, and run a publish listener with notifier for reliable publishing, if notifier is nil, it will use DefaultPublishNotifier.

func (*RabbitMQ) SetDelay

func (r *RabbitMQ) SetDelay(seconds int)

SetDelay sets delay retry seconds for the AMQP connection.

type ReturnNotifier

type ReturnNotifier func(amqp.Return)

ReturnNotifier represents a AMQP return message handle function.

type SafeProducer

type SafeProducer struct {
	*Producer
	// contains filtered or unexported fields
}

SafeProducer represents a AMQP safe producer.

func (SafeProducer) Close

func (e SafeProducer) Close() error

Close closes the AMQP channel.

func (SafeProducer) DeclareAndBind

func (e SafeProducer) DeclareAndBind() error

DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.

func (SafeProducer) ExchangeDeclare

func (e SafeProducer) ExchangeDeclare() error

ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.

func (*SafeProducer) IsConfirmed

func (p *SafeProducer) IsConfirmed() bool

IsConfirmed reports whether the channel is in confirm mode.

func (*SafeProducer) Publish

func (p *SafeProducer) Publish(publishing amqp.Publishing) error

Publish sends a amqp.Publishing from the safe producer to an exchange on the server. If an error occurred, safe producer will retry to publish by retryTimes and delaySeconds, when retryTimes < 0, it will retry forever until publish successfully. Safe producer will record the publish message sent to the server to deliveryCache and each time a message is sent, the deliveryTag increases.

func (*SafeProducer) PublishNotifier

func (p *SafeProducer) PublishNotifier() PublishNotifier

PublishNotifier returns producer AMQP message confirmation handler.

func (SafeProducer) QueueBind

func (e SafeProducer) QueueBind() error

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

func (SafeProducer) QueueDeclare

func (e SafeProducer) QueueDeclare() error

QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.

func (SafeProducer) SetDelay

func (e SafeProducer) SetDelay(seconds int)

SetDelay sets delay retry seconds for the AMQP channel.

type Session

type Session struct {
	Exchange       Exchange
	Queue          Queue
	BindOptions    BindOptions
	ConsumeOptions ConsumeOptions
	PublishOptions PublishOptions
}

Session represents executer configs and options.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL