broker

package module
v0.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2017 License: Apache-2.0 Imports: 10 Imported by: 0

README

gomqtt/broker

Build Status Coverage Status GoDoc Release Go Report Card

Package broker provides an extensible MQTT 3.1.1 broker implementation.

Installation

Get it using go's standard toolset:

$ go get github.com/gomqtt/broker

Usage

// A Backend provides effective queuing functionality to a Broker and its Clients.
type Backend interface {
	// Authenticate should authenticate the client using the user and password
	// values and return true if the client is eligible to continue or false
	// when the broker should terminate the connection.
	Authenticate(client *Client, user, password string) (bool, error)

	// Setup is called when a new client comes online and is successfully
	// authenticated. Setup should return the already stored session for the
	// supplied id or create and return a new one. If the supplied id has a zero
	// length, a new temporary session should returned that is not stored
	// further. The backend may also close any existing clients that use the
	// same client id.
	//
	// Note: In this call the Backend may also allocate other resources and
	// setup the client for further usage as the broker will acknowledge the
	// connection when the call returns.
	Setup(client *Client, id string) (Session, bool, error)

	// QueueOffline is called after the clients stored subscriptions have been
	// resubscribed. It should be used to trigger a background process that
	// forwards all missed messages.
	QueueOffline(client *Client) error

	// Subscribe should subscribe the passed client to the specified topic and
	// call Publish with any incoming messages.
	Subscribe(client *Client, topic string) error

	// Unsubscribe should unsubscribe the passed client from the specified topic.
	Unsubscribe(client *Client, topic string) error

	// StoreRetained should store the specified message.
	StoreRetained(client *Client, msg *packet.Message) error

	// ClearRetained should remove the stored messages for the given topic.
	ClearRetained(client *Client, topic string) error

	// QueueRetained is called after acknowledging a subscription and should be
	// used to trigger a background process that forwards all retained messages.
	QueueRetained(client *Client, topic string) error

	// Publish should forward the passed message to all other clients that hold
	// a subscription that matches the messages topic. It should also add the
	// message to all sessions that have a matching offline subscription.
	Publish(client *Client, msg *packet.Message) error

	// Terminate is called when the client goes offline. Terminate should
	// unsubscribe the passed client from all previously subscribed topics. The
	// backend may also convert a clients subscriptions to offline subscriptions.
	//
	// Note: The Backend may also cleanup previously allocated resources for
	// that client as the broker will close the connection when the call
	// returns.
	Terminate(client *Client) error
}

// A Session is used to persist incoming/outgoing packets, subscriptions and the
// will.
type Session interface {
	// PacketID should return the next id for outgoing packets.
	PacketID() uint16

	// SavePacket should store a packet in the session. An eventual existing
	// packet with the same id should be quietly overwritten.
	SavePacket(direction string, pkt packet.Packet) error

	// LookupPacket should retrieve a packet from the session using the packet id.
	LookupPacket(direction string, id uint16) (packet.Packet, error)

	// DeletePacket should remove a packet from the session. The method should
	// not return an error if no packet with the specified id does exists.
	DeletePacket(direction string, id uint16) error

	// AllPackets should return all packets currently saved in the session. This
	// method is used to resend stored packets when the session is resumed.
	AllPackets(direction string) ([]packet.Packet, error)

	// SaveSubscription should store the subscription in the session. An eventual
	// subscription with the same topic should be quietly overwritten.
	SaveSubscription(sub *packet.Subscription) error

	// LookupSubscription should match a topic against the stored subscriptions
	// and eventually return the first found subscription.
	LookupSubscription(topic string) (*packet.Subscription, error)

	// DeleteSubscription should remove the subscription from the session. The
	// method should not return an error if no subscription with the specified
	// topic does exist.
	DeleteSubscription(topic string) error

	// AllSubscriptions should return all subscriptions currently saved in the
	// session. This method is used to restore a clients subscriptions when the
	// session is resumed.
	AllSubscriptions() ([]*packet.Subscription, error)

	// SaveWill should store the will message.
	SaveWill(msg *packet.Message) error

	// LookupWill should retrieve the will message.
	LookupWill() (*packet.Message, error)

	// ClearWill should remove the will message from the store.
	ClearWill() error

	// Reset should completely reset the session.
	Reset() error
}

Documentation

Overview

Example
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
	panic(err)
}

engine := NewEngine()
engine.Accept(server)

c := client.New()
wait := make(chan struct{})

c.Callback = func(msg *packet.Message, err error) {
	if err != nil {
		panic(err)
	}

	fmt.Println(msg.String())
	close(wait)
}

cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
	panic(err)
}

cf.Wait()

sf, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

sf.Wait()

pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

pf.Wait()

<-wait

err = c.Disconnect()
if err != nil {
	panic(err)
}

err = server.Close()
if err != nil {
	panic(err)
}

engine.Close()
Output:

<Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrExpectedConnect = errors.New("expected a ConnectPacket as the first packet")

ErrExpectedConnect is returned when the first received packet is not a ConnectPacket.

Functions

func Run

func Run(t *testing.T, engine *Engine, protocol string) (string, chan struct{}, chan struct{})

Run runs the passed broker on a random available port and returns a channel that can be closed to shutdown the broker. This method is intended to be used in testing scenarios.

Types

type Backend

type Backend interface {
	// Authenticate should authenticate the client using the user and password
	// values and return true if the client is eligible to continue or false
	// when the broker should terminate the connection.
	Authenticate(client *Client, user, password string) (bool, error)

	// Setup is called when a new client comes online and is successfully
	// authenticated. Setup should return the already stored session for the
	// supplied id or create and return a new one. If the supplied id has a zero
	// length, a new temporary session should returned that is not stored
	// further. The backend may also close any existing clients that use the
	// same client id.
	//
	// Note: In this call the Backend may also allocate other resources and
	// setup the client for further usage as the broker will acknowledge the
	// connection when the call returns.
	Setup(client *Client, id string) (Session, bool, error)

	// QueueOffline is called after the clients stored subscriptions have been
	// resubscribed. It should be used to trigger a background process that
	// forwards all missed messages.
	QueueOffline(client *Client) error

	// Subscribe should subscribe the passed client to the specified topic and
	// call Publish with any incoming messages.
	Subscribe(client *Client, topic string) error

	// Unsubscribe should unsubscribe the passed client from the specified topic.
	Unsubscribe(client *Client, topic string) error

	// StoreRetained should store the specified message.
	StoreRetained(client *Client, msg *packet.Message) error

	// ClearRetained should remove the stored messages for the given topic.
	ClearRetained(client *Client, topic string) error

	// QueueRetained is called after acknowledging a subscription and should be
	// used to trigger a background process that forwards all retained messages.
	QueueRetained(client *Client, topic string) error

	// Publish should forward the passed message to all other clients that hold
	// a subscription that matches the messages topic. It should also add the
	// message to all sessions that have a matching offline subscription.
	Publish(client *Client, msg *packet.Message) error

	// Terminate is called when the client goes offline. Terminate should
	// unsubscribe the passed client from all previously subscribed topics. The
	// backend may also convert a clients subscriptions to offline subscriptions.
	//
	// Note: The Backend may also cleanup previously allocated resources for
	// that client as the broker will close the connection when the call
	// returns.
	Terminate(client *Client) error
}

A Backend provides effective queuing functionality to a Broker and its Clients.

type Client

type Client struct {
	Context *Context
	// contains filtered or unexported fields
}

A Client represents a remote client that is connected to the broker.

func (*Client) CleanSession

func (c *Client) CleanSession() bool

CleanSession returns whether the client requested a clean session during connect.

func (*Client) ClientID

func (c *Client) ClientID() string

ClientID returns the supplied client id during connect.

func (*Client) Close

func (c *Client) Close(clean bool)

Close will immediately close the connection. When clean=true the client will be marked as cleanly disconnected, and the will message will not get dispatched.

func (*Client) Publish

func (c *Client) Publish(msg *packet.Message) bool

Publish will send a Message to the client and initiate QOS flows.

func (*Client) RemoteAddr added in v0.6.4

func (c *Client) RemoteAddr() net.Addr

RemoteAddr returns the client's remote net address from the underlying connection.

func (*Client) Session

func (c *Client) Session() Session

Session returns the current Session used by the client.

type Context

type Context struct {
	// contains filtered or unexported fields
}

A Context is a store for custom data.

func NewContext

func NewContext() *Context

NewContext returns a new Context.

func (*Context) Get

func (c *Context) Get(key string) interface{}

Get returns the stored valued for the passed key.

func (*Context) Set

func (c *Context) Set(key string, value interface{})

Set sets the passed value for the key in the context.

type Engine

type Engine struct {
	Backend Backend
	Logger  Logger

	ConnectTimeout   time.Duration
	DefaultReadLimit int64
	// contains filtered or unexported fields
}

The Engine handles incoming connections and connects them to the backend.

func NewEngine

func NewEngine() *Engine

NewEngine returns a new Engine with a basic MemoryBackend.

func NewEngineWithBackend

func NewEngineWithBackend(backend Backend) *Engine

NewEngineWithBackend returns a new Engine with a custom Backend.

func (*Engine) Accept

func (e *Engine) Accept(server transport.Server)

Accept begins accepting connections from the passed server.

func (*Engine) Clients

func (e *Engine) Clients() []*Client

Clients returns a current list of connected clients.

func (*Engine) Close

func (e *Engine) Close()

Close will stop handling incoming connections and close all current clients. The call will block until all clients are properly closed.

Note: All passed servers to Accept must be closed before calling this method.

func (*Engine) Handle

func (e *Engine) Handle(conn transport.Conn) bool

Handle takes over responsibility and handles a transport.Conn. It returns false if the engine is closing and the connection has been closed.

func (*Engine) Wait

func (e *Engine) Wait(timeout time.Duration) bool

Wait can be called after close to wait until all clients have been closed. The method returns whether all clients have been closed (true) or the timeout has been reached (false).

type LogEvent

type LogEvent int

LogEvent are received by a Logger.

const (
	// NewConnection is emitted when a client comes online.
	NewConnection LogEvent = iota

	// PacketReceived is emitted when a packet has been received.
	PacketReceived

	// MessagePublished is emitted after a message has been published.
	MessagePublished

	// MessageForwarded is emitted after a message has been forwarded.
	MessageForwarded

	// PacketSent is emitted when a packet has been sent.
	PacketSent

	// LostConnection is emitted when the connection has been terminated.
	LostConnection

	// TransportError is emitted when an underlying transport error occurs.
	TransportError

	// SessionError is emitted when a call to the session fails.
	SessionError

	// BackendError is emitted when a call to the backend fails.
	BackendError

	// ClientError is emitted when the client violates the protocol.
	ClientError
)

type Logger

type Logger func(LogEvent, *Client, packet.Packet, *packet.Message, error)

The Logger callback handles incoming log messages.

type MemoryBackend

type MemoryBackend struct {
	Logins map[string]string
	// contains filtered or unexported fields
}

A MemoryBackend stores everything in memory.

func NewMemoryBackend

func NewMemoryBackend() *MemoryBackend

NewMemoryBackend returns a new MemoryBackend.

func (*MemoryBackend) Authenticate

func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)

Authenticate authenticates a clients credentials by matching them to the saved Logins map.

func (*MemoryBackend) ClearRetained

func (m *MemoryBackend) ClearRetained(client *Client, topic string) error

ClearRetained will remove the stored messages for the given topic.

func (*MemoryBackend) Publish

func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error

Publish will forward the passed message to all other subscribed clients. It will also add the message to all sessions that have a matching offline subscription.

func (*MemoryBackend) QueueOffline

func (m *MemoryBackend) QueueOffline(client *Client) error

QueueOffline will begin with forwarding all missed messages in a separate goroutine.

func (*MemoryBackend) QueueRetained

func (m *MemoryBackend) QueueRetained(client *Client, topic string) error

QueueRetained will queue all retained messages matching the given topic.

func (*MemoryBackend) Setup

func (m *MemoryBackend) Setup(client *Client, id string) (Session, bool, error)

Setup returns the already stored session for the supplied id or creates and returns a new one. If the supplied id has a zero length, a new session is returned that is not stored further. Furthermore, it will disconnect any client connected with the same client id.

func (*MemoryBackend) StoreRetained

func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error

StoreRetained will store the specified message.

func (*MemoryBackend) Subscribe

func (m *MemoryBackend) Subscribe(client *Client, topic string) error

Subscribe will subscribe the passed client to the specified topic and begin to forward messages by calling the clients Publish method.

func (*MemoryBackend) Terminate

func (m *MemoryBackend) Terminate(client *Client) error

Terminate will unsubscribe the passed client from all previously subscribed topics. If the client connect with clean=true it will also clean the session. Otherwise it will create offline subscriptions for all QOS 1 and QOS 2 subscriptions.

func (*MemoryBackend) Unsubscribe

func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error

Unsubscribe will unsubscribe the passed client from the specified topic.

type MemorySession

type MemorySession struct {
	// contains filtered or unexported fields
}

A MemorySession stores packets, subscriptions and the will in memory.

func NewMemorySession

func NewMemorySession() *MemorySession

NewMemorySession returns a new MemorySession.

func (*MemorySession) AllPackets

func (s *MemorySession) AllPackets(direction string) ([]packet.Packet, error)

AllPackets will return all packets currently saved in the session.

func (*MemorySession) AllSubscriptions

func (s *MemorySession) AllSubscriptions() ([]*packet.Subscription, error)

AllSubscriptions will return all subscriptions currently saved in the session.

func (*MemorySession) ClearWill

func (s *MemorySession) ClearWill() error

ClearWill will remove the will message from the store.

func (*MemorySession) DeletePacket

func (s *MemorySession) DeletePacket(direction string, id uint16) error

DeletePacket will remove a packet from the session. The method will not return an error if no packet with the specified id exists.

func (*MemorySession) DeleteSubscription

func (s *MemorySession) DeleteSubscription(topic string) error

DeleteSubscription will remove the subscription from the session. The method will not return an error if no subscription with the specified topic does exist.

func (*MemorySession) LookupPacket

func (s *MemorySession) LookupPacket(direction string, id uint16) (packet.Packet, error)

LookupPacket will retrieve a packet from the session using a packet id.

func (*MemorySession) LookupSubscription

func (s *MemorySession) LookupSubscription(topic string) (*packet.Subscription, error)

LookupSubscription will match a topic against the stored subscriptions and eventually return the first found subscription.

func (*MemorySession) LookupWill

func (s *MemorySession) LookupWill() (*packet.Message, error)

LookupWill will retrieve the will message.

func (*MemorySession) PacketID

func (s *MemorySession) PacketID() uint16

PacketID will return the next id for outgoing packets.

func (*MemorySession) Reset

func (s *MemorySession) Reset() error

Reset will completely reset the session.

func (*MemorySession) SavePacket

func (s *MemorySession) SavePacket(direction string, pkt packet.Packet) error

SavePacket will store a packet in the session. An eventual existing packet with the same id gets quietly overwritten.

func (*MemorySession) SaveSubscription

func (s *MemorySession) SaveSubscription(sub *packet.Subscription) error

SaveSubscription will store the subscription in the session. An eventual subscription with the same topic gets quietly overwritten.

func (*MemorySession) SaveWill

func (s *MemorySession) SaveWill(newWill *packet.Message) error

SaveWill will store the will message.

type Session

type Session interface {
	// PacketID should return the next id for outgoing packets.
	PacketID() uint16

	// SavePacket should store a packet in the session. An eventual existing
	// packet with the same id should be quietly overwritten.
	SavePacket(direction string, pkt packet.Packet) error

	// LookupPacket should retrieve a packet from the session using the packet id.
	LookupPacket(direction string, id uint16) (packet.Packet, error)

	// DeletePacket should remove a packet from the session. The method should
	// not return an error if no packet with the specified id does exists.
	DeletePacket(direction string, id uint16) error

	// AllPackets should return all packets currently saved in the session. This
	// method is used to resend stored packets when the session is resumed.
	AllPackets(direction string) ([]packet.Packet, error)

	// SaveSubscription should store the subscription in the session. An eventual
	// subscription with the same topic should be quietly overwritten.
	SaveSubscription(sub *packet.Subscription) error

	// LookupSubscription should match a topic against the stored subscriptions
	// and eventually return the first found subscription.
	LookupSubscription(topic string) (*packet.Subscription, error)

	// DeleteSubscription should remove the subscription from the session. The
	// method should not return an error if no subscription with the specified
	// topic does exist.
	DeleteSubscription(topic string) error

	// AllSubscriptions should return all subscriptions currently saved in the
	// session. This method is used to restore a clients subscriptions when the
	// session is resumed.
	AllSubscriptions() ([]*packet.Subscription, error)

	// SaveWill should store the will message.
	SaveWill(msg *packet.Message) error

	// LookupWill should retrieve the will message.
	LookupWill() (*packet.Message, error)

	// ClearWill should remove the will message from the store.
	ClearWill() error

	// Reset should completely reset the session.
	Reset() error
}

A Session is used to persist incoming/outgoing packets, subscriptions and the will.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL