mqtt

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: MIT Imports: 11 Imported by: 4

README

Go Report Card GoDoc codecov

natiu-mqtt

A dead-simple, extensible and correct MQTT implementation.

Natiu: Means mosquito in the Guaraní language, a language spoken primarily in Paraguay. Commonly written as ñati'û or ñati'ũ.

Highlights

  • Modular

    • Client implementation leaves allocating parts up to the Decoder interface type. Users can choose to use non-allocating or allocating implementations of the 3 method interface.
    • RxTx type lets one build an MQTT implementation from scratch for any transport. No server/client logic defined at this level.
  • No uneeded allocations: The PUBLISH application message is not handled by this library, the user receives an io.Reader with the underlying transport bytes. This prevents allocations on natiu-mqtt side.

  • V3.1.1: Compliant with MQTT version 3.1.1 for QoS0 interactions. QoS1 and QoS2 are WIP.

  • No external dependencies: Nada. Nope.

  • Data oriented design: Minimizes abstractions or objects for the data on the wire.

  • Fuzz tested, robust: Decoding implementation fuzzed to prevent adversarial user input from crashing application (95% coverage).

  • Simplicity: A simple base package yields simple implementations for different transports. See Implementations section.

  • Runtime-what?: Unlike other MQTT implementations. No channels, no interface conversions, no goroutines- as little runtimey stuff as possible. You get the best of Go's concrete types when using Natiu's API. Why? Because MQTT deserialization and serialization are an embarrassingly serial and concrete problem.

Goals

This implementation will have a simple embedded-systems implementation in the package top level. This implementation will be transport agnostic and non-concurrent. This will make it far easier to modify and reason about. The transport dependent implementations will have their own subpackage, so one package for TCP transport, another for UART, PPP etc.

  • Minimal, if any, heap allocations.
  • Support for TCP transport.
  • User owns payload bytes.

Implementations

Examples

API subject to before v1.0.0 release.

Example use of Client
	// Create new client.
	client := mqtt.NewClient(mqtt.ClientConfig{
		Decoder: mqtt.DecoderNoAlloc{make([]byte, 1500)},
		OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error {
			message, _ := io.ReadAll(r)
			log.Println("received message:", string(message))
			return nil
		},
	})

	// Get a transport for MQTT packets.
	const defaultMQTTPort = ":1883"
	conn, err := net.Dial("tcp", "127.0.0.1"+defaultMQTTPort)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Prepare for CONNECT interaction with server.
	var varConn mqtt.VariablesConnect
	varConn.SetDefaultMQTT([]byte("salamanca"))
	ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
	err = client.Connect(ctx, conn, &varConn) // Connect to server.
	cancel()
	if err != nil {
		// Error or loop until connect success.
		log.Fatalf("connect attempt failed: %v\n", err)
	}

	// Ping forever until error.
	for {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		pingErr := client.Ping(ctx)
		cancel()
		if pingErr != nil {
			log.Fatal("ping error: ", pingErr, " with disconnect reason:", client.Err())
		}
		log.Println("ping success!")
	}
Example: Low level packet management with RxTx type
func main() {
    // Dial a TCP connection.
    const defaultMQTTPort = ":1883"
	conn, err := net.Dial("tcp", "127.0.0.1"+defaultMQTTPort)
	if err != nil {
		log.Fatal(err)
	}

    // Create the RxTx MQTT IO handler.
	rxtx, err := mqtt.NewRxTx(conn, mqtt.DecoderLowmem{UserBuffer: make([]byte, 1500)})
	if err != nil {
		log.Fatal(err)
	}
    // Add a handler on CONNACK packet.
	rxtx.OnConnack = func(rt *mqtt.RxTx, vc mqtt.VariablesConnack) error {
		log.Printf("%v received, SP=%v, rc=%v", rt.LastReceivedHeader.String(), vc.SessionPresent(), vc.ReturnCode.String())
		return nil
	}
	
    // Prepare to send first MQTT packet over wire.
	varConnect := mqtt.VariablesConnect{
		ClientID:      []byte("salamanca"),
		Protocol:      []byte("MQTT"),
		ProtocolLevel: 4,
		KeepAlive:     60,
		CleanSession:  true,
		WillMessage:   []byte("MQTT is okay, I guess"),
		WillTopic:     []byte("mqttnerds"),
		WillRetain:    true,
	}
    // Header set automatically for all packets that are not PUBLISH.
	err = rxtx.WriteConnect(&varConnect)
	if err != nil {
		log.Fatal(err)
	}
}

Why not just use paho?

Some issues with Eclipse's Paho implementation:

  • Inherent data races on API side. The implementation is so notoriously hard to modify this issue has been in a frozen state.
  • Calling Client.Disconnect when client is already disconnected blocks indefinetely and can cause deadlock or spin with Paho's implementation.
  • If there is an issue with the network and Reconnect is enabled then then Paho's Reconnect spins. There is no way to prevent this.
  • Interfaces used for ALL data types. This is not necessary and makes it difficult to work with since there is no in-IDE documentation on interface methods.
  • No lower level abstraction of MQTT for use in embedded systems with non-TCP transport.
  • Uses any interface for the payload, which could simply be a byte slice...

I found these issues after a 2 hour dev session. There will undoubtedly be more if I were to try to actually get it working...

Documentation

Overview

package mqtt implements MQTT v3.1.1 protocol providing users of this package with low level decoding and encoding primitives and complete documentation sufficient to grapple with the concepts of the MQTT protocol.

If you are new to MQTT start by reading definitions.go.

Index

Examples

Constants

View Source
const (
	// Accepted protocol level as per MQTT v3.1.1. This goes in the CONNECT variable header.
	DefaultProtocolLevel = 4
	// Accepted protocol as per MQTT v3.1.1. This goes in the CONNECT variable header.
	DefaultProtocol = "MQTT"
)

Variables

View Source
var (

	// natiu-mqtt depends on user provided buffers for string and byte slice allocation.
	// If a buffer is too small for the incoming strings or for marshalling a subscription topic
	// then the implementation should return this error.
	ErrUserBufferFull = errors.New("natiu-mqtt: user buffer full")
	// ErrBadRemainingLen is passed to Rx's OnRxError after decoding a header with a
	// remaining length that does not conform to MQTT v3.1.1 packet specifications.
	ErrBadRemainingLen = errors.New("natiu-mqtt: MQTT v3.1.1 bad remaining length")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a asynchronous MQTT v3.1.1 client implementation which is safe for concurrent use.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net"
	"time"

	mqtt "github.com/soypat/natiu-mqtt"
)

func main() {
	// Create new client with default settings.
	client := mqtt.NewClient(mqtt.ClientConfig{})

	// Get a transport for MQTT packets.
	const defaultMQTTPort = ":1883"
	conn, err := net.Dial("tcp", "test.mosquitto.org"+defaultMQTTPort)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Prepare for CONNECT interaction with server.
	var varConn mqtt.VariablesConnect
	varConn.SetDefaultMQTT([]byte("salamanca"))
	ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
	err = client.Connect(ctx, conn, &varConn) // Connect to server.
	cancel()
	if err != nil {
		// Error or loop until connect success.
		log.Fatalf("connect attempt failed: %v\n", err)
	}
	fmt.Println("connection success")

	defer func() {
		err := client.Disconnect(errors.New("end of test"))
		if err != nil {
			fmt.Println("disconnect failed:", err)
		}
	}()

	// Ping forever until error.
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	pingErr := client.Ping(ctx)
	cancel()
	if pingErr != nil {
		log.Fatal("ping error: ", pingErr, " with disconnect reason:", client.Err())
	}
	fmt.Println("ping success!")
}
Output:

connection success
ping success!
Example (Concurrent)
package main

import (
	"bytes"
	"context"
	"io"
	"log"
	"math/rand"
	"net"
	"time"

	mqtt "github.com/soypat/natiu-mqtt"
)

func main() {
	// Create new client.
	received := make(chan []byte, 10)
	client := mqtt.NewClient(mqtt.ClientConfig{
		Decoder: mqtt.DecoderNoAlloc{make([]byte, 1500)},
		OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error {
			message, _ := io.ReadAll(r)
			if len(message) > 0 {
				select {
				case received <- message:
				default:
					// If channel is full we ignore message.
				}
			}
			log.Println("received message:", string(message))
			return nil
		},
	})

	// Set the connection parameters and set the Client ID to "salamanca".
	var varConn mqtt.VariablesConnect
	varConn.SetDefaultMQTT([]byte("salamanca"))

	// Define an inline function that connects the MQTT client automatically.
	tryConnect := func() error {
		// Get a transport for MQTT packets using the local host and default MQTT port (1883).
		conn, err := net.Dial("tcp", "127.0.0.1:1883")
		if err != nil {
			return err
		}
		ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
		defer cancel()
		return client.Connect(ctx, conn, &varConn) // Connect to server.
	}
	// Attempt first connection and fail immediately if that does not work.
	err := tryConnect()
	if err != nil {
		log.Println(err)
		return
	}

	// Call read goroutine. Read goroutine will also handle reconnection
	// when client disconnects.
	go func() {
		for {
			if !client.IsConnected() {
				time.Sleep(time.Second)
				tryConnect()
				continue
			}
			err = client.HandleNext()
			if err != nil {
				log.Println("HandleNext failed:", err)
			}
		}
	}()

	// Call Write goroutine and create a channel to serialize messages
	// that we want to send out.
	const TOPICNAME = "/mqttnerds"
	pubFlags, _ := mqtt.NewPublishFlags(mqtt.QoS0, false, false)
	varPub := mqtt.VariablesPublish{
		TopicName: []byte(TOPICNAME),
	}
	txQueue := make(chan []byte, 10)
	go func() {
		for {
			if !client.IsConnected() {
				time.Sleep(time.Second)
				continue
			}
			message := <-txQueue
			varPub.PacketIdentifier = uint16(rand.Int())
			// Loop until message is sent successfully. This guarantees
			// all messages are sent, even in events of disconnect.
			for {
				err := client.PublishPayload(pubFlags, varPub, message)
				if err == nil {
					break
				}
				time.Sleep(time.Second)
			}
		}
	}()

	// Main program logic.
	for {
		message := <-received
		// We transform the message and send it back out.
		fields := bytes.Fields(message)
		message = bytes.Join(fields, []byte(","))
		txQueue <- message
	}
}
Output:

func NewClient

func NewClient(cfg ClientConfig) *Client

NewClient creates a new MQTT client with the configuration parameters provided. If no Decoder is provided a DecoderNoAlloc will be used.

func (*Client) AwaitingPingresp added in v0.4.0

func (c *Client) AwaitingPingresp() bool

AwaitingPingresp checks if a ping sent over the wire had no response received back.

func (*Client) AwaitingSuback added in v0.4.0

func (c *Client) AwaitingSuback() bool

AwaitingSuback checks if a subscribe request sent over the wire had no suback received back. Returns false if client is disconnected.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *VariablesConnect) error

Connect sends a CONNECT packet over the transport and waits for a CONNACK response from the server. The client is connected if the returned error is nil.

func (*Client) ConnectedAt added in v0.4.0

func (c *Client) ConnectedAt() time.Time

ConnectedAt returns the time the client managed to successfully connect. If client is disconnected ConnectedAt returns the zero-value for time.Time.

func (*Client) Disconnect

func (c *Client) Disconnect(userErr error) error

Disconnect performs a MQTT disconnect and resets the connection. Future calls to Err will return the argument userErr.

func (*Client) Err added in v0.4.0

func (c *Client) Err() error

Err returns error indicating the cause of client disconnection.

func (*Client) HandleNext added in v0.4.0

func (c *Client) HandleNext() error

HandleNext reads from the wire and decodes MQTT packets. If bytes are read and the decoder fails to read a packet the whole client fails and disconnects. HandleNext only returns an error in the case where the OnPub callback passed in the ClientConfig returns an error or if a packet is malformed. If HandleNext returns an error the client will be in a disconnected state.

func (*Client) IsConnected added in v0.4.0

func (c *Client) IsConnected() bool

IsConnected returns true if there still has been no disconnect event or an unrecoverable error encountered during decoding. A Connected client may send and receive MQTT messages.

func (*Client) LastRx added in v0.4.1

func (c *Client) LastRx() time.Time

LastRx returns the time the last packet was received at. If Client is disconnected LastRx returns the zero value of time.Time.

func (*Client) LastTx added in v0.4.1

func (c *Client) LastTx() time.Time

LastTx returns the time the last successful packet transmission finished at. A "successful" transmission does not necessarily mean the packet was received on the other end. If Client is disconnected LastTx returns the zero value of time.Time.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping writes a ping packet over the network and blocks until it receives the ping response back. It uses an exponential backoff algorithm to time checks on the status of the ping.

func (*Client) PublishPayload

func (c *Client) PublishPayload(flags PacketFlags, varPub VariablesPublish, payload []byte) error

PublishPayload sends a PUBLISH packet over the network on the topic defined by varPub.

func (*Client) StartConnect added in v0.4.0

func (c *Client) StartConnect(rwc io.ReadWriteCloser, vc *VariablesConnect) error

StartConnect sends a CONNECT packet over the transport and does not wait for a CONNACK response. Client is not guaranteed to be connected after a call to this function.

func (*Client) StartPing added in v0.4.0

func (c *Client) StartPing() error

StartPing writes a PINGREQ packet over the network without blocking waiting for response.

func (*Client) StartSubscribe added in v0.4.0

func (c *Client) StartSubscribe(vsub VariablesSubscribe) error

StartSubscribe begins subscription to argument topics.

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error

Subscribe writes a SUBSCRIBE packet over the network and waits for the server to respond with a SUBACK packet or until the context ends.

func (*Client) SubscribedTopics added in v0.4.0

func (c *Client) SubscribedTopics() []string

SubscribedTopics returns list of topics the client successfully subscribed to. Returns a copy of a slice so is safe for concurrent use.

type ClientConfig added in v0.4.0

type ClientConfig struct {
	// If a Decoder is not set one will automatically be picked.
	Decoder Decoder
	// OnPub is executed on every PUBLISH message received. Do not call
	// HandleNext or other client methods from within this function.
	OnPub func(pubHead Header, varPub VariablesPublish, r io.Reader) error
}

ClientConfig is used to configure a new Client.

type ConnectReturnCode

type ConnectReturnCode uint8

ConnectReturnCode represents the CONNACK return code, which is the second byte in the variable header. It indicates if the connection was successful (0 value) or if the connection attempt failed on the server side. ConnectReturnCode also implements the error interface and can be returned on a failed connection.

const (
	ReturnCodeConnAccepted ConnectReturnCode = iota
	ReturnCodeUnnaceptableProtocol
	ReturnCodeIdentifierRejected
	ReturnCodeServerUnavailable
	ReturnCodeBadUserCredentials
	ReturnCodeUnauthorized
)

func (ConnectReturnCode) Error added in v0.5.0

func (rc ConnectReturnCode) Error() string

Error implements the error interface for a non-zero return code.

func (ConnectReturnCode) String

func (rc ConnectReturnCode) String() (s string)

String returns a pretty-string representation of rc indicating if the connection was accepted or the human-readable error if present.

type Decoder

type Decoder interface {
	DecodePublish(r io.Reader, qos QoSLevel) (VariablesPublish, int, error)
	DecodeConnect(r io.Reader) (VariablesConnect, int, error)
	DecodeSubscribe(r io.Reader, remainingLen uint32) (VariablesSubscribe, int, error)
	DecodeUnsubscribe(r io.Reader, remainingLength uint32) (VariablesUnsubscribe, int, error)
}

Decoder provides an abstraction for an MQTT variable header decoding implementation. This is because heap allocations are necessary to be able to decode any MQTT packet. Some compile targets are restrictive in terms of memory usage, so the best decoder for the situation may differ.

type DecoderNoAlloc added in v0.2.0

type DecoderNoAlloc struct {
	UserBuffer []byte
}

DecoderNoAlloc implements the Decoder interface for unmarshalling Variable headers of MQTT packets. This particular implementation avoids heap allocations to ensure minimal memory usage during decoding. The UserBuffer is used up to it's length. Decode Calls that receive strings invalidate strings decoded in previous calls. Needless to say, this implementation is NOT safe for concurrent use. Calls that allocate strings or bytes are contained in the Decoder interface.

func (DecoderNoAlloc) DecodeConnect added in v0.2.0

func (d DecoderNoAlloc) DecodeConnect(r io.Reader) (varConn VariablesConnect, n int, err error)

DecodeConnect implements Decoder interface.

func (DecoderNoAlloc) DecodePublish added in v0.2.0

func (d DecoderNoAlloc) DecodePublish(r io.Reader, qos QoSLevel) (_ VariablesPublish, n int, err error)

DecodePublish implements Decoder interface.

func (DecoderNoAlloc) DecodeSubscribe added in v0.2.0

func (d DecoderNoAlloc) DecodeSubscribe(r io.Reader, remainingLen uint32) (varSub VariablesSubscribe, n int, err error)

DecodeSubscribe implements Decoder interface.

func (DecoderNoAlloc) DecodeUnsubscribe added in v0.2.0

func (d DecoderNoAlloc) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (varUnsub VariablesUnsubscribe, n int, err error)

DecodeUnsubscribe implements Decoder interface.

type Header struct {
	RemainingLength uint32
	// contains filtered or unexported fields
}

Header represents the bytes preceding the payload in an MQTT packet. This commonly called the Fixed Header, although this Header type also contains PacketIdentifier, which is part of the Variable Header and may or may not be present in an MQTT packet.

func DecodeHeader

func DecodeHeader(transp io.Reader) (Header, int, error)

DecodeHeader receives transp, an io.ByteReader that reads from an underlying arbitrary transport protocol. transp should start returning the first byte of the MQTT packet. Decode header returns the decoded header and any error that prevented it from reading the entire header as specified by the MQTT v3.1 protocol.

func NewHeader

func NewHeader(packetType PacketType, packetFlags PacketFlags, remainingLen uint32) (Header, error)

NewHeader creates a new Header for a packetType and returns an error if invalid arguments are passed in. It will set expected reserved flags for non-PUBLISH packets.

func (Header) Encode

func (hdr Header) Encode(w io.Writer) (n int, err error)

Encode encodes the header into the argument writer. It will encode up to a maximum of 7 bytes, which is the max length header in MQTT v3.1.

func (Header) Flags

func (h Header) Flags() PacketFlags

Flags returns the MQTT packet flags in the fixed header. Important mainly for PUBLISH packets.

func (Header) HasPacketIdentifier

func (hd Header) HasPacketIdentifier() bool

HasPacketIdentifier returns true if the MQTT packet has a 2 octet packet identifier number.

func (Header) Put added in v0.4.0

func (hdr Header) Put(buf []byte) int

func (Header) Size

func (hd Header) Size() (sz int)

Size returns the size of the header as encoded over the wire. If the remaining length is invalid Size returns 0.

func (Header) String

func (h Header) String() string

String returns a pretty-string representation of h. Allocates memory.

func (Header) Type

func (h Header) Type() PacketType

Type returns the packet type with no validation.

func (Header) Validate

func (h Header) Validate() error

Validate returns an error if the Header contains malformed data. This usually means the header has bits set that contradict "MUST" statements in MQTT's protocol specification.

type PacketFlags

type PacketFlags uint8

PacketFlags represents the LSB 4 bits in the first byte in an MQTT fixed header. PacketFlags takes on select values in range 1..15. PacketType and PacketFlags are present in all MQTT packets.

const PacketFlagsPubrelSubUnsub PacketFlags = 0b10

Reserved flags for PUBREL, SUBSCRIBE and UNSUBSCRIBE packet types. This is effectively a PUBLISH flag with QoS1 set and no DUP or RETAIN bits.

func NewPublishFlags

func NewPublishFlags(qos QoSLevel, dup, retain bool) (PacketFlags, error)

NewPublishFlags returns PUBLISH packet flags and an error if the flags were to create a malformed packet according to MQTT specification.

func (PacketFlags) Dup

func (pf PacketFlags) Dup() bool

Dup returns true if the DUP flag bit is set. If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or Server has attempted to send this MQTT PUBLISH Packet.

func (PacketFlags) QoS

func (pf PacketFlags) QoS() QoSLevel

QoS returns the PUBLISH QoSLevel in pf which varies between 0..2. PUBREL, UNSUBSCRIBE and SUBSCRIBE packets MUST have QoS1 set by standard. Other packets will have a QoS1 set.

func (PacketFlags) Retain

func (pf PacketFlags) Retain() bool

QoS returns true if the PUBLISH Retain bit is set. This typically is set by the client to indicate the packet must be preserved after a Session ends which is to say Retained packets do not form part of Session state.

func (PacketFlags) String

func (pf PacketFlags) String() string

String returns a pretty string representation of pf. Allocates memory.

type PacketType

type PacketType byte

PacketType represents the 4 MSB bits in the first byte in an MQTT fixed header. takes on values 1..14. PacketType and PacketFlags are present in all MQTT packets.

const (

	// A CONNECT packet is sent from Client to Server, it is a Client request to connect to a Server.
	// After a network connection is established by a client to a server at the transport layer, the first
	// packet sent from the client to the server must be a Connect packet.
	// A Client can only send the CONNECT Packet once over a Network Connection.
	// The CONNECT packet contains a 10 byte variable header and a
	// payload determined by flags present in variable header. See [VariablesConnect]. 0x10.
	PacketConnect PacketType
	// The CONNACK Packet is the packet sent by the Server in response to a CONNECT Packet received from a Client.
	// The first packet sent from the Server to the Client MUST be a CONNACK Packet
	// The payload contains a 2 byte variable header and no payload. 0x20.
	PacketConnack
	// A PUBLISH Control Packet is sent from a Client to a Server or from Server to a Client to transport an Application Message.
	// It's payload contains a variable header with a MQTT encoded string for the topic name and a packet identifier.
	// The payload may or may not contain a Application Message that is being published. The length of this Message
	// can be calculated by subtracting the length of the variable header from the Remaining Length field that is in the Fixed Header. 0x3?.
	PacketPublish
	// A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1. It's Variable header contains the packet identifier. No payload. 0x40.
	PacketPuback
	// A PUBREC Packet is the response to a PUBLISH Packet with QoS 2. It is the second packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x50.
	PacketPubrec
	// A PUBREL Packet is the response to a PUBREC Packet. It is the third packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x62.
	PacketPubrel
	// The PUBCOMP Packet is the response to a PUBREL Packet. It is the fourth and final packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x70.
	PacketPubcomp
	// The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
	// Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH
	// Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.
	// The SUBSCRIBE Packet also specifies (for each Subscription) the maximum QoS with which the Server can
	// send Application Messages to the Client.
	// The variable header of a subscribe topic contains the packet identifier. The payload contains a list of topic filters, see [VariablesSubscribe]. 0x82.
	PacketSubscribe
	// A SUBACK Packet is sent by the Server to the Client to confirm receipt and processing of a SUBSCRIBE Packet.
	// The variable header contains the packet identifier. The payload contains a list of octet return codes for each subscription requested by client, see [VariablesSuback]. 0x90.
	PacketSuback
	// An UNSUBSCRIBE Packet is sent by the Client to the Server, to unsubscribe from topics.
	// The variable header contains the packet identifier. Its payload contains a list of mqtt encoded strings corresponding to unsubscribed topics, see [VariablesUnsubscribe]. 0xa2.
	PacketUnsubscribe
	// The UNSUBACK Packet is sent by the Server to the Client to confirm receipt of an UNSUBSCRIBE Packet.
	// The variable header contains the packet identifier. It has no payload. 0xb0.
	PacketUnsuback
	// The PINGREQ Packet is sent from a Client to the Server. It can be used to:
	//  - Indicate to the Server that the Client is alive in the absence of any other Control Packets being sent from the Client to the Server.
	//  - Request that the Server responds to confirm that it is alive.
	//  - Exercise the network to indicate that the Network Connection is active.
	// No payload or variable header. 0xc0.
	PacketPingreq
	// A PINGRESP Packet is sent by the Server to the Client in response to a PINGREQ Packet. It indicates that the Server is alive.
	// No payload or variable header. 0xd0.
	PacketPingresp
	// The DISCONNECT Packet is the final Control Packet sent from the Client to the Server. It indicates that the Client is disconnecting cleanly.
	// No payload or variable header. 0xe0.
	PacketDisconnect
)

func (PacketType) String

func (p PacketType) String() string

String returns a string representation of the packet type, stylized with all caps i.e: "PUBREL", "CONNECT". Does not allocate memory.

type QoSLevel

type QoSLevel uint8

QoSLevel represents the Quality of Service specified by the client. The server can choose to provide or reject requested QoS. The values of QoS range from 0 to 2, each representing a different methodology for message delivery guarantees.

const (
	// QoS0 at most once delivery. Arrives either once or not at all. Depends on capabilities of underlying network.
	QoS0 QoSLevel = iota
	// QoS1 at least once delivery. Ensures message arrives at receiver at least once.
	QoS1
	// QoS2 Exactly once delivery. Highest quality service. For use when neither loss nor duplication of messages are acceptable.
	// There is an increased overhead associated with this quality of service.
	QoS2

	// QoSSubfail marks a failure in SUBACK. This value cannot be encoded into a header
	// and is only returned upon an unsuccessful subscribe to a topic in an SUBACK packet.
	QoSSubfail QoSLevel = 0x80
)

QoS indicates the level of assurance for packet delivery.

func (QoSLevel) IsValid

func (qos QoSLevel) IsValid() bool

IsValid returns true if qos is a valid Quality of Service.

func (QoSLevel) String

func (qos QoSLevel) String() (s string)

String returns a pretty-string representation of qos i.e: "QoS0". Does not allocate memory.

type Rx

type Rx struct {
	RxCallbacks RxCallbacks

	// ScratchBuf is lazily allocated to exhaust Publish payloads when received and no
	// OnPub callback is set.
	ScratchBuf []byte
	// LastReceivedHeader contains the last correctly read header.
	LastReceivedHeader Header
	// contains filtered or unexported fields
}

Rx implements a bare minimum MQTT v3.1.1 protocol transport layer handler. Packages are received by calling Rx.ReadNextPacket and setting the callback in Rx corresponding to the expected packet. Rx will perform basic validation of input data according to MQTT's specification. If there is an error after reading the first byte of a packet the transport is closed and a new transport must be set with Rx.SetRxTransport. If OnRxError is set the underlying transport is not automatically closed and it becomes the callback's responsibility to close the transport.

Not safe for concurrent use.

func (*Rx) CloseRx

func (rx *Rx) CloseRx() error

Close closes the underlying transport.

func (*Rx) ReadNextPacket

func (rx *Rx) ReadNextPacket() (int, error)

ReadNextPacket reads the next packet in the transport. If it fails after reading a non-zero amount of bytes it closes the transport and the underlying transport must be reset.

func (*Rx) RxTransport added in v0.2.1

func (rx *Rx) RxTransport() io.ReadCloser

RxTransport returns the underlying transport handler. It may be nil.

func (*Rx) SetRxTransport

func (rx *Rx) SetRxTransport(transport io.ReadCloser)

SetRxTransport sets the rx's reader.

func (*Rx) ShallowCopy

func (rx *Rx) ShallowCopy() *Rx

ShallowCopy shallow copies rx and underlying transport and decoder. Does not copy callbacks over.

type RxCallbacks added in v0.3.1

type RxCallbacks struct {
	// Functions below can access the Header of the message via RxTx.LastReceivedHeader.
	// All these functions block RxTx.ReadNextPacket.
	OnConnect func(*Rx, *VariablesConnect) error // Receives pointer because of large struct!
	// OnConnack is called on a CONNACK packet receipt.
	OnConnack func(*Rx, VariablesConnack) error
	// OnPub is called on PUBLISH packet receive. The [io.Reader] points to the transport's reader
	// and is limited to read the amount of bytes in the payload as given by RemainingLength.
	// One may calculate amount of bytes in the reader like so:
	//  payloadLen := rx.LastReceivedHeader.RemainingLength - varPub.Size()
	OnPub func(rx *Rx, varPub VariablesPublish, r io.Reader) error
	// OnOther takes in the Header of received packet and a packet identifier uint16 if present.
	// OnOther receives PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK packets containing non-zero packet identfiers
	// and DISCONNECT, PINGREQ, PINGRESP packets with no packet identifier.
	OnOther  func(rx *Rx, packetIdentifier uint16) error
	OnSub    func(*Rx, VariablesSubscribe) error
	OnSuback func(*Rx, VariablesSuback) error
	OnUnsub  func(*Rx, VariablesUnsubscribe) error
	// OnRxError is called if an error is encountered during decoding of packet.
	// If it is set then it becomes the responsibility of the callback to close the transport.
	OnRxError func(*Rx, error)
}

RxCallbacks groups all functionality executed on data receipt, both successful and unsuccessful.

type SubscribeRequest

type SubscribeRequest struct {
	// utf8 encoded topic or match pattern for topic filter.
	TopicFilter []byte
	// The desired QoS level.
	QoS QoSLevel
}

SubscribeRequest is relevant only to SUBSCRIBE packets where several SubscribeRequest each encode a topic filter that is to be matched on the server side and a desired QoS for each matched topic.

type Tx

type Tx struct {
	TxCallbacks TxCallbacks
	// contains filtered or unexported fields
}

Tx implements a bare minimum MQTT v3.1.1 protocol transport layer handler for transmitting packets. If there is an error during read/write of a packet the transport is closed and a new transport must be set with Tx.SetTxTransport. A Tx will not validate data before encoding, that is up to the caller, Malformed packets will be rejected and the connection will be closed immediately. If OnTxError is set then the underlying transport is not closed and it becomes responsibility of the callback to close the transport.

func (*Tx) CloseTx

func (tx *Tx) CloseTx() error

Close closes the underlying tranport and returns an error if any.

func (*Tx) SetTxTransport

func (rxtx *Tx) SetTxTransport(transport io.WriteCloser)

SetTxTransport sets the rxtx's reader and writer.

func (*Tx) ShallowCopy

func (tx *Tx) ShallowCopy() *Tx

ShallowCopy shallow copies rx and underlying transport and encoder. Does not copy callbacks over.

func (*Tx) TxTransport added in v0.2.1

func (tx *Tx) TxTransport() io.WriteCloser

TxTransport returns the underlying transport handler. It may be nil.

func (*Tx) WriteConnack

func (tx *Tx) WriteConnack(varConnack VariablesConnack) error

WriteConnack writes a CONNACK packet over the transport.

func (*Tx) WriteConnect

func (tx *Tx) WriteConnect(varConn *VariablesConnect) error

WriteConnack writes a CONNECT packet over the transport.

func (*Tx) WriteIdentified

func (tx *Tx) WriteIdentified(packetType PacketType, packetIdentifier uint16) (err error)

WriteIdentified writes PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK packets containing non-zero packet identfiers It automatically sets the RemainingLength field to 2.

func (*Tx) WritePublishPayload

func (tx *Tx) WritePublishPayload(h Header, varPub VariablesPublish, payload []byte) error

WritePublishPayload writes a PUBLISH packet over the transport along with the Application Message in the payload. payload can be zero-length.

func (*Tx) WriteSimple

func (tx *Tx) WriteSimple(packetType PacketType) (err error)

WriteSimple facilitates easy sending of the 2 octet DISCONNECT, PINGREQ, PINGRESP packets. If the packet is not one of these then an error is returned. It also returns an error with encoding step if there was one.

func (*Tx) WriteSuback

func (tx *Tx) WriteSuback(varSub VariablesSuback) error

WriteSuback writes an UNSUBACK packet over the transport.

func (*Tx) WriteSubscribe

func (tx *Tx) WriteSubscribe(varSub VariablesSubscribe) error

WriteSubscribe writes an SUBSCRIBE packet over the transport.

func (*Tx) WriteUnsubscribe

func (tx *Tx) WriteUnsubscribe(varUnsub VariablesUnsubscribe) error

WriteUnsubscribe writes an UNSUBSCRIBE packet over the transport.

type TxCallbacks added in v0.3.1

type TxCallbacks struct {
	// OnTxError is called if an error is encountered during encoding. If it is set
	// then it becomes the responsibility of the callback to close Tx's transport.
	OnTxError func(*Tx, error)
	// OnSuccessfulTx is called after a MQTT packet is fully written to the underlying transport.
	OnSuccessfulTx func(*Tx)
}

TxCallbacks groups functionality executed on transmission success or failure of an MQTT packet.

type VariablesConnack

type VariablesConnack struct {
	// Octet with SP (Session Present) on LSB bit0.
	AckFlags uint8
	// Octet
	ReturnCode ConnectReturnCode
}

func (VariablesConnack) SessionPresent

func (vc VariablesConnack) SessionPresent() bool

SessionPresent returns true if the SP bit is set in the CONNACK Ack flags. This bit indicates whether the ClientID already has a session on the server.

  • If server accepts a connection with CleanSession set to 1 the server MUST set SP to 0 (false).
  • If server accepts a connection with CleanSession set to 0 SP depends on whether the server already has stored a Session state for the supplied Client ID. If the server has stored a Session then SP MUST set to 1, else MUST set to 0.

In both cases above this is in addition to returning a zero CONNACK return code. If the CONNACK return code is non-zero then SP MUST set to 0.

func (VariablesConnack) Size

func (vc VariablesConnack) Size() (sz int)

Size returns size-on-wire of the CONNACK variable header generated by vs.

func (VariablesConnack) String

func (vc VariablesConnack) String() string

String returns a pretty-string representation of CONNACK variable header.

type VariablesConnect

type VariablesConnect struct {
	// Must be present and unique to the server. UTF-8 encoded string
	// between 1 and 23 bytes in length although some servers may allow larger ClientIDs.
	ClientID []byte
	// By default will be set to 'MQTT' protocol if nil, which is v3.1 compliant.
	Protocol []byte
	Username []byte
	// For password to be used username must also be set. See [MQTT-3.1.2-22].
	Password    []byte
	WillTopic   []byte
	WillMessage []byte
	// KeepAlive is a interval measured in seconds. it is the maximum time interval that is
	// permitted to elapse between the point at which the Client finishes transmitting one
	// Control Packet and the point it starts sending the next.
	KeepAlive uint16
	// By default if set to 0 will use Protocol level 4, which is v3.1 compliant
	ProtocolLevel byte
	// This bit specifies if the Will Message is to be Retained when it is published.
	WillRetain   bool
	CleanSession bool
	// These two bits specify the QoS level to be used when publishing the Will Message.
	WillQoS QoSLevel
}

VariablesConnect all strings in the variable header must be UTF-8 encoded except password which may be binary data.

func (*VariablesConnect) Flags

func (cv *VariablesConnect) Flags() byte

Flags returns the eighth CONNECT packet byte.

func (*VariablesConnect) SetDefaultMQTT

func (vc *VariablesConnect) SetDefaultMQTT(clientID []byte)

SetDefaultMQTT sets required fields, like the ClientID, Protocol and Protocol level fields. If KeepAlive is zero, is set to 60 (one minute). If Protocol field is not set to "MQTT" then memory is allocated for it. Clean session is also set to true.

func (*VariablesConnect) Size

func (vs *VariablesConnect) Size() (sz int)

Size returns size-on-wire of the CONNECT variable header generated by vs.

func (*VariablesConnect) StringsLen

func (vs *VariablesConnect) StringsLen() (n int)

StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.

func (*VariablesConnect) WillFlag

func (cv *VariablesConnect) WillFlag() bool

WillFlag returns true if CONNECT packet will have a will topic and a will message, which means setting Will Flag bit to 1.

type VariablesPublish

type VariablesPublish struct {
	// Must be present as utf-8 encoded string with NO wildcard characters.
	// The server may override the TopicName on response according to matching process [Section 4.7]
	TopicName []byte
	// Only present (non-zero) in QoS level 1 or 2.
	PacketIdentifier uint16
}

VariablesPublish represents the variable header of a PUBLISH packet. It does not include the payload with the topic data.

func (VariablesPublish) Size

func (vp VariablesPublish) Size(qos QoSLevel) int

Size returns size-on-wire of the PUBLISH variable header generated by vp. It takes the packet QoS as an argument as it decides whether there's a Packet Identifier in the header.

func (VariablesPublish) StringsLen

func (vp VariablesPublish) StringsLen() int

StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.

func (VariablesPublish) Validate

func (vp VariablesPublish) Validate() error

type VariablesSuback

type VariablesSuback struct {
	// Each return code corresponds to a topic filter in the SUBSCRIBE
	// packet being acknowledged. These MUST match the order of said SUBSCRIBE packet.
	// A return code can indicate failure using QoSSubfail.
	ReturnCodes      []QoSLevel
	PacketIdentifier uint16
}

VariablesSuback represents the variable header of a SUBACK packet.

func (VariablesSuback) Size

func (vs VariablesSuback) Size() (sz int)

Size returns size-on-wire of the SUBACK variable header generated by vs.

func (VariablesSuback) Validate

func (vs VariablesSuback) Validate() error

type VariablesSubscribe

type VariablesSubscribe struct {
	TopicFilters     []SubscribeRequest
	PacketIdentifier uint16
}

VariablesSubscribe represents the variable header of a SUBSCRIBE packet. It encodes the topic filters requested by a Client and the desired QoS for each topic.

func (*VariablesSubscribe) Copy added in v0.4.0

Copy copies the subscribe variables optimizing for memory space savings.

func (VariablesSubscribe) Size

func (vs VariablesSubscribe) Size() (sz int)

Size returns size-on-wire of the SUBSCRIBE variable header generated by vs.

func (VariablesSubscribe) StringsLen

func (vs VariablesSubscribe) StringsLen() (n int)

StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.

func (*VariablesSubscribe) Validate

func (vsub *VariablesSubscribe) Validate() error

type VariablesUnsubscribe

type VariablesUnsubscribe struct {
	Topics           [][]byte
	PacketIdentifier uint16
}

VariablesUnsubscribe represents the variable header of a UNSUBSCRIBE packet.

func (VariablesUnsubscribe) Size

func (vu VariablesUnsubscribe) Size() (sz int)

Size returns size-on-wire of the UNSUBSCRIBE variable header generated by vu.

func (VariablesUnsubscribe) StringsLen

func (vu VariablesUnsubscribe) StringsLen() (n int)

StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL