mqttpubsub

package module
v0.0.0-...-7f640b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

go-mqttpubsub CI godoc goreportcard license

Go module to enable the MQTT protocol for gocloud.dev/pubsub.

install

go get github.com/frantjc/go-mqttpubsub

Documentation

Index

Examples

Constants

View Source
const Scheme = "mqtt"

Scheme is the URL scheme mqttpubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(conn Subscriber, topicName string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
Example
package main

import (
	"context"
	"log"

	mqtt "github.com/eclipse/paho.mqtt.golang"
	mqttpubsub "github.com/frantjc/go-pubsubmqtt"
)

var ctx = context.Background()

func main() {
	opts := mqtt.NewClientOptions()
	opts = opts.AddBroker("mqtt://mqtt.example.com")
	opts.ClientID = "exampleClient"
	cli := mqtt.NewClient(opts)
	defer cli.Disconnect(0)
	token := cli.Connect()
	if token.Wait() {
		if err := token.Error(); err != nil {
			log.Println(token.Error())
			return
		}
	}

	subscription, err := mqttpubsub.OpenSubscription(
		mqttpubsub.NewSubscriber(cli, 0, 0),
		"example.mysubject",
		nil)
	if err != nil {
		log.Println(token.Error())
		return
	}
	defer func() {
		_ = subscription.Shutdown(ctx)
	}()
}
Output:

func OpenTopic

func OpenTopic(conn Publisher, name string, _ *TopicOptions) (*pubsub.Topic, error)
Example
package main

import (
	"context"
	"log"

	mqtt "github.com/eclipse/paho.mqtt.golang"
	mqttpubsub "github.com/frantjc/go-pubsubmqtt"
)

var ctx = context.Background()

func main() {
	opts := mqtt.NewClientOptions()
	opts = opts.AddBroker("mqtt://mqtt.example.com")
	opts.ClientID = "exampleClient"
	cli := mqtt.NewClient(opts)
	defer cli.Disconnect(0)
	token := cli.Connect()
	if token.Wait() && token.Error() != nil {
		log.Println(token.Error())
		return
	}

	topic, err := mqttpubsub.OpenTopic(mqttpubsub.NewPublisher(cli, 0, 0), "example.mysubject", nil)
	if err != nil {
		log.Println(err)
		return
	}
	defer func() {
		_ = topic.Shutdown(ctx)
	}()
}
Output:

Types

type Publisher

type Publisher interface {
	Publish(topic string, payload interface{}, qos *byte) error
	Stop() error
}

func NewPublisher

func NewPublisher(cli mqtt.Client, qos byte, timeout time.Duration) Publisher

type Subscriber

type Subscriber interface {
	Subscribe(topic string, handler mqtt.MessageHandler, qos *byte) error
	Unsubscribe(topic string) error
	Close() error
}

func NewSubscriber

func NewSubscriber(cli mqtt.Client, qos byte, timeout time.Duration) Subscriber

type SubscriptionOptions

type SubscriptionOptions struct {
	WaitTime time.Duration
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by MQTT.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by MQTT.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Subscriber
	Publisher

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens MQTT URLs like "mqtt://myexchange" for topics or "mqtt://myqueue" for subscriptions.

For topics, the URL's host+path is used as the exchange name.

For subscriptions, the URL's host+path is used as the queue name.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(_ context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(_ context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL