Documentation ¶
Overview ¶
This is Version 3 of the package of the go API for devices to interact with the MODE cloud.
You can connect to the cloud in three steps:
- Construct a DeviceContext deviceContext := &mode.DeviceContext{...}
- Use the DeviceContext to construct Mode's implementation of the MqttDelegate. delegate := mode.NewModeMqttDelegate(...)
- Create a client using the delegate client := mode.NewMqttClient(mqttHost, mqttPort, WithMqttDelegate(delegate))
Customization and configuration are done through various delegate interfaces. For
convenience, there is a general delegate (MqttDelegate), as used in the example, which contains all the delegate interfaces except the error delegate. The ModeMqttDelegate is implemented to subscribe and receive the MQTT topics used by MODE, but the MqttClient can be constructed with other implementations to communicate with other MQTT servers.
Reqeusts to the cloud through the MqttClient fall in two categories: non-blocking, or
blocking until a response is received. Refer to the documentation for details. For
non-blocking calls, the MqttReceiverDelegate will receive the response channels from the
client via the SetReceiveChannels().
Package Mode implements the Mode Client MQTT API The interface is through the MqttClient struct, which supports the MQTT subset that is required for our devices and configuration is through the MqttDelegate.
Source for the MODE specific MQTT protocol.
Index ¶
- Constants
- Variables
- func DummyMQTTD(ctx context.Context, wg *sync.WaitGroup, cmdCh chan DummyCmd) bool
- func DummyMQTTDWithConfig(ctx context.Context, wg *sync.WaitGroup, cmdCh chan DummyCmd, conf DummyConfig) bool
- func SetRESTHostPort(host string, port int, useTLS bool)
- func WithAdditionalFormatSubscription(formatTopic string, handler MqttMsgHandler) func(*ModeMqttDelegate)
- func WithAdditionalSubscription(topic string, handler MqttMsgHandler) func(*ModeMqttDelegate)
- func WithMqttAuthDelegate(authDelegate MqttAuthDelegate) func(*MqttClient)
- func WithMqttConfigDelegate(confDelegate MqttConfigDelegate) func(*MqttClient)
- func WithMqttDelegate(delegate MqttDelegate) func(*MqttClient)
- func WithMqttErrorDelegate(errorDelegate MqttErrorDelegate) func(*MqttClient)
- func WithMqttReceiverDelegate(recvDelegate MqttReceiverDelegate) func(*MqttClient)
- func WithReceiveQueueSize(qSize uint16) func(*ModeMqttDelegate)
- func WithSendQueueSize(qSize uint16) func(*ModeMqttDelegate)
- func WithUseTLS(useTLS bool) func(*ModeMqttDelegate)
- func WithUseWebSocket(b bool) func(client *MqttClient)
- type DeviceBulkData
- type DeviceCommand
- type DeviceContext
- type DeviceEvent
- type DeviceInfo
- type DummyClient
- type DummyCmd
- type DummyConfig
- type DummyContext
- type KeyValue
- type KeyValueSync
- type ModeMqttDelegate
- func (del *ModeMqttDelegate) AuthInfo() (username string, password string)
- func (del *ModeMqttDelegate) GetCommandChannel() chan *DeviceCommand
- func (del *ModeMqttDelegate) GetDeviceContext() *DeviceContext
- func (del *ModeMqttDelegate) GetKVSyncChannel() chan *KeyValueSync
- func (del *ModeMqttDelegate) GetReceiveQueueSize() uint16
- func (del *ModeMqttDelegate) GetSendQueueSize() uint16
- func (del *ModeMqttDelegate) OnClose()
- func (del *ModeMqttDelegate) SetReceiveChannels(subRecvCh <-chan MqttSubData, queueAckCh <-chan MqttResponse, ...)
- func (del *ModeMqttDelegate) StartSubscriptionListener()
- func (del *ModeMqttDelegate) Subscriptions() []string
- func (del *ModeMqttDelegate) TLSUsageAndConfiguration() (useTLS bool, tlsConfig *tls.Config)
- type ModeMqttDelegateOption
- type MqttAuthDelegate
- type MqttClient
- func (client *MqttClient) Connect(ctx context.Context) error
- func (client *MqttClient) Disconnect(ctx context.Context) error
- func (client *MqttClient) GetLastActivity() time.Time
- func (client *MqttClient) GetModeAuthDelegate() (*ModeMqttDelegate, error)
- func (client *MqttClient) IsConnected() bool
- func (client *MqttClient) Ping(ctx context.Context) error
- func (client *MqttClient) PingAndWait(ctx context.Context) error
- func (client *MqttClient) Publish(ctx context.Context, qos QOSLevel, topic string, data []byte) (uint16, error)
- func (client *MqttClient) PublishBulkData(ctx context.Context, bulkData DeviceBulkData) (uint16, error)
- func (client *MqttClient) PublishEvent(ctx context.Context, event DeviceEvent) (uint16, error)
- func (client *MqttClient) PublishKeyValueUpdate(ctx context.Context, kvData KeyValueSync) (uint16, error)
- func (client *MqttClient) Republish(ctx context.Context, qos QOSLevel, topic string, data []byte, packetID uint16) (uint16, error)
- func (client *MqttClient) Subscribe(ctx context.Context, subs []string) []error
- func (client *MqttClient) TakeRemainingErrors() []error
- func (client *MqttClient) Unsubscribe(ctx context.Context, subs []string) []error
- type MqttConfigDelegate
- type MqttDelegate
- type MqttErrorDelegate
- type MqttMsgHandler
- type MqttReceiverDelegate
- type MqttResponse
- type MqttSubData
- type NetworkStatus
- type QOSLevel
- type RESTError
Constants ¶
const ( DefaultUseTLS = true DefaultQueueSize = uint16(8) )
const ( KVSyncActionReload = "reload" KVSyncActionSet = "set" KVSyncActionDelete = "delete" )
Variables ¶
var ( DefaultMqttHost = "localhost" DefaultMqttPort = 1998 DefaultUsers = []string{"good", "1234"} DefaultSubData []byte DefaultItems = []*KeyValue{ &KeyValue{Key: "key1", Value: "value1", ModificationTime: time.Now()}, &KeyValue{Key: "key2", Value: "value2", ModificationTime: time.Now()}, } // DummyServerDelayDuration is the amount of time the server will wait before // responding. This can be used to simulate a slow network DummyServerDelayDuration = 3 * time.Second MqttDummyContext *DummyContext )
Functions ¶
func DummyMQTTD ¶
Dummy MQTT server. It will run an MQTT server as a goroutine. An optional command channel can be passed in to manipulate the server manipulating test conditions and shutting down. Unlike the v2 dummyMQTTD, this starts goroutines but isn't meant to be run as a goroutine. It will panic if it is unable to start listening.
To end the server, either close the command channel or call cancel() on the context.
func DummyMQTTDWithConfig ¶
func SetRESTHostPort ¶
SetRESTHostPort overrides the default REST API server host and port, and specifies whether TLS connection should be used. (TLS is used by default.)
func WithAdditionalFormatSubscription ¶
func WithAdditionalFormatSubscription(formatTopic string, handler MqttMsgHandler) func(*ModeMqttDelegate)
This is a little obtuse, but allows a format string where we substitute a %d. So, we check for the %d in the string.
func WithAdditionalSubscription ¶
func WithAdditionalSubscription(topic string, handler MqttMsgHandler) func(*ModeMqttDelegate)
func WithMqttAuthDelegate ¶
func WithMqttAuthDelegate(authDelegate MqttAuthDelegate) func(*MqttClient)
func WithMqttConfigDelegate ¶
func WithMqttConfigDelegate(confDelegate MqttConfigDelegate) func(*MqttClient)
func WithMqttDelegate ¶
func WithMqttDelegate(delegate MqttDelegate) func(*MqttClient)
func WithMqttErrorDelegate ¶
func WithMqttErrorDelegate(errorDelegate MqttErrorDelegate) func(*MqttClient)
func WithMqttReceiverDelegate ¶
func WithMqttReceiverDelegate(recvDelegate MqttReceiverDelegate) func(*MqttClient)
func WithReceiveQueueSize ¶
func WithReceiveQueueSize(qSize uint16) func(*ModeMqttDelegate)
func WithSendQueueSize ¶
func WithSendQueueSize(qSize uint16) func(*ModeMqttDelegate)
func WithUseTLS ¶
func WithUseTLS(useTLS bool) func(*ModeMqttDelegate)
func WithUseWebSocket ¶ added in v3.1.0
func WithUseWebSocket(b bool) func(client *MqttClient)
Types ¶
type DeviceBulkData ¶
type DeviceBulkData struct { StreamID string Blob []byte Qos QOSLevel // not exported to serializer }
BulkData represents a batch of opaque data to be sent to the MODE cloud.
type DeviceCommand ¶
type DeviceCommand struct { Action string // contains filtered or unexported fields }
DeviceCommand represents a received from the MODE cloud.
func (*DeviceCommand) BindParameters ¶
func (cmd *DeviceCommand) BindParameters(v interface{}) error
BindParameters maps the command parameters from JSON to the provided struct.
func (*DeviceCommand) String ¶
func (cmd *DeviceCommand) String() string
type DeviceContext ¶
type DeviceContext struct { DeviceID uint64 AuthToken string TLSClientAuth bool TLSConfig *tls.Config }
An initialized DeviceContext is needed for most API calls. Normally, DeviceID and AuthToken are provisioned using the MODE Developer Console. If on-demand device provisioning is enabled for your MODE project, you can call ProvisionDevice to create a new DeviceContext. If you want to use client certificate instead of AuthToken, set TLSClientAuth to true and call SetPKCS12ClientCertificate function.
func ProvisionDevice ¶
func ProvisionDevice(token string) (*DeviceContext, error)
ProvisionDevice is used for on-demand device provisioning. It takes a provisioning token which is obtained by the user who initiated the process. If successful, the device should store the returned DeviceContext for all future API calls.
func (*DeviceContext) DisableClaimMode ¶
func (dc *DeviceContext) DisableClaimMode() error
DisableClaimMode turns off the device's "claim mode", disallowing it to be added to a different home.
func (*DeviceContext) EnableClaimMode ¶
func (dc *DeviceContext) EnableClaimMode(duration time.Duration) error
EnableClaimMode activates the device's "claim mode", i.e. allows the device to be added to a different home. The claim mode will be active for the time period specified by "duration".
func (*DeviceContext) GetInfo ¶
func (dc *DeviceContext) GetInfo() (*DeviceInfo, error)
GetInfo fetches the device's information from MODE.
func (*DeviceContext) SetPKCS12ClientCertificate ¶
func (dc *DeviceContext) SetPKCS12ClientCertificate(fileName string, password string, insecureSkipVerify bool) error
SetPKCS12ClientCertificate set PKCS#12 client certificate to device context. Set fileName and password of the certificate. If insecureSkipVerify is true, TLS accepts any certificate presented by the server and any host name in that certificate. This should be used only for testing.
type DeviceEvent ¶
type DeviceEvent struct { EventType string `json:"eventType"` EventData map[string]interface{} `json:"eventData,omitempty"` Qos QOSLevel // not exported to JSON }
DeviceEvent represents an event to be sent to the MODE cloud.
type DeviceInfo ¶
type DeviceInfo struct { ID uint64 `json:"id"` ProjectID uint64 `json:"projectId"` Name string `json:"name"` Tag string `json:"tag"` DeviceClass string `json:"deviceClass"` }
DeviceInfo contains the key information fetched from the MODE API.
func (*DeviceInfo) String ¶
func (d *DeviceInfo) String() string
type DummyClient ¶ added in v3.0.4
type DummyClient struct { Subscriptions []string // contains filtered or unexported fields }
current connections
type DummyCmd ¶
type DummyCmd int
const ( // Tells the server to publish on whatever topics a client is subscribed to PublishCmd DummyCmd = iota // Tells the server to publish some kv commands PublishKvSync PublishKvSet PublishKvDelete // Tells the server to publish a command PublishCommandCmd // Shuts the server down ShutdownCmd // Disconnects all connections DisconnectCmd // Inserts a 3 second delay after receiving the next command SlowdownServerCmd // Resets the server to normal ResetServerCmd )
type DummyConfig ¶
type DummyContext ¶ added in v3.0.4
type DummyContext struct { Clients []*DummyClient // contains filtered or unexported fields }
type KeyValue ¶
type KeyValue struct { Key string `json:"key"` Value interface{} `json:"value"` ModificationTime time.Time `json:"modificationTime"` }
KeyValue represents a key-value pair stored in the Device Data Proxy.
type KeyValueSync ¶
type ModeMqttDelegate ¶
type ModeMqttDelegate struct { UseTLS bool // For receiving data from the API. Input SubRecvCh <-chan MqttSubData QueueAckCh <-chan MqttResponse PingAckCh <-chan MqttResponse // contains filtered or unexported fields }
Implements the MqttDelegate
func NewModeMqttDelegate ¶
func NewModeMqttDelegate(dc *DeviceContext, opts ...ModeMqttDelegateOption) *ModeMqttDelegate
Maybe have the channel sizes as parameters.
func (*ModeMqttDelegate) AuthInfo ¶
func (del *ModeMqttDelegate) AuthInfo() (username string, password string)
func (*ModeMqttDelegate) GetCommandChannel ¶
func (del *ModeMqttDelegate) GetCommandChannel() chan *DeviceCommand
func (*ModeMqttDelegate) GetDeviceContext ¶
func (del *ModeMqttDelegate) GetDeviceContext() *DeviceContext
func (*ModeMqttDelegate) GetKVSyncChannel ¶
func (del *ModeMqttDelegate) GetKVSyncChannel() chan *KeyValueSync
func (*ModeMqttDelegate) GetReceiveQueueSize ¶
func (del *ModeMqttDelegate) GetReceiveQueueSize() uint16
func (*ModeMqttDelegate) GetSendQueueSize ¶
func (del *ModeMqttDelegate) GetSendQueueSize() uint16
func (*ModeMqttDelegate) OnClose ¶
func (del *ModeMqttDelegate) OnClose()
func (*ModeMqttDelegate) SetReceiveChannels ¶
func (del *ModeMqttDelegate) SetReceiveChannels(subRecvCh <-chan MqttSubData, queueAckCh <-chan MqttResponse, pingAckCh <-chan MqttResponse)
func (*ModeMqttDelegate) StartSubscriptionListener ¶
func (del *ModeMqttDelegate) StartSubscriptionListener()
func (*ModeMqttDelegate) Subscriptions ¶
func (del *ModeMqttDelegate) Subscriptions() []string
func (*ModeMqttDelegate) TLSUsageAndConfiguration ¶
func (del *ModeMqttDelegate) TLSUsageAndConfiguration() (useTLS bool, tlsConfig *tls.Config)
type ModeMqttDelegateOption ¶
type ModeMqttDelegateOption func(*ModeMqttDelegate)
type MqttAuthDelegate ¶
type MqttAuthDelegate interface { // Returns the tls usage and configuration. If useTLS is false, a nil // tlsConfig should be returned. TLSUsageAndConfiguration() (useTLS bool, tlsConfig *tls.Config) // Returns authentication information AuthInfo() (username string, password string) }
MqttAuthDelegate methods provide the security and authentication information to start a connection to the MqttServer
type MqttClient ¶
type MqttClient struct {
// contains filtered or unexported fields
}
MqttClient provides the public API to MQTT. We handle connect, disconnect, ping, publish, and subscribe. Connect, disconnect, and subscribe will block and wait for the response. Ping and publish will return after the packet has been sent and the response will be sent on a channel that is provided by the delegate. For ping, since MQTT does not provide a mechanism to distinguish between different ping requests, we do not provide an API to distinguish them either. For publish, the function returns a packet ID. This packet ID will be returned to the caller in the channel.
func NewMqttClient ¶
func NewMqttClient(mqttHost string, mqttPort int, dels ...func(*MqttClient)) *MqttClient
NewMqttClient will create client and open a stream. A client is invalid if not connected, and you need to create a new client to reconnect.
func (*MqttClient) Connect ¶
func (client *MqttClient) Connect(ctx context.Context) error
Connect will initiate a connection to the server. It will block until we receive a CONNACK from the server.
func (*MqttClient) Disconnect ¶
func (client *MqttClient) Disconnect(ctx context.Context) error
Disconnect will end this connection with the server. We will block until the server closes the connection. Note: We might want to wait, but order is important here.
func (*MqttClient) GetLastActivity ¶
func (client *MqttClient) GetLastActivity() time.Time
GetLastActivity will return the time since the last send or receive.
func (*MqttClient) GetModeAuthDelegate ¶
func (client *MqttClient) GetModeAuthDelegate() (*ModeMqttDelegate, error)
Mode extensions to the MqttClient cast to the concrete delegate
func (*MqttClient) IsConnected ¶
func (client *MqttClient) IsConnected() bool
IsConnected will return true if we have a successfully CONNACK'ed response.
func (*MqttClient) Ping ¶
func (client *MqttClient) Ping(ctx context.Context) error
Ping sends an MQTT PINGREQ event to the server. This is an asynchronous call, so we will always return success if we were able to queue the message for delivery. Results will be sent on the delegate's pingAckCh
func (*MqttClient) PingAndWait ¶
func (client *MqttClient) PingAndWait(ctx context.Context) error
PingAndWait sends an MQTT PINGREQ event to the server and waits for the response. If this method is used instead of the asynchronous Ping, user should not be listening on the pingAckCh channel since this function may timeout waiting for the response an error will be returned.
func (*MqttClient) Publish ¶
func (client *MqttClient) Publish(ctx context.Context, qos QOSLevel, topic string, data []byte) (uint16, error)
Publish sends an MQTT Publish event to subscribers on the specified topic. This is an asynchronous call, so we will always return a packet ID as long as the request is able to be queued. After queueing, the any subsequent errors or results will be written to the delegate's queueAckCh. For QOSAtMostOnce, there will only be an error returned if the request was unable to be queued. We receive no ACK from the server. For QOSAtLeastOnce, we will receive an ACK if we were successful. For any other QOS levels (QOSExactlyOnce), they are not supported and an error is returned immediately and the request will not be sent.
func (*MqttClient) PublishBulkData ¶
func (client *MqttClient) PublishBulkData(ctx context.Context, bulkData DeviceBulkData) (uint16, error)
Helper function to send DeviceEvent instances. This replaces both the sendBulkData and writeBulkData methods in the old API (since it does less than both, covering just the intersection)
func (*MqttClient) PublishEvent ¶
func (client *MqttClient) PublishEvent(ctx context.Context, event DeviceEvent) (uint16, error)
Helper function to send DeviceEvent instances
func (*MqttClient) PublishKeyValueUpdate ¶
func (client *MqttClient) PublishKeyValueUpdate(ctx context.Context, kvData KeyValueSync) (uint16, error)
The key value store should typically be cached. Key Values are all sent on subscription, so should be handled in the client by receiving on the kvSync channel. There is no method of fetching single key value pairs. We only update the key values. XXX: Looks like there's a reload, so I must be missing something.
func (*MqttClient) Subscribe ¶
func (client *MqttClient) Subscribe(ctx context.Context, subs []string) []error
Subscribe will subscribe to the topics in subs by sending a subscribe request. This is a synchronous call so it will block until a response is received from the server. It will return a slice of errors which will be in the same order as the subscriptions in Subscriptions().
func (*MqttClient) TakeRemainingErrors ¶
func (client *MqttClient) TakeRemainingErrors() []error
Returns the last errors, and then resets the errors. If there is no error delegate or the error delegate's error channel is full, we "queue" errors in a slice that can be fetched.
func (*MqttClient) Unsubscribe ¶ added in v3.0.1
func (client *MqttClient) Unsubscribe(ctx context.Context, subs []string) []error
Unsubscribe will send an unsubscribe request for the topics in subs. This is a synchronous call.
type MqttConfigDelegate ¶
type MqttConfigDelegate interface { // Buffer size of the incoming queues to the delegate. This is the // size of the three receive channels GetReceiveQueueSize() uint16 // Buffer size of the outgoing queue to the server. This cannot be // changed after a connection is created GetSendQueueSize() uint16 }
MqttConfigDelegate methods allow the MqttClient to configure itself according to the requirements of the user
type MqttDelegate ¶
type MqttDelegate interface { MqttAuthDelegate MqttReceiverDelegate MqttConfigDelegate }
MqttDelegate is the combined required interfaces that must be implemented to use the MqttClient. This is a convenience that the user can use to allow a single struct to implement all the required interfaces
type MqttErrorDelegate ¶
type MqttErrorDelegate interface { // The buffer size of the error channel GetErrorChannelSize() uint16 // Provides the delegate the channel to receive errors SetErrorChannel(errCh chan error) }
MqttErrorDelegate is an optional delegate which allows the MqttClient a method of signaling errors that are not able to be communicated through the normal channels. See handling errors in the documentation.
type MqttMsgHandler ¶
type MqttReceiverDelegate ¶
type MqttReceiverDelegate interface { // SetReceiveChannels will be called by the MqttClient. The MqttClient // will create the channels with the buffer size returned by // GetReceieveQueueSize(). The implementor of the delegate will use // these channels to receive information from the server, such as // queued responses and subscriptions: // subRecvCh: Data published from our subscriptions. // queueAckCh: API requests that are queued will receive MqttPublishID's // which will be ACK'ed. The MqttQueueResult will have the // MqttPublishID and the result // pingAckCh: True if our ping received an ACK or false if timeout // Note: These channels will be closed when the connection is closed (from // a Disconnect), so the user should stop listening to these channels when // OnClose() is called.. SetReceiveChannels(subRecvCh <-chan MqttSubData, queueAckCh <-chan MqttResponse, pingAckCh <-chan MqttResponse) // Hook so we can clean up on closing of connections OnClose() }
MqttReceiverDelegate methods allow the MqttClient to communicate information and events back to the user.
type MqttResponse ¶
MqttResponse is result of an MQTT Request. Not all of these members will be valid. For example, only PUBACK's will have PacketIds, and PUBLISH'es will send subscription data. In some cases, there will be multiple errors, so the Errs slice will be populated rather than the Errs.
type MqttSubData ¶
MqttSubData to send in the channel to the client when we receive data published for our subscription.
type NetworkStatus ¶
type NetworkStatus int
const ( // There is currently an active connection to the server ConnectedNetworkStatus NetworkStatus = iota // We have successfully disconnected to the server DisconnectedNetworkStatus // If we have had requests time out, we set to timing out. We should // reconnect. TimingOutNetworkStatus // Not yet connected state DefaultNetworkStatus )
type QOSLevel ¶
type QOSLevel int
QoS level of message delivery. This is used in sending events to MODE.
type RESTError ¶
type RESTError struct {
// contains filtered or unexported fields
}
RESTError represents an error returned by the MODE REST API.
func (*RESTError) StatusCode ¶
StatusCode returns the HTTP status code provided by the API server.
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
echo
In this example, the device sends an "echo" event whenever it receives a "doEcho" command.
|
In this example, the device sends an "echo" event whenever it receives a "doEcho" command. |
mutual_tls_echo
In this example, the device sends an "echo" event whenever it receives a "doEcho" command.
|
In this example, the device sends an "echo" event whenever it receives a "doEcho" command. |
websocket_echo
In this example, the device sends an "echo" event whenever it receives a "doEcho" command.
|
In this example, the device sends an "echo" event whenever it receives a "doEcho" command. |
Package packet implements functionality for encoding and decoding MQTT 3.1.1 (http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/) packets.
|
Package packet implements functionality for encoding and decoding MQTT 3.1.1 (http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/) packets. |