gmq

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: MIT Imports: 11 Imported by: 2

README

general-mq

Documentation CI Coverage License

This is the Go implementation (the original project is here).

General purposed interfaces for message queues. Now we provide the following implementations:

  • AMQP 0-9-1
  • MQTT

By using these classes, you can configure queues with the following properties:

  • Unicast or broadcast.
  • Reliable or best-effort.

Notes

  • MQTT uses shared queues to implement unicast.
  • AMQP uses confirm channels to implement reliable publish, and MQTT uses QoS 1 to implement reliable publish/subscribe.

Relationships of Connections and Queues

The term connection describes a TCP/TLS connection to the message broker. The term queue describes a message queue or a topic within a connection. You can use one connection to manage multiple queues, or one connection to manage one queue.

A queue can only be a receiver or a sender at a time.

Connections for sender/receiver queues with the same name

The sender and the receiver are usually different programs, there are two connections to hold two queues.

For the special case that a program acts both the sender and the receiver using the same queue:

  • The AMQP implementation uses one Channel for one queue, so the program can manages all queues with one connection.
  • The MQTT implementation MUST uses one connection for one queue, or both sender and receiver will receive packets.

Test

Please prepare a RabbitMQ broker and a EMQX broker at localhost for testing.

  • To install using Docker:

    $ docker run --rm --name rabbitmq -d -p 5672:5672 rabbitmq:management-alpine
    $ docker run --rm --name emqx -d -p 1883:1883 emqx/emqx
    

Then run the test:

$ go test -tags= -v ./tests

Example

Launch RabbitMQ and then run AMQP example:

$ go run examples/simple.go

Launch EMQX and then run MQTT example:

$ RUN_MQTT= go run examples/simple.go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

Manages an AMQP connection.

func NewAmqpConnection

func NewAmqpConnection(opts AmqpConnectionOptions) (*AmqpConnection, error)

Create a AMQP connection instance.

func (*AmqpConnection) AddHandler

func (c *AmqpConnection) AddHandler(handler ConnectionHandler) string

func (*AmqpConnection) Close

func (c *AmqpConnection) Close() error

func (*AmqpConnection) Connect

func (c *AmqpConnection) Connect() error

func (*AmqpConnection) RemoveHandler

func (c *AmqpConnection) RemoveHandler(id string)

func (*AmqpConnection) Status

func (c *AmqpConnection) Status() Status

type AmqpConnectionOptions

type AmqpConnectionOptions struct {
	// Connection URI. Use `amqp|amqps://username:password@host:port/vhost` format.
	//
	// Default is `amqp://localhost/%2f`.
	URI string
	// Connection timeout in milliseconds.
	//
	// Default or zero value is `3000`.
	ConnectTimeoutMS uint64
	// Time in milliseconds from disconnection to reconnection.
	//
	// Default or zero value is `1000`.
	ReconnectMS uint64
}

The connection options.

type AmqpQueue

type AmqpQueue struct {
	// contains filtered or unexported fields
}

Manages an AMQP queue.

func NewAmqpQueue

func NewAmqpQueue(opts AmqpQueueOptions, conn *AmqpConnection) (*AmqpQueue, error)

Create a queue instance.

func (*AmqpQueue) Close

func (q *AmqpQueue) Close() error

func (*AmqpQueue) Connect

func (q *AmqpQueue) Connect() error

func (*AmqpQueue) IsRecv

func (q *AmqpQueue) IsRecv() bool

func (*AmqpQueue) Name

func (q *AmqpQueue) Name() string

func (*AmqpQueue) SendMsg

func (q *AmqpQueue) SendMsg(payload []byte) error

func (*AmqpQueue) SetHandler

func (q *AmqpQueue) SetHandler(handler QueueEventHandler)

func (*AmqpQueue) SetMsgHandler added in v0.0.6

func (q *AmqpQueue) SetMsgHandler(handler QueueMessageHandler) error

func (*AmqpQueue) Status

func (q *AmqpQueue) Status() Status

type AmqpQueueOptions

type AmqpQueueOptions struct {
	// The queue name that is used to map a AMQP queue (unicast) or an exchange (broadcast).
	//
	// The pattern is `^[a-z0-9_-]+([\\.]{1}[a-z0-9_-]+)*$`.
	Name string
	// `true` for the receiver and `false` for the sender.
	IsRecv bool
	// Reliable by selecting the confirm channel (for publish).
	Reliable bool
	// `true` for broadcast and `false` for unicast.
	Broadcast bool
	// Time in milliseconds from disconnection to reconnection.
	//
	// Default or zero value is `1000`.
	ReconnectMS uint64
	// The QoS of the receiver queue.
	//
	// `Note`: this value MUST be a positive value.
	Prefetch uint16
	// Use persistent delivery mode.
	Persistent bool
}

The queue options.

type ConnectionHandler

type ConnectionHandler interface {
	// Triggered by `ConnectionStatus`.
	OnStatus(handlerID string, conn GmqConnection, status Status)

	// Triggered when there are errors.
	OnError(handlerID string, conn GmqConnection, err error)
}

The event handler for connections.

type GmqConnection

type GmqConnection interface {
	// To get the connection status.
	Status() Status

	// To add a connection event handler. This will return an identifier for applications to manage
	// handlers.
	AddHandler(handler ConnectionHandler) string

	// To remove a handler with an idenfier from `AddHandler`.
	RemoveHandler(id string)

	// To connect to the message broker. The `GmqConnection` will connect to the broker using
	// a go-routine and report status with `Status`.
	Connect() error

	// To close the connection.
	Close() error
}

The operations for connections.

type GmqError

type GmqError int

Error type of general-mq.

const (
	// The queue does not have a message handler.
	NoMsgHandler GmqError = iota
	// The connection is not connected or the queue (topic) is not connected (declared/subscribed).
	NotConnected
	// The queue is a receiver that cannot send messages.
	QueueIsReceiver
)

general-mq error.

func (GmqError) String

func (e GmqError) String() string

type GmqQueue

type GmqQueue interface {
	// To get the queue name.
	Name() string

	// Is the queue a receiver.
	IsRecv() bool

	// To get the connection status.
	Status() Status

	// To set the queue event handler. Use `nil` to remove the handler.
	SetHandler(handler QueueEventHandler)

	// To set the queue message handler.
	SetMsgHandler(handler QueueMessageHandler) error

	// To connect to the message queue. The `GmqQueue` will connect to the queue using a go-routine
	// report status with `Status`.
	//
	// `Note` You MUST call `SetMsgHandler()` before `connect()`.
	Connect() error

	// To close the connection.
	Close() error

	// To send a message (for senders only).
	SendMsg(payload []byte) error
}

The operations for queues.

type Message

type Message interface {
	// To get the payload.
	Payload() []byte

	// Use this if the message is processed successfully.
	Ack() error

	// To requeue the message and the broker will send the message in the future.
	//
	// `Note`: only AMQP or protocols that support requeuing are effective.
	Nack() error
}

The operations for incoming messages.

type MqttConnection

type MqttConnection struct {
	// contains filtered or unexported fields
}

Manages an MQTT connection.

func NewMqttConnection

func NewMqttConnection(opts MqttConnectionOptions) (*MqttConnection, error)

Create a MQTT connection instance.

func (*MqttConnection) AddHandler

func (c *MqttConnection) AddHandler(handler ConnectionHandler) string

func (*MqttConnection) Close

func (c *MqttConnection) Close() error

func (*MqttConnection) Connect

func (c *MqttConnection) Connect() error

func (*MqttConnection) RemoveHandler

func (c *MqttConnection) RemoveHandler(id string)

func (*MqttConnection) Status

func (c *MqttConnection) Status() Status

type MqttConnectionOptions

type MqttConnectionOptions struct {
	// Connection URI. Use `mqtt|mqtts://username:password@host:port` format.
	//
	// Default is `mqtt://localhost`.
	URI string
	// Connection timeout in milliseconds.
	//
	// Default or zero value is `3000`.
	ConnectTimeoutMS uint64
	// Time in milliseconds from disconnection to reconnection.
	//
	// Default or zero value is `1000`.
	ReconnectMS uint64
	// Client identifier. Empty to generate a random client identifier.
	ClientID string
	// Do not clean session flag.
	//
	// `Note`: this is not stable.
	NotCleanSession bool
}

The connection options.

type MqttQueue

type MqttQueue struct {
	// contains filtered or unexported fields
}

Manages a MQTT queue.

func NewMqttQueue

func NewMqttQueue(opts MqttQueueOptions, conn *MqttConnection) (*MqttQueue, error)

Create a queue instance.

func (*MqttQueue) Close

func (q *MqttQueue) Close() error

func (*MqttQueue) Connect

func (q *MqttQueue) Connect() error

func (*MqttQueue) IsRecv

func (q *MqttQueue) IsRecv() bool

func (*MqttQueue) Name

func (q *MqttQueue) Name() string

func (*MqttQueue) SendMsg

func (q *MqttQueue) SendMsg(payload []byte) error

func (*MqttQueue) SetHandler

func (q *MqttQueue) SetHandler(handler QueueEventHandler)

func (*MqttQueue) SetMsgHandler added in v0.0.6

func (q *MqttQueue) SetMsgHandler(handler QueueMessageHandler) error

func (*MqttQueue) Status

func (q *MqttQueue) Status() Status

type MqttQueueOptions

type MqttQueueOptions struct {
	// The queue name that is used to map a MQTT topic.
	//
	// The pattern is `^[a-z0-9_-]+([\\.]{1}[a-z0-9_-]+)*$`.
	Name string
	// `true` for the receiver and `false` for the sender.
	IsRecv bool
	// Reliable by selecting the confirm channel (for publish).
	Reliable bool
	// `true` for broadcast and `false` for unicast.
	//
	// `Note`: the unicast queue relies on `shared queue`. See the `SharedPrefix` option.
	Broadcast bool
	// Time in milliseconds from disconnection to reconnection.
	//
	// Default or zero value is `1000`.
	ReconnectMS uint64
	// Used for `broadcast=false`.
	SharedPrefix string
}

The queue options.

type QueueEventHandler added in v0.0.6

type QueueEventHandler interface {
	// Triggered by `ConnectionStatus`.
	OnStatus(queue GmqQueue, status Status)

	// Triggered when there are errors.
	OnError(queue GmqQueue, err error)
}

The event handler for queues.

type QueueMessageHandler added in v0.0.6

type QueueMessageHandler interface {
	// Triggered for new incoming `Message`s.
	OnMessage(queue GmqQueue, message Message)
}

The message handler for queues.

type Status

type Status int

Status of connections and queues.

const (
	// The connection/queue is closing.
	Closing Status = iota
	// The connection/queue is closed by the program.
	Closed
	// Connecting to the message broker or the queue.
	Connecting
	// Connected to the message broker or queue.
	Connected
	// The connection/ is not connected. It will retry connecting to the broker or queue
	// automatically.
	Disconnected
)

Status definitions for connections and queues.

func (Status) String

func (s Status) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL