mqtt

package module
v0.0.0-...-d3ff3e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2022 License: MIT Imports: 15 Imported by: 0

README

Snple MQTT logo

PkgGoDev

Snple MQTT

简体中文

Note: The API of this library is still unstable and has not been sufficiently tested, please do not use it in production environments.

Features
  • MQTT 3.1.1 compatible.
  • Full MQTT Feature-set (QoS, Retained, $SYS)
  • Trie-based Subscription model.
  • Ring Buffer packet codec.
  • TCP, Websocket, (including SSL/TLS).
  • Interfaces for Client Authentication and Topic access control.
  • Bolt-backed persistence and storage interfaces.
  • Server Publish (Publish, PublishToClientByID, ...).
  • Event hooks (Recv, Send, ...), see hook.go.
Roadmap
  • Improve event hooks and server publish interface
  • MQTT v5 compatibility
Quick Start
import (
    "github.com/snple/mqtt"
    "log"
)

func main() {
    // Create the new MQTT Server.
    server := mqtt.New()

    // Create a TCP listener on a standard port.
    tcp := listener.NewTCP("t1", ":1883", &mqtt.AuthAllow{})

    // Add the listener to the server.
    err := server.AddListener(tcp)
    if err != nil {
        log.Fatal(err)
    }

    // Start the broker. Serve() is blocking - see examples folder
    // for usage ideas.
    err = server.Serve()
    if err != nil {
        log.Fatal(err)
    }
}

Examples of running the broker with various configurations can be found in the examples folder.

Authentication and ACL

Authentication and ACL may be configured on a per-listener basis by providing an Auth Controller to the listener configuration. Custom Auth Controllers should satisfy the Auth interface found in auth.go. Two default controllers are provided, AuthAllow, which allows all traffic, and AuthDisallow, which denies all traffic.

    tcp := listener.NewTCP("t1", ":1883", &mqtt.AuthAllow{})
    err := server.AddListener(tcp)

If no auth controller is provided in the listener configuration, the server will default to Allowing all traffic.

SSL/TLS

SSL/TLS may be configured on both the TCP and Websocket listeners.

    cert, err := tls.X509KeyPair(publicCertificate, privateKey)
    if err != nil {
        log.Fatal(err)
    }
    cfg := &tls.Config{Certificates: []tls.Certificate{cert}}

    tcp := listener.NewTCPWithTLS("t1", ":1883", &mqtt.AuthAllow{}, cfg)
    err := server.AddListener(tcp)

Note the mandatory inclusion of the Auth Controller!

Data Persistence

Snple MQTT provides a persistence.Store interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by Bolt and can be enabled by assigning a *bolt.Store to the server.

    // import "github.com/snple/mqtt/persistence/bolt"
    err = server.AddStore(bolt.New("mqtt.db", nil))
    if err != nil {
        log.Fatal(err)
    }

Persistence is on-demand (not flushed) and will potentially reduce throughput when compared to the standard in-memory store. Only use it if you need to maintain state through restarts.

Server publish

Snple MQTT provides interfaces such as Publish, PublishToClientByID etc. for publish messages directly from the server.


    server.Publish(
        "time", // topic
        []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
        1,     // qos
        false, // retain
    )

    server.PublishToClientByID(
        "mqtt_123456", // client id
        "time",        // topic
        []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
        1,     // qos
        false, // retain
    )

With PublishToClientByID, you can publish messages to specified client, even if the client is not subscribed. (It depends on whether your client will handle unsubscribed messages.)

Server Hook interface

Snple MQTT provides a Hook interface for extending server functionality.

type Hook interface {
	// When the client connects to the server
	// If the return is false, the client will be rejected.
	Connect(*Server, *Client) bool

	// When the client disconnects
	DisConnect(*Server, *Client, error)

	// When the server receives a packet.
	// If the return is false, it will cancel the operation.
	Recv(*Server, *Client, *packets.Packet) bool

	// When the server sends a packet.
	// If the return is false, it will cancel the operation.
	Send(*Server, *Client, *packets.Packet) bool

	// When the server receives a message from the client publish.
	// If the return is false, it will cancel the operation.
	Emit(*Server, *Client, *packets.Packet) bool

	// When the server pushes a message to the client
	// If the return is false, it will cancel the operation.
	Push(*Server, *Client, *packets.Packet) bool
}

With this interface, you can debug more easily, and:

func (*MyHook) Emit(server *mqtt.Server, client *mqtt.Client, pk *packets.Packet) bool {
    log.Printf("Client publish: %v, topic: %v, payload:%v", client.ID, pk.TopicName, pk.Payload)

    if pk.TopicName == "time" {
        server.PublishToClientByID(
            client.ID,  // client id
            "time_ack", // topic
            []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
            1,     // qos
            false, // retain
        )
    }

    return true
}

This code demonstrates that when a client sends a message with topic of "time" to the server, the server gives direct feedback to the client.

Contributions

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request.

Documentation

Overview

packet server provides a MQTT 3.1.1 compliant MQTT server.

Index

Constants

View Source
const (
	Version = "1.0.0" // the server version.

)

Variables

View Source
var (
	ErrListenerIDExists     = errors.New("Listener id already exists")
	ErrReadConnectInvalid   = errors.New("Connect packet was not valid")
	ErrConnectNotAuthorized = errors.New("Connect packet was not authorized")
	ErrInvalidTopic         = errors.New("Cannot publish to $ and $SYS topics")

	// SysTopicInterval is the number of milliseconds between $SYS topic publishes.
	SysTopicInterval time.Duration = 30000
)
View Source
var (
	ErrConnectionClosed = errors.New("Connection not open")
)

Functions

This section is empty.

Types

type Auth

type Auth interface {

	// Auth authenticates a user on CONNECT and returns true if a user is
	// allowed to join the server.
	Auth(client *Client) bool

	// ACL returns true if a user has read or write access to a given topic.
	ACL(client *Client, topic string, write bool) bool
}

Auth is an interface for authentication controllers.

type AuthAllow

type AuthAllow struct{}

Allow is an auth controller which allows access to all connections and topics.

func (*AuthAllow) ACL

func (a *AuthAllow) ACL(client *Client, topic string, write bool) bool

ACL returns true if a user has access permissions to read or write on a topic. Allow always returns true.

func (*AuthAllow) Auth

func (a *AuthAllow) Auth(client *Client) bool

Auth returns true if a username and password are acceptable. Allow always returns true.

type AuthDisallow

type AuthDisallow struct{}

Disallow is an auth controller which disallows access to all connections and topics.

func (*AuthDisallow) ACL

func (d *AuthDisallow) ACL(client *Client, topic string, write bool) bool

ACL returns true if a user has access permissions to read or write on a topic. Disallow always returns false.

func (*AuthDisallow) Auth

func (d *AuthDisallow) Auth(client *Client) bool

Auth returns true if a username and password are acceptable. Disallow always returns false.

type Client

type Client struct {
	sync.RWMutex

	ID            string               // the client id.
	AC            Auth                 // an auth controller inherited from the listener.
	Subscriptions topics.Subscriptions // a map of the subscription filters a client maintains.
	Listener      string               // the id of the listener the client is connected to.
	Inflight      *Inflight            // a map of in-flight qos messages.
	Username      []byte               // the username the client authenticated with.
	Password      []byte               // the password the client authenticated with.

	LWT   LWT   // the last will and testament for the client.
	State State // the operational state of the client.
	// contains filtered or unexported fields
}

Client contains information about a client known by the broker.

func NewClient

func NewClient(c net.Conn, r *circ.Reader, w *circ.Writer, s *system.Info) *Client

NewClient returns a new instance of Client.

func NewClientStub

func NewClientStub(s *system.Info) *Client

NewClientStub returns an instance of Client with basic initializations. This method is typically called by the persistence restoration system.

func (*Client) ForgetSubscription

func (c *Client) ForgetSubscription(filter string)

ForgetSubscription forgests a subscription note for the client.

func (*Client) Identify

func (c *Client) Identify(listenerID string, pk packets.Packet, ac Auth)

Identify sets the identification values of a client instance.

func (*Client) LocalAddr

func (c *Client) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Client) NextPacketID

func (c *Client) NextPacketID() uint32

NextPacketID returns the next packet id for a client, looping back to 0 if the maximum ID has been reached.

func (*Client) NoteSubscription

func (c *Client) NoteSubscription(filter string, qos byte)

NoteSubscription makes a note of a subscription for the client.

func (*Client) Read

func (c *Client) Read(h func(*Client, packets.Packet) error) error

Read reads new packets from a client connection

func (*Client) ReadFixedHeader

func (c *Client) ReadFixedHeader(fh *packets.FixedHeader) error

readFixedHeader reads in the values of the next packet's fixed header.

func (*Client) ReadPacket

func (c *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error)

ReadPacket reads the remaining buffer into an MQTT packet.

func (*Client) RemoteAddr

func (c *Client) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*Client) Start

func (c *Client) Start()

Start begins the client goroutines reading and writing packets.

func (*Client) Stop

func (c *Client) Stop()

Stop instructs the client to shut down all processing goroutines and disconnect.

func (*Client) WritePacket

func (c *Client) WritePacket(pk packets.Packet) (n int, err error)

WritePacket encodes and writes a packet to the client.

type Clients

type Clients struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Clients contains a map of the clients known by the broker.

func NewClients

func NewClients() *Clients

New returns an instance of Clients.

func (*Clients) Add

func (c *Clients) Add(val *Client)

Add adds a new client to the clients map, keyed on client id.

func (*Clients) Delete

func (c *Clients) Delete(id string)

Delete removes a client from the internal map.

func (*Clients) Get

func (c *Clients) Get(id string) (*Client, bool)

Get returns the value of a client if it exists.

func (*Clients) GetByListener

func (c *Clients) GetByListener(id string) []*Client

GetByListener returns clients matching a listener id.

func (*Clients) Len

func (c *Clients) Len() int

Len returns the length of the clients map.

type CloseFunc

type CloseFunc func(id string)

CloseFunc is a callback function for closing all listener clients.

type EstablishFunc

type EstablishFunc func(id string, c net.Conn, auth Auth) error

EstablishFunc is a callback function for establishing new clients.

type Hook

type Hook interface {
	// When the client connects to the server
	// If the return is false, the client will be rejected.
	Connect(*Server, *Client) bool

	// When the client disconnects
	DisConnect(*Server, *Client, error)

	// When the server receives a packet.
	// If the return is false, it will cancel the operation.
	Recv(*Server, *Client, *packets.Packet) bool

	// When the server sends a packet.
	// If the return is false, it will cancel the operation.
	Send(*Server, *Client, *packets.Packet) bool

	// When the server receives a message from the client publish.
	// If the return is false, it will cancel the operation.
	Emit(*Server, *Client, *packets.Packet) bool

	// When the server pushes a message to the client
	// If the return is false, it will cancel the operation.
	Push(*Server, *Client, *packets.Packet) bool
}

Hook is the server hook interface.

type Inflight

type Inflight struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Inflight is a map of InflightMessage keyed on packet id.

func NewInflight

func NewInflight() *Inflight

func (*Inflight) Delete

func (i *Inflight) Delete(key uint16) bool

Delete removes an in-flight message from the map. Returns true if the message existed.

func (*Inflight) Get

func (i *Inflight) Get(key uint16) (InflightMessage, bool)

Get returns the value of an in-flight message if it exists.

func (*Inflight) GetAll

func (i *Inflight) GetAll() map[uint16]InflightMessage

GetAll returns all the in-flight messages.

func (*Inflight) Len

func (i *Inflight) Len() int

Len returns the size of the in-flight messages map.

func (*Inflight) Set

func (i *Inflight) Set(key uint16, in InflightMessage) bool

Set stores the packet of an Inflight message, keyed on message id. Returns true if the inflight message was new.

type InflightMessage

type InflightMessage struct {
	Packet  packets.Packet // the packet currently in-flight.
	Sent    int64          // the last time the message was sent (for retries) in unixtime.
	Resends int            // the number of times the message was attempted to be sent.
}

InflightMessage contains data about a packet which is currently in-flight.

type LWT

type LWT struct {
	Topic   string // the topic the will message shall be sent to.
	Message []byte // the message that shall be sent when the client disconnects.
	Qos     byte   // the quality of service desired.
	Retain  bool   // indicates whether the will message should be retained
}

LWT contains the last will and testament details for a client connection.

type Listener

type Listener interface {
	Listen(s *system.Info) error // open the network address.
	Serve(EstablishFunc) error   // starting actively listening for new connections.
	ID() string                  // return the id of the listener.
	Auth() Auth
	Close(CloseFunc) // stop and close the listener.
}

Listener is an interface for network listeners. A network listener listens for incoming client connections and adds them to the server.

type Listeners

type Listeners struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Listeners contains the network listeners for the broker.

func NewListeners

func NewListeners(s *system.Info) *Listeners

New returns a new instance of Listeners.

func (*Listeners) Add

func (l *Listeners) Add(val Listener)

Add adds a new listener to the listeners map, keyed on id.

func (*Listeners) Close

func (l *Listeners) Close(id string, closer CloseFunc)

Close stops a listener from the internal map.

func (*Listeners) CloseAll

func (l *Listeners) CloseAll(closer CloseFunc)

CloseAll iterates and closes all registered listeners.

func (*Listeners) Delete

func (l *Listeners) Delete(id string)

Delete removes a listener from the internal map.

func (*Listeners) Get

func (l *Listeners) Get(id string) (Listener, bool)

Get returns the value of a listener if it exists.

func (*Listeners) Len

func (l *Listeners) Len() int

Len returns the length of the listeners map.

func (*Listeners) Serve

func (l *Listeners) Serve(id string, establisher EstablishFunc)

Serve starts a listener serving from the internal map.

func (*Listeners) ServeAll

func (l *Listeners) ServeAll(establisher EstablishFunc)

ServeAll starts all listeners serving from the internal map.

type NilHook

type NilHook struct{}

func (*NilHook) Connect

func (*NilHook) Connect(*Server, *Client) bool

func (*NilHook) DisConnect

func (*NilHook) DisConnect(*Server, *Client, error)

func (*NilHook) Emit

func (*NilHook) Emit(*Server, *Client, *packets.Packet) bool

func (*NilHook) Push

func (*NilHook) Push(*Server, *Client, *packets.Packet) bool

func (*NilHook) Recv

func (*NilHook) Recv(*Server, *Client, *packets.Packet) bool

func (*NilHook) Send

func (*NilHook) Send(*Server, *Client, *packets.Packet) bool

type Server

type Server struct {
	Listeners *Listeners        // listeners are network interfaces which listen for new connections.
	Clients   *Clients          // clients which are known to the broker.
	Topics    *topics.Index     // an index of topic filter subscriptions and retained messages.
	System    *system.Info      // values about the server commonly found in $SYS topics.
	Store     persistence.Store // a persistent storage backend if desired.
	Hook      Hook
	// contains filtered or unexported fields
}

Server is an MQTT broker server. It should be created with server.New() in order to ensure all the internal fields are correctly populated.

func New

func New() *Server

New returns a new instance of an MQTT broker.

func (*Server) AddListener

func (s *Server) AddListener(listener Listener) error

AddListener adds a new network listener to the server.

func (*Server) Close

func (s *Server) Close() error

Close attempts to gracefully shutdown the server, all listeners, clients, and stores.

func (*Server) EstablishConnection

func (s *Server) EstablishConnection(listenerID string, c net.Conn, auth Auth) error

EstablishConnection establishes a new client when a listener accepts a new connection.

func (*Server) Publish

func (s *Server) Publish(topic string, payload []byte, qos byte, retain bool)

func (*Server) PublishToClient

func (s *Server) PublishToClient(c *Client, out packets.Packet)

func (*Server) PublishToClientByID

func (s *Server) PublishToClientByID(id string, topic string, payload []byte, qos byte, retain bool) error

func (*Server) PublishToSubscribers

func (s *Server) PublishToSubscribers(pk packets.Packet)

publishToSubscribers publishes a publish packet to all subscribers with matching topic filters.

func (*Server) ResendClientInflight

func (s *Server) ResendClientInflight(c *Client, force bool) error

ResendClientInflight attempts to resend all undelivered inflight messages to a

func (*Server) Serve

func (s *Server) Serve() error

Serve starts the event loops responsible for establishing client connections on all attached listeners, and publishing the system topics.

func (*Server) SetHook

func (s *Server) SetHook(h Hook)

func (*Server) SetStore

func (s *Server) SetStore(p persistence.Store) error

SetStore assigns a persistent storage backend to the server. This must be called before calling server.Server().

type State

type State struct {
	Done int32 // atomic counter which indicates that the client has closed.
	// contains filtered or unexported fields
}

State tracks the state of the client.

Directories

Path Synopsis
examples
tcp
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL