mqtt

package
v2.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: MIT Imports: 22 Imported by: 1

Documentation

Overview

package mqtt provides a high performance, fully compliant MQTT v5 broker server with v3.1.1 backward compatibility.

Index

Constants

View Source
const (
	InheritWayNew = iota
	InheritWayLocal
	InheritWayRemote
)

Client session inheritance way

View Source
const (
	SetOptions byte = iota
	OnSysInfoTick
	OnStarted
	OnStopped
	OnConnectAuthenticate
	OnACLCheck
	OnConnect
	OnSessionEstablish
	OnSessionEstablished
	OnDisconnect
	OnAuthPacket
	OnPacketRead
	OnPacketEncode
	OnPacketSent
	OnPacketProcessed
	OnSubscribe
	OnSubscribed
	OnSelectSubscribers
	OnUnsubscribe
	OnUnsubscribed
	OnPublish
	OnPublished
	OnPublishDropped
	OnRetainMessage
	OnRetainPublished
	OnQosPublish
	OnQosComplete
	OnQosDropped
	OnPacketIDExhausted
	OnWill
	OnWillSent
	OnClientExpired
	OnRetainedExpired
	OnPublishedWithSharedFilters
	StoredClients
	StoredSubscriptions
	StoredInflightMessages
	StoredRetainedMessages
	StoredSysInfo
	StoredClientByCid
	StoredSubscriptionsByCid
	StoredInflightMessagesByCid
	StoredRetainedMessageByTopic
)
View Source
const (
	Version = "2.4.0" // the current server version.

	LocalListener  = "local"
	InlineClientId = "inline"
)

Variables

View Source
var (
	// DefaultServerCapabilities defines the default features and capabilities provided by the server.
	DefaultServerCapabilities = &Capabilities{
		MaximumSessionExpiryInterval: math.MaxUint32,
		MaximumMessageExpiryInterval: 60 * 60 * 24,
		ReceiveMaximum:               1024,
		MaximumQos:                   2,
		RetainAvailable:              1,
		MaximumPacketSize:            0,
		TopicAliasMaximum:            math.MaxUint16,
		WildcardSubAvailable:         1,
		SubIDAvailable:               1,
		SharedSubAvailable:           1,
		MinimumProtocolVersion:       3,
		MaximumClientWritesPending:   1024 * 8,
	}

	ErrListenerIDExists       = errors.New("listener id already exists")                               // a listener with the same id already exists
	ErrConnectionClosed       = errors.New("connection not open")                                      // connection is closed
	ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default
)
View Source
var (
	SharePrefix = "$SHARE" // the prefix indicating a share topic
	SysPrefix   = "$SYS"   // the prefix indicating a system info topic
)
View Source
var (
	// ErrInvalidConfigType indicates a different Type of config value was expected to what was received.
	ErrInvalidConfigType = errors.New("invalid config type provided")
)

Functions

func AtomicItoa

func AtomicItoa(ptr *int64) string

AtomicItoa converts an int64 point to a string.

func IsSharedFilter

func IsSharedFilter(filter string) bool

IsSharedFilter returns true if the filter uses the share prefix.

func IsValidFilter

func IsValidFilter(filter string, forPublish bool) bool

IsValidFilter returns true if the filter is valid.

Types

type Capabilities

type Capabilities struct {
	MaximumMessageExpiryInterval int64  `yaml:"maximum-message-expiry-interval"`
	MaximumClientWritesPending   int32  `yaml:"maximum-client-writes-pending"`
	MaximumSessionExpiryInterval uint32 `yaml:"maximum-session-expiry-interval"`
	MaximumPacketSize            uint32 `yaml:"maximum-packet-size"`

	ReceiveMaximum         uint16 `yaml:"receive-maximum"`
	TopicAliasMaximum      uint16 `yaml:"topic-alias-maximum"`
	SharedSubAvailable     byte   `yaml:"shared-sub-available"`
	MinimumProtocolVersion byte   `yaml:"minimum-protocol-version"`
	Compatibilities        Compatibilities
	MaximumQos             byte `yaml:"maximum-qos"`
	RetainAvailable        byte `yaml:"retain-available"`
	WildcardSubAvailable   byte `yaml:"wildcard-sub-available"`
	SubIDAvailable         byte `yaml:"sub-id-available"`
	// contains filtered or unexported fields
}

Capabilities indicates the capabilities and features provided by the server.

type Client

type Client struct {
	Properties ClientProperties // client properties
	State      ClientState      // the operational state of the client.
	Net        ClientConnection // network connection state of the client
	ID         string           // the client id.

	sync.RWMutex                        // mutex
	InheritWay   int                    // session inheritance way
	Ext          map[string]interface{} // client extension.
	// contains filtered or unexported fields
}

Client contains information about a client known by the broker.

func (*Client) ClearInflights

func (cl *Client) ClearInflights(now, maximumExpiry int64) []uint16

ClearInflights deletes all inflight messages for the client, e.g. for a disconnected user with a clean session.

func (*Client) Closed

func (cl *Client) Closed() bool

Closed returns true if client connection is closed.

func (*Client) NextPacketID

func (cl *Client) NextPacketID() (i uint32, err error)

NextPacketID returns the next available (unused) packet id for the client. If no unused packet ids are available, an error is returned and the client should be disconnected.

func (*Client) ParseConnect

func (cl *Client) ParseConnect(lid string, pk packets.Packet)

ParseConnect parses the connect parameters and properties for a client.

func (*Client) Read

func (cl *Client) Read(packetHandler ReadFn) error

Read reads incoming packets from the connected client and transforms them into packets to be handled by the packetHandler.

func (*Client) ReadFixedHeader

func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error

ReadFixedHeader reads in the values of the next packet's fixed header.

func (*Client) ReadPacket

func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error)

ReadPacket reads the remaining buffer into an MQTT packet.

func (*Client) ResendInflightMessages

func (cl *Client) ResendInflightMessages(force bool) error

ResendInflightMessages attempts to resend any pending inflight messages to connected clients.

func (*Client) Stop

func (cl *Client) Stop(err error)

Stop instructs the client to shut down all processing goroutines and disconnect.

func (*Client) StopCause

func (cl *Client) StopCause() error

StopCause returns the reason the client connection was stopped, if any.

func (*Client) WriteLoop

func (cl *Client) WriteLoop()

WriteLoop ranges over pending outbound messages and writes them to the client connection.

func (*Client) WritePacket

func (cl *Client) WritePacket(pk packets.Packet) error

WritePacket encodes and writes a packet to the client.

type ClientConnection

type ClientConnection struct {
	Conn net.Conn // the net.Conn used to establish the connection

	Remote   string // the remote address of the client
	Listener string // listener id of the client
	Inline   bool   // if true, the client is the built-in 'inline' embedded client
	// contains filtered or unexported fields
}

ClientConnection contains the connection transport and metadata for the client.

type ClientProperties

type ClientProperties struct {
	Props           packets.Properties
	Will            Will
	Username        []byte
	ProtocolVersion byte
	Clean           bool
}

ClientProperties contains the properties which define the client behaviour.

type ClientState

type ClientState struct {
	TopicAliases TopicAliases // a map of topic aliases

	Inflight      *Inflight      // a map of in-flight qos messages
	Subscriptions *Subscriptions // a map of the subscription filters a client maintains

	Keepalive       uint16 // the number of seconds the connection can wait
	ServerKeepalive bool   // keepalive was set by the server
	// contains filtered or unexported fields
}

ClientState tracks the state of the client.

type ClientSubscriptions

type ClientSubscriptions map[string]packets.Subscription

ClientSubscriptions is a map of aggregated subscriptions for a client.

type Clients

type Clients struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Clients contains a map of the clients known by the broker.

func NewClients

func NewClients() *Clients

NewClients returns an instance of Clients.

func (*Clients) Add

func (cl *Clients) Add(val *Client)

Add adds a new client to the clients map, keyed on client id.

func (*Clients) Delete

func (cl *Clients) Delete(id string)

Delete removes a client from the internal map.

func (*Clients) Get

func (cl *Clients) Get(id string) (*Client, bool)

Get returns the value of a client if it exists.

func (*Clients) GetAll

func (cl *Clients) GetAll() map[string]*Client

GetAll returns all the clients.

func (*Clients) GetByListener

func (cl *Clients) GetByListener(id string) []*Client

GetByListener returns clients matching a listener id.

func (*Clients) Len

func (cl *Clients) Len() int

Len returns the length of the clients map.

type Compatibilities

type Compatibilities struct {
	ObscureNotAuthorized       bool `yaml:"obscure-not-authorized"`    // return unspecified errors instead of not authorized
	PassiveClientDisconnect    bool `yaml:"passive-client-disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho)
	AlwaysReturnResponseInfo   bool `yaml:"always-return-response"`    // always return response info (useful for testing)
	RestoreSysInfoOnRestart    bool `yaml:"restore-sys-info-restart"`  // restore system info from store as if server never stopped
	NoInheritedPropertiesOnAck bool // don't allow inherited user properties on ack (paho - spec violation)
}

Compatibilities provides flags for using compatibility modes.

type Hook

type Hook interface {
	ID() string
	Provides(b byte) bool
	Init(config any) error
	Stop() error
	SetOpts(l *slog.Logger, o *HookOptions)
	OnStarted()
	OnStopped()
	OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
	OnACLCheck(cl *Client, topic string, write bool) bool
	OnSysInfoTick(*system.Info)
	OnConnect(cl *Client, pk packets.Packet) error
	OnSessionEstablish(cl *Client, pk packets.Packet)
	OnSessionEstablished(cl *Client, pk packets.Packet)
	OnDisconnect(cl *Client, err error, expire bool)
	OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)
	OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) // triggers when a new packet is received by a client, but before packet validation
	OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet        // modify a packet before it is byte-encoded and written to the client
	OnPacketSent(cl *Client, pk packets.Packet, b []byte)               // triggers when packet bytes have been written to the client
	OnPacketProcessed(cl *Client, pk packets.Packet, err error)         // triggers after a packet from the client been processed (handled)
	OnSubscribe(cl *Client, pk packets.Packet) packets.Packet
	OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int) // counts is an array of the number of subscribers for the same filter
	OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
	OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet
	OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)
	OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)
	OnPublished(cl *Client, pk packets.Packet)
	OnPublishDropped(cl *Client, pk packets.Packet)
	OnRetainMessage(cl *Client, pk packets.Packet, r int64)
	OnRetainPublished(cl *Client, pk packets.Packet)
	OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
	OnQosComplete(cl *Client, pk packets.Packet)
	OnQosDropped(cl *Client, pk packets.Packet)
	OnPacketIDExhausted(cl *Client, pk packets.Packet)
	OnWill(cl *Client, will Will) (Will, error)
	OnWillSent(cl *Client, pk packets.Packet)
	OnClientExpired(cl *Client)
	OnRetainedExpired(filter string)
	OnPublishedWithSharedFilters(pk packets.Packet, sharedFilters map[string]bool)
	StoredClients() ([]storage.Client, error)
	StoredSubscriptions() ([]storage.Subscription, error)
	StoredInflightMessages() ([]storage.Message, error)
	StoredRetainedMessages() ([]storage.Message, error)
	StoredSysInfo() (storage.SystemInfo, error)
	StoredClientByCid(cid string) (storage.Client, error)
	StoredSubscriptionsByCid(cid string) ([]storage.Subscription, error)
	StoredInflightMessagesByCid(cid string) ([]storage.Message, error)
	StoredRetainedMessageByTopic(topic string) (storage.Message, error)
}

Hook provides an interface of handlers for different events which occur during the lifecycle of the broker.

type HookBase

type HookBase struct {
	Hook
	Log  *slog.Logger
	Opts *HookOptions
}

HookBase provides a set of default methods for each hook. It should be embedded in all hooks.

func (*HookBase) ID

func (h *HookBase) ID() string

ID returns the ID of the hook.

func (*HookBase) Init

func (h *HookBase) Init(config any) error

Init performs any pre-start initializations for the hook, such as connecting to databases or opening files.

func (*HookBase) OnACLCheck

func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool

OnACLCheck is called when a user attempts to subscribe or publish to a topic.

func (*HookBase) OnAuthPacket

func (h *HookBase) OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)

OnAuthPacket is called when an auth packet is received from the client.

func (*HookBase) OnClientExpired

func (h *HookBase) OnClientExpired(cl *Client)

OnClientExpired is called when a client session has expired.

func (*HookBase) OnConnect

func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error

OnConnect is called when a new client connects.

func (*HookBase) OnConnectAuthenticate

func (h *HookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool

OnConnectAuthenticate is called when a user attempts to authenticate with the server.

func (*HookBase) OnDisconnect

func (h *HookBase) OnDisconnect(cl *Client, err error, expire bool)

OnDisconnect is called when a client is disconnected for any reason.

func (*HookBase) OnPacketEncode

func (h *HookBase) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet

OnPacketEncode is called before a packet is byte-encoded and written to the client.

func (*HookBase) OnPacketIDExhausted

func (h *HookBase) OnPacketIDExhausted(cl *Client, pk packets.Packet)

OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.

func (*HookBase) OnPacketProcessed

func (h *HookBase) OnPacketProcessed(cl *Client, pk packets.Packet, err error)

OnPacketProcessed is called immediately after a packet from a client is processed.

func (*HookBase) OnPacketRead

func (h *HookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error)

OnPacketRead is called when a packet is received.

func (*HookBase) OnPacketSent

func (h *HookBase) OnPacketSent(cl *Client, pk packets.Packet, b []byte)

OnPacketSent is called immediately after a packet is written to a client.

func (*HookBase) OnPublish

func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)

OnPublish is called when a client publishes a message.

func (*HookBase) OnPublishDropped

func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet)

OnPublishDropped is called when a message to a client is dropped instead of being delivered.

func (*HookBase) OnPublished

func (h *HookBase) OnPublished(cl *Client, pk packets.Packet)

OnPublished is called when a client has published a message to subscribers.

func (*HookBase) OnPublishedWithSharedFilters added in v2.5.4

func (h *HookBase) OnPublishedWithSharedFilters(pk packets.Packet, sharedFilters map[string]bool)

OnPublishedWithSharedFilters is called when a client has published a message to cluster.

func (*HookBase) OnQosComplete

func (h *HookBase) OnQosComplete(cl *Client, pk packets.Packet)

OnQosComplete is called when the Qos flow for a message has been completed.

func (*HookBase) OnQosDropped

func (h *HookBase) OnQosDropped(cl *Client, pk packets.Packet)

OnQosDropped is called the Qos flow for a message expires.

func (*HookBase) OnQosPublish

func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)

OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.

func (*HookBase) OnRetainMessage

func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64)

OnRetainMessage is called then a published message is retained.

func (*HookBase) OnRetainPublished added in v2.3.7

func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet)

OnRetainPublished is called when a retained message is published.

func (*HookBase) OnRetainedExpired

func (h *HookBase) OnRetainedExpired(topic string)

OnRetainedExpired is called when a retained message for a topic has expired.

func (*HookBase) OnSelectSubscribers

func (h *HookBase) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers

OnSelectSubscribers is called when selecting subscribers to receive a message.

func (*HookBase) OnSessionEstablish added in v2.3.7

func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet)

OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.

func (*HookBase) OnSessionEstablished

func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet)

OnSessionEstablished is called when a new client establishes a session (after OnConnect).

func (*HookBase) OnStarted

func (h *HookBase) OnStarted()

OnStarted is called when the server starts.

func (*HookBase) OnStopped

func (h *HookBase) OnStopped()

OnStopped is called when the server stops.

func (*HookBase) OnSubscribe

func (h *HookBase) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet

OnSubscribe is called when a client subscribes to one or more filters.

func (*HookBase) OnSubscribed

func (h *HookBase) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)

OnSubscribed is called when a client subscribes to one or more filters.

func (*HookBase) OnSysInfoTick

func (h *HookBase) OnSysInfoTick(*system.Info)

OnSysInfoTick is called when the server publishes system info.

func (*HookBase) OnUnsubscribe

func (h *HookBase) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet

OnUnsubscribe is called when a client unsubscribes from one or more filters.

func (*HookBase) OnUnsubscribed

func (h *HookBase) OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)

OnUnsubscribed is called when a client unsubscribes from one or more filters.

func (*HookBase) OnWill

func (h *HookBase) OnWill(cl *Client, will Will) (Will, error)

OnWill is called when a client disconnects and publishes an LWT message.

func (*HookBase) OnWillSent

func (h *HookBase) OnWillSent(cl *Client, pk packets.Packet)

OnWillSent is called when an LWT message has been issued from a disconnecting client.

func (*HookBase) Provides

func (h *HookBase) Provides(b byte) bool

Provides indicates which methods a hook provides. The default is none - this method should be overridden by the embedding hook.

func (*HookBase) SetOpts

func (h *HookBase) SetOpts(l *slog.Logger, opts *HookOptions)

SetOpts is called by the server to propagate internal values and generally should not be called manually.

func (*HookBase) Stop

func (h *HookBase) Stop() error

Stop is called to gracefully shut down the hook.

func (*HookBase) StoredClientByCid

func (h *HookBase) StoredClientByCid(cid string) (v storage.Client, err error)

StoredClientByCid returns a client from a store.

func (*HookBase) StoredClients

func (h *HookBase) StoredClients() (v []storage.Client, err error)

StoredClients returns all clients from a store.

func (*HookBase) StoredInflightMessages

func (h *HookBase) StoredInflightMessages() (v []storage.Message, err error)

StoredInflightMessages returns all inflight messages from a store.

func (*HookBase) StoredInflightMessagesByCid

func (h *HookBase) StoredInflightMessagesByCid(cid string) (v []storage.Message, err error)

StoredInflightMessagesByCid returns all inflight messages of client from a store.

func (*HookBase) StoredRetainedMessageByTopic

func (h *HookBase) StoredRetainedMessageByTopic(topic string) (v storage.Message, err error)

StoredRetainedMessageByTopic returns a retained message of topic from a store.

func (*HookBase) StoredRetainedMessages

func (h *HookBase) StoredRetainedMessages() (v []storage.Message, err error)

StoredRetainedMessages returns all retained messages from a store.

func (*HookBase) StoredSubscriptions

func (h *HookBase) StoredSubscriptions() (v []storage.Subscription, err error)

StoredSubscriptions returns all subcriptions from a store.

func (*HookBase) StoredSubscriptionsByCid

func (h *HookBase) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)

StoredSubscriptionsByCid returns all subcriptions of client from a store.

func (*HookBase) StoredSysInfo

func (h *HookBase) StoredSysInfo() (v storage.SystemInfo, err error)

StoredSysInfo returns a set of system info values.

type HookOptions

type HookOptions struct {
	Capabilities *Capabilities
}

HookOptions contains values which are inherited from the server on initialisation.

type Hooks

type Hooks struct {
	Log *slog.Logger // a logger for the hook (from the server)

	sync.Mutex // a mutex for locking when adding hooks
	// contains filtered or unexported fields
}

Hooks is a slice of Hook interfaces to be called in sequence.

func (*Hooks) Add

func (h *Hooks) Add(hook Hook, config any) error

Add adds and initializes a new hook.

func (*Hooks) GetAll

func (h *Hooks) GetAll() []Hook

GetAll returns a slice of all the hooks.

func (*Hooks) Len

func (h *Hooks) Len() int64

Len returns the number of hooks added.

func (*Hooks) OnACLCheck

func (h *Hooks) OnACLCheck(cl *Client, topic string, write bool) bool

OnACLCheck is called when a user attempts to publish or subscribe to a topic filter. An implementation of this method MUST be used to allow or deny access to the (see hooks/auth/allow_all or basic). It can be used in custom hooks to check publishing and subscribing users against an existing permissions or roles database.

func (*Hooks) OnAuthPacket

func (h *Hooks) OnAuthPacket(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnAuthPacket is called when an auth packet is received. It is intended to allow developers to create their own auth packet handling mechanisms.

func (*Hooks) OnClientExpired

func (h *Hooks) OnClientExpired(cl *Client)

OnClientExpired is called when a client session has expired and should be deleted.

func (*Hooks) OnConnect

func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error

OnConnect is called when a new client connects, and may return a packets.Code as an error to halt the connection.

func (*Hooks) OnConnectAuthenticate

func (h *Hooks) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool

OnConnectAuthenticate is called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database.

func (*Hooks) OnDisconnect

func (h *Hooks) OnDisconnect(cl *Client, err error, expire bool)

OnDisconnect is called when a client is disconnected for any reason.

func (*Hooks) OnPacketEncode

func (h *Hooks) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet

OnPacketEncode is called immediately before a packet is encoded to be sent to a client.

func (*Hooks) OnPacketIDExhausted

func (h *Hooks) OnPacketIDExhausted(cl *Client, pk packets.Packet)

OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.

func (*Hooks) OnPacketProcessed

func (h *Hooks) OnPacketProcessed(cl *Client, pk packets.Packet, err error)

OnPacketProcessed is called when a packet has been received and successfully handled by the broker.

func (*Hooks) OnPacketRead

func (h *Hooks) OnPacketRead(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnPacketRead is called when a packet is received from a client.

func (*Hooks) OnPacketSent

func (h *Hooks) OnPacketSent(cl *Client, pk packets.Packet, b []byte)

OnPacketSent is called when a packet has been sent to a client. It takes a bytes parameter containing the bytes sent.

func (*Hooks) OnPublish

func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnPublish is called when a client publishes a message. This method differs from OnPublished in that it allows you to modify you to modify the incoming packet before it is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnPublishDropped

func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet)

OnPublishDropped is called when a message to a client was dropped instead of delivered such as when a client is too slow to respond.

func (*Hooks) OnPublished

func (h *Hooks) OnPublished(cl *Client, pk packets.Packet)

OnPublished is called when a client has published a message to subscribers.

func (*Hooks) OnPublishedWithSharedFilters added in v2.5.1

func (h *Hooks) OnPublishedWithSharedFilters(pk packets.Packet, sharedFilters map[string]bool)

OnPublishedWithSharedFilters is called when a client has published a message to cluster.

func (*Hooks) OnQosComplete

func (h *Hooks) OnQosComplete(cl *Client, pk packets.Packet)

OnQosComplete is called when the Qos flow for a message has been completed. In other words, when an inflight message is resolved. It is typically used to delete an inflight message from a store.

func (*Hooks) OnQosDropped

func (h *Hooks) OnQosDropped(cl *Client, pk packets.Packet)

OnQosDropped is called the Qos flow for a message expires. In other words, when an inflight message expires or is abandoned. It is typically used to delete an inflight message from a store.

func (*Hooks) OnQosPublish

func (h *Hooks) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)

OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber. In other words, this method is called when a new inflight message is created or resent. It is typically used to store a new inflight message.

func (*Hooks) OnRetainMessage

func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64)

OnRetainMessage is called then a published message is retained.

func (*Hooks) OnRetainPublished added in v2.3.7

func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet)

OnRetainPublished is called when a retained message is published.

func (*Hooks) OnRetainedExpired

func (h *Hooks) OnRetainedExpired(filter string)

OnRetainedExpired is called when a retained message has expired and should be deleted.

func (*Hooks) OnSelectSubscribers

func (h *Hooks) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers

OnSelectSubscribers is called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. This hook can be used to programmatically remove or add clients to a publish to subscribers process, or to select the subscriber for a shared group in a custom manner (such as based on client id, ip, etc).

func (*Hooks) OnSessionEstablish added in v2.3.7

func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet)

OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.

func (*Hooks) OnSessionEstablished

func (h *Hooks) OnSessionEstablished(cl *Client, pk packets.Packet)

OnSessionEstablished is called when a new client establishes a session (after OnConnect).

func (*Hooks) OnStarted

func (h *Hooks) OnStarted()

OnStarted is called when the server has successfully started.

func (*Hooks) OnStopped

func (h *Hooks) OnStopped()

OnStopped is called when the server has successfully stopped.

func (*Hooks) OnSubscribe

func (h *Hooks) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet

OnSubscribe is called when a client subscribes to one or more filters. This method differs from OnSubscribed in that it allows you to modify the subscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnSubscribed

func (h *Hooks) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)

OnSubscribed is called when a client subscribes to one or more filters.

func (*Hooks) OnSysInfoTick

func (h *Hooks) OnSysInfoTick(sys *system.Info)

OnSysInfoTick is called when the $SYS topic values are published out.

func (*Hooks) OnUnsubscribe

func (h *Hooks) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet

OnUnsubscribe is called when a client unsubscribes from one or more filters. This method differs from OnUnsubscribed in that it allows you to modify the unsubscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnUnsubscribed

func (h *Hooks) OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)

OnUnsubscribed is called when a client unsubscribes from one or more filters.

func (*Hooks) OnWill

func (h *Hooks) OnWill(cl *Client, will Will) Will

OnWill is called when a client disconnects and publishes an LWT message. This method differs from OnWillSent in that it allows you to modify the LWT message before it is published. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnWillSent

func (h *Hooks) OnWillSent(cl *Client, pk packets.Packet)

OnWillSent is called when an LWT message has been issued from a disconnecting client.

func (*Hooks) Provides

func (h *Hooks) Provides(b ...byte) bool

Provides returns true if any one hook provides any of the requested hook methods.

func (*Hooks) Stop

func (h *Hooks) Stop()

Stop indicates all attached hooks to gracefully end.

func (*Hooks) StoredClientByCid

func (h *Hooks) StoredClientByCid(cid string) (v storage.Client, err error)

StoredClientByCid returns a clients, e.g. from a persistent store.

func (*Hooks) StoredClients

func (h *Hooks) StoredClients() (v []storage.Client, err error)

StoredClients returns all clients, e.g. from a persistent store, is used to populate the server clients list before start.

func (*Hooks) StoredInflightMessages

func (h *Hooks) StoredInflightMessages() (v []storage.Message, err error)

StoredInflightMessages returns all inflight messages, e.g. from a persistent store, and is used to populate the restored clients with inflight messages before start.

func (*Hooks) StoredInflightMessagesByCid

func (h *Hooks) StoredInflightMessagesByCid(cid string) (v []storage.Message, err error)

StoredInflightMessagesByCid returns all inflight messages, e.g. from a persistent store.

func (*Hooks) StoredRetainedMessageByTopic

func (h *Hooks) StoredRetainedMessageByTopic(topic string) (v storage.Message, err error)

StoredRetainedMessageByTopic returns a retained message, e.g. from a persistent store.

func (*Hooks) StoredRetainedMessages

func (h *Hooks) StoredRetainedMessages() (v []storage.Message, err error)

StoredRetainedMessages returns all retained messages, e.g. from a persistent store, and is used to populate the server topics with retained messages before start.

func (*Hooks) StoredSubscriptions

func (h *Hooks) StoredSubscriptions() (v []storage.Subscription, err error)

StoredSubscriptions returns all subcriptions, e.g. from a persistent store, and is used to populate the server subscriptions list before start.

func (*Hooks) StoredSubscriptionsByCid

func (h *Hooks) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)

StoredSubscriptionsByCid returns all subcriptions, e.g. from a persistent store.

func (*Hooks) StoredSysInfo

func (h *Hooks) StoredSysInfo() (v storage.SystemInfo, err error)

StoredSysInfo returns a set of system info values.

type InboundTopicAliases

type InboundTopicAliases struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InboundTopicAliases contains a map of topic aliases received from the client.

func NewInboundTopicAliases

func NewInboundTopicAliases(topicAliasMaximum uint16) *InboundTopicAliases

NewInboundTopicAliases returns a pointer to InboundTopicAliases.

func (*InboundTopicAliases) Set

func (a *InboundTopicAliases) Set(id uint16, topic string) string

Set sets a new alias for a specific topic.

type Inflight

type Inflight struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Inflight is a map of InflightMessage keyed on packet id.

func NewInflights

func NewInflights() *Inflight

NewInflights returns a new instance of an Inflight packets map.

func (*Inflight) Clone

func (i *Inflight) Clone() *Inflight

Clone returns a new instance of Inflight with the same message data. This is used when transferring inflights from a taken-over session.

func (*Inflight) DecreaseReceiveQuota

func (i *Inflight) DecreaseReceiveQuota()

TakeRecieveQuota reduces the receive quota by 1.

func (*Inflight) DecreaseSendQuota

func (i *Inflight) DecreaseSendQuota()

DecreaseSendQuota reduces the send quota by 1.

func (*Inflight) Delete

func (i *Inflight) Delete(id uint16) bool

Delete removes an in-flight message from the map. Returns true if the message existed.

func (*Inflight) Get

func (i *Inflight) Get(id uint16) (packets.Packet, bool)

Get returns an inflight packet by packet id.

func (*Inflight) GetAll

func (i *Inflight) GetAll(immediate bool) []packets.Packet

GetAll returns all the inflight messages.

func (*Inflight) IncreaseReceiveQuota

func (i *Inflight) IncreaseReceiveQuota()

TakeRecieveQuota increases the receive quota by 1.

func (*Inflight) IncreaseSendQuota

func (i *Inflight) IncreaseSendQuota()

IncreaseSendQuota increases the send quota by 1.

func (*Inflight) Len

func (i *Inflight) Len() int

Len returns the size of the inflight messages map.

func (*Inflight) NextImmediate

func (i *Inflight) NextImmediate() (packets.Packet, bool)

NextImmediate returns the next inflight packet which is indicated to be sent immediately. This typically occurs when the quota has been exhausted, and we need to wait until new quota is free to continue sending.

func (*Inflight) ResetReceiveQuota

func (i *Inflight) ResetReceiveQuota(n int32)

ResetReceiveQuota resets the receive quota to the maximum allowed value.

func (*Inflight) ResetSendQuota

func (i *Inflight) ResetSendQuota(n int32)

ResetSendQuota resets the send quota to the maximum allowed value.

func (*Inflight) Set

func (i *Inflight) Set(m packets.Packet) bool

Set adds or updates an inflight packet by packet id.

type InlineSubFn added in v2.5.0

type InlineSubFn func(cl *Client, sub packets.Subscription, pk packets.Packet)

InlineSubFn is the signature for a callback function which will be called when an inline client receives a message on a topic it is subscribed to. The sub argument contains information about the subscription that was matched for any filters.

type InlineSubscription added in v2.5.0

type InlineSubscription struct {
	packets.Subscription
	Handler InlineSubFn
}

type InlineSubscriptions added in v2.5.0

type InlineSubscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InlineSubscriptions represents a map of internal subscriptions keyed on client.

func NewInlineSubscriptions added in v2.5.0

func NewInlineSubscriptions() *InlineSubscriptions

NewInlineSubscriptions returns a new instance of InlineSubscriptions.

func (*InlineSubscriptions) Add added in v2.5.0

Add adds a new internal subscription for a client id.

func (*InlineSubscriptions) Delete added in v2.5.0

func (s *InlineSubscriptions) Delete(id int)

Delete removes an internal subscription by the client id.

func (*InlineSubscriptions) Get added in v2.5.0

func (s *InlineSubscriptions) Get(id int) (val InlineSubscription, ok bool)

Get returns an internal subscription for a client id.

func (*InlineSubscriptions) GetAll added in v2.5.0

func (s *InlineSubscriptions) GetAll() map[int]InlineSubscription

GetAll returns all internal subscriptions.

func (*InlineSubscriptions) Len added in v2.5.0

func (s *InlineSubscriptions) Len() int

Len returns the number of internal subscriptions.

type Options

type Options struct {
	// Capabilities defines the server features and behaviour. If you only wish to modify
	// several of these values, set them explicitly - e.g.
	// 	server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
	Capabilities *Capabilities

	// ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer.
	ClientNetWriteBufferSize int `yaml:"client-write-buffer-size"`

	// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
	ClientNetReadBufferSize int `yaml:"client-read-buffer-size"`

	// Logger specifies a custom configured implementation of zerolog to override
	// the servers default logger configuration. If you wish to change the log level,
	// of the default logger, you can do so by setting
	// 	server := mqtt.New(nil)
	// level := new(slog.LevelVar)
	// server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
	// 	Level: level,
	// }))
	// level.Set(slog.LevelDebug)
	Logger *slog.Logger

	// SysTopicResendInterval specifies the interval between $SYS topic updates in seconds.
	SysTopicResendInterval int64 `yaml:"sys-topic-resend-interval"`

	// Enable Inline client to allow direct subscribing and publishing from the parent codebase,
	// with negligible performance difference (disabled by default to prevent confusion in statistics).
	InlineClient bool `yaml:"inline-client"`
}

Options contains configurable options for the server.

type OutboundTopicAliases

type OutboundTopicAliases struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OutboundTopicAliases contains a map of topic aliases sent from the broker to the client.

func NewOutboundTopicAliases

func NewOutboundTopicAliases(topicAliasMaximum uint16) *OutboundTopicAliases

NewOutboundTopicAliases returns a pointer to OutboundTopicAliases.

func (*OutboundTopicAliases) Set

func (a *OutboundTopicAliases) Set(topic string) (uint16, bool)

Set sets a new topic alias for a topic and returns the alias value, and a boolean indicating if the alias already existed.

type ReadFn

type ReadFn func(*Client, packets.Packet) error

ReadFn is the function signature for the function used for reading and processing new packets.

type Server

type Server struct {
	Options   *Options             // configurable server options
	Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections
	Clients   *Clients             // clients known to the broker
	Topics    *TopicsIndex         // an index of topic filter subscriptions and retained messages
	Info      *system.Info         // values about the server commonly known as $SYS topics

	Log *slog.Logger // minimal no-alloc logger
	// contains filtered or unexported fields
}

Server is an MQTT broker server. It should be created with server.New() in order to ensure all the internal fields are correctly populated.

func New

func New(opts *Options) *Server

New returns a new instance of comqtt broker. Optional parameters can be specified to override some default settings (see Options).

func (*Server) AddHook

func (s *Server) AddHook(hook Hook, config any) error

AddHook attaches a new Hook to the server. Ideally, this should be called before the server is started with s.Serve().

func (*Server) AddListener

func (s *Server) AddListener(l listeners.Listener) error

AddListener adds a new network listener to the server, for receiving incoming client connections.

func (*Server) Close

func (s *Server) Close() error

Close attempts to gracefully shut down the server, all listeners, clients, and stores.

func (*Server) DisconnectClient

func (s *Server) DisconnectClient(cl *Client, code packets.Code) error

DisconnectClient sends a Disconnect packet to a client and then closes the client connection.

func (*Server) EstablishConnection

func (s *Server) EstablishConnection(listener string, c net.Conn) error

EstablishConnection establishes a new client when a listener accepts a new connection.

func (*Server) InjectPacket

func (s *Server) InjectPacket(cl *Client, pk packets.Packet) error

InjectPacket injects a packet into the broker as if it were sent from the specified client. InlineClients using this method can publish packets to any topic (including $SYS) and bypass ACL checks.

func (*Server) NewClient

func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) *Client

NewClient returns a new Client instance, populated with all the required values and references to be used with the server. If you are using this client to directly publish messages from the embedding application, set the inline flag to true to bypass ACL and topic validation checks.

func (*Server) Publish

func (s *Server) Publish(topic string, payload []byte, retain bool, qos byte) error

Publish publishes a publish packet into the broker as if it were sent from the specified client. This is a convenience function which wraps InjectPacket. As such, this method can publish packets to any topic (including $SYS) and bypass ACL checks. The qos byte is used for limiting the outbound qos (mqtt v5) rather than issuing to the broker (we assume qos 2 complete).

func (*Server) PublishToSubscribers

func (s *Server) PublishToSubscribers(pk packets.Packet, local bool)

PublishToSubscribers publishes a publish packet to all subscribers with matching topic filters. local: true indicates the current process call,false indicates external forwarding

func (*Server) SendConnack added in v2.3.7

func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error

SendConnack returns a Connack packet to a client.

func (*Server) Serve

func (s *Server) Serve() error

Serve starts the event loops responsible for establishing client connections on all attached listeners, publishing the system topics, and starting all hooks.

func (*Server) Subscribe added in v2.5.0

func (s *Server) Subscribe(filter string, subscriptionId int, handler InlineSubFn) error

Subscribe adds an inline subscription for the specified topic filter and subscription identifier with the provided handler function.

func (*Server) Unsubscribe added in v2.5.0

func (s *Server) Unsubscribe(filter string, subscriptionId int) error

Unsubscribe removes an inline subscription for the specified subscription and topic filter. It allows you to unsubscribe a specific subscription from the internal subscription associated with the given topic filter.

func (*Server) UnsubscribeClient

func (s *Server) UnsubscribeClient(cl *Client)

UnsubscribeClient unsubscribes a client from all of their subscriptions.

type SharedSubscriptions

type SharedSubscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SharedSubscriptions contains a map of subscriptions to a shared filter, keyed on share group then client id.

func NewSharedSubscriptions

func NewSharedSubscriptions() *SharedSubscriptions

NewSharedSubscriptions returns a new instance of Subscriptions.

func (*SharedSubscriptions) Add

func (s *SharedSubscriptions) Add(group, id string, val packets.Subscription)

Add creates a new shared subscription for a group and client id pair.

func (*SharedSubscriptions) Delete

func (s *SharedSubscriptions) Delete(group, id string)

Delete deletes a client id from a shared subscription group.

func (*SharedSubscriptions) Get

func (s *SharedSubscriptions) Get(group, id string) (val packets.Subscription, ok bool)

Get returns the subscription properties for a client id in a share group, if one exists.

func (*SharedSubscriptions) GetAll

GetAll returns all shared subscription groups and their subscriptions.

func (*SharedSubscriptions) GroupLen

func (s *SharedSubscriptions) GroupLen() int

GroupLen returns the number of groups subscribed to the filter.

func (*SharedSubscriptions) Len

func (s *SharedSubscriptions) Len() int

Len returns the total number of shared subscriptions to a filter across all groups.

func (*SharedSubscriptions) SubsInGroupLen added in v2.5.1

func (s *SharedSubscriptions) SubsInGroupLen(group string) int

SubsInGroupLen returns the number of subscriptions in a shared subscription group.

type Subscribers

type Subscribers struct {
	Shared              map[string]map[string]packets.Subscription
	SharedSelected      map[string]packets.Subscription
	Subscriptions       map[string]packets.Subscription
	InlineSubscriptions map[int]InlineSubscription
}

Subscribers contains the shared and non-shared subscribers matching a topic.

func (*Subscribers) MergeSharedSelected

func (s *Subscribers) MergeSharedSelected()

MergeSharedSelected merges the selected subscribers for a shared subscription group and the non-shared subscribers, to ensure that no subscriber gets multiple messages due to have both types of subscription matching the same filter.

func (*Subscribers) SelectShared

func (s *Subscribers) SelectShared()

SelectShared returns one subscriber for each shared subscription group.

type Subscriptions

type Subscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriptions is a map of subscriptions keyed on client.

func NewSubscriptions

func NewSubscriptions() *Subscriptions

NewSubscriptions returns a new instance of Subscriptions.

func (*Subscriptions) Add

func (s *Subscriptions) Add(id string, val packets.Subscription)

Add adds a new subscription for a client. ID can be a filter in the case this map is client state, or a client id if particle state.

func (*Subscriptions) Delete

func (s *Subscriptions) Delete(id string)

Delete removes a subscription by client or filter id.

func (*Subscriptions) Get

func (s *Subscriptions) Get(id string) (val packets.Subscription, ok bool)

Get returns a subscriptions for a specific client or filter id.

func (*Subscriptions) GetAll

func (s *Subscriptions) GetAll() map[string]packets.Subscription

GetAll returns all subscriptions.

func (*Subscriptions) Len

func (s *Subscriptions) Len() int

Len returns the number of subscriptions.

type TopicAliases

type TopicAliases struct {
	Inbound  *InboundTopicAliases
	Outbound *OutboundTopicAliases
}

TopicAliases contains inbound and outbound topic alias registrations.

func NewTopicAliases

func NewTopicAliases(topicAliasMaximum uint16) TopicAliases

NewTopicAliases returns an instance of TopicAliases.

type TopicsIndex

type TopicsIndex struct {
	Retained *packets.Packets
	// contains filtered or unexported fields
}

TopicsIndex is a prefix/trie tree containing topic subscribers and retained messages.

func NewTopicsIndex

func NewTopicsIndex() *TopicsIndex

NewTopicsIndex returns a pointer to a new instance of Index.

func (*TopicsIndex) InlineSubscribe added in v2.5.0

func (x *TopicsIndex) InlineSubscribe(subscription InlineSubscription) (bool, int)

InlineSubscribe adds a new internal subscription for a topic filter, returning true if the subscription was new.

func (*TopicsIndex) InlineUnsubscribe added in v2.5.0

func (x *TopicsIndex) InlineUnsubscribe(id int, filter string) (bool, int)

InlineUnsubscribe removes an internal subscription for a topic filter associated with a specific client, returning true if the subscription existed.

func (*TopicsIndex) Messages

func (x *TopicsIndex) Messages(filter string) []packets.Packet

Messages returns a slice of any retained messages which match a filter.

func (*TopicsIndex) RetainMessage

func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64

RetainMessage saves a message payload to the end of a topic address. Returns 1 if a retained message was added, and -1 if the retained message was removed. 0 is returned if sequential empty payloads are received.

func (*TopicsIndex) Subscribe

func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) (bool, int)

Subscribe adds a new subscription for a client to a topic filter, returning true if the subscription was new.

func (*TopicsIndex) Subscribers

func (x *TopicsIndex) Subscribers(topic string) *Subscribers

Subscribers returns a map of clients who are subscribed to matching filters, their subscription ids and highest qos.

func (*TopicsIndex) Unsubscribe

func (x *TopicsIndex) Unsubscribe(filter, client string) (bool, int)

Unsubscribe removes a subscription filter for a client, returning true if the subscription existed.

type Will

type Will struct {
	Payload           []byte                 // -
	User              []packets.UserProperty // -
	TopicName         string                 // -
	Flag              uint32                 // 0,1
	WillDelayInterval uint32                 // -
	Qos               byte                   // -
	Retain            bool                   // -
}

Will contains the last will and testament details for a client connection.

Directories

Path Synopsis
examples
tcp
tls
hooks
storage/bolt
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL