mqtt

package module
v0.0.0-...-760e6b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2023 License: MIT Imports: 20 Imported by: 1

README

mqtt.go

MQTTv5 client and server library

Go implementation of MQTTv5 protocol

(server library is work in progress)

Installation

# Go client
go get github.com/srishina/mqtt.go

Run the tests

go test ./... -race -v

note: few test can take time, namely, TestBasicWithKeepAlive, TestPublishAfterReconnectWithSession, TestPublishAfterReconnectWithoutSession

Try out the examples

cd ./examples

Connect to a broker(basic client):

go run ./basic-client/main.go -b ws://mqtt.eclipseprojects.io:80/mqtt -k 120 -cs=true // keep alive = 120secs, clean start=true

Publish a message:

go run ./client-pub/main.go -b ws://mqtt.eclipseprojects.io:80/mqtt "TEST/GREETING" 1 "Willkommen"

Subscribe to a message:

go run ./client-sub/main.go -b ws://mqtt.eclipseprojects.io:80/mqtt "TEST/GREETING/#" 1

Will message

go run ./client-will-msg/main.go -b ws://mqtt.eclipseprojects.io:80/mqtt --will-delay-interval 5 "TEST/GREETING/WILL" 1 "The Will message" "TEST/GREETING/#" 1

Network connection - client

The client library provides a possibility to provision a connection. The implementation of the "Connection" interface needs to be passed when initializing the client.

// Connection represents a connection that the MQTT client uses.
// The implementation of the Connection is responsible for
// initialization of the connection(tcp, ws etc...) with the broker.
// WebsocketConn, TCPConn is provided as part of the library, other
// connections can be written by the implementations
type Connection interface {
	BrokerURL() string
	// Connect MQTT client calls Connect when it needs a io read writer.
	// If the Connect returns an error during reconnect then the MQTT client will
	// attempt a reconnect again. The reconnect interval depends on backoff delay
	Connect(ctx context.Context) (io.ReadWriter, error)
	// Closes the network connection
	Close()
}

WebsocketConn, TCPConn implementations are provided as part of the library.

u, err := url.Parse(broker)
if err != nil {
	log.Fatal(err)
}

var conn mqtt.Connection
switch u.Scheme {
case "ws":
	fallthrough
case "wss":
	conn = &mqtt.WebsocketConn{Host: broker}
case "tcp":
	conn = &mqtt.TCPConn{Host: u.Host}
default:
	log.Fatal("Invalid scheme name")
}
var opts []mqtt.ClientOption
opts = append(opts, mqtt.WithCleanStart(cleanStart))
opts = append(opts, mqtt.WithKeepAlive(uint16(keepAlive)))
opts = append(opts, mqtt.WithClientID(clientID))

client := mqtt.NewClient(conn, opts...)

If the default implementations are not suitable and then more sophisticated implementations can be provisioned.

Subscriber overview - client

In order to receive messages published to a topic, the client needs to subscribe to the interesting topics. The client can either use push or pull mechanism to receive messages. In the pull model the client can decide when to read the messages. The messages are queued internally in the library. The client may run the message receiver in a separate go routine. In the push model the library delivers message to the client asynchronously as the PUBLISH messages are received.

Pull model

recvr := mqtt.NewMessageReceiver()
var wg sync.WaitGroup
wg.Add(1)
go func() {
	defer wg.Done()
	for {
		p, err := recvr.Recv()
		if err != nil {
			return
		}
		log.Printf("PUBLISH recvd - Topic: %s QoS: %d Payload: %v\n", p.TopicName, p.QoSLevel, string(p.Payload))
	}
}()
// subscribe
subscriptions := []*mqtt.Subscription{}
subscriptions = append(subscriptions, &mqtt.Subscription{TopicFilter: "TEST/GREETING/#", QoSLevel: 2})

suback, err := client.Subscribe(context.Background(), subscriptions, nil, recvr)
if err != nil {
	log.Fatal(err)
}

Push model


// The messages are delivered asynchronously. The library does not order messages in this case. The messages
// are delivered as it arrives. The callbacks are executed from the library using a go routine.

s := []*Subscription{{TopicFilter: "TEST/GREETING/#", QoSLevel: 2}}
suback, err := client.CallbackSubscribe(context.Background(), s, nil, func(m *Publish) {
	log.Printf("PUBLISH received - Topic: %s QoS: %d Payload: %v\n", p.TopicName, p.QoSLevel, string(p.Payload))
})

How the network reconnect is handled in the library?

The client library supports reconnecting and automatically resubscribe / publish the pending messages.

MQTTv5 supports the possibility to set whether the session that is initiated with the broker should be clean or a continuation of the last session. In the later case, the session unique identifier is used. The specification also provides an extra property through which the client or the broker can decide how long a session should be kept. The client can set a session expiry interval. However, if the browser specifies a session expiry interval then that value takes precedence. If the client or broker does not specify session expiry interval then the session state is lost when the network connection is dropped.

So in summary, clean start + the session expiry interval + the CONNACK response from the broker determines how the client reconnects.

The library operates as below:

If the network connection is dropped, the library tries to reconnect with the broker with the CONNECT packet set by client. At the moment, the library does not provide a mechanism to override the CONNECT packet. Based on the broker response the client will perform one of the below.

  1. If the broker still has the session state, then the pending messages will be send, which can also include partial PUBLISH messages with QoS 2. No resubscription is needed as broker has the subscriptions.
  2. If the broker has no session state, then the client library resubscribes to the already subscribed topics and send pending messages. For QoS 1 & 2 the library restarts the publish flow again. Note that, in this scenario the resubscription may fail and the client will be notified of the status of the resubscription.

Connection retry uses exponential backoff with jitter.

var opts []ClientOption
opts = append(opts, WithInitialReconnectDelay(50))
// other as needed
client := NewClient(mqttMock, opts...)

please see func WithInitialReconnectDelay, WithMaxReconnectDelay, WithReconnectJitter for more information

Documentation

Index

Constants

View Source
const (
	ReconnectingEvent = "reconnecting"
	DisconnectedEvent = "disconnected"
	ReconnectedEvent  = "reconnected"
	ResubscribeEvent  = "resubscribe"
	LogEvent          = "log"
)

Variables

View Source
var (
	ErrInvalidProtocolName = errors.New("invalid protocol name")
	ErrInvalidConnectFlags = errors.New("invalid connect flags - Malformed packet")
	ErrInvalidWillQos      = errors.New("invalid QoS - Malformed packet")
	ErrInvalidQosFlags     = errors.New("invalid QoS flag- Malformed packet")
	ErrInvalidWillRetain   = errors.New("invalid retain flag - Malformed packet")
)
View Source
var (
	ErrNoTopicsPresent = errors.New("Subscription payload MUST contain atleast a topic - protocol error")
)
View Source
var (
	ErrProtocol = errors.New("protocol error")
)
View Source
var PROTOCOLVERSIONv5 = byte(0x05)

PROTOCOLVERSIONv5 MQTT protocol version

Functions

This section is empty.

Types

type Client

type Client interface {
	// Connect connect with MQTT broker and send CONNECT MQTT request
	Connect(ctx context.Context) (*ConnAck, error)

	// Disconnect disconnect from MQTT broker
	Disconnect(ctx context.Context, reasonCode DisconnectReasonCode, props *DisconnectProperties) error

	// Subscribe send MQTT Subscribe request to the broker with the given Subscription parameter and it's
	// properties, and a message channel through which the published messages are returned for the given
	// subscribed topics.
	// The function waits for the the MQTT SUBSCRIBE response, SubAck, or a packet timeout
	// configured as part of the Client options
	// Note: the subscriptions input may contain more than one topic, the associated
	// MessageReceiver is valid for all the topics present in the given subscriptions.
	Subscribe(ctx context.Context, subscriptions []*Subscription, props *SubscribeProperties, recvr *MessageReceiver) (*SubAck, error)

	// CallbackSubscribe send MQTT Subscribe request to the broker with the given Subscription parameter and it's
	// properties, and a callback handler through which the published messages are returned for the given subscribed topics.
	// The function waits for the the MQTT SUBSCRIBE response, SubAck, or a packet timeout
	// configured as part of the Client options
	// Note: the input Subscribe parameter can contain more than one topic, the associated
	// Callback handler is valid for all the topics present in the given Subscribe.
	CallbackSubscribe(ctx context.Context, subscriptions []*Subscription, props *SubscribeProperties, cb MessageHandler) (*SubAck, error)

	// Unsubscribe send MQTT UNSUBSCRIBE request to the broker with the given topic filters and it's properties.
	// The function waits for the the MQTT SUBSCRIBE response, SubAck, or for a packet timeout
	Unsubscribe(ctx context.Context, topicFilters []string, props *UnsubscribeProperties) (*UnsubAck, error)

	// Publish send MQTT PUBLISH packet to the MQTT broker. When the QoS is 1 or 2
	// the function waits for a response from the broker and for QoS 0 the function
	// complets immediately after the PUBLISH message is scheduled to send.
	Publish(ctx context.Context, topic string, qosLevel byte, retain bool, payload []byte, props *PublishProperties) error

	// On map the callback argument with the event name, more than one
	// callback can be added for a particular event name
	On(eventName string, callback interface{}) error

	// Off remove the callback associated with the event name
	Off(eventName string, callback interface{}) error
}

Client represents a MQTT client. An MQTT client can be used to perform MQTT operations such as connect, publish, subscribe or unsubscribe

func NewClient

func NewClient(conn Connection, opt ...ClientOption) Client

NewClient creates a new MQTT client An MQTT client can be used to perform MQTT operations such as connect, publish, subscribe or unsubscribe

type ClientOption

type ClientOption func(*clientOptions) error

ClientOption contains configurable settings for a client

func WithCleanStart

func WithCleanStart(cleanStart bool) ClientOption

WithCleanStart indicates whether the client starts a new Session or is a continuation of an existing Session. default: true

func WithClientID

func WithClientID(clientID string) ClientOption

WithClientID Identifies the client to the broker, if not set then the broker assigns a client ID automatically

func WithConnectProperties

func WithConnectProperties(props *ConnectProperties) ClientOption

WithConnectProperties configure connect properties,

func WithInitialReconnectDelay

func WithInitialReconnectDelay(delay int) ClientOption

WithInitialReconnectDelay delay for the first reconnect attempt may vary depends on the provided jitter default: 1000ms

func WithKeepAlive

func WithKeepAlive(keepAlive uint16) ClientOption

WithKeepAlive the number of seconds after which the client sends a PINGREQ to the broker if no other messages are exchanged during that time default: 60secs

func WithMaxReconnectDelay

func WithMaxReconnectDelay(delay int) ClientOption

WithMaxReconnectDelay max reconnect delay, once this value is reached, the backoff time will not be increased default: 32000ms

func WithPassword

func WithPassword(password []byte) ClientOption

WithPassword can be used to carry any credential information

func WithReconnectJitter

func WithReconnectJitter(jitter float32) ClientOption

WithReconnectJitter the value add randomness to the retry delay default: 0.5

func WithUserName

func WithUserName(username string) ClientOption

WithUserName can be used by the broker to authenticatie and authorize the client

func WithWillMessage

func WithWillMessage(willMessage *WillMessage) ClientOption

WithWillMessage Configures a will message

type ConnAck

type ConnAck struct {
	SessionPresent bool
	ReasonCode     ConnAckReasonCode
	Properties     *ConnAckProperties
}

ConnAck MQTT CONNACK packet

func (*ConnAck) String

func (c *ConnAck) String() string

type ConnAckProperties

type ConnAckProperties struct {
	SessionExpiryInterval           *uint32
	ReceiveMaximum                  *uint16
	MaximumQoS                      *byte
	RetainAvailable                 *bool
	MaximumPacketSize               *uint32
	AssignedClientIdentifier        string
	TopicAliasMaximum               *uint16
	ReasonString                    string
	UserProperty                    map[string]string
	WildcardSubscriptionAvailable   *bool
	SubscriptionIdentifierAvailable *bool
	SharedSubscriptionAvailable     *bool
	ServerKeepAlive                 *uint16
	ResponseInformation             string
	ServerReference                 string
	AuthenticationMethod            string
	AuthenticationData              []byte
}

ConnAckProperties MQTT CONNACK properties

func (*ConnAckProperties) String

func (cp *ConnAckProperties) String() string

type ConnAckReasonCode

type ConnAckReasonCode byte

ConnAckReasonCode MQTT reason code that indicates the result of an CONNECT operation

const (
	ConnAckReasonCodeSuccess                ConnAckReasonCode = ConnAckReasonCode(reasoncode.Success)
	ConnAckReasonCodeUnspecifiedError       ConnAckReasonCode = ConnAckReasonCode(reasoncode.UnspecifiedError)
	ConnAckReasonCodeMalformedPacket        ConnAckReasonCode = ConnAckReasonCode(reasoncode.MalformedPacket)
	ConnAckReasonCodeProtocolError          ConnAckReasonCode = ConnAckReasonCode(reasoncode.ProtocolError)
	ConnAckReasonCodeImplSpecificError      ConnAckReasonCode = ConnAckReasonCode(reasoncode.ImplSpecificError)
	ConnAckReasonCodeUnsupportedProtocolVer ConnAckReasonCode = 0x84
	ConnAckReasonCodeClientIDNotValud       ConnAckReasonCode = 0x85
	ConnAckReasonCodeBadUsernameOrPWD       ConnAckReasonCode = 0x86
	ConnAckReasonCodeNotAuthorized          ConnAckReasonCode = ConnAckReasonCode(reasoncode.NotAuthorized)
	ConnAckReasonCodeServerUnavailable      ConnAckReasonCode = 0x88
	ConnAckReasonCodeServerBusy             ConnAckReasonCode = ConnAckReasonCode(reasoncode.ServerBusy)
	ConnAckReasonCodeBanned                 ConnAckReasonCode = 0x8A
	ConnAckReasonCodeBadAuthMethod          ConnAckReasonCode = 0x8C
	ConnAckReasonCodeTopicNameInvalid       ConnAckReasonCode = ConnAckReasonCode(reasoncode.TopicNameInvalid)
	ConnAckReasonCodePacketTooLarge         ConnAckReasonCode = ConnAckReasonCode(reasoncode.PacketTooLarge)
	ConnAckReasonCodeQuotaExceeded          ConnAckReasonCode = ConnAckReasonCode(reasoncode.QuotaExceeded)
	ConnAckReasonCodePayloadFormatInvalid   ConnAckReasonCode = ConnAckReasonCode(reasoncode.PayloadFormatInvalid)
	ConnAckReasonCodeRetainNotSupported     ConnAckReasonCode = ConnAckReasonCode(reasoncode.RetainNotSupported)
	ConnAckReasonCodeQoSNotSupported        ConnAckReasonCode = ConnAckReasonCode(reasoncode.QoSNotSupported)
	ConnAckReasonCodeUseAnotherServer       ConnAckReasonCode = ConnAckReasonCode(reasoncode.UseAnotherServer)
	ConnAckReasonCodeServerMoved            ConnAckReasonCode = ConnAckReasonCode(reasoncode.ServerMoved)
	ConnAckReasonCodeConnectionRateExceeded ConnAckReasonCode = ConnAckReasonCode(reasoncode.ConnectionRateExceeded)
)

func (ConnAckReasonCode) Desc

func (code ConnAckReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (ConnAckReasonCode) Text

func (code ConnAckReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type Connect

type Connect struct {
	CleanStart     bool
	KeepAlive      uint16
	WillFlag       bool
	WillQoS        byte
	WillRetain     bool
	WillTopic      string
	WillPayload    []byte
	WillProperties *WillProperties
	Properties     *ConnectProperties
	ClientID       string
	UserName       string
	Password       []byte
	// contains filtered or unexported fields
}

Connect MQTT connect packet

func (*Connect) String

func (c *Connect) String() string

type ConnectProperties

type ConnectProperties struct {
	SessionExpiryInterval *uint32
	ReceiveMaximum        *uint16
	MaximumPacketSize     *uint32
	TopicAliasMaximum     *uint16
	RequestProblemInfo    *bool
	RequestResponseInfo   *bool
	UserProperty          map[string]string
	AuthenticationMethod  string
	AuthenticationData    []byte
}

ConnectProperties MQTT connect properties packet

func (*ConnectProperties) String

func (cp *ConnectProperties) String() string

type Connection

type Connection interface {
	BrokerURL() string
	// Connect MQTT client calls Connect when it needs a io read writer.
	// If the Connect returns an error during reconenct then the MQTT client will
	// attempt a reconnect again. The reconnect interval depends on backoff delay
	Connect(ctx context.Context) (io.ReadWriter, error)
	// Closes the network connection
	Close()
}

Connection represents a connection that the MQTT client uses. The implementation of the Connection is responsible for initialization of the connection(tcp, ws etc...) with the broker. WebsocketConn, TCPConn is provided as part of the library, other connections can be written by the implementations

type Disconnect

type Disconnect struct {
	ReasonCode DisconnectReasonCode
	Properties *DisconnectProperties
}

Disconnect MQTT DISCONNECT packet

func (*Disconnect) String

func (d *Disconnect) String() string

type DisconnectProperties

type DisconnectProperties struct {
	SessionExpiryInterval *uint32
	ReasonString          string
	UserProperty          map[string]string
	ServerReference       string
}

DisconnectProperties MQTT DISCONNECT properties

func (*DisconnectProperties) String

func (dp *DisconnectProperties) String() string

type DisconnectReasonCode

type DisconnectReasonCode byte

DisconnectReasonCode indicates DISCONNECT MQTT reason code

const (
	DisconnectReasonCodeNormal                            DisconnectReasonCode = DisconnectReasonCode(reasoncode.Success)
	DisconnectReasonCodeWithWillMessage                   DisconnectReasonCode = 0x04
	DisconnectReasonCodeUnspecifiedError                  DisconnectReasonCode = DisconnectReasonCode(reasoncode.UnspecifiedError)
	DisconnectReasonCodeMalformedPacket                   DisconnectReasonCode = DisconnectReasonCode(reasoncode.MalformedPacket)
	DisconnectReasonCodeProtocolError                     DisconnectReasonCode = DisconnectReasonCode(reasoncode.ProtocolError)
	DisconnectReasonCodeImplSpecificError                 DisconnectReasonCode = DisconnectReasonCode(reasoncode.ImplSpecificError)
	DisconnectReasonCodeNotAuthorized                     DisconnectReasonCode = DisconnectReasonCode(reasoncode.NotAuthorized)
	DisconnectReasonCodeServerBusy                        DisconnectReasonCode = DisconnectReasonCode(reasoncode.ServerBusy)
	DisconnectReasonCodeServerShuttingDown                DisconnectReasonCode = 0x8B
	DisconnectReasonCodeKeepAliveTimeout                  DisconnectReasonCode = 0x8D
	DisconnectReasonCodeSessionTakenOver                  DisconnectReasonCode = 0x8E
	DisconnectReasonCodeTopicFilterInvalid                DisconnectReasonCode = DisconnectReasonCode(reasoncode.TopicFilterInvalid)
	DisconnectReasonCodeTopicNameInvalid                  DisconnectReasonCode = DisconnectReasonCode(reasoncode.TopicNameInvalid)
	DisconnectReasonCodeReceiveMaximumExceeded            DisconnectReasonCode = 0x93
	DisconnectReasonCodeTopicAliasInvalid                 DisconnectReasonCode = 0x94
	DisconnectReasonCodePacketTooLarge                    DisconnectReasonCode = 0x95
	DisconnectReasonCodeMessageRateTooHigh                DisconnectReasonCode = 0x96
	DisconnectReasonCodeQuotaExceeded                     DisconnectReasonCode = DisconnectReasonCode(reasoncode.QuotaExceeded)
	DisconnectReasonCodeAdministrativeAction              DisconnectReasonCode = 0x98
	DisconnectReasonCodePayloadFormatInvalid              DisconnectReasonCode = DisconnectReasonCode(reasoncode.PayloadFormatInvalid)
	DisconnectReasonCodeRetainNotSupported                DisconnectReasonCode = DisconnectReasonCode(reasoncode.RetainNotSupported)
	DisconnectReasonCodeQoSNotSupported                   DisconnectReasonCode = DisconnectReasonCode(reasoncode.QoSNotSupported)
	DisconnectReasonCodeUseAnotherServer                  DisconnectReasonCode = DisconnectReasonCode(reasoncode.UseAnotherServer)
	DisconnectReasonServerMoved                           DisconnectReasonCode = DisconnectReasonCode(reasoncode.ServerMoved)
	DisconnectReasonCodeSharedSubscriptionsNotSupported   DisconnectReasonCode = DisconnectReasonCode(reasoncode.SharedSubscriptionsNotSupported)
	DisconnectReasonCodeConnectionRateExceeded            DisconnectReasonCode = DisconnectReasonCode(reasoncode.ConnectionRateExceeded)
	DisconnectReasonCodeMaximumConnectTime                DisconnectReasonCode = 0xA0
	DisconnectReasonCodeSubscriptionIdsNotSupported       DisconnectReasonCode = DisconnectReasonCode(reasoncode.SubscriptionIdsNotSupported)
	DisconnectReasonCodeWildcardSubscriptionsNotSupported DisconnectReasonCode = DisconnectReasonCode(reasoncode.WildcardSubscriptionsNotSupported)
)

type DisconnectedEventFn

type DisconnectedEventFn = func(err error)

type EventEmitter

type EventEmitter interface {
	On(eventName string, callback interface{}) error
	Off(eventName string, callback interface{}) error
}

EventEmitter listens to a named event and triggers a callback when that event occurs The events are emitted as it occurs

type MessageHandler

type MessageHandler func(*Publish)

MessageHandler callback that is invoked when a new PUBLISH message has been received

type MessageReceiver

type MessageReceiver struct {
	// contains filtered or unexported fields
}

MessageReceiver that allows receiving subscribed messages

func NewMessageReceiver

func NewMessageReceiver() *MessageReceiver

NewMessageReceiver new subscriber channel

func (*MessageReceiver) Recv

func (m *MessageReceiver) Recv() (*Publish, error)

type PubAck

type PubAck struct {
	ReasonCode PubAckReasonCode
	Properties *PublishResponseProperties
	// contains filtered or unexported fields
}

PubAck MQTT PUBACK packet

type PubAckReasonCode

type PubAckReasonCode byte

PubAckReasonCode MQTT reason code that indicates the result of PUBLISH operation

func (PubAckReasonCode) Desc

func (code PubAckReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (PubAckReasonCode) Text

func (code PubAckReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type PubComp

type PubComp struct {
	ReasonCode PubCompReasonCode
	Properties *PublishResponseProperties
	// contains filtered or unexported fields
}

PubComp MQTT PUBCOMP packet

type PubCompReasonCode

type PubCompReasonCode byte

PubCompReasonCode MQTT reason code that indicates the result of PUBLISH operation

const (
	PubCompReasonCodeSuccess        PubCompReasonCode = PubCompReasonCode(reasoncode.Success)
	PubCompPacketIdentifierNotFound PubCompReasonCode = PubCompReasonCode(reasoncode.PacketIdentifierNotFound)
)

func (PubCompReasonCode) Desc

func (code PubCompReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (PubCompReasonCode) Text

func (code PubCompReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type PubRec

type PubRec struct {
	ReasonCode PubRecReasonCode
	Properties *PublishResponseProperties
	// contains filtered or unexported fields
}

PubRec MQTT PUBACK packet

type PubRecReasonCode

type PubRecReasonCode byte

func (PubRecReasonCode) Desc

func (code PubRecReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (PubRecReasonCode) Text

func (code PubRecReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type PubRel

type PubRel struct {
	ReasonCode PubRelReasonCode
	Properties *PublishResponseProperties
	// contains filtered or unexported fields
}

PubRel MQTT PUBREL packet

type PubRelReasonCode

type PubRelReasonCode byte

PubRelReasonCode MQTT reason code that indicates the result of PUBLISH operation

const (
	PubRelReasonCodeSuccess        PubRelReasonCode = PubRelReasonCode(reasoncode.Success)
	PubRelPacketIdentifierNotFound PubRelReasonCode = PubRelReasonCode(reasoncode.PacketIdentifierNotFound)
)

func (PubRelReasonCode) Desc

func (code PubRelReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (PubRelReasonCode) Text

func (code PubRelReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type Publish

type Publish struct {
	QoSLevel  byte
	DUPFlag   bool
	Retain    bool
	TopicName string

	Properties *PublishProperties
	Payload    []byte
	// contains filtered or unexported fields
}

Publish MQTT PUBLISH packet

func (*Publish) String

func (p *Publish) String() string

type PublishProperties

type PublishProperties struct {
	PayloadFormatIndicator  *bool
	MessageExpiryInterval   *uint32
	TopicAlias              *uint16
	ResponseTopic           string
	CorrelationData         []byte
	UserProperty            map[string]string
	SubscriptionIdentifiers []uint32
	ContentType             string
}

PublishProperties MQTT PUBLISH properties

func (*PublishProperties) String

func (pp *PublishProperties) String() string

type PublishResponseProperties

type PublishResponseProperties struct {
	ReasonString string
	UserProperty map[string]string
}

PublishResponseProperties MQTT PUBACK, PUBREC, PUBREL, PUBCOMP properties

type ReconnectedEventFn

type ReconnectedEventFn = func(connack *ConnAck)

type ReconnectingEventFn

type ReconnectingEventFn = func(str string)

type ResubscribeEventFn

type ResubscribeEventFn = func(resubscribeResult ResubscribeResult)

type ResubscribeResult

type ResubscribeResult struct {
	Subscribe *Subscribe
	SubAck    *SubAck
	Error     error
}

ResubscribeResult result after resubscription, returns the Subscription ack (SubAck) and an error code. If there is an error then the subscription may have failed

type SubAck

type SubAck struct {
	Properties  *SubAckProperties
	ReasonCodes []SubAckReasonCode
	// contains filtered or unexported fields
}

SubAck MQTT SUBACK packet

func (*SubAck) String

func (s *SubAck) String() string

type SubAckProperties

type SubAckProperties struct {
	ReasonString string
	UserProperty map[string]string
}

SubAckProperties MQTT SUBACK properties

func (*SubAckProperties) String

func (sp *SubAckProperties) String() string

type SubAckReasonCode

type SubAckReasonCode byte

SubAckReasonCode MQTT reason code that indicates the result of SUBSCRIBE operation

const (
	SubAckReasonCodeGrantedQoS0             SubAckReasonCode = SubAckReasonCode(reasoncode.Success)
	SubAckReasonCodeGrantedQoS1             SubAckReasonCode = 0x01
	SubAckReasonCodeGrantedQoS2             SubAckReasonCode = 0x02
	SubAckReasonCodeUnspecifiedError        SubAckReasonCode = SubAckReasonCode(reasoncode.UnspecifiedError)
	SubAckReasonCodeImplSpecificError       SubAckReasonCode = SubAckReasonCode(reasoncode.ImplSpecificError)
	SubAckReasonCodeNotAuthorized           SubAckReasonCode = SubAckReasonCode(reasoncode.NotAuthorized)
	SubAckReasonCodeTopicFilterInvalid      SubAckReasonCode = SubAckReasonCode(reasoncode.TopicFilterInvalid)
	SubAckPacketIdentifierInUse             SubAckReasonCode = SubAckReasonCode(reasoncode.PacketIdentifierInUse)
	SubAckQuotaExceeded                     SubAckReasonCode = SubAckReasonCode(reasoncode.QuotaExceeded)
	SubAckSharedSubscriptionsNotSupported   SubAckReasonCode = SubAckReasonCode(reasoncode.SharedSubscriptionsNotSupported)
	SubAckSubscriptionIdsNotSupported       SubAckReasonCode = SubAckReasonCode(reasoncode.SubscriptionIdsNotSupported)
	SubAckWildcardSubscriptionsNotSupported SubAckReasonCode = SubAckReasonCode(reasoncode.WildcardSubscriptionsNotSupported)
)

func (SubAckReasonCode) Desc

func (code SubAckReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (SubAckReasonCode) Text

func (code SubAckReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type Subscribe

type Subscribe struct {
	Subscriptions []*Subscription
	Properties    *SubscribeProperties
	// contains filtered or unexported fields
}

Subscribe MQTT SUBSCRIBE packet

func (*Subscribe) String

func (s *Subscribe) String() string

type SubscribeProperties

type SubscribeProperties struct {
	SubscriptionIdentifier *uint32
	UserProperty           map[string]string
}

SubscribeProperties MQTT SUBSCRIBE properties

func (*SubscribeProperties) String

func (sp *SubscribeProperties) String() string

type Subscription

type Subscription struct {
	TopicFilter       string
	QoSLevel          byte
	NoLocal           bool
	RetainAsPublished bool
	RetainHandling    byte
}

Subscription contains topic filter and subscription options for the MQTT subscribe

func (*Subscription) String

func (s *Subscription) String() string

type TCPConn

type TCPConn struct {
	Host      string
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

TCPConn concrete implementation of Connection when used the MQTT client uses a TCP to connect to MQTT broker

func (*TCPConn) BrokerURL

func (t *TCPConn) BrokerURL() string

BrokerURL the broker URL

func (*TCPConn) Close

func (t *TCPConn) Close()

Close closes the connection

func (*TCPConn) Connect

func (t *TCPConn) Connect(ctx context.Context) (io.ReadWriter, error)

Connect connect to MQTT broker

type UnsubAck

type UnsubAck struct {
	Properties  *UnsubAckProperties
	ReasonCodes []UnsubAckReasonCode
	// contains filtered or unexported fields
}

UnsubAck MQTT UNSUBACK packet

func (*UnsubAck) String

func (us *UnsubAck) String() string

type UnsubAckProperties

type UnsubAckProperties struct {
	ReasonString string
	UserProperty map[string]string
}

UnsubAckProperties MQTT UNSUBACK properties

func (*UnsubAckProperties) String

func (usp *UnsubAckProperties) String() string

type UnsubAckReasonCode

type UnsubAckReasonCode byte

UnsubAckReasonCode MQTT reason code that indicates the result of SUBSCRIBE operation

const (
	UnsubAckReasonCodeSuccess            UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.Success)
	UnsubAckNoSubscriptionExisted        UnsubAckReasonCode = 0x11
	UnsubAckReasonCodeUnspecifiedError   UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.UnspecifiedError)
	UnsubAckReasonCodeImplSpecificError  UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.ImplSpecificError)
	UnsubAckReasonCodeNotAuthorized      UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.NotAuthorized)
	UnsubAckReasonCodeTopicFilterInvalid UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.TopicFilterInvalid)
	UnsubAckPacketIdentifierInUse        UnsubAckReasonCode = UnsubAckReasonCode(reasoncode.PacketIdentifierInUse)
)

func (UnsubAckReasonCode) Desc

func (code UnsubAckReasonCode) Desc() string

Desc returns a description for the MQTT reason code. Returns the empty string if the reason code is unknown.

func (UnsubAckReasonCode) Text

func (code UnsubAckReasonCode) Text() string

Text returns a text for the MQTT reason code. Returns the empty string if the reason code is unknown.

type Unsubscribe

type Unsubscribe struct {
	Properties   *UnsubscribeProperties
	TopicFilters []string
	// contains filtered or unexported fields
}

Unsubscribe MQTT unsubscribe packet

func (*Unsubscribe) String

func (us *Unsubscribe) String() string

type UnsubscribeProperties

type UnsubscribeProperties struct {
	UserProperty map[string]string
}

UnsubscribeProperties MQTT UNSUBSCRIBE properties

func (*UnsubscribeProperties) String

func (usp *UnsubscribeProperties) String() string

type WebsocketConn

type WebsocketConn struct {
	Host      string
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

WebsocketConn concrete implementation of Connection when used the MQTT client uses a WebSocket to connect to MQTT broker

func (*WebsocketConn) BrokerURL

func (w *WebsocketConn) BrokerURL() string

BrokerURL the broker URL

func (*WebsocketConn) Close

func (w *WebsocketConn) Close()

Close closes the connection

func (*WebsocketConn) Connect

func (w *WebsocketConn) Connect(ctx context.Context) (io.ReadWriter, error)

Connect connect to MQTT broker

type WillMessage

type WillMessage struct {
	QoS        byte
	Retain     bool
	Topic      string
	Payload    []byte
	Properties *WillProperties
}

WillMessage An Application Message which is published by the Server after the Network Connection is closed. The client can set a reason code DisconnectReasonCodeWithWillMessage while disconnecting

type WillProperties

type WillProperties struct {
	WillDelayInterval      *uint32
	PayloadFormatIndicator *bool
	MessageExpiryInterval  *uint32
	ContentType            string
	ResponseTopic          string
	CorrelationData        []byte
	UserProperty           map[string]string
}

WillProperties will properties in CONNECT packet

func (*WillProperties) String

func (wp *WillProperties) String() string

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL