Documentation ¶
Overview ¶
package mqtt implements MQTT v3.1.1 protocol providing users of this package with low level decoding and encoding primitives and complete documentation sufficient to grapple with the concepts of the MQTT protocol.
If you are new to MQTT start by reading definitions.go.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) AwaitingPingresp() bool
- func (c *Client) AwaitingSuback() bool
- func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *VariablesConnect) error
- func (c *Client) ConnectedAt() time.Time
- func (c *Client) Disconnect(userErr error) error
- func (c *Client) Err() error
- func (c *Client) HandleNext() error
- func (c *Client) IsConnected() bool
- func (c *Client) LastRx() time.Time
- func (c *Client) LastTx() time.Time
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PublishPayload(flags PacketFlags, varPub VariablesPublish, payload []byte) error
- func (c *Client) StartConnect(rwc io.ReadWriteCloser, vc *VariablesConnect) error
- func (c *Client) StartPing() error
- func (c *Client) StartSubscribe(vsub VariablesSubscribe) error
- func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error
- func (c *Client) SubscribedTopics() []string
- type ClientConfig
- type ConnectReturnCode
- type Decoder
- type DecoderNoAlloc
- func (d DecoderNoAlloc) DecodeConnect(r io.Reader) (varConn VariablesConnect, n int, err error)
- func (d DecoderNoAlloc) DecodePublish(r io.Reader, qos QoSLevel) (_ VariablesPublish, n int, err error)
- func (d DecoderNoAlloc) DecodeSubscribe(r io.Reader, remainingLen uint32) (varSub VariablesSubscribe, n int, err error)
- func (d DecoderNoAlloc) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (varUnsub VariablesUnsubscribe, n int, err error)
- type Header
- func (hdr Header) Encode(w io.Writer) (n int, err error)
- func (h Header) Flags() PacketFlags
- func (hd Header) HasPacketIdentifier() bool
- func (hdr Header) Put(buf []byte) int
- func (hd Header) Size() (sz int)
- func (h Header) String() string
- func (h Header) Type() PacketType
- func (h Header) Validate() error
- type PacketFlags
- type PacketType
- type QoSLevel
- type Rx
- type RxCallbacks
- type SubscribeRequest
- type Tx
- func (tx *Tx) CloseTx() error
- func (rxtx *Tx) SetTxTransport(transport io.WriteCloser)
- func (tx *Tx) ShallowCopy() *Tx
- func (tx *Tx) TxTransport() io.WriteCloser
- func (tx *Tx) WriteConnack(varConnack VariablesConnack) error
- func (tx *Tx) WriteConnect(varConn *VariablesConnect) error
- func (tx *Tx) WriteIdentified(packetType PacketType, packetIdentifier uint16) (err error)
- func (tx *Tx) WritePublishPayload(h Header, varPub VariablesPublish, payload []byte) error
- func (tx *Tx) WriteSimple(packetType PacketType) (err error)
- func (tx *Tx) WriteSuback(varSub VariablesSuback) error
- func (tx *Tx) WriteSubscribe(varSub VariablesSubscribe) error
- func (tx *Tx) WriteUnsubscribe(varUnsub VariablesUnsubscribe) error
- type TxCallbacks
- type VariablesConnack
- type VariablesConnect
- type VariablesPublish
- type VariablesSuback
- type VariablesSubscribe
- type VariablesUnsubscribe
Examples ¶
Constants ¶
const ( // Accepted protocol level as per MQTT v3.1.1. This goes in the CONNECT variable header. DefaultProtocolLevel = 4 // Accepted protocol as per MQTT v3.1.1. This goes in the CONNECT variable header. DefaultProtocol = "MQTT" )
Variables ¶
var ( // natiu-mqtt depends on user provided buffers for string and byte slice allocation. // If a buffer is too small for the incoming strings or for marshalling a subscription topic // then the implementation should return this error. ErrUserBufferFull = errors.New("natiu-mqtt: user buffer full") // ErrBadRemainingLen is passed to Rx's OnRxError after decoding a header with a // remaining length that does not conform to MQTT v3.1.1 packet specifications. ErrBadRemainingLen = errors.New("natiu-mqtt: MQTT v3.1.1 bad remaining length") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a asynchronous MQTT v3.1.1 client implementation which is safe for concurrent use.
Example ¶
package main import ( "context" "errors" "fmt" "log" "net" "time" mqtt "github.com/soypat/natiu-mqtt" ) func main() { // Create new client with default settings. client := mqtt.NewClient(mqtt.ClientConfig{}) // Get a transport for MQTT packets. const defaultMQTTPort = ":1883" conn, err := net.Dial("tcp", "test.mosquitto.org"+defaultMQTTPort) if err != nil { fmt.Println(err) return } // Prepare for CONNECT interaction with server. var varConn mqtt.VariablesConnect varConn.SetDefaultMQTT([]byte("salamanca")) ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) err = client.Connect(ctx, conn, &varConn) // Connect to server. cancel() if err != nil { // Error or loop until connect success. log.Fatalf("connect attempt failed: %v\n", err) } fmt.Println("connection success") defer func() { err := client.Disconnect(errors.New("end of test")) if err != nil { fmt.Println("disconnect failed:", err) } }() // Ping forever until error. ctx, cancel = context.WithTimeout(context.Background(), time.Second) pingErr := client.Ping(ctx) cancel() if pingErr != nil { log.Fatal("ping error: ", pingErr, " with disconnect reason:", client.Err()) } fmt.Println("ping success!") }
Output: connection success ping success!
Example (Concurrent) ¶
package main import ( "bytes" "context" "io" "log" "math/rand" "net" "time" mqtt "github.com/soypat/natiu-mqtt" ) func main() { // Create new client. received := make(chan []byte, 10) client := mqtt.NewClient(mqtt.ClientConfig{ Decoder: mqtt.DecoderNoAlloc{make([]byte, 1500)}, OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error { message, _ := io.ReadAll(r) if len(message) > 0 { select { case received <- message: default: // If channel is full we ignore message. } } log.Println("received message:", string(message)) return nil }, }) // Set the connection parameters and set the Client ID to "salamanca". var varConn mqtt.VariablesConnect varConn.SetDefaultMQTT([]byte("salamanca")) // Define an inline function that connects the MQTT client automatically. tryConnect := func() error { // Get a transport for MQTT packets using the local host and default MQTT port (1883). conn, err := net.Dial("tcp", "127.0.0.1:1883") if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) defer cancel() return client.Connect(ctx, conn, &varConn) // Connect to server. } // Attempt first connection and fail immediately if that does not work. err := tryConnect() if err != nil { log.Println(err) return } // Call read goroutine. Read goroutine will also handle reconnection // when client disconnects. go func() { for { if !client.IsConnected() { time.Sleep(time.Second) tryConnect() continue } err = client.HandleNext() if err != nil { log.Println("HandleNext failed:", err) } } }() // Call Write goroutine and create a channel to serialize messages // that we want to send out. const TOPICNAME = "/mqttnerds" pubFlags, _ := mqtt.NewPublishFlags(mqtt.QoS0, false, false) varPub := mqtt.VariablesPublish{ TopicName: []byte(TOPICNAME), } txQueue := make(chan []byte, 10) go func() { for { if !client.IsConnected() { time.Sleep(time.Second) continue } message := <-txQueue varPub.PacketIdentifier = uint16(rand.Int()) // Loop until message is sent successfully. This guarantees // all messages are sent, even in events of disconnect. for { err := client.PublishPayload(pubFlags, varPub, message) if err == nil { break } time.Sleep(time.Second) } } }() // Main program logic. for { message := <-received // We transform the message and send it back out. fields := bytes.Fields(message) message = bytes.Join(fields, []byte(",")) txQueue <- message } }
Output:
func NewClient ¶
func NewClient(cfg ClientConfig) *Client
NewClient creates a new MQTT client with the configuration parameters provided. If no Decoder is provided a DecoderNoAlloc will be used.
func (*Client) AwaitingPingresp ¶ added in v0.4.0
AwaitingPingresp checks if a ping sent over the wire had no response received back.
func (*Client) AwaitingSuback ¶ added in v0.4.0
AwaitingSuback checks if a subscribe request sent over the wire had no suback received back. Returns false if client is disconnected.
func (*Client) Connect ¶
func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *VariablesConnect) error
Connect sends a CONNECT packet over the transport and waits for a CONNACK response from the server. The client is connected if the returned error is nil.
func (*Client) ConnectedAt ¶ added in v0.4.0
ConnectedAt returns the time the client managed to successfully connect. If client is disconnected ConnectedAt returns the zero-value for time.Time.
func (*Client) Disconnect ¶
Disconnect performs a MQTT disconnect and resets the connection. Future calls to Err will return the argument userErr.
func (*Client) Err ¶ added in v0.4.0
Err returns error indicating the cause of client disconnection.
func (*Client) HandleNext ¶ added in v0.4.0
HandleNext reads from the wire and decodes MQTT packets. If bytes are read and the decoder fails to read a packet the whole client fails and disconnects. HandleNext only returns an error in the case where the OnPub callback passed in the ClientConfig returns an error or if a packet is malformed. If HandleNext returns an error the client will be in a disconnected state.
func (*Client) IsConnected ¶ added in v0.4.0
IsConnected returns true if there still has been no disconnect event or an unrecoverable error encountered during decoding. A Connected client may send and receive MQTT messages.
func (*Client) LastRx ¶ added in v0.4.1
LastRx returns the time the last packet was received at. If Client is disconnected LastRx returns the zero value of time.Time.
func (*Client) LastTx ¶ added in v0.4.1
LastTx returns the time the last successful packet transmission finished at. A "successful" transmission does not necessarily mean the packet was received on the other end. If Client is disconnected LastTx returns the zero value of time.Time.
func (*Client) Ping ¶
Ping writes a ping packet over the network and blocks until it receives the ping response back. It uses an exponential backoff algorithm to time checks on the status of the ping.
func (*Client) PublishPayload ¶
func (c *Client) PublishPayload(flags PacketFlags, varPub VariablesPublish, payload []byte) error
PublishPayload sends a PUBLISH packet over the network on the topic defined by varPub.
func (*Client) StartConnect ¶ added in v0.4.0
func (c *Client) StartConnect(rwc io.ReadWriteCloser, vc *VariablesConnect) error
StartConnect sends a CONNECT packet over the transport and does not wait for a CONNACK response. Client is not guaranteed to be connected after a call to this function.
func (*Client) StartPing ¶ added in v0.4.0
StartPing writes a PINGREQ packet over the network without blocking waiting for response.
func (*Client) StartSubscribe ¶ added in v0.4.0
func (c *Client) StartSubscribe(vsub VariablesSubscribe) error
StartSubscribe begins subscription to argument topics.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error
Subscribe writes a SUBSCRIBE packet over the network and waits for the server to respond with a SUBACK packet or until the context ends.
func (*Client) SubscribedTopics ¶ added in v0.4.0
SubscribedTopics returns list of topics the client successfully subscribed to. Returns a copy of a slice so is safe for concurrent use.
type ClientConfig ¶ added in v0.4.0
type ClientConfig struct { // If a Decoder is not set one will automatically be picked. Decoder Decoder // OnPub is executed on every PUBLISH message received. Do not call // HandleNext or other client methods from within this function. OnPub func(pubHead Header, varPub VariablesPublish, r io.Reader) error }
ClientConfig is used to configure a new Client.
type ConnectReturnCode ¶
type ConnectReturnCode uint8
ConnectReturnCode represents the CONNACK return code, which is the second byte in the variable header. It indicates if the connection was successful (0 value) or if the connection attempt failed on the server side. ConnectReturnCode also implements the error interface and can be returned on a failed connection.
const ( ReturnCodeConnAccepted ConnectReturnCode = iota ReturnCodeUnnaceptableProtocol ReturnCodeIdentifierRejected ReturnCodeBadUserCredentials )
func (ConnectReturnCode) Error ¶ added in v0.5.0
func (rc ConnectReturnCode) Error() string
Error implements the error interface for a non-zero return code.
func (ConnectReturnCode) String ¶
func (rc ConnectReturnCode) String() (s string)
String returns a pretty-string representation of rc indicating if the connection was accepted or the human-readable error if present.
type Decoder ¶
type Decoder interface { DecodePublish(r io.Reader, qos QoSLevel) (VariablesPublish, int, error) DecodeConnect(r io.Reader) (VariablesConnect, int, error) DecodeSubscribe(r io.Reader, remainingLen uint32) (VariablesSubscribe, int, error) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (VariablesUnsubscribe, int, error) }
Decoder provides an abstraction for an MQTT variable header decoding implementation. This is because heap allocations are necessary to be able to decode any MQTT packet. Some compile targets are restrictive in terms of memory usage, so the best decoder for the situation may differ.
type DecoderNoAlloc ¶ added in v0.2.0
type DecoderNoAlloc struct {
UserBuffer []byte
}
DecoderNoAlloc implements the Decoder interface for unmarshalling Variable headers of MQTT packets. This particular implementation avoids heap allocations to ensure minimal memory usage during decoding. The UserBuffer is used up to it's length. Decode Calls that receive strings invalidate strings decoded in previous calls. Needless to say, this implementation is NOT safe for concurrent use. Calls that allocate strings or bytes are contained in the Decoder interface.
func (DecoderNoAlloc) DecodeConnect ¶ added in v0.2.0
func (d DecoderNoAlloc) DecodeConnect(r io.Reader) (varConn VariablesConnect, n int, err error)
DecodeConnect implements Decoder interface.
func (DecoderNoAlloc) DecodePublish ¶ added in v0.2.0
func (d DecoderNoAlloc) DecodePublish(r io.Reader, qos QoSLevel) (_ VariablesPublish, n int, err error)
DecodePublish implements Decoder interface.
func (DecoderNoAlloc) DecodeSubscribe ¶ added in v0.2.0
func (d DecoderNoAlloc) DecodeSubscribe(r io.Reader, remainingLen uint32) (varSub VariablesSubscribe, n int, err error)
DecodeSubscribe implements Decoder interface.
func (DecoderNoAlloc) DecodeUnsubscribe ¶ added in v0.2.0
func (d DecoderNoAlloc) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (varUnsub VariablesUnsubscribe, n int, err error)
DecodeUnsubscribe implements Decoder interface.
type Header ¶
type Header struct { RemainingLength uint32 // contains filtered or unexported fields }
Header represents the bytes preceding the payload in an MQTT packet. This commonly called the Fixed Header, although this Header type also contains PacketIdentifier, which is part of the Variable Header and may or may not be present in an MQTT packet.
func DecodeHeader ¶
DecodeHeader receives transp, an io.ByteReader that reads from an underlying arbitrary transport protocol. transp should start returning the first byte of the MQTT packet. Decode header returns the decoded header and any error that prevented it from reading the entire header as specified by the MQTT v3.1 protocol.
func NewHeader ¶
func NewHeader(packetType PacketType, packetFlags PacketFlags, remainingLen uint32) (Header, error)
NewHeader creates a new Header for a packetType and returns an error if invalid arguments are passed in. It will set expected reserved flags for non-PUBLISH packets.
func (Header) Encode ¶
Encode encodes the header into the argument writer. It will encode up to a maximum of 7 bytes, which is the max length header in MQTT v3.1.
func (Header) Flags ¶
func (h Header) Flags() PacketFlags
Flags returns the MQTT packet flags in the fixed header. Important mainly for PUBLISH packets.
func (Header) HasPacketIdentifier ¶
HasPacketIdentifier returns true if the MQTT packet has a 2 octet packet identifier number.
func (Header) Size ¶
Size returns the size of the header as encoded over the wire. If the remaining length is invalid Size returns 0.
func (Header) Type ¶
func (h Header) Type() PacketType
Type returns the packet type with no validation.
type PacketFlags ¶
type PacketFlags uint8
PacketFlags represents the LSB 4 bits in the first byte in an MQTT fixed header. PacketFlags takes on select values in range 1..15. PacketType and PacketFlags are present in all MQTT packets.
const PacketFlagsPubrelSubUnsub PacketFlags = 0b10
Reserved flags for PUBREL, SUBSCRIBE and UNSUBSCRIBE packet types. This is effectively a PUBLISH flag with QoS1 set and no DUP or RETAIN bits.
func NewPublishFlags ¶
func NewPublishFlags(qos QoSLevel, dup, retain bool) (PacketFlags, error)
NewPublishFlags returns PUBLISH packet flags and an error if the flags were to create a malformed packet according to MQTT specification.
func (PacketFlags) Dup ¶
func (pf PacketFlags) Dup() bool
Dup returns true if the DUP flag bit is set. If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or Server has attempted to send this MQTT PUBLISH Packet.
func (PacketFlags) QoS ¶
func (pf PacketFlags) QoS() QoSLevel
QoS returns the PUBLISH QoSLevel in pf which varies between 0..2. PUBREL, UNSUBSCRIBE and SUBSCRIBE packets MUST have QoS1 set by standard. Other packets will have a QoS1 set.
func (PacketFlags) Retain ¶
func (pf PacketFlags) Retain() bool
QoS returns true if the PUBLISH Retain bit is set. This typically is set by the client to indicate the packet must be preserved after a Session ends which is to say Retained packets do not form part of Session state.
func (PacketFlags) String ¶
func (pf PacketFlags) String() string
String returns a pretty string representation of pf. Allocates memory.
type PacketType ¶
type PacketType byte
PacketType represents the 4 MSB bits in the first byte in an MQTT fixed header. takes on values 1..14. PacketType and PacketFlags are present in all MQTT packets.
const ( // A CONNECT packet is sent from Client to Server, it is a Client request to connect to a Server. // After a network connection is established by a client to a server at the transport layer, the first // packet sent from the client to the server must be a Connect packet. // A Client can only send the CONNECT Packet once over a Network Connection. // The CONNECT packet contains a 10 byte variable header and a // payload determined by flags present in variable header. See [VariablesConnect]. 0x10. PacketConnect PacketType // The CONNACK Packet is the packet sent by the Server in response to a CONNECT Packet received from a Client. // The first packet sent from the Server to the Client MUST be a CONNACK Packet // The payload contains a 2 byte variable header and no payload. 0x20. PacketConnack // A PUBLISH Control Packet is sent from a Client to a Server or from Server to a Client to transport an Application Message. // It's payload contains a variable header with a MQTT encoded string for the topic name and a packet identifier. // The payload may or may not contain a Application Message that is being published. The length of this Message // can be calculated by subtracting the length of the variable header from the Remaining Length field that is in the Fixed Header. 0x3?. PacketPublish // A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1. It's Variable header contains the packet identifier. No payload. 0x40. PacketPuback // A PUBREC Packet is the response to a PUBLISH Packet with QoS 2. It is the second packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x50. PacketPubrec // A PUBREL Packet is the response to a PUBREC Packet. It is the third packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x62. PacketPubrel // The PUBCOMP Packet is the response to a PUBREL Packet. It is the fourth and final packet of the QoS 2 protocol exchange. It's Variable header contains the packet identifier. No payload. 0x70. PacketPubcomp // The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions. // Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH // Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions. // The SUBSCRIBE Packet also specifies (for each Subscription) the maximum QoS with which the Server can // send Application Messages to the Client. // The variable header of a subscribe topic contains the packet identifier. The payload contains a list of topic filters, see [VariablesSubscribe]. 0x82. PacketSubscribe // A SUBACK Packet is sent by the Server to the Client to confirm receipt and processing of a SUBSCRIBE Packet. // The variable header contains the packet identifier. The payload contains a list of octet return codes for each subscription requested by client, see [VariablesSuback]. 0x90. PacketSuback // An UNSUBSCRIBE Packet is sent by the Client to the Server, to unsubscribe from topics. // The variable header contains the packet identifier. Its payload contains a list of mqtt encoded strings corresponding to unsubscribed topics, see [VariablesUnsubscribe]. 0xa2. PacketUnsubscribe // The UNSUBACK Packet is sent by the Server to the Client to confirm receipt of an UNSUBSCRIBE Packet. // The variable header contains the packet identifier. It has no payload. 0xb0. PacketUnsuback // The PINGREQ Packet is sent from a Client to the Server. It can be used to: // - Indicate to the Server that the Client is alive in the absence of any other Control Packets being sent from the Client to the Server. // - Request that the Server responds to confirm that it is alive. // - Exercise the network to indicate that the Network Connection is active. // No payload or variable header. 0xc0. PacketPingreq // A PINGRESP Packet is sent by the Server to the Client in response to a PINGREQ Packet. It indicates that the Server is alive. // No payload or variable header. 0xd0. PacketPingresp // The DISCONNECT Packet is the final Control Packet sent from the Client to the Server. It indicates that the Client is disconnecting cleanly. // No payload or variable header. 0xe0. PacketDisconnect )
func (PacketType) String ¶
func (p PacketType) String() string
String returns a string representation of the packet type, stylized with all caps i.e: "PUBREL", "CONNECT". Does not allocate memory.
type QoSLevel ¶
type QoSLevel uint8
QoSLevel represents the Quality of Service specified by the client. The server can choose to provide or reject requested QoS. The values of QoS range from 0 to 2, each representing a different methodology for message delivery guarantees.
const ( // QoS0 at most once delivery. Arrives either once or not at all. Depends on capabilities of underlying network. QoS0 QoSLevel = iota // QoS1 at least once delivery. Ensures message arrives at receiver at least once. QoS1 // QoS2 Exactly once delivery. Highest quality service. For use when neither loss nor duplication of messages are acceptable. // There is an increased overhead associated with this quality of service. QoS2 // QoSSubfail marks a failure in SUBACK. This value cannot be encoded into a header // and is only returned upon an unsuccessful subscribe to a topic in an SUBACK packet. QoSSubfail QoSLevel = 0x80 )
QoS indicates the level of assurance for packet delivery.
type Rx ¶
type Rx struct { RxCallbacks RxCallbacks // ScratchBuf is lazily allocated to exhaust Publish payloads when received and no // OnPub callback is set. ScratchBuf []byte // LastReceivedHeader contains the last correctly read header. LastReceivedHeader Header // contains filtered or unexported fields }
Rx implements a bare minimum MQTT v3.1.1 protocol transport layer handler. Packages are received by calling Rx.ReadNextPacket and setting the callback in Rx corresponding to the expected packet. Rx will perform basic validation of input data according to MQTT's specification. If there is an error after reading the first byte of a packet the transport is closed and a new transport must be set with Rx.SetRxTransport. If OnRxError is set the underlying transport is not automatically closed and it becomes the callback's responsibility to close the transport.
Not safe for concurrent use.
func (*Rx) ReadNextPacket ¶
ReadNextPacket reads the next packet in the transport. If it fails after reading a non-zero amount of bytes it closes the transport and the underlying transport must be reset.
func (*Rx) RxTransport ¶ added in v0.2.1
func (rx *Rx) RxTransport() io.ReadCloser
RxTransport returns the underlying transport handler. It may be nil.
func (*Rx) SetRxTransport ¶
func (rx *Rx) SetRxTransport(transport io.ReadCloser)
SetRxTransport sets the rx's reader.
func (*Rx) ShallowCopy ¶
ShallowCopy shallow copies rx and underlying transport and decoder. Does not copy callbacks over.
type RxCallbacks ¶ added in v0.3.1
type RxCallbacks struct { // Functions below can access the Header of the message via RxTx.LastReceivedHeader. // All these functions block RxTx.ReadNextPacket. OnConnect func(*Rx, *VariablesConnect) error // Receives pointer because of large struct! // OnConnack is called on a CONNACK packet receipt. OnConnack func(*Rx, VariablesConnack) error // OnPub is called on PUBLISH packet receive. The [io.Reader] points to the transport's reader // and is limited to read the amount of bytes in the payload as given by RemainingLength. // One may calculate amount of bytes in the reader like so: // payloadLen := rx.LastReceivedHeader.RemainingLength - varPub.Size() OnPub func(rx *Rx, varPub VariablesPublish, r io.Reader) error // OnOther takes in the Header of received packet and a packet identifier uint16 if present. // OnOther receives PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK packets containing non-zero packet identfiers // and DISCONNECT, PINGREQ, PINGRESP packets with no packet identifier. OnOther func(rx *Rx, packetIdentifier uint16) error OnSub func(*Rx, VariablesSubscribe) error OnSuback func(*Rx, VariablesSuback) error OnUnsub func(*Rx, VariablesUnsubscribe) error // OnRxError is called if an error is encountered during decoding of packet. // If it is set then it becomes the responsibility of the callback to close the transport. OnRxError func(*Rx, error) }
RxCallbacks groups all functionality executed on data receipt, both successful and unsuccessful.
type SubscribeRequest ¶
type SubscribeRequest struct { // utf8 encoded topic or match pattern for topic filter. TopicFilter []byte // The desired QoS level. QoS QoSLevel }
SubscribeRequest is relevant only to SUBSCRIBE packets where several SubscribeRequest each encode a topic filter that is to be matched on the server side and a desired QoS for each matched topic.
type Tx ¶
type Tx struct { TxCallbacks TxCallbacks // contains filtered or unexported fields }
Tx implements a bare minimum MQTT v3.1.1 protocol transport layer handler for transmitting packets. If there is an error during read/write of a packet the transport is closed and a new transport must be set with Tx.SetTxTransport. A Tx will not validate data before encoding, that is up to the caller, Malformed packets will be rejected and the connection will be closed immediately. If OnTxError is set then the underlying transport is not closed and it becomes responsibility of the callback to close the transport.
func (*Tx) SetTxTransport ¶
func (rxtx *Tx) SetTxTransport(transport io.WriteCloser)
SetTxTransport sets the rxtx's reader and writer.
func (*Tx) ShallowCopy ¶
ShallowCopy shallow copies rx and underlying transport and encoder. Does not copy callbacks over.
func (*Tx) TxTransport ¶ added in v0.2.1
func (tx *Tx) TxTransport() io.WriteCloser
TxTransport returns the underlying transport handler. It may be nil.
func (*Tx) WriteConnack ¶
func (tx *Tx) WriteConnack(varConnack VariablesConnack) error
WriteConnack writes a CONNACK packet over the transport.
func (*Tx) WriteConnect ¶
func (tx *Tx) WriteConnect(varConn *VariablesConnect) error
WriteConnack writes a CONNECT packet over the transport.
func (*Tx) WriteIdentified ¶
func (tx *Tx) WriteIdentified(packetType PacketType, packetIdentifier uint16) (err error)
WriteIdentified writes PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK packets containing non-zero packet identfiers It automatically sets the RemainingLength field to 2.
func (*Tx) WritePublishPayload ¶
func (tx *Tx) WritePublishPayload(h Header, varPub VariablesPublish, payload []byte) error
WritePublishPayload writes a PUBLISH packet over the transport along with the Application Message in the payload. payload can be zero-length.
func (*Tx) WriteSimple ¶
func (tx *Tx) WriteSimple(packetType PacketType) (err error)
WriteSimple facilitates easy sending of the 2 octet DISCONNECT, PINGREQ, PINGRESP packets. If the packet is not one of these then an error is returned. It also returns an error with encoding step if there was one.
func (*Tx) WriteSuback ¶
func (tx *Tx) WriteSuback(varSub VariablesSuback) error
WriteSuback writes an UNSUBACK packet over the transport.
func (*Tx) WriteSubscribe ¶
func (tx *Tx) WriteSubscribe(varSub VariablesSubscribe) error
WriteSubscribe writes an SUBSCRIBE packet over the transport.
func (*Tx) WriteUnsubscribe ¶
func (tx *Tx) WriteUnsubscribe(varUnsub VariablesUnsubscribe) error
WriteUnsubscribe writes an UNSUBSCRIBE packet over the transport.
type TxCallbacks ¶ added in v0.3.1
type TxCallbacks struct { // OnTxError is called if an error is encountered during encoding. If it is set // then it becomes the responsibility of the callback to close Tx's transport. OnTxError func(*Tx, error) // OnSuccessfulTx is called after a MQTT packet is fully written to the underlying transport. OnSuccessfulTx func(*Tx) }
TxCallbacks groups functionality executed on transmission success or failure of an MQTT packet.
type VariablesConnack ¶
type VariablesConnack struct { // Octet with SP (Session Present) on LSB bit0. AckFlags uint8 // Octet ReturnCode ConnectReturnCode }
func (VariablesConnack) SessionPresent ¶
func (vc VariablesConnack) SessionPresent() bool
SessionPresent returns true if the SP bit is set in the CONNACK Ack flags. This bit indicates whether the ClientID already has a session on the server.
- If server accepts a connection with CleanSession set to 1 the server MUST set SP to 0 (false).
- If server accepts a connection with CleanSession set to 0 SP depends on whether the server already has stored a Session state for the supplied Client ID. If the server has stored a Session then SP MUST set to 1, else MUST set to 0.
In both cases above this is in addition to returning a zero CONNACK return code. If the CONNACK return code is non-zero then SP MUST set to 0.
func (VariablesConnack) Size ¶
func (vc VariablesConnack) Size() (sz int)
Size returns size-on-wire of the CONNACK variable header generated by vs.
func (VariablesConnack) String ¶
func (vc VariablesConnack) String() string
String returns a pretty-string representation of CONNACK variable header.
type VariablesConnect ¶
type VariablesConnect struct { // Must be present and unique to the server. UTF-8 encoded string // between 1 and 23 bytes in length although some servers may allow larger ClientIDs. ClientID []byte // By default will be set to 'MQTT' protocol if nil, which is v3.1 compliant. Protocol []byte Username []byte // For password to be used username must also be set. See [MQTT-3.1.2-22]. Password []byte WillTopic []byte WillMessage []byte // KeepAlive is a interval measured in seconds. it is the maximum time interval that is // permitted to elapse between the point at which the Client finishes transmitting one // Control Packet and the point it starts sending the next. KeepAlive uint16 // By default if set to 0 will use Protocol level 4, which is v3.1 compliant ProtocolLevel byte // This bit specifies if the Will Message is to be Retained when it is published. WillRetain bool CleanSession bool // These two bits specify the QoS level to be used when publishing the Will Message. WillQoS QoSLevel }
VariablesConnect all strings in the variable header must be UTF-8 encoded except password which may be binary data.
func (*VariablesConnect) Flags ¶
func (cv *VariablesConnect) Flags() byte
Flags returns the eighth CONNECT packet byte.
func (*VariablesConnect) SetDefaultMQTT ¶
func (vc *VariablesConnect) SetDefaultMQTT(clientID []byte)
SetDefaultMQTT sets required fields, like the ClientID, Protocol and Protocol level fields. If KeepAlive is zero, is set to 60 (one minute). If Protocol field is not set to "MQTT" then memory is allocated for it. Clean session is also set to true.
func (*VariablesConnect) Size ¶
func (vs *VariablesConnect) Size() (sz int)
Size returns size-on-wire of the CONNECT variable header generated by vs.
func (*VariablesConnect) StringsLen ¶
func (vs *VariablesConnect) StringsLen() (n int)
StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.
func (*VariablesConnect) WillFlag ¶
func (cv *VariablesConnect) WillFlag() bool
WillFlag returns true if CONNECT packet will have a will topic and a will message, which means setting Will Flag bit to 1.
type VariablesPublish ¶
type VariablesPublish struct { // Must be present as utf-8 encoded string with NO wildcard characters. // The server may override the TopicName on response according to matching process [Section 4.7] TopicName []byte // Only present (non-zero) in QoS level 1 or 2. PacketIdentifier uint16 }
VariablesPublish represents the variable header of a PUBLISH packet. It does not include the payload with the topic data.
func (VariablesPublish) Size ¶
func (vp VariablesPublish) Size(qos QoSLevel) int
Size returns size-on-wire of the PUBLISH variable header generated by vp. It takes the packet QoS as an argument as it decides whether there's a Packet Identifier in the header.
func (VariablesPublish) StringsLen ¶
func (vp VariablesPublish) StringsLen() int
StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.
func (VariablesPublish) Validate ¶
func (vp VariablesPublish) Validate() error
type VariablesSuback ¶
type VariablesSuback struct { // Each return code corresponds to a topic filter in the SUBSCRIBE // packet being acknowledged. These MUST match the order of said SUBSCRIBE packet. // A return code can indicate failure using QoSSubfail. ReturnCodes []QoSLevel PacketIdentifier uint16 }
VariablesSuback represents the variable header of a SUBACK packet.
func (VariablesSuback) Size ¶
func (vs VariablesSuback) Size() (sz int)
Size returns size-on-wire of the SUBACK variable header generated by vs.
func (VariablesSuback) Validate ¶
func (vs VariablesSuback) Validate() error
type VariablesSubscribe ¶
type VariablesSubscribe struct { TopicFilters []SubscribeRequest PacketIdentifier uint16 }
VariablesSubscribe represents the variable header of a SUBSCRIBE packet. It encodes the topic filters requested by a Client and the desired QoS for each topic.
func (*VariablesSubscribe) Copy ¶ added in v0.4.0
func (vs *VariablesSubscribe) Copy() VariablesSubscribe
Copy copies the subscribe variables optimizing for memory space savings.
func (VariablesSubscribe) Size ¶
func (vs VariablesSubscribe) Size() (sz int)
Size returns size-on-wire of the SUBSCRIBE variable header generated by vs.
func (VariablesSubscribe) StringsLen ¶
func (vs VariablesSubscribe) StringsLen() (n int)
StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.
func (*VariablesSubscribe) Validate ¶
func (vsub *VariablesSubscribe) Validate() error
type VariablesUnsubscribe ¶
VariablesUnsubscribe represents the variable header of a UNSUBSCRIBE packet.
func (VariablesUnsubscribe) Size ¶
func (vu VariablesUnsubscribe) Size() (sz int)
Size returns size-on-wire of the UNSUBSCRIBE variable header generated by vu.
func (VariablesUnsubscribe) StringsLen ¶
func (vu VariablesUnsubscribe) StringsLen() (n int)
StringsLen returns length of all strings in variable header before being encoded. StringsLen is useful to know how much of the user's buffer was consumed during decoding.