mqttpubsub

package module
v0.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

go-cloud-pubsub-mqtt

MQTT pubsub driver for Go Cloud Development Kit. Just put it in go-cloud/pubsub/ package or do whetever you want. :)

REFERENCES:

https://github.com/google/go-cloud/issues/1466

https://github.com/google/go-cloud/pull/2751

Documentation

Overview

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"gocloud.dev/pubsub"
	"log"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/mqttpubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the MQTT server at the URL in the environment variable
	// MQTT_SERVER_URL and receive messages with subject "example.mysubject".
	subscription, err := pubsub.OpenSubscription(ctx, "mqtt://example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"gocloud.dev/pubsub"
	"log"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/mqttpubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the MQTT server at the URL in the environment variable
	// MQTT_SERVER_URL and send messages with subject "example.mysubject".
	topic, err := pubsub.OpenTopic(ctx, "mqtt://example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "mqtt"

Scheme is the URL scheme mqttpubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(conn Subscriber, topicName string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

opts := mqtt.NewClientOptions()
opts = opts.AddBroker("mqtt://mqtt.example.com")
opts.ClientID = "exampleClient"
cli := mqtt.NewClient(opts)
token := cli.Connect()
if token.Wait() && token.Error() != nil {
	log.Fatal(token.Error())
}

defer cli.Disconnect(0)

subscription, err := mqttpubsub.OpenSubscription(
	mqttpubsub.NewSubscriber(cli, 0, 0),
	"example.mysubject",
	nil)
if err != nil {
	log.Fatal(err)
}
defer subscription.Shutdown(ctx)
Output:

func OpenTopic

func OpenTopic(conn Publisher, name string, _ *TopicOptions) (*pubsub.Topic, error)
Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

opts := mqtt.NewClientOptions()
opts = opts.AddBroker("mqtt://mqtt.example.com")
opts.ClientID = "exampleClient"
cli := mqtt.NewClient(opts)
token := cli.Connect()
if token.Wait() && token.Error() != nil {
	log.Fatal(token.Error())
}

defer cli.Disconnect(0)

topic, err := mqttpubsub.OpenTopic(mqttpubsub.NewPublisher(cli, 0, 0), "example.mysubject", nil)
if err != nil {
	log.Fatal(err)
}
defer topic.Shutdown(ctx)
Output:

Types

type Publisher

type Publisher interface {
	Publish(topic string, payload interface{}, qos *byte) error
	Stop() error
}

func NewPublisher

func NewPublisher(cli mqtt.Client, qos byte, timeout time.Duration) Publisher

type Subscriber

type Subscriber interface {
	Subscribe(topic string, handler mqtt.MessageHandler, qos *byte) error
	UnSubscribe(topic string) error
	Close() error
}

func NewSubscriber

func NewSubscriber(cli mqtt.Client, qos byte, timeout time.Duration) Subscriber

type SubscriptionOptions

type SubscriptionOptions struct {
	WaitTime time.Duration
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by MQTT.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by MQTT.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	SubConn Subscriber
	PubConn Publisher

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens MQTT URLs like "mqtt://myexchange" for topics or "mqtt://myqueue" for subscriptions.

For topics, the URL's host+path is used as the exchange name.

For subscriptions, the URL's host+path is used as the queue name.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL