rabbit_hole

package
v0.0.0-...-92f3312 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2020 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Overview

Rabbit_hole creates a simple interface to a RabbitMQ exchange allowing setup via one function call to Mk_mqreader() for reading and Mk_mqwriter() for sending. Rabbit_hole also provides for a channel listening interface which can be paused by the user.

The user programme can create a listener via Mk_mqreader() and then can either listen directly on the lister.Port for amqp.Delivery messages, or can invoke listener.Eat() and supply a channel where Eat() will write received messages (allowing a central user function to process all messages from multiple listeners).

User programme creates a sender via Mk_mqwriter() which returns a struct that is used to start the driver. Once the driver is started, the user can pass messages on the struct.Port and the driver will push it out to the message exchange.

Index

Constants

View Source
const (
	DURABLE   bool = true // bloody amqp parms are true false w/o constants to make code readable
	AUTO_DEL  bool = true
	INTERNAL  bool = true
	WAIT      bool = false
	EXCLUSIVE bool = true
	AUTO_ACK  bool = true // automatically ack messages so we/user does not have to
	LOCAL     bool = true
	MANDITORY bool = true
	IMMED     bool = true
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Mq_msg

type Mq_msg struct {
	Key  string
	Data []byte
}

User can send this on the channel to use a specific key.

type Mq_reader

type Mq_reader struct {
	Port <-chan amqp.Delivery // this is exposed so that the user can listen directly
	// contains filtered or unexported fields
}

Manages a reader connection.

func Mk_mqreader

func Mk_mqreader(host string, port string, user string, pw string, ex string, ex_type string, key *string) (rdr *Mq_reader, err error)

Create a reader for a given user/pw host/port exchange 5-tuple. Creates a connection and channel.

The exchange type has the following syntax:

[type][+eop+eop..][>[qname][+qop+qop...]

Type is fanout etc. Eop is one of the following and can be !eop to negate:

ad == autodel
du == durable
in == internal

qname is the name of the queue, if >+qop is coded, then a random name is gnerated. The qops are:

ad == auto delete
du == durable
ex == exclusive

Any attribute may be prefixed with ! to negate it (e.g. !ad) and the order of either type of op is NOT important.

The queue type must match the exchange type (durable or !durable) and setting the type for the exchage will also set the type for the queue, and thus setting du or !du in the queue is optional.

The key is a string of one or more comma separated keys which will be asscoiated with the queue when bound to the exchange. For a direct exchange, this allows the same queue to be used to receive multiple message types based on key.

func (*Mq_reader) Close

func (rdr *Mq_reader) Close()

----------------reader things ----------------------------------------------------------

Ensure that everything is closed down before we go.

func (*Mq_reader) Pause

func (rdr *Mq_reader) Pause(onoff bool)

Pause will set the pause flag in the reader based on the on/off state that is passed in. When paused is true, then messages which are receivd from the Rabbit exchange are dropped. When paused, and quiet is off, periodic dropped counts are written to the standard error device.

func (*Mq_reader) Quiet

func (rdr *Mq_reader) Quiet(onoff bool)

Quiet sets the reader's quiet option using the given on/off flag.

func (*Mq_reader) Start_eating

func (rdr *Mq_reader) Start_eating(usr_ch chan amqp.Delivery)

Start_eating causes the package to beging processin messages from the rabbit channel and passing them on the user channel, or dropping them if paused.

func (*Mq_reader) Stop

func (rdr *Mq_reader) Stop()

Stop sets the stop flag in the reader. If the reader is active it wll stop and return.

type Mq_writer

type Mq_writer struct {
	Port chan interface{} // anything user writes to port is sent to the exhange (accepts string, *string and []byte blobs)

	Notify chan string // user can send request to us (quiet, stop, etc.)
	// contains filtered or unexported fields
}

Struct which manages a connection to an exchange for writing.

func Mk_mqwriter

func Mk_mqwriter(host string, port string, user string, pw string, ex string, ex_type string, key *string) (wrtr *Mq_writer, err error)

Create a new writer which connects to the RMQ server and binds to the named exchange.

func (*Mq_writer) Close

func (wrtr *Mq_writer) Close()

Ensure that everything is closed down before we go.

func (*Mq_writer) Delete

func (wrtr *Mq_writer) Delete(force bool) (err error)

Delete the queue and exchange associated with the writer. If force is not true, then deletion only happens if there are no consumers on the queue.

func (*Mq_writer) Driver

func (wrtr *Mq_writer) Driver()

Listens on the channel for some kind of 'blob' and then sends it off to the exchange. The blob can be string, *string, []byte, or Mq_msg. Mq_msg is the only way of using a key other than the default given when the driver was started.

If the write channel encounters an error, an attempt to reconnct will be made, and writing will resume when a connection is established, however any messages received on our inbound channel will be dropped. The user can test the state after each burst (or individual message) using the Is_alive() function which will return false if the connection is down. This does not make any attempt to preserve messages during an outage.

func (*Mq_writer) Is_alive

func (wrtr *Mq_writer) Is_alive() bool

Is alive checks the known state of the connection and returns true if it is thought to be alive.

func (*Mq_writer) Quiet

func (wrtr *Mq_writer) Quiet(onoff bool)

Quiet sets the writer's quiet option using the given on/off flag.

func (*Mq_writer) Start_writer

func (wrtr *Mq_writer) Start_writer(key string)

Starts the writer driver for the exchange using the given key as the default.

func (*Mq_writer) Stop

func (wrtr *Mq_writer) Stop()

Stop sets the stop flag in the writer. If the writer is active it will stop and return.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL