lanternmq

package module
v0.0.0-...-3a37e37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2023 License: Apache-2.0 Imports: 1 Imported by: 0

README

LanternMQ

LanternMQ is a go package that facilitates the messaging infrastructure for the Lantern microservices. LanternMQ provides a simple interface for sending and receiving messages in a queue, and for sending and receiving topic messages.

The package includes a RabbitMQ implementation for the LanternMQ interface.

The package also includes a mock implementation for the LanternMQ interface to support testing.

To test the package, see the testing instructions.

Updating Users for RabbitMQ

The default users, their password hashes, and each user's permissions can be found in lantern/definitions.json.

To update users from the RabbitMQ browser interface:

  1. Log-in as an admin at localhost:15672
  2. Go to the Admin tab
  3. Click the specific user to update
  4. Near the bottom of the page click Update this User
  5. Set a new password

A new user can be added by clicking Add a user in the Admin tab.

To update users from the command line:
Run docker exec -it lantern-back-end_lantern-mq_1 rabbitmqctl change_password <username> <new password>

A new user can be added by replacing change_password in the above line to add_user.

The definitions.json file must be updated to persist these changes. To update the definitions.json file:

  1. Get the updated JSON object by using the RabbitMQ API
    (e.g. curl -H "Accept:application/json" -u <management_username>:<management_password> "localhost:15672/api/definitions")
  2. Replace the current definitions file with the response from Step 1

The two steps can also be combined into one command: curl -H "Accept:application/json" -u <management_username>:<management_password> "localhost:15672/api/definitions" > lanternmq/definitions.json

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelID

type ChannelID interface{}

ChannelID is the identifier for a channel.

type MessageHandler

type MessageHandler func([]byte, *map[string]interface{}) error

MessageHandler is a function to process an individual message.

type MessageQueue

type MessageQueue interface {
	// Connect opens a connection with the underlying queuing service.
	Connect(username string, password string, host string, port string) error
	// CreateChannel opens a channel associated with the connected queuing service.
	CreateChannel() (ChannelID, error)
	// NumConcurrentMsgs defines how many messages can be processed in parallel.
	NumConcurrentMsgs(chID ChannelID, num int) error
	// QueueExists checks whether or not a queue already exists
	QueueExists(chID ChannelID, qName string) (bool, error)
	// DeclareQueue creates a queue with the name 'qName' on the channel with ID 'chID' if one
	// does not exist.
	DeclareQueue(chID ChannelID, qName string) error
	// PublishToQueue sends 'message' to the queue with name 'qName' over the channel with ID
	// 'chID'.
	PublishToQueue(chID ChannelID, qName string, message string) error
	// ConsumeFromQueue returns an instance of Messages, which acts like the receiving channel
	// for any messages that present on queue 'qName' on the channel with ID 'chID'.
	ConsumeFromQueue(chID ChannelID, qName string) (Messages, error)
	// ProcessMessages applies the 'handler' MessageHandler with arguments 'args' to each
	// message that is received through 'msgs'. Sends any errors to the 'errs' channel.
	ProcessMessages(ctx context.Context, msgs Messages, handler MessageHandler, args *map[string]interface{}, errs chan<- error)
	// DeclareExchange creates an exchange with the name 'name' and type 'exchangeType' on the channel with
	// ID 'chID' if one does not exist.
	DeclareExchange(chID ChannelID, name string, exchangeType string) error
	// PublishToExchange sends 'message' to the exchange 'name' on channel with ID 'chID', which will be
	// routed to receivers using 'routingKey'.
	PublishToExchange(chID ChannelID, name string, routingKey string, message string) error
	// DeclareExchangeReceiveQueue creates queue with name 'qName' associated to the exchange with name
	// 'typeName' on the channel with ID 'chID' to receive messages routed with the routing key 'routingKey'.
	DeclareExchangeReceiveQueue(chID ChannelID, typeName string, qName string, routingKey string) error
	// Close closes the MessageQueue and any associated resources including associated channels and the
	// connection to the underlying queuing service.
	Close()
}

MessageQueue is an interface for writing messages to either a basic queue or a topic. Below are some usage examples.

Example: Publish a message to a queue -------- mq := <implementation of MessageQueue err := mq.Connect("guest", "guest", "localhost", "5672") chID, err := mq.CreateChannel() err = mq.DeclareQueue(chID, "queueName") err = mq.PublishToQueue(chID, "queueName", "message")

Example: Read a message from a queue -------- mq := <implementation of MessageQueue err := mq.Connect("guest", "guest", "localhost", "5672") chID, err := mq.CreateChannel() err = mq.DeclareQueue(chID, "queueName") msgs, err := mq.ConsumeFromQueue(chID, "queueName") forever := make(chan bool) errs := make(chan error) go mq.ProcessMessages(

		msgs,
		func(msg []byte, _ *map[string]interface{}) error {
			fmt.Printf("Received message: %s\n")
		},
		nil,
     errs)

<-forever

Example: Publish a message to a topic -------- mq := <implementation of MessageQueue err := mq.Connect("guest", "guest", "localhost", "5672") chID, err := mq.CreateChannel() err = mq.DeclareExchange(chID, "topicName", "topic") err = mq.PublishToExchange(chID, "topicName", "topicRoutingKey", "message")

Example: Read a message from a topic -------- mq := <implementation of MessageQueue err := mq.Connect("guest", "guest", "localhost", "5672") chID, err := mq.CreateChannel() err = mq.DeclareExchange(chID, "topicName", "topic") err = mq.DeclareExchangeReceiveQueue(chID, "topicName", "queueName", "topicRoutingKey") msgs, err := mq.ConsumeFromQueue(chID, "queueName") forever := make(chan bool) errs := make(chan error) go mq.ProcessMessages(

		msgs,
		func(msg []byte, _ *map[string]interface{}) error {
			fmt.Printf("Received message: %s\n")
		},
		nil,
     errs)

<-forever

type Messages

type Messages interface{}

Messages is the stream of messages that will be received from a queue.

Directories

Path Synopsis
pkg
test
receive
Receives messages from the queue named 'hello' and from the topic with target name 'logs_topic' and topics 'error' and 'warning'.
Receives messages from the queue named 'hello' and from the topic with target name 'logs_topic' and topics 'error' and 'warning'.
sendQueue
Sends a message to a queue.
Sends a message to a queue.
sendTopic
Sends a message as a queue topic.
Sends a message as a queue topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL