magina

package module
v0.0.0-...-81f4a6c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2018 License: MIT Imports: 10 Imported by: 0

README

magina

An MQTT broker library for Golang, using RabbitMQ as backend.

WARNING

This project is under development and not production ready. Issues and contributions are wellcomed.

Features

  • support running standalone or being embed in your own application as golang library.
  • based on RabbitMQ's stable, reliable and efficient message service.
  • suport QoS 0 and 1.
  • easy to scale up because of the share nothing implementation.
  • extend topic schema to support RPC(non-mqtt-official).

Features in the near future.

  • suport QoS 2.
  • suport websocket proxy.
  • suport TLS/SSL.

How to use

(package management depends on glide, install it first and run glide install in the projectv folder)

standalone

the standalone borker is in ./standalone, you can build and run it as you like.

options:
As GoLang library
example:
package main

import (
	"github.com/ruizeng/magina"
)

func authenticate(client *magina.Client, username string, password string) bool {
	return true
}
func authorizePublish(client *magina.Client, topic string) bool {
	return true
}
func authorizeSubscribe(client *magina.Client, topic string) bool {
	return true
}

func main() {
	server := &magina.Broker{
		// server address to listen
		Addr: ":1883",
		// rabbit uri
		RabbitURI: "amqp://guest:guest@localhost:5672/",
		// callbacks
		Authenticate:       authenticate,
		AuthorizePublish:   authorizePublish,
		AuthorizeSubscribe: authorizeSubscribe,
	}

	server.ListenAndServe()

	return
}


Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AuthenticateFunc

type AuthenticateFunc func(client *Client, username string, password string) bool

AuthenticateFunc AuthorizePublishFunc AuthorizeSubscribeFunc callback functions for authentication and authorization

type AuthorizePublishFunc

type AuthorizePublishFunc func(client *Client, topic string) bool

AuthorizePublishFunc callback functions for authentication and authorization

type AuthorizeSubscribeFunc

type AuthorizeSubscribeFunc func(client *Client, topic string) bool

AuthorizeSubscribeFunc callback functions for authentication and authorization

type Broker

type Broker struct {
	// server address to listen
	Addr string
	// rabbit uri
	RabbitURI string
	// extend the broker to suport RPC. (WARNNING: NOT standard MQTT feature)
	SuportRPC bool
	// rabbitmq connection
	RabbitConnection *amqp.Connection
	// if use mqtts, set this
	TLSConfig *tls.Config
	// callbacks
	Authenticate       AuthenticateFunc
	AuthorizePublish   AuthorizePublishFunc
	AuthorizeSubscribe AuthorizeSubscribeFunc
	OnClientOnline     OnClientOnlineCB
	OnClientOffline    OnClientOfflineCB
	OnClientHeartbeat  OnClientHeartbeatCB
}

Broker is MQTT main service

func (*Broker) InitRabbitConn

func (b *Broker) InitRabbitConn()

InitRabbitConn init rabbitmq connection.

func (*Broker) ListenAndServe

func (b *Broker) ListenAndServe()

ListenAndServe serves for mqtt connections.

type Client

type Client struct {
	Conn              net.Conn
	Broker            *Broker
	Identifier        string
	LastHeartbeat     time.Time
	KeepAliveInterval int

	Channel         *amqp.Channel
	Exchangers      map[string]Exchanger
	SubscribeTopics map[string]string
	// contains filtered or unexported fields
}

Client a client servces an MQTT remote client connection

func (*Client) Serve

func (c *Client) Serve()

Serve serves a mqtt client.

type ConnectToken

type ConnectToken struct {
	// contains filtered or unexported fields
}

ConnectToken is an extension of Token containing the extra fields required to provide information about calls to Connect()

func (*ConnectToken) Error

func (b *ConnectToken) Error() error

func (*ConnectToken) ReturnCode

func (c *ConnectToken) ReturnCode() byte

ReturnCode returns the acknowlegement code in the connack sent in response to a Connect()

func (*ConnectToken) Wait

func (b *ConnectToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*ConnectToken) WaitTimeout

func (b *ConnectToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time in ms to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type DisconnectToken

type DisconnectToken struct {
	// contains filtered or unexported fields
}

DisconnectToken is an extension of Token containing the extra fields required to provide information about calls to Disconnect()

func (*DisconnectToken) Error

func (b *DisconnectToken) Error() error

func (*DisconnectToken) Wait

func (b *DisconnectToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*DisconnectToken) WaitTimeout

func (b *DisconnectToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time in ms to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type ExchangeMessage

type ExchangeMessage struct {
	Topic   string
	Payload []byte
}

ExchangeMessage is a message in exchange

type Exchanger

type Exchanger interface {
	// init the exchanger
	Init() error
	// publish a message.
	Publish(ExchangeMessage) error
	// subsctibe a topic and return a channel to receive message.
	Subscribe(topic string) (chan ExchangeMessage, error)
	// unsubstring a topic
	Unsubscribe(topic string) error
}

Exchanger is an interface for handling pub/sub messages

type MessageIds

type MessageIds struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MessageIds is 16 bit message id as specified by the MQTT spec. In general, these values should not be depended upon by the client application.

type OnClientHeartbeatCB

type OnClientHeartbeatCB func(client *Client)

OnClientHeartbeatCB callback function when receive a client heartbeat

type OnClientOfflineCB

type OnClientOfflineCB func(client *Client)

OnClientOfflineCB callback function when losing heartbeat from client

type OnClientOnlineCB

type OnClientOnlineCB func(client *Client)

OnClientOnlineCB callback function when client connect success

type PacketAndToken

type PacketAndToken struct {
	// contains filtered or unexported fields
}

PacketAndToken is a struct that contains both a ControlPacket and a Token. This struct is passed via channels between the client interface code and the underlying code responsible for sending and receiving MQTT messages.

type PubSubExchanger

type PubSubExchanger struct {
	TopicQueue map[string]string
	TopicChan  map[string]chan ExchangeMessage
	MessageIds MessageIds
	Channel    *amqp.Channel
}

PubSubExchanger is the exchange in RabbitMQ for publish & subscribe.

func NewPubSubExchanger

func NewPubSubExchanger(channel *amqp.Channel) *PubSubExchanger

NewPubSubExchanger creates a new exchange for pubsub.

func (*PubSubExchanger) Init

func (pubsub *PubSubExchanger) Init() error

Init exchange

func (*PubSubExchanger) Publish

func (pubsub *PubSubExchanger) Publish(msg ExchangeMessage) error

Publish a massage

func (*PubSubExchanger) Subscribe

func (pubsub *PubSubExchanger) Subscribe(topic string) (chan ExchangeMessage, error)

Subscribe topic

func (*PubSubExchanger) Unsubscribe

func (pubsub *PubSubExchanger) Unsubscribe(topic string) error

Unsubscribe topic

type PublishToken

type PublishToken struct {
	// contains filtered or unexported fields
}

PublishToken is an extension of Token containing the extra fields required to provide information about calls to Publish()

func (*PublishToken) Error

func (b *PublishToken) Error() error

func (*PublishToken) MessageID

func (p *PublishToken) MessageID() uint16

MessageID returns the MQTT message ID that was assigned to the Publish packet when it was sent to the broker

func (*PublishToken) Wait

func (b *PublishToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*PublishToken) WaitTimeout

func (b *PublishToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time in ms to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type RPCExchanger

type RPCExchanger struct {
	Channel          *amqp.Channel
	RPCQueues        map[string]string
	RPCCorrelationID map[string]string
	MsgChan          map[string]chan ExchangeMessage
}

RPCExchanger is the exchange in RabbitMQ for rpcs.

func NewPRCExchanger

func NewPRCExchanger(channel *amqp.Channel) *RPCExchanger

NewPRCExchanger creates a new rpc exchanger.

func (*RPCExchanger) Init

func (rpc *RPCExchanger) Init() error

Init exchange

func (*RPCExchanger) Publish

func (rpc *RPCExchanger) Publish(msg ExchangeMessage) error

Publish send a rpc request or response

func (*RPCExchanger) Subscribe

func (rpc *RPCExchanger) Subscribe(topic string) (chan ExchangeMessage, error)

Subscribe waits for prc request or response.

func (*RPCExchanger) Unsubscribe

func (rpc *RPCExchanger) Unsubscribe(topic string) error

Unsubscribe rpc topic.

type SubscribeToken

type SubscribeToken struct {
	// contains filtered or unexported fields
}

SubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Subscribe()

func (*SubscribeToken) Error

func (b *SubscribeToken) Error() error

func (*SubscribeToken) Result

func (s *SubscribeToken) Result() map[string]byte

Result returns a map of topics that were subscribed to along with the matching return code from the broker. This is either the Qos value of the subscription or an error code.

func (*SubscribeToken) Wait

func (b *SubscribeToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*SubscribeToken) WaitTimeout

func (b *SubscribeToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time in ms to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

type Token

type Token interface {
	Wait() bool
	WaitTimeout(time.Duration) bool

	Error() error
	// contains filtered or unexported methods
}

Token defines the interface for the tokens used to indicate when actions have completed.

type UnsubscribeToken

type UnsubscribeToken struct {
	// contains filtered or unexported fields
}

UnsubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Unsubscribe()

func (*UnsubscribeToken) Error

func (b *UnsubscribeToken) Error() error

func (*UnsubscribeToken) Wait

func (b *UnsubscribeToken) Wait() bool

Wait will wait indefinitely for the Token to complete, ie the Publish to be sent and confirmed receipt from the broker

func (*UnsubscribeToken) WaitTimeout

func (b *UnsubscribeToken) WaitTimeout(d time.Duration) bool

WaitTimeout takes a time in ms to wait for the flow associated with the Token to complete, returns true if it returned before the timeout or returns false if the timeout occurred. In the case of a timeout the Token does not have an error set in case the caller wishes to wait again

Directories

Path Synopsis
tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL