gmqtt

package module
v0.0.0-...-4c22015 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2022 License: MIT Imports: 17 Imported by: 0

README

gmqtt

MQTT for Golang Client

import (
    "github.com/sanxia/gmqtt"
)

const TOPIC = "sanxia/classroom"

func main() {
    opts := gmqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
    opts.SetClientID("mqtt-sanxia-2")
    opts.SetKeepAlive(2 * time.Second)
    opts.SetPingTimeout(1 * time.Second)
    opts.SetAutoReconnect(true)

    opts.SetDefaultPublishHandler(func(client gmqtt.Client, msg gmqtt.Message) {
        message := string(msg.Payload()[:])
        fmt.Printf("gmqtt topic: %s, id: %d, msg: %s\n", msg.Topic(), msg.MessageId(), message)
    })

    //连接成功回调
    opts.SetOnConnectHandler(func(client gmqtt.Client) {
        fmt.Println("gmqtt connect OK")

        //订阅
        fmt.Println("gmqtt Subscribe")

        if token := client.Subscribe(TOPIC, 0, nil); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
            os.Exit(1)
        }
    })

    //连接失败回调
    opts.SetConnectionLostHandler(func(client gmqtt.Client, err error) {
        fmt.Printf("gmqtt ConnectionLost %v\n", err)
    })

    client := gmqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    //发布消息
    go func() {
        i := 0

        for {
            var msg string
            if i%2 == 0 {
                msg = "one"
            } else {
                msg = "two"
            }

            fmt.Println("gmqtt Publish")

            token := client.Publish(TOPIC, 1, true, msg).(*gmqtt.PublishToken)

            fmt.Printf("gmqtt Publish msgId: %d\r\n", token.MessageId())

            token.Wait()
            time.Sleep(2 * time.Second)
            i++
        }
    }()

    time.Sleep(60 * time.Second)

    //取消订阅
    fmt.Println("gmqtt.Unsubscribe")

    if token := client.Unsubscribe(TOPIC); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    time.Sleep(10 * time.Second)
    fmt.Println("gmqtt Disconnect")
    
    client.Disconnect(250)
    time.Sleep(3 * time.Second)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotConnected = errors.New("Not Connected")

ErrNotConnected is the error returned from function calls that are made when the client is not connected to a broker

Functions

func DefaultConnectionLostHandler

func DefaultConnectionLostHandler(client Client, reason error)

DefaultConnectionLostHandler is a definition of a function that simply reports to the DEBUG log the reason for the client losing a connection.

Types

type Client

type Client interface {
	IsConnected() bool
	Connect() Token
	Disconnect(quiesce uint)
	Publish(topic string, qos byte, retained bool, payload interface{}) Token
	Subscribe(topic string, qos byte, callback MessageHandler) Token
	SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
	Unsubscribe(topics ...string) Token
	AddRoute(topic string, callback MessageHandler)
}

func NewClient

func NewClient(o *ClientOptions) Client

type ClientOptions

type ClientOptions struct {
	Servers         []*url.URL
	ClientID        string
	Username        string
	Password        string
	CleanSession    bool
	Order           bool
	WillEnabled     bool
	WillTopic       string
	WillPayload     []byte
	WillQos         byte
	WillRetained    bool
	ProtocolVersion uint

	TLSConfig             tls.Config
	KeepAlive             int64
	PingTimeout           time.Duration
	ConnectTimeout        time.Duration
	MaxReconnectInterval  time.Duration
	AutoReconnect         bool
	Store                 stores.Store
	DefaultPublishHandler MessageHandler
	OnConnect             OnConnectHandler
	OnConnectionLost      ConnectionLostHandler
	WriteTimeout          time.Duration
	MessageChannelDepth   uint
	// contains filtered or unexported fields
}

func NewClientOptions

func NewClientOptions() *ClientOptions

NewClientOptions will create a new ClientClientOptions type with some default values.

Port: 1883
CleanSession: True
Order: True
KeepAlive: 30 (seconds)
ConnectTimeout: 30 (seconds)
MaxReconnectInterval 10 (minutes)
AutoReconnect: True

func (*ClientOptions) AddBroker

func (o *ClientOptions) AddBroker(server string) *ClientOptions

AddBroker adds a broker URI to the list of brokers to be used. The format should be scheme://host:port Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname) and "port" is the port on which the broker is accepting connections.

An example broker URI would look like: tcp://foobar.com:1883

func (*ClientOptions) SetAutoReconnect

func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions

SetAutoReconnect sets whether the automatic reconnection logic should be used when the connection is lost, even if disabled the ConnectionLostHandler is still called

func (*ClientOptions) SetBinaryWill

func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions

SetBinaryWill accepts a []byte will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.

func (*ClientOptions) SetCleanSession

func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions

SetCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before diconnecting previously but didn't will not be sent upon connecting to the broker.

func (*ClientOptions) SetClientID

func (o *ClientOptions) SetClientID(id string) *ClientOptions

SetClientID will set the client id to be used by this client when connecting to the MQTT broker. According to the MQTT v3.1 specification, a client id mus be no longer than 23 characters.

func (*ClientOptions) SetConnectTimeout

func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions

SetConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timeing out and erroring the attempt. A duration of 0 never times out. Default 30 seconds. Currently only operational on TCP/TLS connections.

func (*ClientOptions) SetConnectionLostHandler

func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions

SetConnectionLostHandler will set the OnConnectionLost callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.

func (*ClientOptions) SetDefaultPublishHandler

func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions

SetDefaultPublishHandler sets the MessageHandler that will be called when a message is received that does not match any known subscriptions.

func (*ClientOptions) SetKeepAlive

func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions

SetKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server.

func (*ClientOptions) SetMaxReconnectInterval

func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions

SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts when connection is lost

func (*ClientOptions) SetMessageChannelDepth

func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions

SetMessageChannelDepth sets the size of the internal queue that holds messages while the client is temporairily offline, allowing the application to publish when the client is reconnecting. This setting is only valid if AutoReconnect is set to true, it is otherwise ignored.

func (*ClientOptions) SetOnConnectHandler

func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions

SetOnConnectHandler sets the function to be called when the client is connected. Both at initial connection time and upon automatic reconnect.

func (*ClientOptions) SetOrderMatters

func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions

SetOrderMatters will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false, this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order.

func (*ClientOptions) SetPassword

func (o *ClientOptions) SetPassword(p string) *ClientOptions

SetPassword will set the password to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.

func (*ClientOptions) SetPingTimeout

func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions

SetPingTimeout will set the amount of time (in seconds) that the client will wait after sending a PING request to the broker, before deciding that the connection has been lost. Default is 10 seconds.

func (*ClientOptions) SetProtocolVersion

func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions

SetProtocolVersion sets the MQTT version to be used to connect to the broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1

func (*ClientOptions) SetStore

func (o *ClientOptions) SetStore(s stores.Store) *ClientOptions

SetStore will set the implementation of the Store interface used to provide message persistence in cases where QoS levels QoS_ONE or QoS_TWO are used. If no store is provided, then the client will use MemoryStore by default.

func (*ClientOptions) SetTLSConfig

func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions

SetTLSConfig will set an SSL/TLS configuration to be used when connecting to an MQTT broker. Please read the official Go documentation for more information.

func (*ClientOptions) SetUsername

func (o *ClientOptions) SetUsername(u string) *ClientOptions

SetUsername will set the username to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.

func (*ClientOptions) SetWill

func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions

SetWill accepts a string will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.

func (*ClientOptions) SetWriteTimeout

func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions

SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a timeout error. A duration of 0 never times out. Default 30 seconds

func (*ClientOptions) UnsetWill

func (o *ClientOptions) UnsetWill() *ClientOptions

UnsetWill will cause any set will message to be disregarded.

type ClientOptionsReader

type ClientOptionsReader struct {
	// contains filtered or unexported fields
}

ClientOptionsReader provides an interface for reading ClientOptions after the client has been initialized.

func (*ClientOptionsReader) AutoReconnect

func (r *ClientOptionsReader) AutoReconnect() bool

func (*ClientOptionsReader) CleanSession

func (r *ClientOptionsReader) CleanSession() bool

CleanSession returns whether Cleansession is set

func (*ClientOptionsReader) ClientID

func (r *ClientOptionsReader) ClientID() string

ClientID returns the set client id

func (*ClientOptionsReader) ConnectTimeout

func (r *ClientOptionsReader) ConnectTimeout() time.Duration

func (*ClientOptionsReader) KeepAlive

func (r *ClientOptionsReader) KeepAlive() time.Duration

func (*ClientOptionsReader) MaxReconnectInterval

func (r *ClientOptionsReader) MaxReconnectInterval() time.Duration

func (*ClientOptionsReader) MessageChannelDepth

func (r *ClientOptionsReader) MessageChannelDepth() uint

func (*ClientOptionsReader) Order

func (r *ClientOptionsReader) Order() bool

func (*ClientOptionsReader) Password

func (r *ClientOptionsReader) Password() string

Password returns the set password

func (*ClientOptionsReader) PingTimeout

func (r *ClientOptionsReader) PingTimeout() time.Duration

func (*ClientOptionsReader) ProtocolVersion

func (r *ClientOptionsReader) ProtocolVersion() uint

func (*ClientOptionsReader) Servers

func (r *ClientOptionsReader) Servers() []*url.URL

Servers returns a slice of the servers defined in the clientoptions

func (*ClientOptionsReader) TLSConfig

func (r *ClientOptionsReader) TLSConfig() tls.Config

func (*ClientOptionsReader) Username

func (r *ClientOptionsReader) Username() string

Username returns the set username

func (*ClientOptionsReader) WillEnabled

func (r *ClientOptionsReader) WillEnabled() bool

func (*ClientOptionsReader) WillPayload

func (r *ClientOptionsReader) WillPayload() []byte

func (*ClientOptionsReader) WillQos

func (r *ClientOptionsReader) WillQos() byte

func (*ClientOptionsReader) WillRetained

func (r *ClientOptionsReader) WillRetained() bool

func (*ClientOptionsReader) WillTopic

func (r *ClientOptionsReader) WillTopic() string

func (*ClientOptionsReader) WriteTimeout

func (r *ClientOptionsReader) WriteTimeout() time.Duration

type ConnectToken

type ConnectToken struct {
	// contains filtered or unexported fields
}

ConnectToken is an extension of Token containing the extra fields required to provide information about calls to Connect()

func (*ConnectToken) Error

func (b *ConnectToken) Error() error

func (*ConnectToken) ReturnCode

func (c *ConnectToken) ReturnCode() byte

ReturnCode returns the acknowlegement code in the connack sent in response to a Connect()

func (*ConnectToken) Wait

func (b *ConnectToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*ConnectToken) WaitTimeout

func (b *ConnectToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time.Duration to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type ConnectionLostHandler

type ConnectionLostHandler func(Client, error)

type DisconnectToken

type DisconnectToken struct {
	// contains filtered or unexported fields
}

DisconnectToken is an extension of Token containing the extra fields required to provide information about calls to Disconnect()

func (*DisconnectToken) Error

func (b *DisconnectToken) Error() error

func (*DisconnectToken) Wait

func (b *DisconnectToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*DisconnectToken) WaitTimeout

func (b *DisconnectToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time.Duration to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type DummyToken

type DummyToken struct {
	// contains filtered or unexported fields
}

func (*DummyToken) Error

func (d *DummyToken) Error() error

func (*DummyToken) Wait

func (d *DummyToken) Wait() bool

func (*DummyToken) WaitTimeout

func (d *DummyToken) WaitTimeout(t time.Duration) bool

type MId

type MId uint16

MId is 16 bit message id as specified by the MQTT spec. In general, these values should not be depended upon by the client application.

type Message

type Message interface {
	Duplicate() bool
	Qos() byte
	Retained() bool
	Topic() string
	MessageId() uint16
	Payload() []byte
}

type MessageHandler

type MessageHandler func(Client, Message)

type OnConnectHandler

type OnConnectHandler func(Client)

type PacketAndToken

type PacketAndToken struct {
	// contains filtered or unexported fields
}

PacketAndToken is a struct that contains both a ControlPacket and a Token. This struct is passed via channels between the client interface code and the underlying code responsible for sending and receiving MQTT messages.

type PublishToken

type PublishToken struct {
	// contains filtered or unexported fields
}

PublishToken is an extension of Token containing the extra fields required to provide information about calls to Publish()

func (*PublishToken) Error

func (b *PublishToken) Error() error

func (*PublishToken) MessageId

func (p *PublishToken) MessageId() uint16

MessageId returns the MQTT message ID that was assigned to the Publish packet when it was sent to the broker

func (*PublishToken) Wait

func (b *PublishToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*PublishToken) WaitTimeout

func (b *PublishToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time.Duration to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type SubscribeToken

type SubscribeToken struct {
	// contains filtered or unexported fields
}

SubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Subscribe()

func (*SubscribeToken) Error

func (b *SubscribeToken) Error() error

func (*SubscribeToken) Result

func (s *SubscribeToken) Result() map[string]byte

Result returns a map of topics that were subscribed to along with the matching return code from the broker. This is either the Qos value of the subscription or an error code.

func (*SubscribeToken) Wait

func (b *SubscribeToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*SubscribeToken) WaitTimeout

func (b *SubscribeToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time.Duration to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type Token

type Token interface {
	Wait() bool
	WaitTimeout(time.Duration) bool
	Error() error
}

Token defines the interface for the tokens used to indicate when actions have completed.

type UnsubscribeToken

type UnsubscribeToken struct {
	// contains filtered or unexported fields
}

UnsubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Unsubscribe()

func (*UnsubscribeToken) Error

func (b *UnsubscribeToken) Error() error

func (*UnsubscribeToken) Wait

func (b *UnsubscribeToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*UnsubscribeToken) WaitTimeout

func (b *UnsubscribeToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time.Duration to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL