mqtt

package module
v1.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: MIT Imports: 11 Imported by: 0

README

MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输) 协议是IBM 针对物联网应用定制的即时通讯协议。 该协议使用TCP/IP 提供网络连接,能够对负载内容实现消息屏蔽传输,开销小,可以有效降低网络流量。

热门在线公共 MQTT 服务器

名称 Broker 地址 TCP TLS WebSocket
EMQ X broker.emqx.io 1883 8883 8083,8084
EMQ X(国内) broker-cn.emqx.io 1883 8883 8083,8084
Eclipse mqtt.eclipseprojects.io 1883 8883 80, 443
Mosquitto test.mosquitto.org 1883 8883, 8884 80
HiveMQ broker.hivemq.com 1883 N/A 8000

Docker部署开发环境

RabbitMQ
docker pull bitnami/rabbitmq:latest

docker run -itd \
    --hostname localhost \
    --name rabbitmq-test \
    -p 15672:15672 \
    -p 5672:5672 \
    -p 1883:1883 \
    -p 15675:15675 \
    -e RABBITMQ_PLUGINS=rabbitmq_top,rabbitmq_mqtt,rabbitmq_web_mqtt,rabbitmq_prometheus,rabbitmq_stomp,rabbitmq_auth_backend_http \
    bitnami/rabbitmq:latest

# 查看插件列表
rabbitmq-plugins list
# rabbitmq_peer_discovery_consul 
rabbitmq-plugins --offline enable rabbitmq_peer_discovery_consul
# rabbitmq_mqtt 提供与后端服务交互使用,端口1883
rabbitmq-plugins enable rabbitmq_mqtt
# rabbitmq_web_mqtt 提供与前端交互使用,端口15675
rabbitmq-plugins enable rabbitmq_web_mqtt
# 
rabbitmq-plugins enable rabbitmq_auth_backend_http

管理后台: http://localhost:15672
默认账号: user
默认密码: bitnami

mosquitto
docker pull eclipse-mosquitto:latest

# 1883 tcp
# 9001 websockets
docker run -itd \
    --name mosquitto-test \
    -p 1883:1883 \
    -p 9001:9001 \
    eclipse-mosquitto:latest
EMX
docker pull emqx/emqx:latest

docker run -itd \
    --name emqx-test \
    --add-host=host.docker.internal:host-gateway \
    -p 18083:18083 \
    -p 1883:1883 \
    emqx/emqx:latest

管理后台: http://localhost:18083
默认账号: admin
默认密码: public

HiveMQ
docker pull hivemq/hivemq4:latest

docker run -itd \
    --name hivemq-test \
    --ulimit nofile=500000:500000 \
    -p 8080:8080 \
    -p 8000:8000 \
    -p 1883:1883 \
    hivemq/hivemq4:latest

Documentation

Index

Constants

View Source
const (
	KindMQTT transport.Kind = "mqtt"
)

Variables

This section is empty.

Functions

func LogDebug

func LogDebug(args ...interface{})

func LogDebugf

func LogDebugf(format string, args ...interface{})

func LogError

func LogError(args ...interface{})

func LogErrorf

func LogErrorf(format string, args ...interface{})

func LogFatal

func LogFatal(args ...interface{})

func LogFatalf

func LogFatalf(format string, args ...interface{})

func LogInfo

func LogInfo(args ...interface{})

func LogInfof

func LogInfof(format string, args ...interface{})

func LogWarn

func LogWarn(args ...interface{})

func LogWarnf

func LogWarnf(format string, args ...interface{})

func RegisterSubscriber

func RegisterSubscriber[T any](srv *Server, ctx context.Context, topic string, handler func(context.Context, string, broker.Headers, *T) error, opts ...broker.SubscribeOption) error

Types

type Server

type Server struct {
	broker.Broker

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) Name

func (s *Server) Name() string

func (*Server) RegisterSubscriber

func (s *Server) RegisterSubscriber(ctx context.Context, topic string, handler broker.Handler, binder broker.Binder, opts ...broker.SubscribeOption) error

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

type ServerOption

type ServerOption func(o *Server)

func WithAddress

func WithAddress(addrs []string) ServerOption

func WithAuth

func WithAuth(username string, password string) ServerOption

func WithBrokerOptions

func WithBrokerOptions(opts ...broker.Option) ServerOption

WithBrokerOptions MQ代理配置

func WithCleanSession

func WithCleanSession(enable bool) ServerOption

func WithClientId

func WithClientId(clientId string) ServerOption

func WithCodec

func WithCodec(c string) ServerOption

func WithEnableKeepAlive

func WithEnableKeepAlive(enable bool) ServerOption

WithEnableKeepAlive enable keep alive

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

type SubscribeOption

type SubscribeOption struct {
	// contains filtered or unexported fields
}

type SubscribeOptionMap

type SubscribeOptionMap map[string]*SubscribeOption

type SubscriberMap

type SubscriberMap map[string]broker.Subscriber

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a MQTT transport.

func (*Transport) Endpoint

func (tr *Transport) Endpoint() string

Endpoint returns the transport endpoint.

func (*Transport) Kind

func (tr *Transport) Kind() transport.Kind

Kind returns the transport kind.

func (*Transport) NodeFilters

func (tr *Transport) NodeFilters() []selector.NodeFilter

NodeFilters returns the client select filters.

func (*Transport) Operation

func (tr *Transport) Operation() string

Operation returns the transport operation.

func (*Transport) ReplyHeader

func (tr *Transport) ReplyHeader() transport.Header

ReplyHeader returns the reply header.

func (*Transport) RequestHeader

func (tr *Transport) RequestHeader() transport.Header

RequestHeader returns the request header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL