mqtt

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2020 License: MIT Imports: 7 Imported by: 1

README

mqtt

GoDoc CI Code Coverage Go Report

An mqtt client for Go that improves usability over the paho.mqtt.golang library it wraps. Made for 🧑.

installation

go get github.com/lucacasonato/mqtt
import "github.com/lucacasonato/mqtt"
// or
import (
    "github.com/lucacasonato/mqtt"
)

usage

creating a client & connecting
client, err := mqtt.NewClient(mqtt.ClientOptions{
    // required
    Servers: []string{
        "tcp://test.mosquitto.org:1883",
    },

    // optional
    ClientID: "my-mqtt-client",
    Username: "admin",
    Password: "***",
    AutoReconnect: true,
})
if err != nil {
    panic(err)
}

err = client.Connect(context.WithTimeout(2 * time.Second))
if err != nil {
    panic(err)
}

You can use any of these schemes for the broker tcp (unesecured), ssl (secured), ws (unsecured), wss (secured).

disconnecting from a client
client.Disconnect()
publishing a message
bytes
err := client.Publish(context.WithTimeout(1 * time.Second), "api/v0/main/client1", []byte(0, 1 ,2, 3), mqtt.AtLeastOnce)
if err != nil {
    panic(err)
}
string
err := client.PublishString(context.WithTimeout(1 * time.Second), "api/v0/main/client1", "hello world", mqtt.AtLeastOnce)
if err != nil {
    panic(err)
}
json
err := client.PublishJSON(context.WithTimeout(1 * time.Second), "api/v0/main/client1", []string("hello", "world"), mqtt.AtLeastOnce)
if err != nil {
    panic(err)
}
subscribing
err := client.Subscribe(context.WithTimeout(1 * time.Second), "api/v0/main/client1", mqtt.AtLeastOnce)
if err != nil {
    panic(err)
}
err := client.SubscribeMultiple(context.WithTimeout(1 * time.Second), map[string]mqtt.QOS{
    "api/v0/main/client1": mqtt.AtLeastOnce,
})
if err != nil {
    panic(err)
}
handling
route := client.Handle("api/v0/main/client1", func(message mqtt.Message) {
    v := interface{}{}
    err := message.PayloadJSON(&v)
    if err != nil {
        panic(err)
    }
    fmt.Printf("recieved a message with content %v\n", v)
})
// once you are done with the route you can stop handling it
route.Stop()
listening
messages, route := client.Listen("api/v0/main/client1")
for {
    message := <-messages
    fmt.Printf("recieved a message with content %v\n", message.PayloadString())
}
// once you are done with the route you can stop handling it
route.Stop()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMinimumOneServer means that at least one server should be specified in the client options
	ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	Options ClientOptions // The options that were used to create this client
	// contains filtered or unexported fields
}

Client for talking using mqtt

func NewClient

func NewClient(options ClientOptions) (*Client, error)

NewClient creates a new client with the specified options

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect tries to establish a connection with the mqtt servers

func (*Client) DisconnectImmediately

func (c *Client) DisconnectImmediately()

DisconnectImmediately will immediately close the connection with the mqtt servers

func (*Client) Handle

func (c *Client) Handle(topic string, handler MessageHandler) Route

Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic. Also returns a route that can be used to unsubsribe. Does not automatically subscribe.

func (*Client) Listen

func (c *Client) Listen(topic string) (chan Message, Route)

Listen returns a stream of messages that match the topic. Also returns a route that can be used to unsubsribe. Does not automatically subscribe.

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error

Publish a message with a byte array payload

func (*Client) PublishJSON

func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error

PublishJSON publishes a message with the payload encoded as JSON using encoding/json

func (*Client) PublishString

func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error

PublishString publishes a message with a string payload

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error

Subscribe subscribes to a certain topic and errors if this fails.

func (*Client) SubscribeMultiple added in v0.2.0

func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error

SubscribeMultiple subscribes to multiple topics and errors if this fails.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ctx context.Context, topic string) error

Unsubscribe unsubscribes from a certain topic and errors if this fails.

type ClientOptions

type ClientOptions struct {
	Servers  []string // The list of broker hostnames to connect to
	ClientID string   // If left empty a uuid will automatically be generated
	Username string   // If not set then authentication will not be used
	Password string   // Will only be used if the username is set

	AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
}

ClientOptions is the list of options used to create a client

type Message

type Message struct {
	// contains filtered or unexported fields
}

A Message from or to the broker

func (*Message) Acknowledge

func (m *Message) Acknowledge()

Acknowledge explicitly acknowledges to a broker that the message has been recieved

func (*Message) IsDuplicate

func (m *Message) IsDuplicate() bool

IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)

func (*Message) Payload

func (m *Message) Payload() []byte

Payload returns the payload as a byte array

func (*Message) PayloadJSON

func (m *Message) PayloadJSON(v interface{}) error

PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails

func (*Message) PayloadString

func (m *Message) PayloadString() string

PayloadString returns the payload as a string

func (*Message) QOS

func (m *Message) QOS() QOS

QOS is the quality of service the message was recieved with

func (*Message) Topic

func (m *Message) Topic() string

Topic is the topic the message was recieved on

func (*Message) TopicVars

func (m *Message) TopicVars() []string

TopicVars is a list of all the message specific matches for a wildcard in a route topic. If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`

type MessageHandler

type MessageHandler func(Message)

A MessageHandler to handle incoming messages

type PublishOption

type PublishOption int

PublishOption are extra options when publishing a message

const (
	// Retain tells the broker to retain a message and send it as the first message to new subscribers.
	Retain PublishOption = iota
)

type QOS

type QOS byte

QOS describes the quality of service of an mqtt publish

const (
	// AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
	AtMostOnce QOS = iota
	// AtLeastOnce means the broker will deliver a message at least once to every subscriber
	AtLeastOnce
	// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
	ExactlyOnce
)

type Route

type Route struct {
	// contains filtered or unexported fields
}

Route is a receipt for listening or handling certain topic

func (*Route) Stop

func (r *Route) Stop()

Stop removes this route from the router and stops matching it

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL