mqtt

package module
v0.0.0-...-c2993f0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2014 License: EPL-1.0 Imports: 18 Imported by: 0

README

Eclipse Paho MQTT Go client

This repository contains the source code for the Eclipse Paho MQTT Go client library.

This code builds a library which enable applications to connect to an MQTT broker to publish messages, and to subscribe to topics and receive published messages.

This library supports a fully asynchronous mode of operation.

Installation and Build

This client is designed to work with the standard Go tools, so installation is as easy as:

go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git

The client depends on Google's websockets package, also easily installed with the command:

go get code.google.com/p/go.net/websocket

Usage and API

Detailed API documentation is available by using to godoc tool, or can be browsed online using the godoc.org service.

Make use of the library by importing it in your Go client source code. For example,

import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"

Samples are available in the /samples directory for reference.

Runtime tracing

Tracing is enabled by using the SetTraceLevel option when creating a ClientOptions struct. See the ClientOptions documentation for more details.

Reporting bugs

Please report bugs under the "MQTT-Go" Component in Eclipse Bugzilla for the Paho Technology project. This is a very new library as of Q1 2014, so there are sure to be bugs.

More information

Discussion of the Paho clients takes place on the Eclipse paho-dev mailing list.

General questions about the MQTT protocol are discussed in the MQTT Google Group.

There is much more information available via the MQTT community site.

Documentation

Overview

Package mqtt provides an MQTT v3.1 client library.

Index

Constants

View Source
const (
	NET component = "[net]     "
	PNG component = "[pinger]  "
	CLI component = "[client]  "
	DEC component = "[decode]  "
	MES component = "[message] "
	STR component = "[store]   "
	MID component = "[msgids]  "
	TST component = "[test]    "
	STA component = "[state]   "
	ERR component = "[error]   "
)
View Source
const (
	Off      tracelevel = 0
	Critical tracelevel = 10
	Warn     tracelevel = 20
	Verbose  tracelevel = 30
)

Variables

View Source
var ErrBadCredentials = errors.New("Bad user name or password")
View Source
var ErrInvalidClientID = errors.New("Identifier rejected")
View Source
var ErrInvalidProtocolVersion = errors.New("Unnacceptable protocol version")

* Connect Errors

View Source
var ErrInvalidQoS = errors.New("Invalid QoS")

* QoS Errors

View Source
var ErrInvalidTopicFilterEmptyString = errors.New("Invalid TopicFilter - may not be empty string")
View Source
var ErrInvalidTopicFilterMultilevel = errors.New("Invalid TopicFilter - multi-level wildcard must be last level")
View Source
var ErrInvalidTopicNameEmptyString = errors.New("Invalid TopicName - may not be empty string")

* Topic Errors

View Source
var ErrInvalidTopicNameWildcard = errors.New("Invalid TopicName - may not contain wild card")
View Source
var ErrNotAuthorized = errors.New("Not Authorized")
View Source
var ErrNotConnected = errors.New("Not Connected")
View Source
var ErrServerUnavailable = errors.New("Server Unavailable")
View Source
var ErrUnknownReason = errors.New("Unknown RC")

Functions

func DefaultErrorHandler

func DefaultErrorHandler(client *MqttClient, reason error)

Types

type ClientOptions

type ClientOptions struct {
	// contains filtered or unexported fields
}

ClientOptions contains configurable options for an MqttClient.

func NewClientOptions

func NewClientOptions() *ClientOptions

NewClientClientOptions will create a new ClientClientOptions type with some default values.

Port: 1883
CleanSession: True
Timeout: 30 (seconds)
Tracefile: os.Stdout

func (*ClientOptions) SetBinaryWill

func (opts *ClientOptions) SetBinaryWill(topic string, payload []byte, qos QoS, retained bool) *ClientOptions

SetBinaryWill accepts a []byte will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.

func (*ClientOptions) SetBroker

func (opts *ClientOptions) SetBroker(server string) *ClientOptions

SetBroker will allow you to set the URI for your broker. The format should be scheme://host:port Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname) and "port" is the port on which the broker is accepting connections. For example, one could connect to tcp://test.mosquitto.org:1883

func (*ClientOptions) SetCleanSession

func (opts *ClientOptions) SetCleanSession(clean bool) *ClientOptions

SetCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before diconnecting previously but didn't will not be sent upon connecting to the broker.

func (*ClientOptions) SetClientId

func (opts *ClientOptions) SetClientId(clientid string) *ClientOptions

SetClientId will set the client id to be used by this client when connecting to the MQTT broker. According to the MQTT v3.1 specification, a client id mus be no longer than 23 characters.

func (*ClientOptions) SetDefaultPublishHandler

func (opts *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions

SetDefaultPublishHandler

func (*ClientOptions) SetOnConnectionLost

func (opts *ClientOptions) SetOnConnectionLost(onLost OnConnectionLost) *ClientOptions

SetOnConnectionLost will set the OnConnectionLost callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.

func (*ClientOptions) SetOrderMatters

func (opts *ClientOptions) SetOrderMatters(order bool) *ClientOptions

SetOrderMatters will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false, this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order.

func (*ClientOptions) SetPassword

func (opts *ClientOptions) SetPassword(password string) *ClientOptions

SetPassword will set the password to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.

func (*ClientOptions) SetStandbyBroker

func (opts *ClientOptions) SetStandbyBroker(server string) *ClientOptions

SetStandbyBroker will allow you to set a second URI to which the client will attempt to connect in the event of a connection failure. This is for use only in cases where two brokers are configured as a highly available pair. (For example, two IBM MessageSight appliances configured in High Availability mode).

func (*ClientOptions) SetStore

func (opts *ClientOptions) SetStore(store Store) *ClientOptions

SetStore will set the implementation of the Store interface used to provide message persistence in cases where QoS levels QoS_ONE or QoS_TWO are used. If no store is provided, then the client will use MemoryStore by default.

func (*ClientOptions) SetTimeout

func (opts *ClientOptions) SetTimeout(timeout uint) *ClientOptions

SetTimeout will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server.

func (*ClientOptions) SetTlsConfig

func (opts *ClientOptions) SetTlsConfig(tlsconfig *tls.Config) *ClientOptions

SetTlsConfig will set an SSL/TLS configuration to be used when connecting to an MQTT broker. Please read the official Go documentation for more information.

func (*ClientOptions) SetTraceLevel

func (opts *ClientOptions) SetTraceLevel(level tracelevel) *ClientOptions

SetTraceLevel will set the trace level (verbosity) of the client. Options are:

Off
Critical
Warn
Verbose

func (*ClientOptions) SetTracefile

func (opts *ClientOptions) SetTracefile(tracefile *os.File) *ClientOptions

SetTracefile will set the output for any trace statements that are generated by the client. By default, trace statements will be directed to os.Stdout.

func (*ClientOptions) SetUsername

func (opts *ClientOptions) SetUsername(username string) *ClientOptions

SetUsername will set the username to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.

func (*ClientOptions) SetWill

func (opts *ClientOptions) SetWill(topic string, payload string, qos QoS, retained bool) *ClientOptions

SetWill accepts a string will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.

func (*ClientOptions) SetWriteTimeout

func (opts *ClientOptions) SetWriteTimeout(t time.Duration)

SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a timeout error. A duration of 0 never times out.

func (*ClientOptions) UnsetWill

func (opts *ClientOptions) UnsetWill() *ClientOptions

UnsetWill will cause any set will message to be disregarded.

type ConnRC

type ConnRC int8
const (
	CONN_FAILURE           ConnRC = -1
	CONN_ACCEPTED          ConnRC = 0x00
	CONN_REF_BAD_PROTO_VER ConnRC = 0x01
	CONN_REF_ID_REJ        ConnRC = 0x02
	CONN_REF_SERV_UNAVAIL  ConnRC = 0x03
	CONN_REF_BAD_USER_PASS ConnRC = 0x04
	CONN_REF_NOT_AUTH      ConnRC = 0x05
)

Connection Return Codes

type FileStore

type FileStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

FileStore implements the store interface using the filesystem to provide true persistence, even across client failure. This is designed to use a single directory per running client. If you are running multiple clients on the same filesystem, you will need to be careful to specify unique store directories for each.

func NewFileStore

func NewFileStore(directory string) *FileStore

NewFileStore will create a new FileStore which stores its messages in the directory provided.

func (*FileStore) All

func (store *FileStore) All() []string

All will provide a list of all of the keys associated with messages currenly residing in the FileStore.

func (*FileStore) Close

func (store *FileStore) Close()

Close will disallow the FileStore from being used.

func (*FileStore) Del

func (store *FileStore) Del(key string)

Del will remove the persisted message associated with the provided key from the FileStore.

func (*FileStore) Get

func (store *FileStore) Get(key string) (m *Message)

Get will retrieve a message from the store, the one associated with the provided key value.

func (*FileStore) Open

func (store *FileStore) Open()

Open will allow the FileStore to be used.

func (*FileStore) Put

func (store *FileStore) Put(key string, m *Message)

Put will put a message into the store, associated with the provided key value.

func (*FileStore) Reset

func (store *FileStore) Reset()

Reset will remove all persisted messages from the FileStore.

func (*FileStore) SetTracer

func (store *FileStore) SetTracer(trace *Tracer)

type MId

type MId uint16

MId is 16 bit message id as specified by the MQTT spec. In general, these values should not be depended upon by the client application.

const (
	MId_MAX MId = 65535
	MId_MIN MId = 1
)

type MemoryStore

type MemoryStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryStore implements the store interface to provide a "persistence" mechanism wholly stored in memory. This is only useful for as long as the client instance exists.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore returns a pointer to a new instance of MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.

func (*MemoryStore) All

func (store *MemoryStore) All() []string

All returns a slice of strings containing all the keys currently in the MemoryStore.

func (*MemoryStore) Close

func (store *MemoryStore) Close()

Close will disallow modifications to the state of the store.

func (*MemoryStore) Del

func (store *MemoryStore) Del(key string)

Del takes a key, searches the MemoryStore and if the key is found deletes the Message pointer associated with it.

func (*MemoryStore) Get

func (store *MemoryStore) Get(key string) *Message

Get takes a key and looks in the store for a matching Message returning either the Message pointer or nil.

func (*MemoryStore) Open

func (store *MemoryStore) Open()

Open initializes a MemoryStore instance.

func (*MemoryStore) Put

func (store *MemoryStore) Put(key string, message *Message)

Put takes a key and a pointer to a Message and stores the message.

func (*MemoryStore) Reset

func (store *MemoryStore) Reset()

Reset eliminates all persisted message data in the store.

func (*MemoryStore) SetTracer

func (store *MemoryStore) SetTracer(tracer *Tracer)

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(message []byte) *Message

Create a default PUBLISH Message with the specified payload If message == nil, create a zero length message Defaults: QoS=1, Retained=False

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes operates on a Message pointer and returns a slice of bytes representing the Message ready for transmission over the network

func (*Message) DupFlag

func (m *Message) DupFlag() bool

DupFlag returns the boolean value of the duplicate message flag as encoded in the fixed header

func (*Message) MsgId

func (m *Message) MsgId() MId

MsgId returns a MId containing the message id of the Message

func (*Message) Payload

func (m *Message) Payload() []byte

Payload returns a slice of bytes containing the payload of the Message

func (*Message) QoS

func (m *Message) QoS() QoS

QoS returns the QoS value of the Message as encoded in the fixed header

func (*Message) RetainedFlag

func (m *Message) RetainedFlag() bool

RetainedFlag returns a boolean value indicating whether this message was a retained message

func (*Message) SetQoS

func (m *Message) SetQoS(qos QoS)

setQoS takes a QoS value and encodes this value in the fixed header of the Message

func (*Message) SetRetainedFlag

func (m *Message) SetRetainedFlag(isRetained bool)

SetRetainedFlag takes a boolean value indicating whether the server should retain the message and encodes it in the fixed header

func (*Message) Topic

func (m *Message) Topic() string

Topic returns the topic of the Message as encoded in the variable header

type MessageHandler

type MessageHandler func(client *MqttClient, message Message)

MessageHandler is a callback type which can be set to be executed upon the arrival of messages published to topics to which the client is subscribed.

type MqttClient

type MqttClient struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Numerous connection options may be specified by configuring a and then supplying a ClientOptions type.

func NewClient

func NewClient(ops *ClientOptions) *MqttClient

NewClient will create an MQTT v3.1 client with all of the options specified in the provided ClientOptions. The client must have the Start method called on it before it may be used. This is to make sure resources (such as a net connection) are created before the application is actually ready.

func (*MqttClient) Disconnect

func (c *MqttClient) Disconnect(quiesce uint)

Disconnect will end the connection with the server, but not before waiting the specified number of milliseconds to wait for existing work to be completed.

func (*MqttClient) EndSubscription

func (c *MqttClient) EndSubscription(topics ...string) (<-chan Receipt, error)

EndSubscription will end the subscription from each of the topics provided. Messages published to those topics from other clients will no longer be received.

func (*MqttClient) ForceDisconnect

func (c *MqttClient) ForceDisconnect()

ForceDisconnect will end the connection with the mqtt broker immediately.

func (*MqttClient) IsConnected

func (c *MqttClient) IsConnected() bool

func (*MqttClient) Publish

func (c *MqttClient) Publish(qos QoS, topic string, payload interface{}) <-chan Receipt

Publish will publish a message with the specified QoS and content to the specified topic. Returns a read only channel used to track the delivery of the message.

func (*MqttClient) PublishMessage

func (c *MqttClient) PublishMessage(topic string, message *Message) <-chan Receipt

PublishMessage will publish a Message to the specified topic. Returns a read only channel used to track the delivery of the message.

func (*MqttClient) Start

func (c *MqttClient) Start() ([]Receipt, error)

Start will create a connection to the message broker If clean session is false, then a slice will be returned containing Receipts for all messages that were in-flight at the last disconnect. If clean session is true, then any existing client state will be removed.

func (*MqttClient) StartSubscription

func (c *MqttClient) StartSubscription(callback MessageHandler, filters ...*TopicFilter) (<-chan Receipt, error)

Start a new subscription. Provide a MessageHandler to be executed when a message is published on one of the topics provided.

type MsgType

type MsgType byte
const (
	/* 0x00 is reserved */
	CONNECT     MsgType = 0x01
	CONNACK     MsgType = 0x02
	PUBLISH     MsgType = 0x03
	PUBACK      MsgType = 0x04
	PUBREC      MsgType = 0x05
	PUBREL      MsgType = 0x06
	PUBCOMP     MsgType = 0x07
	SUBSCRIBE   MsgType = 0x08
	SUBACK      MsgType = 0x09
	UNSUBSCRIBE MsgType = 0x0A
	UNSUBACK    MsgType = 0x0B
	PINGREQ     MsgType = 0x0C
	PINGRESP    MsgType = 0x0D
	DISCONNECT  MsgType = 0x0E
)

MsgType

type OnConnectionLost

type OnConnectionLost func(client *MqttClient, reason error)

OnConnectionLost is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an OnConnectionLost callback to execute.

type QoS

type QoS byte
const (
	QOS_ZERO QoS = 0
	QOS_ONE  QoS = 1
	QOS_TWO  QoS = 2
)

QoS LEVEL

type Receipt

type Receipt struct {
}

Receipt is a sort of token object that you will receive upon delivery of a published message.

type Store

type Store interface {
	Open()
	Put(string, *Message)
	Get(string) *Message
	All() []string
	Del(string)
	Close()
	Reset()
	SetTracer(*Tracer)
}

Store is an interface which can be used to provide implementations for message persistence. Because we may have to store distinct messages with the same message ID, we need a unique key for each message. This is possible by prepending "i." or "o." to each message id

type TopicFilter

type TopicFilter struct {
	QoS
	// contains filtered or unexported fields
}

func NewTopicFilter

func NewTopicFilter(topic string, qos byte) (*TopicFilter, error)

type TopicName

type TopicName struct {
	QoS
	// contains filtered or unexported fields
}

func NewTopicName

func NewTopicName(topic string, qos byte) (*TopicName, error)

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

func (*Tracer) Trace_C

func (t *Tracer) Trace_C(cm component, f string, v ...interface{})

func (*Tracer) Trace_E

func (t *Tracer) Trace_E(cm component, f string, v ...interface{})

func (*Tracer) Trace_V

func (t *Tracer) Trace_V(cm component, f string, v ...interface{})

func (*Tracer) Trace_W

func (t *Tracer) Trace_W(cm component, f string, v ...interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL