amqpClient

package module
v0.0.0-...-2857195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2017 License: MIT Imports: 14 Imported by: 0

README

amqp-client

GoDoc Go Report Card

A rabbitmq client-wrapper library that uses streadway/amqp.

Installation

go get github.com/almasry/amqp-client

Documentation

Available on godoc.org, also reading this document would help you quickly get started using the package, further examples are available at amqp-client/examples .

Pkg design

The pkg uses a good level of abstraction to give the user a great deal of flexibility through decoupling the api from the actual implementation.


Getting started

Create a Struct that holds your queues as fields (probably in a separate file), the struct could have any name, but all its fields must be of type string, the fields might also start with 'Queue' prefix - but it's not mandatory.

type Queues struct {
	QueueUserAccount    string
	QueueOrderWorkflow  string
	QueueServiceChanges string
	// ... 
	// ... all other queues should go here ..
}

  • In your application init function (or the function you use to bootstrap), create an instance of the queues struct you just created, the values of the fields are the names of the queues as hey will appear in rabbitmq

  • Copy the examples/config.yml file to your application directory or just copy the [rabbitmq ] configuration block to your current .yml file (if you use one). After editing the parameters, make a call to the amqpClient.Initialize( .. , ..) with both file location and the new instance of queues struct as parameters to the function.

  • The Initialize method will create all the queues provided, so you actually don't have to worry any more about creating queues in the run time. You can consume/ publish to any of these queues without having to worry about their creation process or which exchange they belong to.

You still can create queues dynamically during the run time by calling client method CreateQueue( name ) anywhere in your application.

var AmqpQueues Queues

func init() {
    // setting values of queues
	AmqpQueues = Queues{
		QueueUserAccount:    "user_account",
		QueueOrderWorkflow:  "order_workflow",
		QueueServiceChanges: "service_changes",
	}
	// initializing the amqp client library
	amqpClient.Initialize("./config.yml", AmqpQueues)
}

Horraaay ! Now that you'e done with configuration, let's get to the fun part.


Example Publisher

Publishing to any of the previously created queues is as simple as the following :


	// creating a new client
	cl, err := amqpClient.New()
	failOnError(err, "Failed to create a new amqp client")
	defer cl.Disconnect()
	
	// creating an instance of the amqpClient.Event and publishing 
	userEmailChangedEvent := amqpClient.Event{
		ID:   "1",
		Name: "new_order.placed", // sending a notification to stat shipping the newly placed order ..
		Date: time.Now(),
	}
	cl.Publish(&userEmailChangedEvent, AmqpQueues.QueueOrderWorkflow)
  • The default simple event amqpClient.Event is using json marshaling and json unmarshaling to handle the serialize / deserialize process. why ? so in case if you have other consumers (written in other programming languages ) they don' need to know about other types of serialization / deserialization, all they need is to decode a simple json message.

That' all about publishing events, very simple ! The full example is available at examples/publisher .


Example Consumer

Creating workers and consuming events is also very simple. Actually they are just one step, all you need to know is :

  • the name of the queue you want to consume messages from
  • the number of concurrent workers you need (1 ... )
  • the event handler or the callback function that will handle the event once it' received

    // create a new client 
	client, err := amqpClient.New()
	failOnError(err)
	defer client.Disconnect()

	// QueuesList ==> is the global queues configuration we declared in the previous steps 

    // QueuesList.QueueServiceChanges  is th queue you wanna publish to 
    // 2  is th number of concurrent consumers (workers) that will be reading messages from  QueueServiceChanges 
	client.Consume(AmqpQueues.QueueServiceChanges, 2, func(msg []byte, consumer string) {
		eventHandler(msg, consumer)
	})
	
	// Do some other business logic here ...
	// ....
	// ....

	client.WatchWorkersStream()
  • Workers are blocking by nature, but in this case Consume method runs asynchronously in a non-blocking way, at the end of your function you need to call client.WatchWorkersStream() to wait for the results of the consumers otherwise the consumers will exit.

  • Yo can create as many consumers with any number of workers and run them simultanelously wihtout having to worry about how their concurrency works or fan out their results to on channel.


    // create a new client 
	client, err := amqpClient.New()
	failOnError(err)
	defer client.Disconnect()

	client.Consume(AmqpQueues.QueueServiceChanges, 5, func(msg []byte, consumer string) {
		eventHandler(msg, consumer)
	})

	client.Consume(AmqpQueues.QueueUserAccount, 20, func(msg []byte, consumer string) {
		// 
		eventHandler(msg, consumer)
	})

	client.Consume(AmqpQueues.QueueOrderWorkflow, 9, func(msg []byte, consumer string) {
		eventHandler(msg, consumer)
	})

	// Do some other business logic here ...
	// ....

	client.WatchWorkersStream()
  • In the call back function of the Consume method, you receive two parameters, the 'msg' in its row format, yo need to deserialize it, and the name of the consumer (the hash), you may ignore the latest or use it for future debugging if needed.

func eventHandler(msg []byte, consumer string) {
	// assuming you already know the type of event you'e receiving from the channel ..
	var event amqpClient.Event
	event.Deserialize(msg)

	log.Println(fmt.Sprintf("Consumer %s just processed event : %s", consumer, event.Name))
}

That' all about consuming events and creating workers. The full example is available at examples/consumer .


FAQ

Why do you create queues while initializing the application ?

It's a good practice to create all the queues while bootstrapping the application, usually rabbitmq tries to allocate disk space to redundantly save the queues and their messages to the disk, and f it fails for any reason - you'd like to find out and fix it as early as possible, not during the run time.

PS. the default configuration of the library sets all queues as 'durable', Durable queues will survive server restarts and remain even when there are no remaining consumers or bindings.

Why do I need to creating a struct for Queues names, why bother while you can just make them part of the yml configuration ?

Using the queues names as fields of a struct makes it impossible for developers to miss-type the namesof the queues. While making the queues declaration part of the weakly typed yaml configuration will give more chance for error since you have to go every time and look at the yml file and copy some string and paste it somewhere else, which is a very error-prone process.

A small bug of not publishing to the right queue can lead to losing thousands of events which is vry risky in an event-driven architecture.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Initialize

func Initialize(configFile string, queues interface{}) error

Initialize : function takes care of two things : it parses the config of the amqb system, and creates the queues as binds them to the default exchange

Types

type Config

type Config struct {
	RabbitMq RabbitMq
}

Config struct : describes the structure of the .yml file configuration

type Event

type Event struct {
	ID   string
	Name string
	Date time.Time
}

Event : is the simplest form of event, provided as a quick way to

func (*Event) Deserialize

func (e *Event) Deserialize(jsonMessage []byte) error

Deserialize implements how an event of type Event is deserialize

func (*Event) Serialize

func (e *Event) Serialize() ([]byte, error)

Serialize implements how an event of type Event is serialize

type MessageBrokerClientInterface

type MessageBrokerClientInterface interface {
	Connect() error
	Disconnect() error
	CreateQueue(queueName string) error
	Publish(event SerializableEventInterface, queueName string) error
	Consume(queueName string, workers uint, callback func(msg []byte, consumer string)) error
	WatchWorkersStream()
}

MessageBrokerClientInterface : to create a message broker client, the client must implement this interface so that it can be used by other system components

func New

New function creates a new client with a new connection (channel) to the message broker

type RabbitMq

type RabbitMq struct {
	Connection struct {
		Host     string `yaml:"host"`
		Port     string `yaml:"port"`
		User     string `yaml:"user"`
		Password string `yaml:"password"`
	}
	Logs struct {
		Logfile string `yaml:"logfile"`
	}
}

RabbitMq struct : describes the rabbitmq configuration (connection, loggig settings)

type RabbitMqClient

type RabbitMqClient struct {
	// contains filtered or unexported fields
}

RabbitMqClient is the default Message Broker client that wraps streadway/amqp implementation of rabbitmq api streadway/amqp can be easily replaced with other implementations, all is need is a struct of type MessageBrokerClientInterface

func (*RabbitMqClient) Connect

func (cl *RabbitMqClient) Connect() error

Connect connects the client to the amqp server

func (*RabbitMqClient) Consume

func (cl *RabbitMqClient) Consume(queueName string, workers uint, callback func(msg []byte, consumer string)) error

Consume is used to consume events / messages from a specific queue. A callback function is required as a parameter, and it' called whn a new message has been received

func (*RabbitMqClient) CreateQueue

func (cl *RabbitMqClient) CreateQueue(queueName string) error

CreateQueue is used to create a queue, by default all the queues created are durable, which means even if the message broker restarts, it will automatically recreate the queues upon recovery without data loss

func (*RabbitMqClient) Disconnect

func (cl *RabbitMqClient) Disconnect() error

Disconnect closes the connection (channel) with the amqp server

func (*RabbitMqClient) Publish

func (cl *RabbitMqClient) Publish(event SerializableEventInterface, queueName string) error

Publish : publishes an event of type SerializableEventInterface to a specific queue

func (*RabbitMqClient) WatchWorkersStream

func (cl *RabbitMqClient) WatchWorkersStream()

WatchWorkersStream does a couple of things : it's a multiplexer of the logs of all the successfully consumed and handled events in the system, it also blocks the function calling it from exiting the consumers. Not calling this function would make it necessary for the developer to create consumers with a go routine [ go client.Consume(..)] and use the wait method from the builtin asynch package to manage concurrency in order to prevent the program from exiting.

type SerializableEventInterface

type SerializableEventInterface interface {
	Serialize() ([]byte, error)
	Deserialize(jsonString []byte) error
}

SerializableEventInterface : to create an event struct that can be published and consumed by the message broker the struct has to be of type SerializableEventInterface and should implement Serialize() and Deserialize() methods

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL