mqtt

package module
v0.0.0-...-9e81042 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: MIT Imports: 3 Imported by: 148

README

mqtt

An MQTT encoder and decoder,written in Golang.

This library was modified heavily from https://github.com/plucury/mqtt.go and is API-incompatible with it.

Currently the library's API is unstable.

@Update: CONNACK with "session present" flag to support 3.1.1 compliance @Update: Fixed header added to the Disconnect Decode for server to verify reserved bits

Documentation

Overview

Implementation of MQTT V3.1 encoding and decoding.

See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html for the MQTT protocol specification. This package does not implement the semantics of MQTT, but purely the encoding and decoding of its messages.

Decoding Messages:

Use the DecodeOneMessage function to read a Message from an io.Reader, it will return a Message value. The function can be implemented using the public API of this package if more control is required. For example:

for {
  msg, err := mqtt.DecodeOneMessage(conn, nil)
  if err != nil {
    // handle err
  }
  switch msg := msg.(type) {
  case *Connect:
    // ...
  case *Publish:
    // ...
    // etc.
  }
}

Encoding Messages:

Create a message value, and use its Encode method to write it to an io.Writer. For example:

someData := []byte{1, 2, 3}
msg := &Publish{
  Header: {
    DupFlag: false,
    QosLevel: QosAtLeastOnce,
    Retain: false,
  },
  TopicName: "a/b",
  MessageId: 10,
  Payload: BytesPayload(someData),
}
if err := msg.Encode(conn); err != nil {
  // handle err
}

Advanced PUBLISH payload handling:

The default behaviour for decoding PUBLISH payloads, and most common way to supply payloads for encoding, is the BytesPayload, which is a []byte derivative.

More complex handling is possible by implementing the Payload interface, which can be injected into DecodeOneMessage via the `config` parameter, or into an outgoing Publish message via its Payload field. Potential benefits of this include:

* Data can be (un)marshalled directly on a connection, without an unecessary round-trip via bytes.Buffer.

* Data can be streamed directly on readers/writers (e.g files, other connections, pipes) without the requirement to buffer an entire message payload in memory at once.

The limitations of these streaming features are:

* When encoding a payload, the encoded size of the payload must be known and declared upfront.

* The payload size (and PUBLISH variable header) can be no more than 256MiB minus 1 byte. This is a specified limitation of MQTT v3.1 itself.

Index

Constants

View Source
const (
	MsgConnect = MessageType(iota + 1)
	MsgConnAck
	MsgPublish
	MsgPubAck
	MsgPubRec
	MsgPubRel
	MsgPubComp
	MsgSubscribe
	MsgSubAck
	MsgUnsubscribe
	MsgUnsubAck
	MsgPingReq
	MsgPingResp
	MsgDisconnect
)

MessageType constants.

View Source
const (
	QosAtMostOnce = QosLevel(iota)
	QosAtLeastOnce
	QosExactlyOnce
)
View Source
const (
	RetCodeAccepted = ReturnCode(iota)
	RetCodeUnacceptableProtocolVersion
	RetCodeIdentifierRejected
	RetCodeServerUnavailable
	RetCodeBadUsernameOrPassword
	RetCodeNotAuthorized
)
View Source
const (
	// Maximum payload size in bytes (256MiB - 1B).
	MaxPayloadSize = (1 << (4 * 7)) - 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BytesPayload

type BytesPayload []byte

BytesPayload reads/writes a plain slice of bytes.

func (BytesPayload) ReadPayload

func (p BytesPayload) ReadPayload(r io.Reader) error

func (BytesPayload) Size

func (p BytesPayload) Size() int

func (BytesPayload) WritePayload

func (p BytesPayload) WritePayload(w io.Writer) error

type ConnAck

type ConnAck struct {
	Header
	SessionPresent bool
	ReturnCode     ReturnCode
}

ConnAck represents an MQTT CONNACK message.

func (*ConnAck) Decode

func (msg *ConnAck) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*ConnAck) Encode

func (msg *ConnAck) Encode(w io.Writer) (err error)

type Connect

type Connect struct {
	Header
	ProtocolName               string
	ProtocolVersion            uint8
	WillRetain                 bool
	WillFlag                   bool
	CleanSession               bool
	WillQos                    QosLevel
	KeepAliveTimer             uint16
	ClientId                   string
	WillTopic, WillMessage     string
	UsernameFlag, PasswordFlag bool
	Username, Password         string
}

Connect represents an MQTT CONNECT message.

func (*Connect) Decode

func (msg *Connect) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*Connect) Encode

func (msg *Connect) Encode(w io.Writer) (err error)

type DecoderConfig

type DecoderConfig interface {
	// MakePayload returns a Payload for the given Publish message. r is a Reader
	// that will read the payload data, and n is the number of bytes in the
	// payload. The Payload.ReadPayload method is called on the returned payload
	// by the decoding process.
	MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
}

DecoderConfig provides configuration for decoding messages.

type DefaultDecoderConfig

type DefaultDecoderConfig struct{}

func (DefaultDecoderConfig) MakePayload

func (c DefaultDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

type Disconnect

type Disconnect struct {
	Header
}

Disconnect represents an MQTT DISCONNECT message.

func (*Disconnect) Decode

func (msg *Disconnect) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error

func (*Disconnect) Encode

func (msg *Disconnect) Encode(w io.Writer) error
type Header struct {
	DupFlag, Retain bool
	QosLevel        QosLevel
}

Header contains the common attributes of all messages. Some attributes are not applicable to some message types.

func (*Header) Decode

func (hdr *Header) Decode(r io.Reader) (msgType MessageType, remainingLength int32, err error)

func (*Header) Encode

func (hdr *Header) Encode(w io.Writer, msgType MessageType, remainingLength int32) error

type Message

type Message interface {
	// Encode writes the message to w.
	Encode(w io.Writer) error

	// Decode reads the message extended headers and payload from
	// r. Typically the values for hdr and packetRemaining will
	// be returned from Header.Decode.
	Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error
}

Message is the interface that all MQTT messages implement.

func DecodeOneMessage

func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error)

DecodeOneMessage decodes one message from r. config provides specifics on how to decode messages, nil indicates that the DefaultDecoderConfig should be used.

func NewMessage

func NewMessage(msgType MessageType) (msg Message, err error)

NewMessage creates an instance of a Message value for the given message type. An error is returned if msgType is invalid.

type MessageType

type MessageType uint8

func (MessageType) IsValid

func (mt MessageType) IsValid() bool

IsValid returns true if the MessageType value is valid.

type Payload

type Payload interface {
	// Size returns the number of bytes that WritePayload will write.
	Size() int

	// WritePayload writes the payload data to w. Implementations must write
	// Size() bytes of data, but it is *not* required to do so prior to
	// returning. Size() bytes must have been written to w prior to another
	// message being encoded to the underlying connection.
	WritePayload(w io.Writer) error

	// ReadPayload reads the payload data from r (r will EOF at the end of the
	// payload). It is *not* required for r to have been consumed prior to this
	// returning. r must have been consumed completely prior to another message
	// being decoded from the underlying connection.
	ReadPayload(r io.Reader) error
}

Payload is the interface for Publish payloads. Typically the BytesPayload implementation will be sufficient for small payloads whose full contents will exist in memory. However, other implementations can read or write payloads requiring them holding their complete contents in memory.

type PingReq

type PingReq struct {
	Header
}

PingReq represents an MQTT PINGREQ message.

func (*PingReq) Decode

func (msg *PingReq) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error

func (*PingReq) Encode

func (msg *PingReq) Encode(w io.Writer) error

type PingResp

type PingResp struct {
	Header
}

PingResp represents an MQTT PINGRESP message.

func (*PingResp) Decode

func (msg *PingResp) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error

func (*PingResp) Encode

func (msg *PingResp) Encode(w io.Writer) error

type PubAck

type PubAck struct {
	Header
	MessageId uint16
}

PubAck represents an MQTT PUBACK message.

func (*PubAck) Decode

func (msg *PubAck) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*PubAck) Encode

func (msg *PubAck) Encode(w io.Writer) error

type PubComp

type PubComp struct {
	Header
	MessageId uint16
}

PubComp represents an MQTT PUBCOMP message.

func (*PubComp) Decode

func (msg *PubComp) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*PubComp) Encode

func (msg *PubComp) Encode(w io.Writer) error

type PubRec

type PubRec struct {
	Header
	MessageId uint16
}

PubRec represents an MQTT PUBREC message.

func (*PubRec) Decode

func (msg *PubRec) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*PubRec) Encode

func (msg *PubRec) Encode(w io.Writer) error

type PubRel

type PubRel struct {
	Header
	MessageId uint16
}

PubRel represents an MQTT PUBREL message.

func (*PubRel) Decode

func (msg *PubRel) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*PubRel) Encode

func (msg *PubRel) Encode(w io.Writer) error

type Publish

type Publish struct {
	Header
	TopicName string
	MessageId uint16
	Payload   Payload
}

Publish represents an MQTT PUBLISH message.

func (*Publish) Decode

func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*Publish) Encode

func (msg *Publish) Encode(w io.Writer) (err error)

type QosLevel

type QosLevel uint8

func (QosLevel) HasId

func (qos QosLevel) HasId() bool

func (QosLevel) IsValid

func (qos QosLevel) IsValid() bool

type ReturnCode

type ReturnCode uint8

func (ReturnCode) IsValid

func (rc ReturnCode) IsValid() bool

type StreamedPayload

type StreamedPayload struct {
	// N indicates payload size to the encoder. This many bytes will be read from
	// the reader when encoding. The number of bytes in the payload will be
	// stored here when decoding.
	N int

	// EncodingSource is used to copy data from when encoding a Publish message
	// onto the wire. This can be
	EncodingSource io.Reader

	// DecodingSink is used to copy data to when decoding a Publish message from
	// the wire. This can be nil if the payload is only being used for encoding.
	DecodingSink io.Writer
}

StreamedPayload writes payload data from reader, or reads payload data into a writer.

func (*StreamedPayload) ReadPayload

func (p *StreamedPayload) ReadPayload(r io.Reader) error

func (*StreamedPayload) Size

func (p *StreamedPayload) Size() int

func (*StreamedPayload) WritePayload

func (p *StreamedPayload) WritePayload(w io.Writer) error

type SubAck

type SubAck struct {
	Header
	MessageId uint16
	TopicsQos []QosLevel
}

SubAck represents an MQTT SUBACK message.

func (*SubAck) Decode

func (msg *SubAck) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*SubAck) Encode

func (msg *SubAck) Encode(w io.Writer) (err error)

type Subscribe

type Subscribe struct {
	Header
	MessageId uint16
	Topics    []TopicQos
}

Subscribe represents an MQTT SUBSCRIBE message.

func (*Subscribe) Decode

func (msg *Subscribe) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*Subscribe) Encode

func (msg *Subscribe) Encode(w io.Writer) (err error)

type TopicQos

type TopicQos struct {
	Topic string
	Qos   QosLevel
}

type UnsubAck

type UnsubAck struct {
	Header
	MessageId uint16
}

UnsubAck represents an MQTT UNSUBACK message.

func (*UnsubAck) Decode

func (msg *UnsubAck) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*UnsubAck) Encode

func (msg *UnsubAck) Encode(w io.Writer) error

type Unsubscribe

type Unsubscribe struct {
	Header
	MessageId uint16
	Topics    []string
}

Unsubscribe represents an MQTT UNSUBSCRIBE message.

func (*Unsubscribe) Decode

func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)

func (*Unsubscribe) Encode

func (msg *Unsubscribe) Encode(w io.Writer) (err error)

type ValueConfig

type ValueConfig struct {
	Payload Payload
}

ValueConfig always returns the given Payload when MakePayload is called.

func (*ValueConfig) MakePayload

func (c *ValueConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL