msgbus

package module
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2017 License: Apache-2.0 Imports: 10 Imported by: 7

README

msgbus

A simplified interface to MQTT.

More than a simple MQTT client implementation, it implements rooting a topic tree, logging and retained topic retrieval.

Uses https://github.com/maruel/paho.mqtt.golang, a fork of https://github.com/eclipse/paho.mqtt.golang that drops websocket support to reduce the dependency set.

GoDoc Go Report Card

Documentation

Overview

Package msgbus implements a generic PubSub message bus that follows MQTT guidelines.

The main difference with MQTT topic is the support for relative message on rebased bus. See RebasePub() for more details.

Spec

The MQTT specification lives at http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Retained

func Retained(b Bus, d time.Duration, topic ...string) (map[string][]byte, error)

Retained retrieves all matching messages for one or multiple topics.

Topic queries cannot be used.

If a topic is missing, will wait for up to d for it to become available. If all topics are available, returns as soon as they are all retrieved.

Types

type Bus

type Bus interface {
	io.Closer

	// Publish publishes a message to a topic.
	//
	// If msg.Payload is empty, the topic is deleted if it was retained.
	//
	// It is not guaranteed that messages are propagated in order, unless
	// qos ExactlyOnce is used.
	Publish(msg Message, qos QOS) error

	// Subscribe sends updates to this topic query through the returned channel.
	Subscribe(topicQuery string, qos QOS) (<-chan Message, error)

	// Unsubscribe removes a previous subscription.
	//
	// Trying to unsubscribe from an invalid topic or a topic not currently
	// subscribed is ignored.
	//
	// BUG: while Subscribe() can be called multiple times with a topic query, a
	// single Unsubscribe() call will unregister all subscriptions.
	Unsubscribe(topicQuery string)
}

Bus is a publisher-subscriber bus.

The topics are expected to use the MQTT definition. "Mosquitto" has good documentation about this: https://mosquitto.org/man/mqtt-7.html

For more information about retained message behavior, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

Implementation of Bus are expected to implement fmt.Stringer.

func Log

func Log(b Bus) Bus

Log returns a Bus that logs all operations done on it, via log standard package.

func New

func New() Bus

New returns a local thread safe memory backed Bus.

This Bus is thread safe. It is useful for unit tests or as a local broker.

func NewMQTT

func NewMQTT(server, clientID, user, password string, will Message, order bool) (Bus, error)

NewMQTT returns an initialized active MQTT connection.

The connection timeouts are fine tuned for a LAN. It will likely fail on a slower connection or when used over the internet.

will is the message to send if the connection is not closed correctly; when Close() is not called.

order determines is messages are processed in order or not. Out of order processing means that a subscription will not be blocked by another one that fails to process its queue in time.

This main purpose of this library is to create a layer that is simpler, more usable and more Go-idiomatic than paho.mqtt.golang.

func RebasePub

func RebasePub(b Bus, root string) Bus

RebasePub rebases a Bus when publishing messages.

All Message published have their Topic prefixed with root.

Messages retrieved are unaffected.

Returns nil if root is an invalid topic or if it is a topic query.

It is possible to publish a message topic outside of root with:

  • "../" to backtrack closer to root
  • "//" to ignore the root

func RebaseSub

func RebaseSub(b Bus, root string) Bus

RebaseSub rebases a Bus when subscribing or getting topics.

All Message retrieved have their Topic prefix root stripped.

Messages published are unaffected.

Returns nil if root is an invalid topic or if it is a topic query.

It is possible to subscribe to a message topic outside of root with:

  • "../" to backtrack closer to root
  • "//" to ignore the root

type Message

type Message struct {
	// Topic is the MQTT topic. It may have a prefix stripped by RebaseSub() or
	// inserted by RebasePub().
	Topic string
	// Payload is the application specific data.
	//
	// Publishing a message with no Payload deleted a retained Topic, and has no
	// effect on non-retained topic.
	Payload []byte
	// Retained signifies that the message is permanent until explicitly changed.
	// Otherwise it is ephemeral.
	Retained bool
}

Message represents a single message to a single topic.

type QOS

type QOS int8

QOS defines the quality of service to use when publishing and subscribing to messages.

The normative definition is http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180912

const (
	// BestEffort means the broker/client will deliver the message at most once,
	// with no confirmation.
	BestEffort QOS = 0
	// MinOnce means the broker/client will deliver the message at least once,
	// potentially duplicate.
	//
	// Do not use if message duplication is problematic.
	MinOnce QOS = 1
	// ExactlyOnce means the broker/client will deliver the message exactly once
	// by using a four step handshake.
	ExactlyOnce QOS = 2
)

func (QOS) String

func (i QOS) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL