rabbitmq

package module
v0.0.0-...-45aa0bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2020 License: MIT Imports: 8 Imported by: 0

README

RabbitMQ

Convenient RabbitMQ wrapper for amqp package.

This package is used mostly for internal purposes.

License

The MIT License (MIT), see LICENSE for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindingOptions

type BindingOptions struct {
	// Publishings messages to given Queue with matching -RoutingKey-
	// Every Queue has a default binding to Default Exchange with their Qeueu name
	// So you can send messages to a queue over default exchange
	RoutingKey string

	// Do not wait for a consumer
	NoWait bool

	// App specific data
	Args amqp.Table
}

BindingOptions type

type Closer

type Closer interface {
	RegisterSignalHandler()
	Shutdown() error
}

Closer interface is for handling reconnection logic in a sane way Every reconnection supported struct should implement those methods in order to work properly

type Consumer

type Consumer struct {
	// Base struct for Producer
	*RabbitMQ
	// contains filtered or unexported fields
}

Consumer type

func (*Consumer) Consume

func (c *Consumer) Consume(handler func(delivery amqp.Delivery)) error

Consume accepts a handler function for every message streamed from RabbitMq will be called within this handler func

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan amqp.Delivery

Deliveries handler

func (*Consumer) Get

func (c *Consumer) Get(handler func(delivery amqp.Delivery)) error

Get ConsumeMessage accepts a handler function and only consumes one message stream from RabbitMq

func (*Consumer) QOS

func (c *Consumer) QOS(messageCount int) error

QOS controls how many messages the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.

func (*Consumer) Shutdown

func (c *Consumer) Shutdown() error

Shutdown gracefully closes all connections and waits for handler to finish its messages

type ConsumerOptions

type ConsumerOptions struct {
	// The consumer is identified by a string that is unique and scoped for all
	// consumers on this channel.
	Tag string

	// When autoAck (also known as noAck) is true, the server will acknowledge
	// deliveries to this consumer prior to writing the delivery to the network.  When
	// autoAck is true, the consumer should not call Delivery.Ack
	AutoAck bool // autoAck

	// Check Queue struct documentation
	Exclusive bool // exclusive

	// When noLocal is true, the server will not deliver publishing sent from the same
	// connection to this consumer. (Do not use Publish and Consume from same channel)
	NoLocal bool // noLocal

	// Check Queue struct documentation
	NoWait bool // noWait

	// Check Exchange comments for Args
	Args amqp.Table // arguments
}

ConsumerOptions type

type Exchange

type Exchange struct {
	// Exchange name
	Name string

	// Exchange type
	Type string

	// Durable exchanges will survive server restarts
	Durable bool

	// Will remain declared when there are no remaining bindings.
	AutoDelete bool

	// Exchanges declared as `internal` do not accept accept publishings.Internal
	// exchanges are useful for when you wish to implement inter-exchange topologies
	// that should not be exposed to users of the broker.
	Internal bool

	// When noWait is true, declare without waiting for a confirmation from the server.
	NoWait bool

	// amqp.Table of arguments that are specific to the server's implementation of
	// the exchange can be sent for exchange types that require extra parameters.
	Args amqp.Table
}

Exchange type

type Producer

type Producer struct {
	// Base struct for Producer
	*RabbitMQ
	// contains filtered or unexported fields
}

Producer type

func (*Producer) NotifyReturn

func (p *Producer) NotifyReturn(notifier func(message amqp.Return))

NotifyReturn captures a message when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

func (*Producer) Publish

func (p *Producer) Publish(publishing amqp.Publishing) error

Publish sends a Publishing from the client to an exchange on the server.

func (*Producer) Shutdown

func (p *Producer) Shutdown() error

Shutdown gracefully closes all connections

type PublishingOptions

type PublishingOptions struct {
	// The key that when publishing a message to a exchange/queue will be only delivered to
	// given routing key listeners
	RoutingKey string

	// Publishing tag
	Tag string

	// Queue should be on the server/broker
	Mandatory bool

	// Consumer should be bound to server
	Immediate bool
}

PublishingOptions type

type Queue

type Queue struct {
	// The queue name may be empty, in which the server will generate a unique name
	// which will be returned in the Name field of Queue struct.
	Name string

	// Check Exchange comments for durable
	Durable bool

	// Check Exchange comments for autodelete
	AutoDelete bool

	// Exclusive queues are only accessible by the connection that declares them and
	// will be deleted when the connection closes.  Channels on other connections
	// will receive an error when attempting declare, bind, consume, purge or delete a
	// queue with the same name.
	Exclusive bool

	// When noWait is true, the queue will assume to be declared on the server.  A
	// channel exception will arrive if the conditions are met for existing queues
	// or attempting to modify an existing queue from a different connection.
	NoWait bool

	// Check Exchange comments for Args
	Args amqp.Table
}

Queue type

type RabbitMQ

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ type

func New

func New(uri string) *RabbitMQ

New RabbitMQ wrapper.

func (*RabbitMQ) Conn

func (r *RabbitMQ) Conn() *amqp.Connection

Conn returns RMQ connection

func (*RabbitMQ) Connect

func (r *RabbitMQ) Connect() (*RabbitMQ, error)

Connect opens a connection to RabbitMq. This function is idempotent

TODO this should not return RabbitMQ struct - cihangir,arslan config changes

func (*RabbitMQ) Dial

func (r *RabbitMQ) Dial() error

Dial dials the RMQ server

func (*RabbitMQ) NewConsumer

func (r *RabbitMQ) NewConsumer(e Exchange, q Queue, bo BindingOptions, co ConsumerOptions) (*Consumer, error)

NewConsumer is a constructor for consumer creation Accepts Exchange, Queue, BindingOptions and ConsumerOptions

func (*RabbitMQ) NewProducer

func (r *RabbitMQ) NewProducer(e Exchange, q Queue, po PublishingOptions) (*Producer, error)

NewProducer is a constructor function for producer creation Accepts Exchange, Queue, PublishingOptions. On the other hand we are not declaring our topology on both the publisher and consumer to be able to change the settings only in one place. We can declare those settings on both place to ensure they are same. But this package will not support it.

func (*RabbitMQ) RegisterSignalHandler

func (r *RabbitMQ) RegisterSignalHandler()

RegisterSignalHandler watchs for interrupt signals and gracefully closes connection

func (*RabbitMQ) Shutdown

func (r *RabbitMQ) Shutdown() error

Shutdown closes the RabbitMQ connection

type Session

type Session struct {
	// Exchange declaration settings
	Exchange Exchange

	// Queue declaration settings
	Queue Queue

	// Binding options for current exchange to queue binding
	BindingOptions BindingOptions

	// Consumer options for a queue or exchange
	ConsumerOptions ConsumerOptions

	// Publishing options for a queue or exchange
	PublishingOptions PublishingOptions
}

Session is holding the current Exchange, Queue, Binding Consuming and Publishing settings for enclosed rabbitmq connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL