amqp

package
v0.0.0-...-68bc43f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2017 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SingletonFactory

func SingletonFactory() transport.Provider

SingletonFactory is a factory for creating singleton AMQP transport instances. This function returns back a Transport interface allowing it to be used as usrv.DefaultTransportFactory.

Types

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements a usrv transport using AMQP. The transport operates in both a client and a server mode and utilizes reference counting to share a single transport instance with multiple clients and servers. The transport maintains and reuses an open connection until both reference counters reach zero. It is therefore important that the application always closes the transport before exiting to ensure that the AMQP logs do not fill up with "client unexpectedly closed TCP connection" wanrings.

In both modes, the transport declares a direct amqp exchange called usrv. All usrv messages are routed through this particular exchange. The transport supoprts endpoint versioning and generates AMQP routing keys with format "$version/$service/$endpoint".

When operating in server mode, the transport allocates a private queue and for each defined endpoint it binds its routing key to the private queue. This allows the transport to use a single consumer channel for processing incoming requests. For each incoming request, the transport uses the routing key to figure out which handler it should invoke and spawns a go-routine to handle the request.

When operating in client mode, the transport allocates a private queue for receiving responses. Whenever the client sends an outgoing request it populates the following AMQP message fields:

  • AppId: set to the the outgoing message Sender() value.
  • Type: set to the outgoing message SenderEndpoint() value.
  • ReplyTo: set to the private queue name for receiving responses.
  • CorrelationId: set to the outgoing message ID() value.

Since the transport handles responses asynchronously, the correlation ID serves as a unique ID for matching pending requests to their responses.

The client also listens for failed deliveries. This allows the transport to fail pending requests if no servers are available or if the broker cannot route the request.

The transport receives its broker connection URL from a parameter with name: "transport/amqp/uri". It's default value is set to "amqp://guest:guest@localhost:5672/" which corresponds to a rabbitMQ instance running on localhost. At the moment, TLS connections to the broker are not supported.

As with other usrv transports, once connected, the AMQP transport monitors the URI config for changes and automatically attempts to re-dial the connection whenever its value changes.

func New

func New() *Transport

New creates a new amqp transport instance.

func (*Transport) Bind

func (t *Transport) Bind(version, service, endpoint string, handler transport.Handler) error

Bind listens for messages send to a particular service and endpoint tuple and invokes the supplied handler to process them.

func (*Transport) Close

func (t *Transport) Close(mode transport.Mode) error

Close shuts down the transport.

func (*Transport) Dial

func (t *Transport) Dial(mode transport.Mode) error

Dial connects the transport and starts relaying messages.

func (*Transport) Request

func (t *Transport) Request(msg transport.Message) <-chan transport.ImmutableMessage

Request performs an RPC and returns back a read-only channel for receiving the result.

func (*Transport) Unbind

func (t *Transport) Unbind(version, service, endpoint string)

Unbind removes a handler previously registered via a call to Bind().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL