rabbitmq

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: BSD-3-Clause Imports: 19 Imported by: 2

README

RabbitMQ adaptor

The RabbitMQ adaptor is capable of consuming and publishing JSON data.

When being used to publish data, you need to configure the routing_key and the exchange is pulled from the message namespace (i.e. database collection/table). If key_in_field is set to true, transporter will use the field defined routing_key to lookup the value from the data.

NOTE key_in_field defaults to false and will therefore use the static routing_key, if you set routing_key to an empty string, no routing key will be set in the published message.

Configuration
rmq = rabbitmq({
  "uri": "amqp://127.0.0.1:5672/",
  "routing_key": "test",
  "key_in_field": false
  // "delivery_mode": 1, // non-persistent (1) or persistent (2)
  // "api_port": 15672,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"]
})

Run adaptor test

Spin up required containers (rabbitmq, haproxy)
# From transporter's root folder
cd config/rabbitmq
docker build . -t rabbit_haproxy
cd test_setup
docker-compose up -d
# Wait ~30s
Run the tests
# From transporter's root folder
go test -v ./adaptor/rabbitmq/

Documentation

Index

Constants

View Source
const (
	// DefaultDeliveryMode is used when writing messages to an exchange.
	DefaultDeliveryMode = amqp.Transient

	// DefaultRoutingKey is set to an empty string so all messages published to the exchange will
	// get routed to whatever queues are bound to it.
	DefaultRoutingKey = ""
)
View Source
const (
	// DefaultAPIPort is the default API port for RabbitMQ
	DefaultAPIPort = 15672
)
View Source
const (
	// DefaultURI is the default endpoint of RabbitMQ on the local machine.
	// Primarily used when initializing a new Client without a specific URI.
	DefaultURI = "amqp://guest:guest@localhost:5672/"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps the underlying connection to a RabbitMQ cluster.

func NewClient

func NewClient(options ...ClientOptionFunc) (*Client, error)

NewClient creates a new client to work with RabbitMQ.

The caller can configure the new client by passing configuration options to the func.

Example:

client, err := NewClient(
  WithURI("mongodb://localhost:27017"))

If no URI is configured, it uses DefaultURI.

An error is also returned when a configuration option is invalid

func (*Client) Close

func (c *Client) Close()

Close implements necessary calls to cleanup the underlying connection.

func (*Client) Connect

func (c *Client) Connect() (client.Session, error)

Connect satisfies the client.Client interface.

type ClientOptionFunc

type ClientOptionFunc func(*Client) error

ClientOptionFunc is a function that configures a Client. It is used in NewClient.

func WithCACerts

func WithCACerts(certs []string) ClientOptionFunc

WithCACerts configures the RootCAs for the underlying TLS connection

func WithSSL

func WithSSL(ssl bool) ClientOptionFunc

WithSSL configures the database connection to connect via TLS.

func WithURI

func WithURI(uri string) ClientOptionFunc

WithURI defines the full connection string for the RabbitMQ connection

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements client.Reader by consuming messages from the cluster based on its configuration.

func (*Reader) Read

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session encapsulates an amqp.Connection and amqp.Channel for use by a Reader/Writer.

type Writer

type Writer struct {
	DeliveryMode uint8
	RoutingKey   string
	KeyInField   bool
}

Writer implements client.Writer by publishing messages to the cluster based on its configuration.

func (*Writer) Write

func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL