Documentation ¶
Overview ¶
Package msgbus implements a generic PubSub message bus that follows MQTT guidelines.
The main difference with MQTT topic is the support for relative message on rebased bus. See RebasePub() for more details.
Spec ¶
The MQTT specification lives at http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Bus ¶
type Bus interface { io.Closer // Publish publishes a message to a topic. // // If msg.Payload is empty, the topic is deleted if it was retained. // // It is not guaranteed that messages are propagated in order, unless // qos ExactlyOnce is used. Publish(msg Message, qos QOS) error // Subscribe sends updates to this topic query through the returned channel. Subscribe(topicQuery string, qos QOS) (<-chan Message, error) // Unsubscribe removes a previous subscription. // // Trying to unsubscribe from an invalid topic or a topic not currently // subscribed is ignored. // // BUG: while Subscribe() can be called multiple times with a topic query, a // single Unsubscribe() call will unregister all subscriptions. Unsubscribe(topicQuery string) }
Bus is a publisher-subscriber bus.
The topics are expected to use the MQTT definition. "Mosquitto" has good documentation about this: https://mosquitto.org/man/mqtt-7.html
For more information about retained message behavior, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages
Implementation of Bus are expected to implement fmt.Stringer.
func New ¶
func New() Bus
New returns a local thread safe memory backed Bus.
This Bus is thread safe. It is useful for unit tests or as a local broker.
func NewMQTT ¶
NewMQTT returns an initialized active MQTT connection.
The connection timeouts are fine tuned for a LAN. It will likely fail on a slower connection or when used over the internet.
will is the message to send if the connection is not closed correctly; when Close() is not called.
order determines is messages are processed in order or not. Out of order processing means that a subscription will not be blocked by another one that fails to process its queue in time.
This main purpose of this library is to create a layer that is simpler, more usable and more Go-idiomatic than paho.mqtt.golang.
func RebasePub ¶
RebasePub rebases a Bus when publishing messages.
All Message published have their Topic prefixed with root.
Messages retrieved are unaffected.
Returns nil if root is an invalid topic or if it is a topic query.
It is possible to publish a message topic outside of root with:
- "../" to backtrack closer to root
- "//" to ignore the root
func RebaseSub ¶
RebaseSub rebases a Bus when subscribing or getting topics.
All Message retrieved have their Topic prefix root stripped.
Messages published are unaffected.
Returns nil if root is an invalid topic or if it is a topic query.
It is possible to subscribe to a message topic outside of root with:
- "../" to backtrack closer to root
- "//" to ignore the root
type Message ¶
type Message struct { // Topic is the MQTT topic. It may have a prefix stripped by RebaseSub() or // inserted by RebasePub(). Topic string // Payload is the application specific data. // // Publishing a message with no Payload deleted a retained Topic, and has no // effect on non-retained topic. Payload []byte // Retained signifies that the message is permanent until explicitly changed. // Otherwise it is ephemeral. Retained bool }
Message represents a single message to a single topic.
type QOS ¶
type QOS int8
QOS defines the quality of service to use when publishing and subscribing to messages.
The normative definition is http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180912
const ( // BestEffort means the broker/client will deliver the message at most once, // with no confirmation. BestEffort QOS = 0 // MinOnce means the broker/client will deliver the message at least once, // potentially duplicate. // // Do not use if message duplication is problematic. MinOnce QOS = 1 // ExactlyOnce means the broker/client will deliver the message exactly once // by using a four step handshake. ExactlyOnce QOS = 2 )