msgbus

package module
v3.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

README

msgbus

A simplified interface to MQTT.

Implements rooting a topic tree, logging and retained topic retrieval.

Wraps https://github.com/eclipse/paho.mqtt.golang, greatly simplifies it and hides some surprising synchronization behavior.

PkgGoDev Coverage Status

Documentation

Overview

Package msgbus implements a generic PubSub message bus that follows MQTT guidelines.

It uses https://github.com/eclipse/paho.mqtt.golang under the hood.

Spec

The MQTT specification lives at http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html

Example
b := msgbus.New()
base := "homeassistant"
var err error
// Now all Publish() calls topics are based on "homeassistant/".
if b, err = msgbus.RebasePub(b, base); err != nil {
	log.Fatal(err)
}
// Now all Subscribe() calls topics are based on "homeassistant/".
if b, err = msgbus.RebaseSub(b, base); err != nil {
	log.Fatal(err)
}

if err := b.Close(); err != nil {
	log.Fatal(err)
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Retained

func Retained(b Bus, d time.Duration, topic ...string) (map[string][]byte, error)

Retained retrieves all matching messages for one or multiple topics.

Topic queries cannot be used.

If a topic is missing, will wait for up to d for it to become available. If all topics are available, returns as soon as they are all retrieved.

Types

type Bus

type Bus interface {
	io.Closer

	// Publish publishes a message to a topic.
	//
	// If msg.Payload is empty, the topic is deleted if it was retained.
	//
	// It is not guaranteed that messages are propagated in order, unless
	// qos ExactlyOnce is used.
	Publish(msg Message, qos QOS) error

	// Subscribe sends updates to this topic query through the provided channel.
	//
	// It blocks until the context is canceled. Returns an error if subscription
	// failed.
	//
	// Upon subscription, it sends an empty message that can be ignored, to
	// enable synchronizing with other systems.
	Subscribe(ctx context.Context, topicQuery string, qos QOS, c chan<- Message) error
}

Bus is a publisher-subscriber bus.

The topics are expected to use the MQTT definition. "Mosquitto" has good documentation about this: https://mosquitto.org/man/mqtt-7.html

For more information about retained message behavior, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

Implementation of Bus are expected to implement fmt.Stringer.

func Log

func Log(b Bus) Bus

Log returns a Bus that logs all operations done on it, via log standard package.

func New

func New() Bus

New returns a local thread safe memory backed Bus.

This Bus is thread safe. It is useful for unit tests or as a local broker.

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
b := msgbus.New()
c := make(chan msgbus.Message)
go func() {
	defer close(c)
	if err := b.Subscribe(ctx, "#", msgbus.BestEffort, c); err != nil {
		log.Fatal(err)
	}
}()
// Wait for subscription to be live.
if msg := <-c; len(msg.Topic) != 0 || len(msg.Payload) != 0 || msg.Retained {
	log.Fatal(msg)
}

if err := b.Publish(msgbus.Message{Topic: "sensor", Payload: []byte("ON"), Retained: true}, msgbus.BestEffort); err != nil {
	log.Fatal(err)
}
msg := <-c
fmt.Printf("%s: %s\n", msg.Topic, msg.Payload)

if err := b.Close(); err != nil {
	log.Fatal(err)
}
Output:

sensor: ON

func NewMQTT

func NewMQTT(server, clientID, user, password string, will Message, order bool) (Bus, error)

NewMQTT returns an initialized active MQTT connection.

The connection timeouts are fine tuned for a LAN. It will likely fail on a slower connection or when used over the internet.

will is the message to send if the connection is not closed correctly; when Close() is not called.

order determines is messages are processed in order or not. Out of order processing means that a subscription will not be blocked by another one that fails to process its queue in time.

This main purpose of this library is to create a layer that is simpler, more usable and more Go-idiomatic than paho.mqtt.golang.

See https://godoc.org/github.com/eclipse/paho.mqtt.golang#ClientOptions.AddBroker for the accepted server format.

Example
will := msgbus.Message{Topic: "alive", Payload: []byte("NO"), Retained: true}
hostname, err := os.Hostname()
if err != nil {
	log.Fatal(err)
}
b, err := msgbus.NewMQTT("tcp://localhost:1883", hostname, "user", "pass", will, false)
if err != nil {
	log.Fatal(err)
}
msg := msgbus.Message{Topic: "alive", Payload: []byte("YES"), Retained: true}
if err := b.Publish(msg, msgbus.BestEffort); err != nil {
	log.Fatal(err)
}
if err := b.Close(); err != nil {
	log.Fatal(err)
}
Output:

func RebasePub

func RebasePub(b Bus, root string) (Bus, error)

RebasePub rebases a Bus when publishing messages.

All Message published have their Topic prefixed with root.

Messages retrieved are unaffected.

Returns nil if root is an invalid topic or if it is a topic query.

func RebaseSub

func RebaseSub(b Bus, root string) (Bus, error)

RebaseSub rebases a Bus when subscribing or getting topics.

All Message retrieved have their Topic prefix root stripped.

Messages published are unaffected.

Returns nil if root is an invalid topic or if it is a topic query.

type Message

type Message struct {
	// Topic is the MQTT topic. It may have a prefix stripped by RebaseSub() or
	// inserted by RebasePub().
	Topic string
	// Payload is the application specific data.
	//
	// Publishing a message with no Payload deletes a retained Topic, and has no
	// effect on non-retained topic.
	Payload []byte
	// Retained signifies that the message is permanent until explicitly changed.
	// Otherwise it is ephemeral.
	Retained bool
}

Message represents a single message to a single topic.

type QOS

type QOS int8

QOS defines the quality of service to use when publishing and subscribing to messages.

The normative definition is http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180912

const (
	// BestEffort means the broker/client will deliver the message at most once,
	// with no confirmation.
	BestEffort QOS = 0
	// MinOnce means the broker/client will deliver the message at least once,
	// potentially duplicate.
	//
	// Do not use if message duplication is problematic.
	MinOnce QOS = 1
	// ExactlyOnce means the broker/client will deliver the message exactly once
	// by using a four step handshake.
	ExactlyOnce QOS = 2
)

func (QOS) String

func (i QOS) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL