hanami

package module
v0.0.0-...-a617ce9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2019 License: Apache-2.0 Imports: 8 Imported by: 1

README

mochi-co/hanami MQTT client

Build Status contributions welcome codecov Codacy Badge GoDoc

What is Hanami?

Hanami is a wrapper for the Go Paho MQTT Client which provides a few convenience functions that are designed to make life a little easier. It was developed to alleviate a few pain points with the Paho client such as adding multiple callbacks per topic, and adds JWT message signing and Reply-To helpers for implementing request-response patterns.

Main Features

  • Add multiple callback functions per topic, isolated by sub-client id.
  • Inbound and outbound JWT message signing to ensure only payloads from trusted sources are processed.
  • Reply-To helpers for lazy request-response pattern implementation.

Quick Start

import "github.com/mochi-co/hanami"

Hanami wraps the standard paho.Client and takes a host address and a paho.ClientOptions. Multiple hosts can be added by setting them in the options directly, in which case the host string will be ignored in favour of the options value.

import (
	"log"
	"github.com/mochi-co/hanami"
)

func main() {

	// The hanami client takes standard paho options.
	options := paho.NewClientOptions()

	// Create the new hanami client with the broker address and the paho options.
	client := hanami.New("tcp://iot.eclipse.org:1883", options)

	// Connecting the client is the same as connecting the paho client, 
	// minus the boilerplate token code. It is non-blocking.
	err := client.Connect()
	if err != nil {
		log.Fatal(err)
	}

}	

Examples

Examples can be found in examples

Using Hanami GoDoc

~ The following guide assumes existing knowledge of MQTT and the Go paho client.

hanami.New(host string, o *paho.ClientOptions) *hanami.Client

Create a new Hanami client. host takes the address of the MQTT broker to connect to (eg. "tcp://iot.eclipse.org:1883") and a paho.ClientOptions containing configuration parameters for the internal paho client. Multiple broker addresses can be assigned by setting them each using options.AddBroker(host) before calling hanami.Connect. A new Hanami client is returned.

Various Hanami specific parameters can be configured by directly setting values on the client:

client.Secret = []byte{"my-jwt-secret"} // Set the JWT signing secret.
client.JWTExpiry = 10 // Number of seconds a signed message is valid.
client.JWTPrefix = "jwt:" // An indicator string to prefix JWT payloads.
client.PubPrefix = "hanami/out" // Add a prefix to all publishing topics.
client.SubPrefix = "hanami/in" // Add a prefix to all subscribing topics.

The PubPrefix and SubPrefix values will be prepended to any provided topic values and joined with a /. For example, calling Subscribe or Publish with a topic of bar/baz will normally subscribe to bar/baz. If the SubPrefix of foo is set, then the actual subscribed topic will be foo/bar/baz.

options := paho.NewClientOptions() // Set various standard client options...
options.SetClientID("mqtt-client-id")
options.SetUsername("user")
options.SetPassword("password")

client := hanami.New("tcp://iot.eclipse.org:1883", options)
options := paho.NewClientOptions()
options.SetClientID("mqtt-client-id")
options.AddBroker("tcp://iot.eclipse.org:1883") // Or add multiple broker addresses
options.AddBroker("tcp://test.mosquitto.org:1883")

client := hanami.New("", options) // host is unneeded when setting brokers manually.
client.Connect() error

Connect to an MQTT broker. The Client will connect to the MQTT broker specified in when calling hanami.New.

err := client.Connect()

client.Publish(topic string, qos byte, retain bool, m interface{}) (b []byte, err error)

Publish a message to the connected broker. Takes the same parameters as paho.Publish, and returns the b byte array which was sent to the broker if successful. m values will be marshalled to json, unless they are strings; which are converted directly into a byte array to avoid unnecessarily wrapping the value in quote marks. If client.PubPrefix has been set, the topic will be prepended with the value. m may be any value: int, struct, map, string, bool, etc.

b, err := client.Publish("hanami/example/map", 1, false, map[string]interface{}{
	"v": "this is my value",
})
// b == [123 34 118 34 58 34 116 104 105 115 32 105 115 32 109 121 32 118 97 108 117 101 34 125]
// string(b) == {"v":"this is my value"}

client.PublishSigned(topic string, qos byte, retain bool, m interface{}) (b []byte, err error)

Publish a JWT signed messaged to the connected broker. client.Secret MUST be set in order to use any of the signing features. Operates exactly the same as client.Publish, except the returned byte array will always be a JWT encoded token prefixed with the client.JWTPrefix indicator (by default this is jwt:). The prefix is used to determine if the payload is signed so similarly formatted messages can be sent to the broker from other clients, and they will be processed as expected.

client.Secret = []byte("hanami-test") // A Secret MUST be set.
_, err = client.PublishSigned("hanami/example/signed", 1, false, "a signed test")
// b == [106 119 116 58 101 121 74 104 98 71 99 105 79 105 74 73 85 122 73 49 78 105 73 115 73 110 82 53 99 67 73 54 73 107 112 88 86 67 74 57 46 101 121 74 107 89 88 82 104 73 106 111 105 89 83 66 122 97 87 100 117 90 87 81 103 100 71 86 122 100 67 73 115 73 109 86 52 99 67 73 54 77 84 85 50 78 68 81 51 77 122 85 120 78 105 119 105 97 87 70 48 73 106 111 120 78 84 89 48 78 68 99 122 78 84 69 120 76 67 74 112 99 51 77 105 79 105 73 105 102 81 46 80 84 73 69 98 55 65 70 69 122 72 119 80 97 82 48 103 112 66 70 84 109 82 104 97 66 78 119 106 81 56 81 121 85 104 54 104 78 51 78 85 104 85]
// string(b) == jwt:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjoiYSBzaWduZWQgdGVzdCIsImV4cCI6MTU2NDQ3MzUxNiwiaWF0IjoxNTY0NDczNTExLCJpc3MiOiIifQ.PTIEb7AFEzHwPaR0gpBFTmRhaBNwjQ8QyUh6hN3NUhU

client.Reply(in *Payload, qos byte, retain bool, m interface{}) (b []byte, err error)

Reply to a message which contained a $reply field in the original payload. In order for a received message to be replyable, the payload must consist of JSON map containing the fields of at least $reply. The message may also indicate that the reply should be automatically signed, by specifying the $signed field. The $reply and $signed meta fields will be stripped from the final payload.

// Originating message sent to broker.
b, err := client.Publish("hanami/example/a/set", 1, false, map[string]interface{}{
	"v": "this is my value",
	"$reply": "hanami/example/a", // request that replies be sent to this topic instead.
})

// Originating message payload (`in *Payload`) as received by Hanami. 
var in *Payload = &Payload{ 
	Msg: [116 104 105 115 32 105 115 32 109 121 32 118 97 108 117 101],
	ReplyTo: "hanami/example/a",
	ReplySigned: false,
	// ... Other fields omitted for readability.
}


// Reply to the originating message using client.Reply...
b2, err := client.Reply(in, 0, false, "this is my reply value")

In the above example, a message is received by Hanami containing the special meta field of $reply, requesting that replies be sent to "hanami/example/a". The payload received can hten be passed directly to the client.Reply method, which will handle automatically signing based on ReplySigned, and then send to whichever topic is specified in ReplyTo.

! Note! Reply implements PubPrefix. Topics being sent as a reply will have PubPrefix appended.


client.Subscribe(id string, filter string, qos byte, signed bool, handler hanami.Callback) error

Subscribe to a topic filter. In hanami, subscriptions are virtualized using sub-clients, so you can have multiple callbacks per topic filter by specifying unique id values. The signed parameter indicates that the filter expects the payload to be signed with a valid JWT token matching client.Secret. If signed is true, and the payload is not signed or is invalid/expired, the payload will be ignored for that handler. It is possible a have two subscriptions (a and b), where a expects a signed payload and b does not. On receiving a non-signed payload, only b will process the payload.

// Create a callback handler that receives a `*hanami.Payload`
cb := func(in *hanami.Payload) {
	log.Printf("RECV: %+v\n", in)
}

// Subscribe to a topic filter and handle all matched incoming messages with our `cb` handler.
err := client.Subscribe("a", "hanami/example/+", 0, false, cb)

// Multiple callbacks can be added to the same filter by changing the unique sub-client id.
// Handlers may also be passed in directly.
err := client.Subscribe("b", "hanami/example/+", 0, false, func(in *hanami.Payload) {
	log.Printf("This is another callback that will be called also, for %s", in.Topic)
})

// Subscribe to a signed topic by setting signed to true.
err := client.Subscribe("a", "hanami/example/signed", 0, true, func(in *hanami.Payload) {
	// The payload will be automatically decoded if the incoming token is valid. 
	// Any non-JWT messages will be dropped by this sub-client.
})

client.Unsubscribe(id, filter string)

Unsubscribe removes a topic filter subscription by sub-client id. hanami maintains one subscription per filter (not sub-client), so if the sub-client is the last or only callback remaining for the filter, the filter will be unsubcribed from the paho client.

client.Unsubscribe("a", "hanami/example")

client.UnsubscribeAll(id string, isPrefix bool)

UnsubscribeAll removes all callbacks for a specific sub-client id. If isPrefix is true, the id will be treated as a prefix and will unsubscribe any matching ids.

// Assuming the following subscribed filters:
// my/filter/hello - subclients: ["a","b","c"]
// another/filters - subclients: ["a"]
// another/filter/stuff - subclients: ["b","c"]

client.UnsubscribeAll("a")

// Subscribed filters then becomes:
// my/filter/hello - subclients: ["b","c"]
// another/filter/stuff - subclients: ["b","c"]

Why 'Hanami'?

This project was born out of a need while developing Sakura, a lightweight Go MQTT broker for embedding in small iot, smarthome, and other non-enterprise projects. The name Sakura (桜) was chosen following the metaphor that the messages running through the broker was akin to the cherry blossoms that fall every year - each unique, but countless. Since this client was originally designed to work with Sakura, the metaphor was naturally extended to Hanami, which is to watch the cherry blossoms.

Contributions

Contributions to Hanami are both welcome and encouraged! Open an issue to report a bug or make a feature request. Participation in the project is governed by our code of conduct.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoSecret indicates that no JWT secret was set on the client.
	ErrNoSecret = errors.New("no JWT secret set")

	// ErrNoReplyTo indicates that a message cannot be replied to if it has not ReplyTo value.
	ErrNoReplyTo = errors.New("no $reply field was set on payload")

	// ErrNoSocket indicates that the paho client wasn't initialized.
	ErrNoSocket = errors.New("paho socket not initialized")

	// ErrNotConnected indicates that the client is not connected to a broker.
	ErrNotConnected = errors.New("Not Connected") // this comes from paho mqtt somewhere and should match
)
View Source
var (

	// ErrTokenExpired indicates that the JWT token has expired.
	ErrTokenExpired = errors.New("JWT token expired")

	// ErrTokenInvalid indicates that the JWT token could not be parsed.
	ErrTokenInvalid = errors.New("JWT token invalid")

	// ErrTokenMalformed indicates that the JWT payload lacked a data field.
	ErrTokenMalformed = errors.New("no data field in JWT token")
)

Functions

func MatchTopic

func MatchTopic(filter string, topic string) (elements []string, matched bool)

MatchTopic checks if a given topic matches a filter, accounting for filter wildcards. Eg. filter /a/b/+/c == topic a/b/d/c.

Types

type Callback

type Callback func(*Payload)

Callback is a function which will be run when a message is received.

type Client

type Client struct {

	// Socket is a paho MQTT client.
	Socket paho.Client

	// Subscriptions is a map of topic subscriptions keyed on topic, then service id.
	Subscriptions Subscriptions

	// PubPrefix is the publish topic prefix.
	PubPrefix string

	// SubPrefix is the subscribe topic prefix.
	SubPrefix string

	// Secret is a byte array used for signing and validating JWT payloads.
	Secret []byte

	// ValueKey is the key used to return a single value in a payload.
	ValueKey string

	// JWTExpiry is the number of seconds a JWT-signed message is valid.
	JWTExpiry time.Duration

	// JWTPrefix will be prefixed to any jwt signed messages.
	JWTPrefix string
}

Client provides a persistent connection to an MQTT server.

func New

func New(host string, o *paho.ClientOptions) *Client

New returns an instance of a Hanami client.

func (*Client) Connect

func (c *Client) Connect() error

Connect to the MQTT server using the provided adapter values.

func (*Client) Publish

func (c *Client) Publish(topic string, qos byte, retain bool, m interface{}) (b []byte, err error)

Publish sends a message to the MQTT broker.

func (*Client) PublishSigned

func (c *Client) PublishSigned(topic string, qos byte, retain bool, m interface{}) (b []byte, err error)

PublishSigned signs a message with a JWT signature and sends it to the MQTT broker.

func (*Client) Reply

func (c *Client) Reply(in *Payload, qos byte, retain bool, m interface{}) (b []byte, err error)

Reply sends a message to the reply topic specified in a payload.

func (*Client) Subscribe

func (c *Client) Subscribe(id string, filter string, qos byte, signed bool, handler Callback) error

Subscribe opens a new subscription to a topic filter for a sub-client id.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(id, filter string)

Unsubscribe deletes the callbacks for a filter by provided subclient id.

func (*Client) UnsubscribeAll

func (c *Client) UnsubscribeAll(id string, isPrefix bool)

UnsubscribeAll deletes all callbacks for all topics by a provided service id. If isPrefix is true, then the provided id will be matched using hasPrefix.

type ClientSubscriptions

type ClientSubscriptions map[string]*Subscription

ClientSubscriptions is a map of subscriptions keyed on client id.

type Msg

type Msg map[string]interface{}

Msg is a convenience type for a map of interfaces.

type Payload

type Payload struct {

	// ReplyTo is the topic the client wants a reply on.
	ReplyTo string

	// ReplySigned indicates that the message being sent back should be signed.
	ReplySigned bool

	// Matched is the filter the client matched, including wildcards.
	Matched string

	// Topic is the path the message was received from (or sending to).
	Topic string

	// Elements are the elements that matched any wildcard characters.
	Elements []string

	// Msg is the received data payload.
	Msg Msg

	// Retain indicates that the message should be retained.
	Retain bool

	// Qos indicates that the quality of service for the message.
	QoS byte

	// Error will log a payload error.
	Error error

	// Validated indicates if the message was signed and validated.
	Validated bool
}

Payload contains the payload and metadata for a message, inbound and outbound.

type Subscription

type Subscription struct {

	// Topic is the path of the subscription.
	Filter string

	// QOS is the QOS byte used in the subscription.
	QOS byte

	// Signed indicates that the subscription expects signed payloads.
	Signed bool

	// Callback is a subscription callback function called on message receipt.
	Callback Callback
}

Subscription contains the values for a topic subscription.

type Subscriptions

type Subscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriptions is a map of Subscription keyed on topic.

func NewSubscriptions

func NewSubscriptions() Subscriptions

NewSubscriptions returns a pointer to a new Subscriptions map.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL