mqtt

package
v0.0.0-...-81dd437 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

MQTT adapter

MQTT adapter provides an MQTT API for sending messages through the platform. MQTT adapter uses mProxy for proxying traffic between client and MQTT broker.

Configuration

The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

Variable Description Default
MF_MQTT_ADAPTER_LOG_LEVEL mProxy Log level error
MF_MQTT_ADAPTER_MQTT_PORT mProxy port 1883
MF_MQTT_ADAPTER_MQTT_TARGET_HOST MQTT broker host 0.0.0.0
MF_MQTT_ADAPTER_MQTT_TARGET_PORT MQTT broker port 1883
MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK URL of broker health check ""
MF_MQTT_ADAPTER_WS_PORT mProxy MQTT over WS port 8080
MF_MQTT_ADAPTER_WS_TARGET_HOST MQTT broker host for MQTT over WS localhost
MF_MQTT_ADAPTER_WS_TARGET_PORT MQTT broker port for MQTT over WS 8080
MF_MQTT_ADAPTER_WS_TARGET_PATH MQTT broker MQTT over WS path /mqtt
MF_MQTT_ADAPTER_FORWARDER_TIMEOUT MQTT forwarder for multiprotocol communication timeout 30s
MF_BROKER_URL Message broker broker URL nats://127.0.0.1:4222
MF_THINGS_AUTH_GRPC_URL Things gRPC endpoint URL localhost:8181
MF_THINGS_AUTH_GRPC_TIMEOUT Timeout in seconds for Things service gRPC calls 1s
MF_JAEGER_URL URL of Jaeger tracing service ""
MF_MQTT_ADAPTER_CLIENT_TLS gRPC client TLS false
MF_MQTT_ADAPTER_CA_CERTS CA certs for gRPC client TLS ""
MF_MQTT_ADAPTER_INSTANCE Instance name for event sourcing ""
MF_MQTT_ADAPTER_ES_URL Event sourcing URL localhost:6379
MF_MQTT_ADAPTER_ES_PASS Event sourcing password ""
MF_MQTT_ADAPTER_ES_DB Event sourcing database "0"
MF_AUTH_CACHE_URL Auth cache URL localhost:6379
MF_AUTH_CACHE_PASS Auth cache password ""
MF_AUTH_CACHE_DB Auth cache database "0"

Deployment

The service itself is distributed as Docker container. Check the mqtt-adapter service section in docker-compose to see how service is deployed.

To start the service outside of the container, execute the following shell script:

# download the latest version of the service
git clone https://github.com/MainfluxLabs/mainflux

cd mainflux

# compile the mqtt
make mqtt

# copy binary to bin
make install

# set the environment variables and run the service
MF_MQTT_ADAPTER_LOG_LEVEL=[MQTT Adapter Log Level] \
MF_MQTT_ADAPTER_MQTT_PORT=[MQTT adapter MQTT port]
MF_MQTT_ADAPTER_MQTT_TARGET_HOST=[MQTT broker host] \
MF_MQTT_ADAPTER_MQTT_TARGET_PORT=[MQTT broker MQTT port]] \
MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=[MQTT health check URL] \
MF_MQTT_ADAPTER_WS_PORT=[MQTT adapter WS port] \
MF_MQTT_ADAPTER_WS_TARGET_HOST=[MQTT broker for MQTT over WS host] \
MF_MQTT_ADAPTER_WS_TARGET_PORT=[MQTT broker for MQTT over WS port]] \
MF_MQTT_ADAPTER_WS_TARGET_PATH=[MQTT adapter WS path] \
MF_MQTT_ADAPTER_FORWARDER_TIMEOUT=[MQTT forwarder for multiprotocol support timeout] \
MF_BROKER_URL=[Message broker instance URL] \
MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \
MF_JAEGER_URL=[Jaeger service URL] \
MF_MQTT_ADAPTER_CLIENT_TLS=[gRPC client TLS] \
MF_MQTT_ADAPTER_CA_CERTS=[CA certs for gRPC client] \
MF_MQTT_ADAPTER_INSTANCE=[Instance for event sourcing] \
MF_MQTT_ADAPTER_ES_URL=[Event sourcing URL] \
MF_MQTT_ADAPTER_ES_PASS=[Event sourcing pass] \
MF_MQTT_ADAPTER_ES_DB=[Event sourcing database] \
MF_AUTH_CACHE_URL=[Auth cache URL] \
MF_AUTH_CACHE_PASS=[Auth cache pass] \
MF_AUTH_CACHE_DB=[Auth cache DB name] \
$GOBIN/mainfluxlabs-mqtt

For more information about service capabilities and its usage, please check out the API documentation API.

Documentation

Index

Constants

View Source
const (
	LogInfoSubscribed                  = "subscribed with client_id %s to topics %s"
	LogInfoUnsubscribed                = "unsubscribed client_id %s from topics %s"
	LogInfoConnected                   = "connected with client_id %s"
	LogInfoDisconnected                = "disconnected client_id %s and username %s"
	LogInfoPublished                   = "published with client_id %s to the topic %s"
	LogErrFailedConnect                = "failed to connect: "
	LogErrFailedSubscribe              = "failed to subscribe: "
	LogErrFailedUnsubscribe            = "failed to unsubscribe: "
	LogErrFailedPublish                = "failed to publish: "
	LogErrFailedDisconnect             = "failed to disconnect: "
	LogErrFailedPublishDisconnectEvent = "failed to publish disconnect event: "

	LogErrFailedPublishConnectEvent = "failed to publish connect event: "
	LogErrFailedPublishToMsgBroker  = "failed to publish to mainflux message broker: "
)

Variables

View Source
var (
	ErrMalformedSubtopic         = errors.New("malformed subtopic")
	ErrClientNotInitialized      = errors.New("client is not initialized")
	ErrMalformedTopic            = errors.New("malformed topic")
	ErrMissingClientID           = errors.New("client_id not found")
	ErrMissingTopicPub           = errors.New("failed to publish due to missing topic")
	ErrMissingTopicSub           = errors.New("failed to subscribe due to missing topic")
	ErrAuthentication            = errors.New("failed to perform authentication over the entity")
	ErrSubscriptionAlreadyExists = errors.New("subscription already exists")
)

Functions

func NewHandler

func NewHandler(publishers []messaging.Publisher, es redis.EventStore,
	logger logger.Logger, auth auth.Client, svc Service) session.Handler

NewHandler creates new Handler entity

Types

type Forwarder

type Forwarder interface {
	// Forward subscribes to the Subscriber and
	// publishes messages using provided Publisher.
	Forward(id string, sub messaging.Subscriber, pub messaging.Publisher) error
}

Forwarder specifies MQTT forwarder interface API.

func NewForwarder

func NewForwarder(topics []string, logger log.Logger) Forwarder

NewForwarder returns new Forwarder implementation.

type Page

type Page struct {
	PageMetadata
	Subscriptions []Subscription
}

Page represents page metadata with content.

type PageMetadata

type PageMetadata struct {
	Offset uint64
	Total  uint64
	Limit  uint64
}

PageMetadata contains page metadata that helps navigation.

type Repository

type Repository interface {
	// RetrieveByChannelID retrieves all subscriptions that belong to the specified channel.
	RetrieveByChannelID(ctx context.Context, pm PageMetadata, chanID string) (Page, error)
	// Save will save the subscription.
	Save(ctx context.Context, sub Subscription) error
	// Remove will remove the subscription.
	Remove(ctx context.Context, sub Subscription) error
	// Update will update the subscription status.
	UpdateStatus(ctx context.Context, sub Subscription) error
	// HasClientID will update the subscription status.
	HasClientID(ctx context.Context, clientID string) error
}

type Service

type Service interface {
	// ListSubscriptions lists all subscriptions that belong to the specified channel.
	ListSubscriptions(ctx context.Context, chanID, token, key string, pm PageMetadata) (Page, error)

	// CreateSubscription create a subscription.
	CreateSubscription(ctx context.Context, sub Subscription) error

	// RemoveSubscription removes the subscription having the provided identifier.
	RemoveSubscription(ctx context.Context, sub Subscription) error

	// HasClientID  indicates if a subscription exist for a given client ID.
	HasClientID(ctx context.Context, clientID string) error

	// UpdateStatus updates the subscription status for a given client ID.
	UpdateStatus(ctx context.Context, sub Subscription) error
}

Service specifies an API that must be fullfiled by the domain service implementation, and all of its decorators (e.g. logging & metrics).

func NewMqttService

func NewMqttService(auth mainflux.AuthServiceClient, things mainflux.ThingsServiceClient, subscriptions Repository, idp mainflux.IDProvider) Service

NewMqttService instantiates the MQTT service implementation.

type Subscription

type Subscription struct {
	Subtopic  string
	ThingID   string
	ChanID    string
	ClientID  string
	Status    string
	CreatedAt float64
}

Subscription represents a user Subscription.

Directories

Path Synopsis
api
http
Package http contains implementation of kit service HTTP API.
Package http contains implementation of kit service HTTP API.
Package redis contains cache implementations using Redis as the underlying database.
Package redis contains cache implementations using Redis as the underlying database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL