libmqtt

package module
v0.9.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2021 License: Apache-2.0 Imports: 23 Imported by: 1

README

libmqtt

Build Status GoDoc GoReportCard codecov

Feature rich modern MQTT library in pure Go, for Go, C/C++, Java

Notice: This is a fork of https://github.com/goiiot/libmqtt that has been roughly patched to correctly handle packets from mqtt 3.1 and mqtt 5. The parent branch does not allow 3.1 and has mostly empty mqtt 5 tests and has completely broken mqtt5. (atw)

Table of contents

Features

  1. MQTT v3.1.1/v5.0 client support (async only)
  2. High performance and less memory footprint (see Benchmark)
  3. Customizable topic routing (see Topic Routing)
  4. Multiple Builtin session persist methods (see Session Persist)
  5. C/C++ lib, Java lib, Command line client support
  6. Idiomatic Go

Usage

This package can be used as

As a Go lib
Prerequisite
  • Go 1.9+
Steps

TL;DR: You can find a full example at examples/client.go

1.Go get this project

go get github.com/goiiot/libmqtt

2.Import this package in your project file

import "github.com/goiiot/libmqtt"

3.Create a custom client

// Create a client and enable auto reconnect when connection lost
// We primarily use `RegexRouter` for client
client, err := libmqtt.NewClient(
    // enable keepalive (10s interval) with 20% tolerance
    libmqtt.WithKeepalive(10, 1.2),
    // enable auto reconnect and set backoff strategy
    libmqtt.WithAutoReconnect(true),
    libmqtt.WithBackoffStrategy(time.Second, 5*time.Second, 1.2),
    // use RegexRouter for topic routing if not specified
    // will use TextRouter, which will match full text
    libmqtt.WithRouter(libmqtt.NewRegexRouter()),
)

if err != nil {
    // handle client creation error
    panic("create mqtt client failed")
}

Notice: If you would like to explore all the options available, please refer to GoDoc#Option

4.Register the handlers and Connect, then you are ready to pub/sub with server

Optional, but we recommend to register handlers for pub, sub, unsub, net error and persist error, and you can gain more controllability of the lifecycle of the client

client.HandlePub(PubHandler) // register handler for pub success/fail (optional, but recommended)
client.HandleSub(SubHandler) // register handler for sub success/fail (optional, but recommended)
client.HandleUnSub(UnSubHandler) // register handler for unsub success/fail (optional, but recommended)
client.HandleNet(NetHandler) // register handler for net error (optional, but recommended)
client.HandlePersist(PersistHandler) // register handler for persist error (optional, but recommended)

// define your topic handlers like a golang http server client.Handle("foo", func(topic string, qos libmqtt.QosLevel, msg []byte) { // handle the topic message })

client.Handle("bar", func(topic string, qos libmqtt.QosLevel, msg []byte) { // handle the topic message })

// connect to server
client.ConnectServer("test.mosquitto.org:8883", 
	libmqtt.WithCustomTLS(nil),
	libmqtt.WithConnHandleFunc(func(server string, code byte, err error) {
		if err != nil {
			// failed
			panic(err)
		}
		
		if code != libmqtt.CodeSuccess {
			// server rejected or in error
			panic(code)
		}

		// success
		// you are now connected to the `server`
		// (the `server` is one of your provided `servers` when create the client)
		// start your business logic here or send a signal to your logic to start

		// subscribe some topic(s)
		client.Subscribe([]*libmqtt.Topic{
			{Name: "foo"},
			{Name: "bar", Qos: libmqtt.Qos1},
		}...)

		// publish some topic message(s)
		client.Publish([]*libmqtt.PublishPacket{
			{TopicName: "foo", Payload: []byte("bar"), Qos: libmqtt.Qos0},
			{TopicName: "bar", Payload: []byte("foo"), Qos: libmqtt.Qos1},
		}...)
	}),
)

5.Unsubscribe from topic(s)

client.UnSubscribe("foo", "bar")

6.Destroy the client when you would like to

// use true for a immediate disconnect to server
// use false to send a DisConn packet to server before disconnect
client.Destroy(true)
As a C/C++ lib

Please refer to c - README.md

As a Java lib

Please refer to java - README.md

As a command line client

Please refer to cmd/libmqtt - README.md

As MQTT infrastructure

This package can also be used as MQTT packet encoder and decoder

// decode one mqtt 3.1.1 packet from reader
packet, err := libmqtt.Decode(libmqtt.V311, reader)
// ...

// encode one mqtt packet to buffered writer
err := libmqtt.Encode(packet, bufferedWriter)
// ...

Topic Routing

Routing topics is one of the most important thing when it comes to business logic, we currently have built two TopicRouters which is ready to use, they are TextRouter and RegexRouter

  • TextRouter will match the exact same topic which was registered to client by Handle method. (this is the default router in a client)
  • RegexRouter will go through all the registered topic handlers, and use regular expression to test whether that is matched and should dispatch to the handler

If you would like to apply other routing strategy to the client, you can provide this option when creating the client

client, err := libmqtt.NewClient(
    // ...
    // e.g. use `RegexRouter`
    libmqtt.WithRouter(libmqtt.NewRegexRouter()),
    // ...
)

Session Persist

Per MQTT Specification, session state should be persisted and be recovered when next time connected to server without clean session flag set, currently we provide persist method as following:

  1. NonePersist - no session persist
  2. memPersist - in memory session persist
  3. filePersist - files session persist (with write barrier)
  4. redisPersist - redis session persist (available inside github.com/goiiot/libmqtt/extension package)

Note: Use RedisPersist if possible.

Benchmark

The procedure of the benchmark is:

  1. Create the client
  2. Connect to server
  3. Publish N times to topic foo
  4. Unsubscribe topic (just ensure all pub message has been sent)
  5. Destroy client (without disconnect packet)

The benchmark result listed below was taken on a MacBook Pro 13' (Early 2015, macOS 10.13.2), statistics inside which is the value of ten times average

Bench Name Pub Count ns/op B/op allocs/op
BenchmarkLibmqttClient-4 (this project) 100000 20187 176 6
BenchmarkPahoClient-4 (eclipse paho) 100000 25072 816 15

You can make the benchmark using source code from benchmark

Extensions

Helpful extensions for libmqtt (see extension)

LICENSE

GitHub license

Copyright Go-IIoT (https://github.com/goiiot)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Documentation

Index

Constants

View Source
const (
	SubOkMaxQos0 = 0    // SubOkMaxQos0 QoS 0 is used by server
	SubOkMaxQos1 = 1    // SubOkMaxQos1 QoS 1 is used by server
	SubOkMaxQos2 = 2    // SubOkMaxQos2 QoS 2 is used by server
	SubFail      = 0x80 // SubFail means that subscription is not successful
)
View Source
const (
	// MQTT 3.1.1 ConnAck code
	CodeUnacceptableVersion   = 1 // Packet: ConnAck
	CodeIdentifierRejected    = 2 // Packet: ConnAck
	CodeServerUnavailable     = 3 // Packet: ConnAck
	CodeBadUsernameOrPassword = 4 // Packet: ConnAck
	CodeUnauthorized          = 5 // Packet: ConnAck
)
View Source
const (
	CodeSuccess                             = 0   // Packet: ConnAck, PubAck, PubRecv, PubRel, PubComp, UnSubAck, Auth
	CodeNormalDisconn                       = 0   // Packet: DisConn
	CodeGrantedQos0                         = 0   // Packet: SubAck
	CodeGrantedQos1                         = 1   // Packet: SubAck
	CodeGrantedQos2                         = 2   // Packet: SubAck
	CodeDisconnWithWill                     = 4   // Packet: DisConn
	CodeNoMatchingSubscribers               = 16  // Packet: PubAck, PubRecv
	CodeNoSubscriptionExisted               = 17  // Packet: UnSubAck
	CodeContinueAuth                        = 24  // Packet: Auth
	CodeReAuth                              = 25  // Packet: Auth
	CodeUnspecifiedError                    = 128 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeMalformedPacket                     = 129 // Packet: ConnAck, DisConn
	CodeProtoError                          = 130 // Packet: ConnAck, DisConn
	CodeImplementationSpecificError         = 131 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeUnsupportedProtoVersion             = 132 // Packet: ConnAck
	CodeClientIdNotValid                    = 133 // Packet: ConnAck
	CodeBadUserPass                         = 134 // Packet: ConnAck
	CodeNotAuthorized                       = 135 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeServerUnavail                       = 136 // Packet: ConnAck
	CodeServerBusy                          = 137 // Packet: ConnAck, DisConn
	CodeBanned                              = 138 // Packet: ConnAck
	CodeServerShuttingDown                  = 139 // Packet: DisConn
	CodeBadAuthenticationMethod             = 140 // Packet: ConnAck, DisConn
	CodeKeepaliveTimeout                    = 141 // Packet: DisConn
	CodeSessionTakenOver                    = 142 // Packet: DisConn
	CodeTopicFilterInvalid                  = 143 // Packet: SubAck, UnSubAck, DisConn
	CodeTopicNameInvalid                    = 144 // Packet: ConnAck, PubAck, PubRecv, DisConn
	CodePacketIdentifierInUse               = 145 // Packet: PubAck, PubRecv, PubAck, UnSubAck
	CodePacketIdentifierNotFound            = 146 // Packet: PubRel, PubComp
	CodeReceiveMaxExceeded                  = 147 // Packet: DisConn
	CodeTopicAliasInvalid                   = 148 // Packet: DisConn
	CodePacketTooLarge                      = 149 // Packet: ConnAck, DisConn
	CodeMessageRateTooHigh                  = 150 // Packet: DisConn
	CodeQuotaExceeded                       = 151 // Packet: ConnAck, PubAck, PubRec, SubAck, DisConn
	CodeAdministrativeAction                = 152 // Packet: DisConn
	CodePayloadFormatInvalid                = 153 // Packet: ConnAck, PubAck, PubRecv, DisConn
	CodeRetainNotSupported                  = 154 // Packet: ConnAck, DisConn
	CodeQosNoSupported                      = 155 // Packet: ConnAck, DisConn
	CodeUseAnotherServer                    = 156 // Packet: ConnAck, DisConn
	CodeServerMoved                         = 157 // Packet: ConnAck, DisConn
	CodeSharedSubscriptionNotSupported      = 158 // Packet: SubAck, DisConn
	CodeConnectionRateExceeded              = 159 // Packet: ConnAck, DisConn
	CodeMaxConnectTime                      = 160 // Packet: DisConn
	CodeSubscriptionIdentifiersNotSupported = 161 // Packet: SubAck, DisConn
	CodeWildcardSubscriptionNotSupported    = 162 // Packet: SubAck, DisConn
)

Variables

View Source
var (
	// ErrDecodeBadPacket is the error happened when trying to decode a none MQTT packet
	ErrDecodeBadPacket = errors.New("none MQTT packet")

	// ErrDecodeNoneV311Packet is the error happened when
	// trying to decode mqtt 3.1.1 packet but got other mqtt packet ProtoVersion
	ErrDecodeNoneV311Packet = errors.New("none MQTT v3.1.1 packet")

	// ErrDecodeNoneV5Packet is the error happened when
	// trying to decode mqtt 5 packet but got other mqtt packet ProtoVersion
	ErrDecodeNoneV5Packet = errors.New("none MQTT v5 packet")
)
View Source
var (
	// ErrUnsupportedVersion unsupported mqtt ProtoVersion
	ErrUnsupportedVersion = errors.New("trying encode/decode packet with unsupported MQTT version ")

	// ErrEncodeBadPacket happens when trying to encode none MQTT packet
	ErrEncodeBadPacket = errors.New("trying encode none MQTT packet ")

	// ErrEncodeLargePacket happens when MQTT packet is too large according to MQTT spec
	ErrEncodeLargePacket = errors.New("MQTT packet too large")
)
View Source
var (
	PingReqPacket  = &pingReqPacket{}
	PingRespPacket = &pingRespPacket{}
)
View Source
var (
	ErrNotSupportedVersion = errors.New("mqtt version not supported ")
)
View Source
var (
	// ErrPacketDroppedByStrategy used when persist store packet while strategy
	// don't allow that persist
	ErrPacketDroppedByStrategy = errors.New("packet persist dropped by strategy ")
)
View Source
var NonePersist = &nonePersist{}

NonePersist defines no persist storage

View Source
var WithBuf = WithBufSize

WithBuf is the alias of WithBufSize

Deprecated: use WithBufSize instead (will be removed in v1.0)

Functions

func Encode

func Encode(packet Packet, w BufferedWriter) error

Encode MQTT packet to bytes according to protocol ProtoVersion

Types

type AsyncClient

type AsyncClient struct {
	// contains filtered or unexported fields
}

AsyncClient is the async mqtt client implementation

func (*AsyncClient) Connect deprecated

func (c *AsyncClient) Connect(h ConnHandler)

Connect to all designated servers

Deprecated: use Client.ConnectServer instead (will be removed in v1.0)

func (*AsyncClient) ConnectServer

func (c *AsyncClient) ConnectServer(server string, connOptions ...Option) error

ConnectServer connect to server with connection specific options only return errors happened when applying options

func (*AsyncClient) Destroy

func (c *AsyncClient) Destroy(force bool)

Destroy will disconnect form all server If force is true, then close connection without sending a DisconnPacket

func (*AsyncClient) Disconnect

func (c *AsyncClient) Disconnect(server string, packet *DisconnPacket) bool

Disconnect from one server return true if DisconnPacket will be sent

func (*AsyncClient) Handle deprecated

func (c *AsyncClient) Handle(topic string, h TopicHandler)

Handle register subscription message route

Deprecated: use HandleTopic instead, will be removed in v1.0

func (*AsyncClient) HandleNet

func (c *AsyncClient) HandleNet(h NetHandler)

HandleNet register handler for net error Deprecated: use WithNetHandleFunc instead (will be removed in v1.0)

func (*AsyncClient) HandlePersist

func (c *AsyncClient) HandlePersist(h PersistHandler)

HandlePersist register handler for net error Deprecated: use WithPersistHandleFunc instead (will be removed in v1.0)

func (*AsyncClient) HandlePub

func (c *AsyncClient) HandlePub(h PubHandler)

HandlePub register handler for pub error Deprecated: use WithPubHandleFunc instead (will be removed in v1.0)

func (*AsyncClient) HandleSub

func (c *AsyncClient) HandleSub(h SubHandler)

HandleSub register handler for extra sub info Deprecated: use WithSubHandleFunc instead (will be removed in v1.0)

func (*AsyncClient) HandleTopic

func (c *AsyncClient) HandleTopic(topic string, h TopicHandleFunc)

HandleTopic add a topic routing rule

func (*AsyncClient) HandleUnSub

func (c *AsyncClient) HandleUnSub(h UnSubHandler)

HandleUnSub register handler for unsubscribe error Deprecated: use WithUnsubHandleFunc instead (will be removed in v1.0)

func (*AsyncClient) Publish

func (c *AsyncClient) Publish(msg ...*PublishPacket)

Publish message(s) to topic(s), one to one

func (*AsyncClient) Subscribe

func (c *AsyncClient) Subscribe(topics ...*Topic)

Subscribe topic(s)

func (*AsyncClient) UnSubscribe

func (c *AsyncClient) UnSubscribe(topics ...string)

UnSubscribe topic(s) Deprecated: use Unsubscribe instead, will be removed in v1.0

func (*AsyncClient) Unsubscribe

func (c *AsyncClient) Unsubscribe(topics ...string)

Unsubscribe topic(s)

func (*AsyncClient) Wait

func (c *AsyncClient) Wait()

Wait will wait for all connections to exit

type AuthPacket

type AuthPacket struct {
	BasePacket
	Code  byte       // the authentication result code
	Props *AuthProps // authentication properties
}

AuthPacket Client <-> Server as part of an extended authentication exchange, such as challenge / response authentication.

It is a Protocol Error for the Client or Server to send an AUTH packet if the ConnPacket did not contain the same Authentication Method

func (*AuthPacket) Bytes

func (a *AuthPacket) Bytes() []byte

func (*AuthPacket) Type

func (a *AuthPacket) Type() CtrlType

Type of AuthPacket is CtrlAuth

func (*AuthPacket) WriteTo

func (a *AuthPacket) WriteTo(w BufferedWriter) error

type AuthProps

type AuthProps struct {
	AuthMethod string
	AuthData   []byte
	Reason     string
	UserProps  UserProps
}

AuthProps properties of AuthPacket

type BasePacket

type BasePacket struct {
	ProtoVersion ProtoVersion
	// contains filtered or unexported fields
}

BasePacket for packet encoding and MQTT version note

func (*BasePacket) SetVersion

func (b *BasePacket) SetVersion(version ProtoVersion)

func (*BasePacket) Version

func (b *BasePacket) Version() ProtoVersion

Version is the MQTT version of this packet

type BufferedReader

type BufferedReader interface {
	io.Reader
	io.ByteReader
}

BufferedReader buffered reader, e.g. bufio.Reader, bytes.Buffer

type BufferedWriter

type BufferedWriter interface {
	io.Writer
	io.ByteWriter
}

BufferedWriter buffered writer, e.g. bufio.Writer, bytes.Buffer

type Client

type Client = *AsyncClient

Client type for *AsyncClient

func NewClient

func NewClient(options ...Option) (Client, error)

NewClient create a new mqtt client

type ConnAckPacket

type ConnAckPacket struct {
	BasePacket
	Present bool
	Code    byte
	Props   *ConnAckProps
}

ConnAckPacket is the packet sent by the Server in response to a ConnPacket received from a Client.

The first packet sent from the Server to the Client MUST be a ConnAckPacket

func (*ConnAckPacket) Bytes

func (c *ConnAckPacket) Bytes() []byte

func (*ConnAckPacket) Type

func (c *ConnAckPacket) Type() CtrlType

Type ConnAckPacket's type is CtrlConnAck

func (*ConnAckPacket) WriteTo

func (c *ConnAckPacket) WriteTo(w BufferedWriter) error

type ConnAckProps

type ConnAckProps struct {
	// If the Session Expiry Interval is absent the value in the ConnPacket used.
	// The server uses this property to inform the Client that it is using
	// a value other than that sent by the Client in the ConnAck
	SessionExpiryInterval uint32

	// The Server uses this value to limit the number of QoS 1 and QoS 2 publications
	// that it is willing to process concurrently for the Client.
	//
	// It does not provide a mechanism to limit the QoS 0 publications that
	// the Client might try to send
	MaxRecv uint16

	MaxQos QosLevel

	// Declares whether the Server supports retained messages.
	// true means that retained messages are not supported.
	// false means retained messages are supported
	RetainAvail *bool

	// Maximum Packet Size the Server is willing to accept.
	// If the Maximum Packet Size is not present, there is no limit on the
	// packet size imposed beyond the limitations in the protocol as a
	// result of the remaining length encoding and the protocol header sizes
	MaxPacketSize uint32

	// The Client Identifier which was assigned by the Server
	// because a zero length Client Identifier was found in the ConnPacket
	AssignedClientID string

	// This value indicates the highest value that the Server will accept
	// as a Topic Alias sent by the Client.
	//
	// The Server uses this value to limit the number of Topic Aliases
	// that it is willing to hold on this Connection.
	MaxTopicAlias uint16

	// Human readable string designed for diagnostics
	Reason string

	// User defines Properties
	UserProps UserProps

	// Whether the Server supports Wildcard Subscriptions.
	// false means that Wildcard Subscriptions are not supported.
	// true means Wildcard Subscriptions are supported.
	//
	// default is true
	WildcardSubAvail *bool // 40

	// Whether the Server supports Subscription Identifiers.
	// false means that Subscription Identifiers are not supported.
	// true means Subscription Identifiers are supported.
	//
	// default is true
	SubIDAvail *bool

	// Whether the Server supports Shared Subscriptions.
	// false means that Shared Subscriptions are not supported.
	// true means Shared Subscriptions are supported
	//
	// default is true
	SharedSubAvail *bool

	// Keep Alive time assigned by the Server
	ServerKeepalive uint16

	// Response Information
	RespInfo string

	// Can be used by the Client to identify another Server to use
	ServerRef string

	// The name of the authentication method
	AuthMethod string

	// The contents of this data are defined by the authentication method.
	AuthData []byte
}

ConnAckProps defines connect acknowledge properties

type ConnHandleFunc

type ConnHandleFunc func(client Client, server string, code byte, err error)

ConnHandleFunc is the handler which tend to the Connect result server is the server address provided by user in client creation call code is the ConnResult code err is the error happened when connect to server, if a error happened, the code value will max byte value (255)

type ConnHandler deprecated

type ConnHandler func(server string, code byte, err error)

Deprecated: use ConnHandleFunc instead, will be removed in v1.0

type ConnPacket

type ConnPacket struct {
	BasePacket
	ProtoName string

	CleanSession bool
	IsWill       bool
	WillQos      QosLevel
	WillRetain   bool
	WillProps    *WillProps

	// Properties
	Props *ConnProps

	// Payloads
	Username    string
	Password    string
	ClientID    string
	Keepalive   uint16
	WillTopic   string
	WillMessage []byte
	// contains filtered or unexported fields
}

ConnPacket is the first packet sent by Client to Server

func (*ConnPacket) Bytes

func (c *ConnPacket) Bytes() []byte

func (*ConnPacket) Type

func (c *ConnPacket) Type() CtrlType

Type ConnPacket's type is CtrlConn

func (*ConnPacket) WriteTo

func (c *ConnPacket) WriteTo(w BufferedWriter) error

type ConnProps

type ConnProps struct {
	// If the Session Expiry Interval is absent the value 0 is used.
	// If it is set to 0, or is absent, the Session ends when the Network Connection is closed.
	// If the Session Expiry Interval is 0xFFFFFFFF (UINT_MAX), the Session does not expire.
	SessionExpiryInterval uint32

	// The Client uses this value to limit the number of QoS 1 and QoS 2 publications
	// that it is willing to process concurrently.
	//
	// There is no mechanism to limit the QoS 0 publications that the Server might try to send.
	//
	// The value of Receive Maximum applies only to the current Network Connection.
	// If the Receive Maximum value is absent then its value defaults to 65,535
	MaxRecv uint16

	// The Maximum Packet Size the Client is willing to accept
	//
	// If the Maximum Packet Size is not present,
	// no limit on the packet size is imposed beyond the limitations in the protocol as a result of the remaining length encoding and the protocol header sizes
	MaxPacketSize uint32

	// This value indicates the highest value that the Client will accept
	// as a Topic Alias sent by the Server.
	//
	// The Client uses this value to limit the number of Topic Aliases that
	// it is willing to hold on this Connection.
	MaxTopicAlias uint16

	// The Client uses this value to request the Server to return Response
	// Information in the ConnAckPacket
	ReqRespInfo *bool

	// The Client uses this value to indicate whether the Reason String
	// or User Properties are sent in the case of failures.
	ReqProblemInfo *bool

	// User defined Properties
	UserProps UserProps

	// If Authentication Method is absent, extended authentication is not performed.
	//
	// If a Client sets an Authentication Method in the ConnPacket,
	// the Client MUST NOT send any packets other than AuthPacket or DisConn packets
	// until it has received a ConnAck packet
	AuthMethod string

	// The contents of this data are defined by the authentication method.
	AuthData []byte
	// contains filtered or unexported fields
}

ConnProps defines connect packet properties

type Connector

type Connector func(ctx context.Context, address string, timeout time.Duration, tlsConfig *tls.Config) (net.Conn, error)

type CtrlType

type CtrlType = byte

CtrlType is MQTT Control packet type

const (
	CtrlConn      CtrlType = 1  // Connect
	CtrlConnAck   CtrlType = 2  // connect ack
	CtrlPublish   CtrlType = 3  // publish
	CtrlPubAck    CtrlType = 4  // publish ack
	CtrlPubRecv   CtrlType = 5  // publish received
	CtrlPubRel    CtrlType = 6  // publish release
	CtrlPubComp   CtrlType = 7  // publish complete
	CtrlSubscribe CtrlType = 8  // subscribe
	CtrlSubAck    CtrlType = 9  // subscribe ack
	CtrlUnSub     CtrlType = 10 // unsubscribe
	CtrlUnSubAck  CtrlType = 11 // unsubscribe ack
	CtrlPingReq   CtrlType = 12 // ping request
	CtrlPingResp  CtrlType = 13 // ping response
	CtrlDisConn   CtrlType = 14 // disconnect
	CtrlAuth      CtrlType = 15 // authentication (since MQTT 5)
)

type DisConnPacket

type DisConnPacket = DisconnPacket

type DisConnProps

type DisConnProps = DisconnPacket

type DisconnPacket

type DisconnPacket struct {
	BasePacket
	Code  byte
	Props *DisconnProps
}

DisconnPacket is the final Control Packet sent from the Client to the Server. It indicates that the Client is disconnecting cleanly.

func (*DisconnPacket) Bytes

func (d *DisconnPacket) Bytes() []byte

func (*DisconnPacket) Type

func (d *DisconnPacket) Type() CtrlType

Type of DisconnPacket is CtrlDisConn

func (*DisconnPacket) WriteTo

func (d *DisconnPacket) WriteTo(w BufferedWriter) error

type DisconnProps

type DisconnProps struct {
	// Session Expiry Interval in seconds
	// If the Session Expiry Interval is absent, the Session Expiry Interval in the CONNECT packet is used
	//
	// The Session Expiry Interval MUST NOT be sent on a DISCONNECT by the Server
	SessionExpiryInterval uint32

	// Human readable, designed for diagnostics and SHOULD NOT be parsed by the receiver
	Reason string

	// User defines Properties
	UserProps UserProps

	// Used by the Client to identify another Server to use
	ServerRef string
}

DisConnProps properties for DisconnPacket

type LogLevel

type LogLevel byte

LogLevel is used to set log level in client creation

const (
	// Silent means no log
	Silent LogLevel = iota
	// Verbose log all
	Verbose
	// Debug log with debug and above
	Debug
	// Info log with info and above
	Info
	// Warning log with warning and above
	Warning
	// Error log error only
	Error
)

type NetHandleFunc

type NetHandleFunc func(client Client, server string, err error)

NetHandleFunc handles the error occurred when net broken

type NetHandler deprecated

type NetHandler func(server string, err error)

Deprecated: use NetHandleFunc instead, will be removed in v1.0

type Option

type Option func(*AsyncClient, *connectOptions) error

Option is client option for connection options

func WithAutoReconnect

func WithAutoReconnect(autoReconnect bool) Option

WithAutoReconnect set client to auto reconnect to server when connection failed

func WithBackoffStrategy

func WithBackoffStrategy(firstDelay, maxDelay time.Duration, factor float64) Option

WithBackoffStrategy will set reconnect backoff strategy firstDelay is the time to wait before retrying after the first failure maxDelay defines the upper bound of backoff delay factor is applied to the backoff after each retry.

e.g. FirstDelay = 1s and Factor = 2

then the SecondDelay is 2s, the ThirdDelay is 4s

func WithBufSize

func WithBufSize(sendBufSize, recvBufSize int) Option

WithBufSize designate the channel size of send and recv

func WithCleanSession

func WithCleanSession(f bool) Option

WithCleanSession will set clean flag in connect packet

func WithClientID

func WithClientID(clientID string) Option

WithClientID set the client id for connection

func WithConnHandleFunc

func WithConnHandleFunc(handler ConnHandleFunc) Option

func WithConnPacket

func WithConnPacket(pkt ConnPacket) Option

func WithCustomConnector

func WithCustomConnector(connector Connector) Option

func WithCustomTLS

func WithCustomTLS(config *tls.Config) Option

WithCustomTLS replaces the TLS options with a custom tls.Config

func WithDialTimeout

func WithDialTimeout(timeout uint16) Option

WithDialTimeout for connection time out (time in second)

func WithIdentity

func WithIdentity(username, password string) Option

WithIdentity for username and password

func WithKeepalive

func WithKeepalive(keepalive uint16, factor float64) Option

WithKeepalive set the keepalive interval (time in second)

func WithLog

func WithLog(l LogLevel) Option

WithLog will create basic logger for the client

func WithNetHandleFunc

func WithNetHandleFunc(handler NetHandleFunc) Option

func WithPersist

func WithPersist(method PersistMethod) Option

WithPersist defines the persist method to be used

func WithPersistHandleFunc

func WithPersistHandleFunc(handler PersistHandleFunc) Option

func WithPubHandleFunc

func WithPubHandleFunc(handler PubHandleFunc) Option

func WithRouter

func WithRouter(r TopicRouter) Option

WithRouter set the router for topic dispatch

func WithSecureServer deprecated

func WithSecureServer(servers ...string) Option

WithSecureServer use server certificate for verification won't apply `WithTLS`, `WithCustomTLS`, `WithTLSReader` options when connecting to these servers

Deprecated: use Client.ConnectServer instead (will be removed in v1.0)

func WithServer deprecated

func WithServer(servers ...string) Option

WithServer set client servers addresses should be in form of `ip:port` or `domain.name:port`, only TCP connection supported for now

Deprecated: use Client.ConnectServer instead (will be removed in v1.0)

func WithSubHandleFunc

func WithSubHandleFunc(handler SubHandleFunc) Option

func WithTCPConnector

func WithTCPConnector(handshakeTimeout time.Duration) Option

func WithTLS

func WithTLS(certFile, keyFile, caCert, serverNameOverride string, skipVerify bool) Option

WithTLS set client tls from cert, key and ca file, apply to all servers listed in `WithServer` Option

func WithTLSReader

func WithTLSReader(certReader, keyReader, caReader io.Reader, serverNameOverride string, skipVerify bool) Option

WithTLSReader set tls from client cert, key, ca reader, apply to all servers listed in `WithServer` Option

func WithUnsubHandleFunc

func WithUnsubHandleFunc(handler UnsubHandleFunc) Option

func WithVersion

func WithVersion(version ProtoVersion, compromise bool) Option

WithVersion defines the mqtt protocol ProtoVersion in use

func WithWebSocketConnector

func WithWebSocketConnector(handshakeTimeout time.Duration, headers http.Header) Option

func WithWill

func WithWill(topic string, qos QosLevel, retain bool, payload []byte) Option

WithWill mark this connection as a will teller

type Packet

type Packet interface {
	// Type return the packet type
	Type() CtrlType

	// Bytes presentation of this packet
	Bytes() []byte

	// Write bytes to the buffered writer
	WriteTo(w BufferedWriter) error

	// Version MQTT version of the packet
	Version() ProtoVersion

	SetVersion(version ProtoVersion)
}

Packet is MQTT control packet

func Decode

func Decode(version ProtoVersion, r BufferedReader) (Packet, error)

Decode will decode one mqtt packet

type PersistHandleFunc

type PersistHandleFunc func(client Client, packet Packet, err error)

PersistHandleFunc handles err happened when persist process has trouble

type PersistHandler deprecated

type PersistHandler func(err error)

Deprecated: use PersistHandleFunc instead, will be removed in v1.0

type PersistMethod

type PersistMethod interface {
	// Name of what persist strategy used
	Name() string

	// Store a packet with key
	Store(key string, p Packet) error

	// Load a packet from stored data according to the key
	Load(key string) (Packet, bool)

	// Range over data stored, return false to break the range
	Range(func(key string, p Packet) bool)

	// Delete
	Delete(key string) error

	// Destroy stored data
	Destroy() error
}

PersistMethod defines the behavior of persist methods

func NewFilePersist

func NewFilePersist(dirPath string, strategy *PersistStrategy) PersistMethod

NewFilePersist will create a file persist method with provided dirPath and strategy, if no strategy provided (nil), then the default strategy will be used

func NewMemPersist

func NewMemPersist(strategy *PersistStrategy) PersistMethod

NewMemPersist create a in memory persist method with provided strategy if no strategy provided (nil), then the default strategy will be used

type PersistStrategy

type PersistStrategy struct {
	// Interval applied to file/database persist
	// if this field is set to 0, means do persist per action
	// default value is 1s
	Interval time.Duration

	// MaxCount applied to all persist method
	// if this field set to 0, means no persist limit
	// for memory persist, means max in memory count
	// for file/database persist, means max entry in file/memory
	// default value is 0
	MaxCount uint32

	// DropOnExceed defines how to tackle with packets incoming
	// when max count is reached, default value is false
	DropOnExceed bool

	// DuplicateReplace defines whether duplicated key should
	// override previous one, default value is true
	DuplicateReplace bool
}

PersistStrategy defines the details to be complied in persist methods

type ProtoVersion

type ProtoVersion byte

ProtoVersion MQTT Protocol ProtoVersion

const (
	V311 ProtoVersion = 4 // V311 means MQTT 3.1.1
	V5   ProtoVersion = 5 // V5 means MQTT 5
)

type PubAckPacket

type PubAckPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubAckProps
}

PubAckPacket is the response to a PublishPacket with QoS level 1.

func (*PubAckPacket) Bytes

func (p *PubAckPacket) Bytes() []byte

func (*PubAckPacket) Type

func (p *PubAckPacket) Type() CtrlType

Type of PubAckPacket is CtrlPubAck

func (*PubAckPacket) WriteTo

func (p *PubAckPacket) WriteTo(w BufferedWriter) error

type PubAckProps

type PubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubAckProps properties for PubAckPacket

type PubCompPacket

type PubCompPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubCompProps
}

PubCompPacket is the response to a PubRelPacket. It is the fourth and final packet of the QoS 892 2 protocol exchange. 893

func (*PubCompPacket) Bytes

func (p *PubCompPacket) Bytes() []byte

func (*PubCompPacket) Type

func (p *PubCompPacket) Type() CtrlType

Type of PubCompPacket is CtrlPubComp

func (*PubCompPacket) WriteTo

func (p *PubCompPacket) WriteTo(w BufferedWriter) error

type PubCompProps

type PubCompProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubCompProps properties for PubCompPacket

type PubHandleFunc

type PubHandleFunc func(client Client, topic string, err error)

PubHandleFunc handles the error occurred when publish some message if err is not nil, that means a error occurred when sending pub msg

type PubHandler deprecated

type PubHandler func(topic string, err error)

Deprecated: use PubHandleFunc instead, will be removed in v1.0

type PubRecvPacket

type PubRecvPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubRecvProps
}

PubRecvPacket is the response to a PublishPacket with QoS 2. It is the second packet of the QoS 2 protocol exchange.

func (*PubRecvPacket) Bytes

func (p *PubRecvPacket) Bytes() []byte

func (*PubRecvPacket) Type

func (p *PubRecvPacket) Type() CtrlType

Type of PubRecvPacket is CtrlPubRecv

func (*PubRecvPacket) WriteTo

func (p *PubRecvPacket) WriteTo(w BufferedWriter) error

type PubRecvProps

type PubRecvProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubRecvProps properties for PubRecvPacket

type PubRelPacket

type PubRelPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubRelProps
}

PubRelPacket is the response to a PubRecvPacket. It is the third packet of the QoS 2 protocol exchange.

func (*PubRelPacket) Bytes

func (p *PubRelPacket) Bytes() []byte

func (*PubRelPacket) Type

func (p *PubRelPacket) Type() CtrlType

Type of PubRelPacket is CtrlPubRel

func (*PubRelPacket) WriteTo

func (p *PubRelPacket) WriteTo(w BufferedWriter) error

type PubRelProps

type PubRelProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubRelProps properties for PubRelPacket

type PublishPacket

type PublishPacket struct {
	BasePacket
	IsDup     bool
	Qos       QosLevel
	IsRetain  bool
	TopicName string
	Payload   []byte
	PacketID  uint16
	Props     *PublishProps
}

PublishPacket is sent from a Client to a Server or from Server to a Client to transport an Application Message.

func (*PublishPacket) Bytes

func (p *PublishPacket) Bytes() []byte

Bytes calls WriteTo

func (*PublishPacket) Type

func (p *PublishPacket) Type() CtrlType

Type of PublishPacket is CtrlPublish

func (*PublishPacket) WriteTo

func (p *PublishPacket) WriteTo(w BufferedWriter) error

WriteTo serializes -- should be Write(io.Writer) error

type PublishProps

type PublishProps struct {
	// PayloadFormat Indicator
	// 0, Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator
	// 1, Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload
	PayloadFormat byte // required in server

	// MessageExpiryInterval
	// Lifetime of the Application Message in seconds
	// If absent, the Application Message does not expire
	MessageExpiryInterval uint32

	// A Topic Alias is an integer value that is used to identify the Topic
	// instead of using the Topic Name.
	//
	// This reduces the size of the PUBLISH packet, and is useful when the
	// Topic Names are long and the same Topic Names are used repetitively
	// within a Network Connection
	TopicAlias uint16

	// RespTopic Used as the Topic Name for a response message
	RespTopic string

	// CorrelationData used by the sender of the Request Message to identify which request the Response Message is for when it is received
	CorrelationData []byte

	// User defined Properties
	UserProps UserProps

	// SubIDs the identifier of the subscription (always no 0)
	//
	// Multiple Subscription Identifiers will be included if the publication
	// is the result of a match to more than one subscription, in this case
	// their order is not significant
	SubIDs []int

	// ContentType describe the content of the Application Message
	ContentType string
}

PublishProps properties for PublishPacket

type QosLevel

type QosLevel = byte

QosLevel is either 0, 1, 2

const (
	Qos0 QosLevel = 0x00 // Qos0 = 0
	Qos1 QosLevel = 0x01 // Qos1 = 1
	Qos2 QosLevel = 0x02 // Qos2 = 2
)

type RegexRouter

type RegexRouter struct {
	// contains filtered or unexported fields
}

RegexRouter use regex to match topic messages

func NewRegexRouter

func NewRegexRouter() *RegexRouter

NewRegexRouter will create a regex router

func (*RegexRouter) Dispatch

func (r *RegexRouter) Dispatch(client Client, p *PublishPacket)

Dispatch the received packet

func (*RegexRouter) Handle

func (r *RegexRouter) Handle(topicRegex string, h TopicHandleFunc)

Handle will register the topic with handler

func (*RegexRouter) Name

func (r *RegexRouter) Name() string

Name is the name of router

type StandardRouter

type StandardRouter struct {
	// contains filtered or unexported fields
}

StandardRouter implements standard MQTT routing behaviour

func NewStandardRouter

func NewStandardRouter() *StandardRouter

NewStandardRouter will create a standard mqtt router

func (*StandardRouter) Dispatch

func (s *StandardRouter) Dispatch(client Client, p *PublishPacket)

Dispatch defines the action to dispatch published packet

func (*StandardRouter) Handle

func (s *StandardRouter) Handle(topic string, h TopicHandleFunc)

Handle defines how to register topic with handler

func (*StandardRouter) Name

func (s *StandardRouter) Name() string

Name is the name of router

type SubAckPacket

type SubAckPacket struct {
	BasePacket
	PacketID uint16
	Codes    []byte
	Props    *SubAckProps
}

SubAckPacket is sent by the Server to the Client to confirm receipt and processing of a SubscribePacket.

SubAckPacket contains a list of return codes, that specify the maximum QoS level that was granted in each Subscription that was requested by the SubscribePacket.

func (*SubAckPacket) Bytes

func (s *SubAckPacket) Bytes() []byte

Bytes to serialize

func (*SubAckPacket) Type

func (s *SubAckPacket) Type() CtrlType

Type of SubAckPacket is CtrlSubAck

func (*SubAckPacket) WriteTo

func (s *SubAckPacket) WriteTo(w BufferedWriter) error

WriteTo to serialize

type SubAckProps

type SubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

SubAckProps properties for SubAckPacket

type SubHandleFunc

type SubHandleFunc func(client Client, topics []*Topic, err error)

SubHandleFunc handles the error occurred when subscribe some topic if err is not nil, that means a error occurred when sending sub msg

type SubHandler deprecated

type SubHandler func(topics []*Topic, err error)

Deprecated: use SubHandleFunc instead, will be removed in v1.0

type SubscribePacket

type SubscribePacket struct {
	BasePacket
	PacketID uint16
	Topics   []*Topic
	Props    *SubscribeProps
}

SubscribePacket is sent from the Client to the Server to create one or more Subscriptions.

Each Subscription registers a Client's interest in one or more TopicNames. The Server sends PublishPackets to the Client in order to forward Application Messages that were published to TopicNames that match these Subscriptions. The SubscribePacket also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to the Client

func (*SubscribePacket) Bytes

func (s *SubscribePacket) Bytes() []byte

Bytes to call write

func (*SubscribePacket) Type

func (s *SubscribePacket) Type() CtrlType

Type of SubscribePacket is CtrlSubscribe

func (*SubscribePacket) WriteTo

func (s *SubscribePacket) WriteTo(w BufferedWriter) error

WriteTo to serialize

type SubscribeProps

type SubscribeProps struct {
	// SubID identifier of the subscription
	SubID int
	// UserProps User defined Properties
	UserProps UserProps
}

SubscribeProps properties for SubscribePacket

type TextRouter

type TextRouter struct {
	// contains filtered or unexported fields
}

TextRouter uses plain string comparison to dispatch topic message this is the default router in client

func NewTextRouter

func NewTextRouter() *TextRouter

NewTextRouter will create a text based router

func (*TextRouter) Dispatch

func (r *TextRouter) Dispatch(client Client, p *PublishPacket)

Dispatch the received packet

func (*TextRouter) Handle

func (r *TextRouter) Handle(topic string, h TopicHandleFunc)

Handle will register the topic with handler

func (*TextRouter) Name

func (r *TextRouter) Name() string

Name of TextRouter is "TextRouter"

type Topic

type Topic struct {
	Name string
	Qos  QosLevel
}

Topic for both topic name and topic qos

func (*Topic) String

func (t *Topic) String() string

type TopicHandleFunc

type TopicHandleFunc func(client Client, topic string, qos QosLevel, msg []byte)

TopicHandleFunc handles topic sub message topic is the client user provided topic code can be SubOkMaxQos0, SubOkMaxQos1, SubOkMaxQos2, SubFail

type TopicHandler deprecated

type TopicHandler func(topic string, qos QosLevel, msg []byte)

Deprecated: use TopicHandleFunc instead, will be removed in v1.0

type TopicRouter

type TopicRouter interface {
	// Name is the name of router
	Name() string
	// Handle defines how to register topic with handler
	Handle(topic string, h TopicHandleFunc)
	// Dispatch defines the action to dispatch published packet
	Dispatch(client Client, p *PublishPacket)
}

TopicRouter defines how to route the topic message to handler

type UnSubAckPacket

type UnSubAckPacket = UnsubAckPacket

UnSubAckPacket is confusing

type UnSubAckProps

type UnSubAckProps = UnsubAckProps

type UnSubHandler deprecated

type UnSubHandler func(topics []string, err error)

Deprecated: use UnsubHandleFunc instead, will be removed in v1.0

type UnSubProps

type UnSubProps = UnsubProps // why??

UnSubProps because why?

type UnsubAckPacket

type UnsubAckPacket struct {
	BasePacket
	PacketID uint16
	Props    *UnsubAckProps
}

UnsubAckPacket is sent by the Server to the Client to confirm receipt of an UnsubPacket

func (*UnsubAckPacket) Bytes

func (s *UnsubAckPacket) Bytes() []byte

Bytes is a utility

func (*UnsubAckPacket) Type

func (s *UnsubAckPacket) Type() CtrlType

Type of UnsubAckPacket is CtrlUnSubAck

func (*UnsubAckPacket) WriteTo

func (s *UnsubAckPacket) WriteTo(w BufferedWriter) error

WriteTo is

type UnsubAckProps

type UnsubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

UnsubAckProps properties for UnsubAckPacket

type UnsubHandleFunc

type UnsubHandleFunc func(client Client, topics []string, err error)

UnsubHandleFunc handles the error occurred when publish some message

type UnsubPacket

type UnsubPacket struct {
	BasePacket
	PacketID   uint16
	TopicNames []string
	Props      *UnsubProps
}

UnsubPacket is sent by the Client to the Server, to unsubscribe from topics.

func (*UnsubPacket) Bytes

func (s *UnsubPacket) Bytes() []byte

Bytes calls WriteTo

func (*UnsubPacket) Type

func (s *UnsubPacket) Type() CtrlType

Type of UnsubPacket is CtrlUnSub

func (*UnsubPacket) WriteTo

func (s *UnsubPacket) WriteTo(w BufferedWriter) error

WriteTo should be just Write

type UnsubProps

type UnsubProps struct {
	// UserProps User defined Properties
	UserProps UserProps
}

UnsubProps properties for UnsubPacket

type UserProps

type UserProps map[string][]string

UserProps contains user defined properties

func (UserProps) Add

func (u UserProps) Add(key, value string)

func (UserProps) Del

func (u UserProps) Del(key string)

func (UserProps) Get

func (u UserProps) Get(key string) (string, bool)

func (UserProps) Set

func (u UserProps) Set(key string, value string)

type WillProps

type WillProps struct {
	// The Server delays publishing the Client’s Will Message until
	// the Will Delay Interval has passed or the Session ends, whichever happens first.
	//
	// If a new Network Connection to this Session is made before the Will Delay Interval has passed,
	// the Server MUST NOT send the Will Message
	WillDelayInterval uint32

	PayloadFormat uint8

	// the lifetime of the Will Message in seconds and is sent as the Publication Expiry Interval
	// when the Server publishes the Will Message.
	MessageExpiryInterval uint32

	// String describing the content of the Will Message
	ContentType string

	// String which is used as the Topic Name for a response message
	ResponseTopic string

	//  The Correlation Data is used by the sender of the Request Message to identify which request the Response Message is for when it is received.
	CorrelationData []byte

	UserProps UserProps
}

Directories

Path Synopsis
c
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL