mqtt

package module
v0.0.0-...-b33ea04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2021 License: CC0-1.0 Imports: 16 Imported by: 0

README

Go MQTT

About

MQTT is a protocol for message queueing over a network. This project provides a client library for the Go programming language. Message-delivery guarantees are maintained at all costs, even on (protocol, network or persistence) errors. The client recovers from errors atomatically. Message transfers in both directions do zero-copy.

The development was kindly sponsored by Northvolt, as a gift to the open-source community.

This is free and unencumbered software released into the public domain.

Go Reference

Introduction

The client supports confirmed message delivery with full progress disclosure. Message transfers without an confirmation can be as simple as the following.

err := client.Publish(ctx.Done(), []byte("20.8℃"), "bedroom")
if err != nil {
	log.Print("thermostat update lost: ", err)
	return
}

A read routine sees inbound messages from any of the subscribed topics.

for {
	message, topic, err := client.ReadSlices()
	switch {
	case err == nil:
		r, _ := utf8.DecodeLastRune(message)
		switch r {
		case 'K', '℃', '℉':
			log.Printf("%q at %q", message, topic)
		}

	case errors.Is(err, mqtt.ErrClosed):
		return // client terminated

	default:
		log.Print("broker unavailable: ", err)
		time.Sleep(time.Second) // backoff
	}
}

The examples from the package documentation provide more detail on error reporting and the delivery alternatives.

Command-Line Client

Run go install github.com/go-mqtt/mqtt/cmd/mqttc to build the binary.

NAME
	mqttc — MQTT broker access

SYNOPSIS
	mqttc [options] address

DESCRIPTION
	The command connects to the address argument, with an option to
	publish a message and/or subscribe with topic filters.

	When the address does not specify a port, then the defaults are
	applied, which is 1883 for plain connections and 8883 for TLS.

OPTIONS
  -ca file
    	Amend the trusted certificate authorities with a PEM file.
  -cert file
    	Use a client certificate from a PEM file (with a corresponding
    	-key option).
  -client identifier
    	Use a specific client identifier. (default "generated")
  -key file
    	Use a private key (matching the client certificate) from a PEM
    	file.
  -net name
    	Select the network by name. Valid alternatives include tcp4,
    	tcp6 and unix. (default "tcp")
  -pass file
    	The file content is used as a password.
  -prefix string
    	Print a string before each inbound message.
  -publish topic
    	Send a message to a topic. The payload is read from standard
    	input.
  -quiet
    	Suppress all output to standard error. Error reporting is
    	deduced to the exit code only.
  -quote
    	Print inbound topics and messages as quoted strings.
  -server name
    	Use a specific server name with TLS
  -subscribe filter
    	Listen with a topic filter. Inbound messages are printed to
    	standard output until interrupted by a signal(3). Multiple
    	-subscribe options may be applied together.
  -suffix string
    	Print a string after each inbound message. (default "\n")
  -timeout duration
    	Network operation expiry. (default 4s)
  -tls
    	Secure the connection with TLS.
  -topic
    	Print the respective topic of each inbound message.
  -user name
    	The user name may be used by the broker for authentication
    	and/or authorization purposes.
  -verbose
    	Produces more output to standard error for debug purposes.

EXIT STATUS
	(0) no error
	(1) MQTT operational error
	(2) illegal command invocation
	(5) connection refused: unacceptable protocol version
	(6) connection refused: identifier rejected
	(7) connection refused: server unavailable
	(8) connection refused: bad username or password
	(9) connection refused: not authorized
	(130) close on SIGINT
	(143) disconnect on SIGTERM

EXAMPLES
	Send a message:

		echo "hello" | mqttc -publish chat/misc localhost

	Print messages:

		mqttc -subscribe "news/#" -prefix "📥 " :1883

	Health check:

		mqttc -tls q1.example.com:8883 || echo "exit $?"

BUGS
	Report bugs at <https://github.com/go-mqtt/mqtt/issues>.

SEE ALSO
	mosquitto_pub(1)

Standard Compliance

The implementation follows version 3.1.1 of the OASIS specification in a strict manner. Support for the originating IBM specification may be added at some point in time.

There are no plans to support protocol version 5. Version 3 is lean and well suited for IOT. The additions in version 5 may be more of a fit for backend computing.

Documentation

Overview

Package mqtt provides a client for the Message Queuing Telemetry Transport protocol.

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Publish and Disconnect do fire-and-forget submission. ErrClosed, ErrDown, ErrCanceled or an IsDeny [Publish only] imply no request submission. All other errors imply that the request submission was interrupted by either a connection failure or a PauseTimeout appliance.

Ping, Subscribe and Unsubscribe await response from the broker. ErrClosed, ErrDown, ErrMax, ErrCanceled or an IsDeny [Subscribe and Unsubscribe only] imply no request submission. ErrBreak and ErrAbandoned leave with the broker response unknown. Subscribe responses may cause an SubscribeError. All other errors imply that the request submission was interrupted by either a connection failure or a PauseTimeout appliance.

PublishAtLeastOnce and PublishExactlyOnce enqueue requests to a Persistence before network submission. Errors imply that the message was dropped: either ErrClosed, ErrMax, Save failure and an IsDeny. Further errors are reported to the respective exchange channel. None of them are fatal, including ErrClosed.

Index

Examples

Constants

View Source
const (

	// ErrProtocolLevel means that the server does not support the level of
	// the MQTT protocol requested by the Client.
	ErrProtocolLevel connectReturn

	// ErrClientID means that the client identifier is correct UTF-8 but not
	// allowed by the Server.
	ErrClientID

	// ErrUnavailable means that the network connection has been made but
	// the MQTT service is unavailable.
	ErrUnavailable

	// ErrAuthBad means that the data in the user name or password is
	// malformed.
	ErrAuthBad

	// ErrAuth means that the client is not authorized to connect.
	ErrAuth
)

Connect return errors are predefined reasons for a broker to deny a connect request. IsConnectionRefused returns true for each of these.

Variables

View Source
var ErrAbandoned = errors.New("mqtt: request abandoned after submission")

ErrAbandoned means that a quit signal got applied after the request was send. The broker received the request, yet the result/response remains unknown.

View Source
var ErrBreak = errors.New("mqtt: connection lost while awaiting response")

ErrBreak means that the connection broke up after the request was send. The broker received the request, yet the result/response remains unknown.

View Source
var ErrCanceled = errors.New("mqtt: request canceled before submission")

ErrCanceled means that a quit signal got applied before the request was send. The transacion never happened, as opposed to ErrAbandoned.

View Source
var ErrClosed = errors.New("mqtt: client closed")

ErrClosed signals use after Close. The state is permanent. Further invocation will result again in an ErrClosed error.

View Source
var ErrDown = errors.New("mqtt: connection unavailable")

ErrDown signals no-service after a failed connect attempt. The error state will clear once a connect retry succeeds.

View Source
var ErrMax = errors.New("mqtt: maximum number of pending requests reached")

ErrMax denies a request on transit capacity, which prevents the Client from blocking. Ping has a limit of 1 slot. Subscribe and Unsubscribe share a large number of slots. PublishAtLeastOnce and PublishExactlyOnce each have a limit defined by Config. A plain Publish (at most once) has no limit.

Functions

func IsConnectionRefused

func IsConnectionRefused(err error) bool

IsConnectionRefused returns whether the broker denied a connect request from the Client.

func IsDeny

func IsDeny(err error) bool

IsDeny returns whether execution was rejected by the Client based on some validation constraint, like size limitation or an illegal UTF-8 encoding. The rejection is permanent in such case. Another invocation with the same arguments will result in the same error again.

Types

type BigMessage

type BigMessage struct {
	*Client        // source
	Topic   string // destinition
	Size    int    // byte count
}

BigMessage signals reception beyond the read buffer capacity. Receivers may or may not allocate the memory with ReadAll. The next ReadSlices will acknowledge reception either way.

func (*BigMessage) Error

func (e *BigMessage) Error() string

Error implements the standard error interface.

func (*BigMessage) ReadAll

func (e *BigMessage) ReadAll() ([]byte, error)

ReadAll returns the message in a new/dedicated buffer. Messages can be read only once, after reception (from ReadSlices), and before the next ReadSlices. The invocation must occur from within the same routine.

type Client

type Client struct {
	Config // read-only
	// contains filtered or unexported fields
}

Client manages a network connection until Close or Disconnect. Clients always start in the Offline state. The (un)subscribe, publish and ping methods block until the first connect attempt (from ReadSlices) completes. When the connect attempt fails, then requests receive ErrDown until a retry succeeds. The same goes for the automatic reconnects on connection loss.

A single goroutine must invoke ReadSlices consecutively until ErrClosed. Some backoff on error reception comes recommended though.

Multiple goroutines may invoke methods on a Client simultaneously, except for ReadSlices.

Example (Setup)

It is good practice to install the client from main.

package main

import (
	"errors"
	"log"
	"time"

	"github.com/go-mqtt/mqtt"
)

// Publish is a method from mqtt.Client.
var Publish func(quit <-chan struct{}, message []byte, topic string) error

func main() {
	client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{
		Dialer:       mqtt.NewDialer("tcp", "localhost:1883"),
		PauseTimeout: 4 * time.Second,
	})
	if err != nil {
		log.Fatal("exit on broken setup: ", err)
	}

	// launch read-routine
	go func() {
		var big *mqtt.BigMessage
		for {
			message, topic, err := client.ReadSlices()
			switch {
			case err == nil:
				// do something with inbound message
				log.Printf("📥 %q: %q", topic, message)

			case errors.As(err, &big):
				log.Printf("📥 %q: %d byte message omitted", big.Topic, big.Size)

			case errors.Is(err, mqtt.ErrClosed):
				log.Print(err)
				return // terminated

			case mqtt.IsConnectionRefused(err):
				log.Print(err) // explains rejection
				// mqtt.ErrDown for a while
				time.Sleep(15 * time.Minute)

			default:
				log.Print("broker unavailable: ", err)
				// mqtt.ErrDown during backoff
				time.Sleep(2 * time.Second)
			}
		}
	}()

	// Install each method in use as a package variable. Such setup is
	// compatible with the tools proveded from the mqtttest subpackage.
	Publish = client.Publish
}
Output:

func AdoptSession

func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal error)

AdoptSession continues with a Persistence which had an InitSession already.

func InitSession

func InitSession(clientID string, p Persistence, c *Config) (*Client, error)

InitSession configures the Persistence for first use. Brokers use clientID to uniquely identify the session. The session may be continued with AdoptSession on another Client.

func VolatileSession

func VolatileSession(clientID string, c *Config) (*Client, error)

VolatileSession operates solely in-memory. This setup is recommended for delivery with the “at most once” guarantee [Publish], and for reception without the “exactly once” guarantee [SubscribeLimitAtLeastOnce], and for testing.

Brokers use clientID to uniquely identify the session. Volatile sessions may be continued by using the same clientID again. Use CleanSession to prevent reuse of an existing state.

func (*Client) Close

func (c *Client) Close() error

Close terminates the connection establishment. The Client is closed regardless of the error return. Closing an already closed Client has no effect.

func (*Client) Disconnect

func (c *Client) Disconnect(quit <-chan struct{}) error

Disconnect tries a graceful termination, which discards the Will. The Client is closed regardless of the error return.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.

BUG(pascaldekloe): The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.

func (*Client) Offline

func (c *Client) Offline() <-chan struct{}

Offline returns a chanel that's closed when the client has no connection.

func (*Client) Online

func (c *Client) Online() <-chan struct{}

Online returns a chanel that's closed when the client has a connection.

func (*Client) Ping

func (c *Client) Ping(quit <-chan struct{}) error

Ping makes a roundtrip to validate the connection. Only one request is permitted ErrMax at a time.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

func (*Client) Publish

func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error

Publish delivers the message with an “at most once” guarantee. Subscribers may or may not receive the message when subject to error. This delivery method is the most efficient option.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.

func (*Client) PublishAtLeastOnce

func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error)

PublishAtLeastOnce delivers the message with an “at least once” guarantee. Subscribers may receive the message more than once when subject to error. This delivery method requires a response transmission plus persistence on both client-side and broker-side.

The exchange channel is closed uppon receival confirmation by the broker. ErrClosed leaves the channel blocked (with no further input).

Example (Critical)

Demonstrates all error scenario and the respective recovery options.

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/go-mqtt/mqtt"
)

// PublishAtLeastOnce is a method from mqtt.Client.
var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error)

func main() {
	for {
		exchange, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert")
		switch {
		case err == nil:
			fmt.Println("alert submitted…")
			break

		case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
			fmt.Println("🚨 alert not send:", err)
			return

		case errors.Is(err, mqtt.ErrMax):
			fmt.Println("⚠️ alert submission hold-up:", err)
			time.Sleep(time.Second / 4)
			continue

		default:
			fmt.Println("⚠️ alert submission blocked on persistence malfunction:", err)
			time.Sleep(4 * time.Second)
			continue
		}

		for err := range exchange {
			if errors.Is(err, mqtt.ErrClosed) {
				fmt.Println("🚨 alert exchange suspended:", err)
				// An AdoptSession may continue the transaction.
				return
			}

			fmt.Println("⚠️ alert request transfer interrupted:", err)
		}
		fmt.Println("alert acknowledged ✓")
		break
	}

}
Output:

alert submitted…
alert acknowledged ✓

func (*Client) PublishAtLeastOnceRetained

func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error)

PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.

func (*Client) PublishExactlyOnce

func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error)

PublishExactlyOnce delivers the message with an “exactly once” guarantee. This delivery method eliminates the duplicate-delivery risk from PublishAtLeastOnce at the expense of an additional network roundtrip.

func (*Client) PublishExactlyOnceRetained

func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error)

PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.

func (*Client) PublishRetained

func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic string) error

PublishRetained is like Publish, but the broker should store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. The broker may choose to discard the message at any time though. Uppon reception, the broker must discard any message previously retained for the topic name.

func (*Client) ReadSlices

func (c *Client) ReadSlices() (message, topic []byte, err error)

ReadSlices should be invoked consecutively from a single goroutine until ErrClosed. An IsDeny implies permantent Config rejection.

Both message and topic are slices from a read buffer. The bytes stop being valid at the next read.

Each invocation acknowledges ownership of the previously returned if any. Alternatively, use either Disconnect or Close to prevent a confirmation from being send.

BigMessage leaves the memory allocation choice to the consumer. Any other error puts the Client in an ErrDown state. Invocation should apply a backoff once down. Retries on IsConnectionRefused, if any, should probably apply a rather large backoff. See the Client example for a complete setup.

func (*Client) Subscribe

func (c *Client) Subscribe(quit <-chan struct{}, topicFilters ...string) error

Subscribe requests subscription for all topics that match any of the filter arguments.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

Example (Sticky)

Demonstrates all error scenario and the respective recovery options.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/go-mqtt/mqtt"
)

// Subscribe is a method from mqtt.Client.
var Subscribe func(quit <-chan struct{}, topicFilters ...string) error

// Online is a method from mqtt.Client.
var Online func() <-chan struct{}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	for {
		err := Subscribe(ctx.Done(), "demo/+")
		switch {
		case err == nil:
			fmt.Println("subscribe confirmed by broker")
			return

		case errors.As(err, new(mqtt.SubscribeError)):
			fmt.Println("subscribe failed by broker")
			return

		case mqtt.IsDeny(err): // illegal topic filter
			fmt.Println(err)
			return

		case errors.Is(err, mqtt.ErrClosed):
			fmt.Println("no subscribe due client close")
			return

		case errors.Is(err, mqtt.ErrCanceled):
			fmt.Println("no subscribe due timeout")
			return

		case errors.Is(err, mqtt.ErrAbandoned):
			fmt.Println("subscribe state unknown due timeout")
			return

		case errors.Is(err, mqtt.ErrBreak):
			fmt.Println("subscribe state unknown due connection loss")
			select {
			case <-Online():
				fmt.Println("subscribe retry with new connection")
			case <-ctx.Done():
				fmt.Println("subscribe timeout")
				return
			}

		case errors.Is(err, mqtt.ErrDown):
			fmt.Println("subscribe delay while service is down")
			select {
			case <-Online():
				fmt.Println("subscribe retry with new connection")
			case <-ctx.Done():
				fmt.Println("subscribe timeout")
				return
			}

		case errors.Is(err, mqtt.ErrMax):
			fmt.Println("subscribe hold-up due excessive number of pending requests")
			time.Sleep(2 * time.Second) // backoff

		default:
			fmt.Println("subscribe request transfer interrupted:", err)
			time.Sleep(time.Second / 2) // backoff
		}
	}
}
Output:

subscribe confirmed by broker

func (*Client) SubscribeLimitAtLeastOnce

func (c *Client) SubscribeLimitAtLeastOnce(quit <-chan struct{}, topicFilters ...string) error

SubscribeLimitAtLeastOnce is like Subscribe, but limits the message reception to quality-of-service level 1: acknowledged transfer.

func (*Client) SubscribeLimitAtMostOnce

func (c *Client) SubscribeLimitAtMostOnce(quit <-chan struct{}, topicFilters ...string) error

SubscribeLimitAtMostOnce is like Subscribe, but limits the message reception to quality-of-service level 0: fire-and-forget.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error

Unsubscribe requests subscription cancelation for each of the filter arguments.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

type Config

type Config struct {
	Dialer // chooses the broker

	// PauseTimeout sets the minimim transfer rate as one byte per duration.
	// Zero disables timeout protection entirely, which leaves the Client
	// vulnerable to blocking on stale connections.
	//
	// Any pauses during MQTT packet submission that exceed the timeout will
	// be treated as fatal to the connection, if they are detected in time.
	// Expiry causes automated reconnects just like any other fatal network
	// error. Operations which got interrupted by a PauseTimeout receive a
	// net.Error with Timeout true.
	PauseTimeout time.Duration

	// The maximum number of transactions at a time. Excess is denied with
	// ErrMax. Zero effectively disables the respective quality-of-service
	// level. Negative values default to the Client limit of 16,384. Higher
	// values are truncated silently.
	AtLeastOnceMax, ExactlyOnceMax int

	// The user name may be used by the broker for authentication and/or
	// authorization purposes. An empty string omits the option, except
	// for when password is not nil.
	UserName string
	Password []byte // option omitted when nil

	// The Will Message is published when the connection terminates
	// without Disconnect. A nil Message disables the Will option.
	Will struct {
		Topic   string // destination
		Message []byte // payload

		Retain      bool // see PublishRetained
		AtLeastOnce bool // see PublishAtLeastOnce
		ExactlyOnce bool // overrides AtLeastOnce
	}

	KeepAlive uint16 // timeout in seconds (disabled with zero)

	// Brokers must resume communications with the client (identified by
	// ClientID) when CleanSession is false. Otherwise, brokers must create
	// a new session when either CleanSession is true or when no session is
	// associated to the client identifier.
	CleanSession bool
}

Config is a Client configuration. Dialer is the only required field.

type Dialer

type Dialer func(ctx context.Context) (net.Conn, error)

Dialer abstracts the transport layer establishment.

func NewDialer

func NewDialer(network, address string) Dialer

NewDialer provides plain network connections. See net.Dial for details on the network & address syntax.

func NewTLSDialer

func NewTLSDialer(network, address string, config *tls.Config) Dialer

NewTLSDialer provides secured network connections. See net.Dial for details on the network & address syntax.

type Persistence

type Persistence interface {
	// Load resolves the value of a key. A nil return means “not found”.
	Load(key uint) ([]byte, error)

	// Save defines the value of a key.
	Save(key uint, value net.Buffers) error

	// Delete clears the value of a key, whether it existed or not. Failures
	// will be overwitten eventually due to the limited address space.
	Delete(key uint) error

	// List enumerates all available in any order.
	List() (keys []uint, err error)
}

Persistence tracks the session state as a key–value store. An instance may serve only one Client at a time.

Values are addressed by a 17-bit key, mask 0x1ffff. The minimum size is 12 B. The maximum size is 256 MiB + 17 B. Clients apply integrity checks all round.

Multiple goroutines may invoke methods on a Persistence simultaneously.

func FileSystem

func FileSystem(dir string) Persistence

FileSystem stores values per file in a directory. Callers must ensure the availability, including write permission for the user.

type SubscribeError

type SubscribeError []string

SubscribeError holds one or more topic filters which were failed by the broker. The element order matches the originating request's.

func (SubscribeError) Error

func (e SubscribeError) Error() string

Error implements the standard error interface.

Notes

Bugs

  • The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.

  • Save errors from a Persistence may cause duplicate reception for deliveries with an “exactly once guarantee”, if the respective Client goes down before a recovery/retry succeeds.

Directories

Path Synopsis
cmd
mqttc
Package main provides a command-line utility.
Package main provides a command-line utility.
Package mqtttest provides utilities for MQTT testing.
Package mqtttest provides utilities for MQTT testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL