broker

package
v0.0.0-...-ec5cd8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ConnectAction mqtt connect
	ConnectAction = "connect"
	// PublishAction  mqtt publish
	PublishAction = "publish"
	// SubscribeAction  mqtt sub
	SubscribeAction = "subscribe"
	// UnsubscribeAction  mqtt sub
	UnsubscribeAction = "unsubscribe"
	// DisconnectAction mqtt disconenct
	DisconnectAction = "disconnect"
)
View Source
const (
	MessagePoolNum        = 1024
	MessagePoolMessageNum = 1024
)
View Source
const (
	// BrokerInfoTopic special pub topic for cluster info
	BrokerInfoTopic = "broker000100101info"
	// CLIENT is an end user.
	CLIENT = 0
	// ROUTER is another router in the cluster.
	ROUTER = 1
	//REMOTE is the router connect to other cluster
	REMOTE  = 2
	CLUSTER = 3
)
View Source
const (
	Connected    = 1
	Disconnected = 2
)
View Source
const (
	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 100 * time.Millisecond
	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 10 * time.Second
	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 5 * time.Second
	// DEFAULT_TLS_TIMEOUT
	DEFAULT_TLS_TIMEOUT = 5 * time.Second
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)
View Source
const (
	QosAtMostOnce byte = iota
	QosAtLeastOnce
	QosExactlyOnce
	QosFailure = 0x80
)

Variables

Functions

func GenUniqueId

func GenUniqueId() string

func NewInfo

func NewInfo(sid, url string) *packets.PublishPacket

func NewTLSConfig

func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)

func ProcessMessage

func ProcessMessage(msg *Message)

Types

type Auth

type Auth interface {
	CheckConnect(clientID, username, password string) bool
}

Auth is auth interface

type Bridge

type Bridge interface {
	Publish(e *Elements) (bool, error)
}

Bridge is message bridge

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *Config, ops ...Option) (*Broker, error)

func (*Broker) BroadcastInfoMessage

func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket)

func (*Broker) BroadcastSubOrUnsubMessage

func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket)

func (*Broker) BroadcastUnSubscribe

func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string)

func (*Broker) CheckConnectAuth

func (b *Broker) CheckConnectAuth(clientID, username, password string) bool

func (*Broker) CheckRemoteExist

func (b *Broker) CheckRemoteExist(remoteID, url string) bool

func (*Broker) ConnectToDiscovery

func (b *Broker) ConnectToDiscovery()

func (*Broker) DisConnClientByClientId

func (b *Broker) DisConnClientByClientId(clientId string)

func (*Broker) OnlineOfflineNotification

func (b *Broker) OnlineOfflineNotification(clientID string, online bool)

func (*Broker) Publish

func (b *Broker) Publish(e *Elements) bool

func (*Broker) PublishMessage

func (b *Broker) PublishMessage(packet *packets.PublishPacket)

func (*Broker) PublishMessageByClientId

func (b *Broker) PublishMessageByClientId(packet *packets.PublishPacket, clientId string) error

func (*Broker) SendLocalSubsToRouter

func (b *Broker) SendLocalSubsToRouter(c *client)

func (*Broker) Start

func (b *Broker) Start()

func (*Broker) StartClientListening

func (b *Broker) StartClientListening(Tls bool)

func (*Broker) StartWebsocketListening

func (b *Broker) StartWebsocketListening()

func (*Broker) SubmitWork

func (b *Broker) SubmitWork(clientId string, msg *Message)

type Config

type Config struct {
	Worker  int     `json:"workerNum"`
	Host    string  `json:"host"`
	Port    string  `json:"port"`
	Router  string  `json:"router"`
	TlsHost string  `json:"tlsHost"`
	TlsPort string  `json:"tlsPort"`
	WsPath  string  `json:"wsPath"`
	WsPort  string  `json:"wsPort"`
	WsTLS   bool    `json:"wsTLS"`
	TlsInfo TLSInfo `json:"tlsInfo"`
	Debug   bool    `json:"debug"`
}
var DefaultConfig *Config = &Config{
	Worker: 4096,
	Host:   "0.0.0.0",
	Port:   "1883",
}

type Elements

type Elements struct {
	ClientID  string `json:"client_id"`
	Username  string `json:"username"`
	Topic     string `json:"topic"`
	Payload   string `json:"payload"`
	Timestamp int64  `json:"ts"`
	Size      int32  `json:"size"`
	Action    string `json:"action"`
}

type InflightStatus

type InflightStatus uint8
const (
	Publish InflightStatus = 0
	Pubrel  InflightStatus = 1
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

type Option

type Option func(*options)

func WithAuth

func WithAuth(a Auth) Option

func WithBridge

func WithBridge(b Bridge) Option

type TLSInfo

type TLSInfo struct {
	Verify   bool   `json:"verify"`
	CaFile   string `json:"caFile"`
	CertFile string `json:"certFile"`
	KeyFile  string `json:"keyFile"`
}

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL