rabbitmq

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2017 License: BSD-3-Clause Imports: 18 Imported by: 0

README

RabbitMQ adaptor

The RabbitMQ adaptor is capable of consuming and publishing JSON data.

When being used to publish data, you need to configure the routing_key and the exchange is pulled from the message namespace (i.e. database collection/table). If key_in_field is set to true, transporter will use the field defined routing_key to lookup the value from the data.

NOTE key_in_field defaults to false and will therefore use the static routing_key, if you set routing_key to an empty string, no routing key will be set in the published message.

Configuration:
rmq = rabbitmq({
  "uri": "amqp://127.0.0.1:5672/",
  "routing_key": "test",
  "key_in_field": false
  // "delivery_mode": 1, // non-persistent (1) or persistent (2)
  // "api_port": 15672,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"]
})

Documentation

Index

Constants

View Source
const (
	// DefaultDeliveryMode is used when writing messages to an exchange.
	DefaultDeliveryMode = amqp.Transient

	// DefaultRoutingKey is set to an empty string so all messages published to the exchange will
	// get routed to whatever queues are bound to it.
	DefaultRoutingKey = ""
)
View Source
const (
	// DefaultAPIPort is the default API port for RabbitMQ
	DefaultAPIPort = 15672
)
View Source
const (
	// DefaultURI is the default endpoint of RabbitMQ on the local machine.
	// Primarily used when initializing a new Client without a specific URI.
	DefaultURI = "amqp://guest:guest@localhost:5672/"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps the underlying connection to a RabbitMQ cluster.

func NewClient

func NewClient(options ...ClientOptionFunc) (*Client, error)

NewClient creates a new client to work with RabbitMQ.

The caller can configure the new client by passing configuration options to the func.

Example:

client, err := NewClient(
  WithURI("mongodb://localhost:27017"))

If no URI is configured, it uses DefaultURI.

An error is also returned when a configuration option is invalid

func (*Client) Close

func (c *Client) Close()

Close implements necessary calls to cleanup the underlying connection.

func (*Client) Connect

func (c *Client) Connect() (client.Session, error)

Connect satisfies the client.Client interface.

type ClientOptionFunc

type ClientOptionFunc func(*Client) error

ClientOptionFunc is a function that configures a Client. It is used in NewClient.

func WithCACerts

func WithCACerts(certs []string) ClientOptionFunc

WithCACerts configures the RootCAs for the underlying TLS connection

func WithSSL

func WithSSL(ssl bool) ClientOptionFunc

WithSSL configures the database connection to connect via TLS.

func WithURI

func WithURI(uri string) ClientOptionFunc

WithURI defines the full connection string for the RabbitMQ connection

type RabbitMQ

type RabbitMQ struct {
	adaptor.BaseConfig
	RoutingKey   string   `json:"routing_key"`
	KeyInField   bool     `json:"key_in_field"`
	DeliveryMode uint8    `json:"delivery_mode"`
	APIPort      int      `json:"api_port"`
	SSL          bool     `json:"ssl"`
	CACerts      []string `json:"cacerts"`
}

RabbitMQ defines all configurable elements for connecting to and sending/receiving JSON.

func (*RabbitMQ) Client

func (r *RabbitMQ) Client() (client.Client, error)

Client creates an instance of Client to be used for connecting to RabbitMQ.

func (*RabbitMQ) Description

func (r *RabbitMQ) Description() string

Description for file adaptor

func (*RabbitMQ) Reader

func (r *RabbitMQ) Reader() (client.Reader, error)

Reader instantiates a Reader for use with subscribing to one or more topics.

func (*RabbitMQ) SampleConfig

func (r *RabbitMQ) SampleConfig() string

SampleConfig for file adaptor

func (*RabbitMQ) Writer

func (r *RabbitMQ) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error)

Writer instantiates a Writer for use with publishing to one or more exchanges.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements client.Reader by consuming messages from the cluster based on its configuration.

func (*Reader) Read

func (r *Reader) Read(filterFn client.NsFilterFunc) client.MessageChanFunc

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session encapsulates an amqp.Connection and amqp.Channel for use by a Reader/Writer.

type Writer

type Writer struct {
	DeliveryMode uint8
	RoutingKey   string
	KeyInField   bool
}

Writer implements client.Writer by publishing messages to the cluster based on its configuration.

func (*Writer) Write

func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL