surgemq: github.com/influxdata/surgemq/service Index | Examples | Files

package service

import "github.com/influxdata/surgemq/service"

Package service provides the MQTT Server and Client services in a library form. See Server and Client examples below for more detailed usage.

Index

Examples

Package Files

buffer.go client.go doc.go misc.go process.go sendrecv.go server.go service.go

Constants

const (
    DefaultKeepAlive        = 300
    DefaultConnectTimeout   = 2
    DefaultAckTimeout       = 20
    DefaultTimeoutRetries   = 3
    DefaultSessionsProvider = "mem"
    DefaultAuthenticator    = "mockSuccess"
    DefaultTopicsProvider   = "mem"
)

Variables

var (
    ErrInvalidConnectionType  error = errors.New("service: Invalid connection type")
    ErrInvalidSubscriber      error = errors.New("service: Invalid subscriber")
    ErrBufferNotReady         error = errors.New("service: buffer is not ready")
    ErrBufferInsufficientData error = errors.New("service: buffer has insufficient data.")
)

type Client Uses

type Client struct {
    // The number of seconds to keep the connection live if there's no data.
    // If not set then default to 5 mins.
    KeepAlive int

    // The number of seconds to wait for the CONNACK message before disconnecting.
    // If not set then default to 2 seconds.
    ConnectTimeout int

    // The number of seconds to wait for any ACK messages before failing.
    // If not set then default to 20 seconds.
    AckTimeout int

    // The number of times to retry sending a packet if ACK is not received.
    // If no set then default to 3 retries.
    TimeoutRetries int
    // contains filtered or unexported fields
}

Client is a library implementation of the MQTT client that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

Code:

// Instantiates a new Client
c := &Client{}

// Creates a new MQTT CONNECT message and sets the proper parameters
msg := message.NewConnectMessage()
msg.SetWillQos(1)
msg.SetVersion(4)
msg.SetCleanSession(true)
msg.SetClientId([]byte("surgemq"))
msg.SetKeepAlive(10)
msg.SetWillTopic([]byte("will"))
msg.SetWillMessage([]byte("send me home"))
msg.SetUsername([]byte("surgemq"))
msg.SetPassword([]byte("verysecret"))

// Connects to the remote server at 127.0.0.1 port 1883
c.Connect("tcp://127.0.0.1:1883", msg)

// Creates a new SUBSCRIBE message to subscribe to topic "abc"
submsg := message.NewSubscribeMessage()
submsg.AddTopic([]byte("abc"), 0)

// Subscribes to the topic by sending the message. The first nil in the function
// call is a OnCompleteFunc that should handle the SUBACK message from the server.
// Nil means we are ignoring the SUBACK messages. The second nil should be a
// OnPublishFunc that handles any messages send to the client because of this
// subscription. Nil means we are ignoring any PUBLISH messages for this topic.
c.Subscribe(submsg, nil, nil)

// Creates a new PUBLISH message with the appropriate contents for publishing
pubmsg := message.NewPublishMessage()
pubmsg.SetTopic([]byte("abc"))
pubmsg.SetPayload(make([]byte, 1024))
pubmsg.SetQoS(0)

// Publishes to the server by sending the message
c.Publish(pubmsg, nil)

// Disconnects from the server
c.Disconnect()

func (*Client) Connect Uses

func (this *Client) Connect(uri string, msg *message.ConnectMessage) (err error)

Connect is for MQTT clients to open a connection to a remote server. It needs to know the URI, e.g., "tcp://127.0.0.1:1883", so it knows where to connect to. It also needs to be supplied with the MQTT CONNECT message.

func (*Client) Disconnect Uses

func (this *Client) Disconnect()

Disconnect sends a single DISCONNECT message to the server. The client immediately terminates after the sending of the DISCONNECT message.

func (*Client) Ping Uses

func (this *Client) Ping(onComplete OnCompleteFunc) error

Ping sends a single PINGREQ message to the server. PINGREQ/PINGRESP messages are mainly used by the client to keep a heartbeat to the server so the connection won't be dropped.

func (*Client) Publish Uses

func (this *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error

Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.

func (*Client) Subscribe Uses

func (this *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, onPublish OnPublishFunc) error

Subscribe sends a single SUBSCRIBE message to the server. The SUBSCRIBE message can contain multiple topics that the client wants to subscribe to. On completion, which is when the client receives a SUBACK messsage back from the server, the supplied onComplete funciton is called.

When messages are sent to the client from the server that matches the topics the client subscribed to, the onPublish function is called to handle those messages. So in effect, the client can supply different onPublish functions for different topics.

func (*Client) Unsubscribe Uses

func (this *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error

Unsubscribe sends a single UNSUBSCRIBE message to the server. The UNSUBSCRIBE message can contain multiple topics that the client wants to unsubscribe. On completion, which is when the client receives a UNSUBACK message from the server, the supplied onComplete function is called. The client will no longer handle messages from the server for those unsubscribed topics.

type OnCompleteFunc Uses

type OnCompleteFunc func(msg, ack message.Message, err error) error

type OnPublishFunc Uses

type OnPublishFunc func(msg *message.PublishMessage) error

type Server Uses

type Server struct {
    // The number of seconds to keep the connection live if there's no data.
    // If not set then default to 5 mins.
    KeepAlive int

    // The number of seconds to wait for the CONNECT message before disconnecting.
    // If not set then default to 2 seconds.
    ConnectTimeout int

    // The number of seconds to wait for any ACK messages before failing.
    // If not set then default to 20 seconds.
    AckTimeout int

    // The number of times to retry sending a packet if ACK is not received.
    // If no set then default to 3 retries.
    TimeoutRetries int

    // Authenticator is the authenticator used to check username and password sent
    // in the CONNECT message. If not set then default to "mockSuccess".
    Authenticator string

    // SessionsProvider is the session store that keeps all the Session objects.
    // This is the store to check if CleanSession is set to 0 in the CONNECT message.
    // If not set then default to "mem".
    SessionsProvider string

    // TopicsProvider is the topic store that keeps all the subscription topics.
    // If not set then default to "mem".
    TopicsProvider string
    // contains filtered or unexported fields
}

Server is a library implementation of the MQTT server that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

Code:

// Create a new server
svr := &Server{
    KeepAlive:        300,           // seconds
    ConnectTimeout:   2,             // seconds
    SessionsProvider: "mem",         // keeps sessions in memory
    Authenticator:    "mockSuccess", // always succeed
    TopicsProvider:   "mem",         // keeps topic subscriptions in memory
}

// Listen and serve connections at localhost:1883
svr.ListenAndServe("tcp://:1883")

func (*Server) Close Uses

func (this *Server) Close() error

Close terminates the server by shutting down all the client connections and closing the listener. It will, as best it can, clean up after itself.

func (*Server) ListenAndServe Uses

func (this *Server) ListenAndServe(uri string) error

ListenAndServe listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:1883".

func (*Server) Publish Uses

func (this *Server) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error

Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.

Package service imports 16 packages (graph). Updated 2017-07-05. Refresh now. Tools for package owners.