rabbit

package module
v0.0.0-...-8cd25d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2017 License: MIT Imports: 8 Imported by: 0

README

rabbit for Go

RabbitMQ Topic Subscriber for Go.

Installation

Dowload rabbit using go get

go get github.com/brettallred/rabbit

Import rabbit into your package

import github.com/brettallred/rabbit

Getting Started

rabbit for Go consists of Subscribers and Handlers. First you need to create a Subscriber

var subscriber = rabbit.Subscriber{
	Concurrency: 5,
	Durable:     true,
	Exchange:    "events",
	Queue:       "test.sample.event.created",
	RoutingKey:  "sample.event.created",
}

Next, you need to create a Handler that will handle the messages your subscriber receives

func sampleTestEventCreatedHandler(delivery amqp.Delivery) bool {
	log.Printf("%s", delivery.Body)
	return true
}

Now, register your Subscriber and Handler with rabbit

rabbit.Register(subscriber, sampleTestEventCreatedHandler)

Finally, fire up the subscribers

rabbit.StartSubscribers()

Publishing

rabbit includes a simple Publisher

publisher := rabbit.NewPublisher()
publisher.Publish("My Message", subscriber)

or, if you are publishing something that isn't a string

publisher.PublishBytes([]byte("My Message"), subscriber)

##Contributing

This is my first project in Go. Feedback and contributions are appreciated. Before submitting a Pull Request, please open an issue outlining the problem and the proposed enhancement. Please reference the issue in your Pull Request.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Subscribers is a map of all of the registered Subscribers
	Subscribers map[string]Subscriber
	// Handlers is a map of all of the registered Subscriber Handlers
	Handlers map[string]func(amqp.Delivery) bool
)

Functions

func CloseSubscribers

func CloseSubscribers()

CloseSubscribers removes all subscribers, handlers, and closes the amqp connection

func CreateQueue

func CreateQueue(channel *amqp.Channel, subscriber *Subscriber) error

CreateQueue creates a queue and binds it

func DeleteQueue

func DeleteQueue(s Subscriber) error

DeleteQueue does what it says, deletes a queue in rabbit

func ExposeSubscriberConnectionForTests

func ExposeSubscriberConnectionForTests() *amqp.Connection

ExposeSubscriberConnectionForTests returns the subscriber connection for testing purposes

func InitPop

func InitPop()

InitPop intializes the RabbitMQ Connection and Channel for popping messages off of a queue.

func IsDevelopmentEnv

func IsDevelopmentEnv() bool

IsDevelopmentEnv tells you if you are currently running in a development environment

func Pop

func Pop(subscriber *Subscriber) (string, error)

Pop returns a single item from a RabbitMQ queue. It uses the Subscriber to know which queue to pop the item off. This is currently a helper function for the tests so you can pop a message off the queue and test it.

func Register

func Register(s Subscriber, handler func(amqp.Delivery) bool)

Register adds a subscriber and handler to the subscribers pool

func ResetPopConnection

func ResetPopConnection()

ResetPopConnection sets popConnection=nil and popChannel=nil for testing purposes

func StartSubscribers

func StartSubscribers() error

StartSubscribers spins up all of the registered Subscribers and consumes messages on their respective queues.

Types

type AssuredPublisher

type AssuredPublisher struct {
	Publisher
	// contains filtered or unexported fields
}

AssuredPublisher allows you to publish events to RabbitMQ with implicit delivery confirmation

func NewAssuredPublisher

func NewAssuredPublisher(cancelChannel <-chan bool) *AssuredPublisher

NewAssuredPublisher constructs a new AssuredPublisher instance

func NewAssuredPublisherWithConnection

func NewAssuredPublisherWithConnection(connection *Connection, cancelChannel <-chan bool) *AssuredPublisher

NewAssuredPublisherWithConnection constructs a new AssuredPublisher instance

func (*AssuredPublisher) DisableRepublishing

func (p *AssuredPublisher) DisableRepublishing()

DisableRepublishing disables messages republishing

func (*AssuredPublisher) Publish

func (p *AssuredPublisher) Publish(message string, subscriber *Subscriber) bool

Publish pushes items on to a RabbitMQ Queue. For AssuredPublisher it waits for delivery confirmaiton and retries on failures

func (*AssuredPublisher) PublishBytes

func (p *AssuredPublisher) PublishBytes(message []byte, subscriber *Subscriber) bool

PublishBytes is the same as Publish but accepts a []byte instead of a string. For AssuredPublisher it waits for delivery confirmaiton and retries on failures

func (*AssuredPublisher) PublishBytesWithArg

func (p *AssuredPublisher) PublishBytesWithArg(message []byte, subscriber *Subscriber, arg interface{}) bool

PublishBytesWithArg is the same as Publish but accepts a []byte instead of a string. The argument will be stored for passing into the confirmation handler. For AssuredPublisher it waits for delivery confirmaiton and retries on failures

func (*AssuredPublisher) PublishWithArg

func (p *AssuredPublisher) PublishWithArg(message string, subscriber *Subscriber, arg interface{}) bool

PublishWithArg pushes items on to a RabbitMQ Queue. The argument will be stored for passing into the confirmation handler. For AssuredPublisher it waits for delivery confirmaiton and retries on failures

func (*AssuredPublisher) ReceiveAllConfirmations

func (p *AssuredPublisher) ReceiveAllConfirmations() bool

ReceiveAllConfirmations explicitly receives all awaiting confirmations

func (*AssuredPublisher) SetConfirmationHandler

func (p *AssuredPublisher) SetConfirmationHandler(confirmationHandler func(amqp.Confirmation, interface{}))

SetConfirmationHandler sets the handler which is called for every confirmation received

func (*AssuredPublisher) SetExplicitWaiting

func (p *AssuredPublisher) SetExplicitWaiting()

SetExplicitWaiting disables implicit waiting for a confirmation after each publishing

func (*AssuredPublisher) WaitForAllConfirmations

func (p *AssuredPublisher) WaitForAllConfirmations() bool

WaitForAllConfirmations waits for all confirmations and retries publishing if needed. Returns false only if is cancelled.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents an autorecovering connection

func NewConnectionWithURL

func NewConnectionWithURL(url string) *Connection

NewConnectionWithURL creates a new connection with a custom RabbitMQ URL

func (*Connection) Close

func (connection *Connection) Close()

Close closes a connection

func (*Connection) GetConnection

func (connection *Connection) GetConnection() *amqp.Connection

GetConnection returns an amqp.Connection stored in Connection. It establishes a new connection if needed.

func (*Connection) ReplaceConnection

func (connection *Connection) ReplaceConnection(newConnection *amqp.Connection)

ReplaceConnection replaces the internal connection with a given one. For testing purposes only

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher allows you to publish events to RabbitMQ

func NewPublisher

func NewPublisher(cancelChannel <-chan bool) *Publisher

NewPublisher constructs a new Publisher instance.

func NewPublisherWithConnection

func NewPublisherWithConnection(connection *Connection, cancelChannel <-chan bool) *Publisher

NewPublisherWithConnection constructs a new Publisher instance with a custom connection

func (*Publisher) Close

func (p *Publisher) Close()

Close will close the connection and channel for the Publisher

func (*Publisher) Confirm

func (p *Publisher) Confirm(wait bool) error

Confirm enables reliable mode for the publisher.

func (*Publisher) GetChannel

func (p *Publisher) GetChannel() *amqp.Channel

GetChannel returns a publisher's channel. It opens a new channel if needed.

func (*Publisher) NotifyPublish

func (p *Publisher) NotifyPublish(size int) chan amqp.Confirmation

NotifyPublish registers a listener for reliable publishing.

func (*Publisher) Publish

func (p *Publisher) Publish(message string, subscriber *Subscriber) error

Publish pushes items on to a RabbitMQ Queue.

func (*Publisher) PublishBytes

func (p *Publisher) PublishBytes(message []byte, subscriber *Subscriber) error

PublishBytes is the same as Publish but accepts a []byte instead of a string

type Subscriber

type Subscriber struct {
	Concurrency   int
	Durable       bool
	Exchange      string
	Queue         string
	RoutingKey    string
	PrefetchCount int
	AutoDelete    bool
	ManualAck     bool
}

Subscriber contains all of the necessary data for Publishing and Subscriber to RabbitMQ Topics

func (*Subscriber) AutoDeleteInDev

func (subscriber *Subscriber) AutoDeleteInDev()

AutoDeleteInDev will set the Subscribers AutoDelete setting to true as long as you are in a development environement. Non production environements have a APP_ENV value that isn't ("production", "prod", "staging", "stage"). This is used for running a worker in your local environment but connecting to a stage or prodution rabbit server. You want to ensure the Subscriber gets AutoDeleted on the remote server.

func (*Subscriber) PrefixQueueInDev

func (subscriber *Subscriber) PrefixQueueInDev()

PrefixQueueInDev will prefix the queue name with the name of your current user if of the APP_ENV variable is set to a non production value ("production", "prod", "staging", "stage"). This is used for running a worker in your local environment but connecting to a stage or prodution rabbit server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL