broker

package
v0.0.0-...-6ea8860 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2019 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Index

Constants

View Source
const (
	SUB = "1"
	PUB = "2"
)
View Source
const (
	MessagePoolNum        = 1024
	MessagePoolMessageNum = 1024
)
View Source
const (
	// special pub topic for cluster info BrokerInfoTopic
	BrokerInfoTopic = "broker000100101info"
	// CLIENT is an end user.
	CLIENT = 0
	// ROUTER is client in the router.
	ROUTER = 1
)
View Source
const (
	Connected    = 1
	Disconnected = 2
)
View Source
const (
	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 100 * time.Millisecond
	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 10 * time.Second
	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 5 * time.Second
	// DEFAULT_TLS_TIMEOUT
	DEFAULT_TLS_TIMEOUT = 5 * time.Second
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)
View Source
const (
	QosAtMostOnce byte = iota
	QosAtLeastOnce
	QosExactlyOnce
	QosFailure = 0x80
)

Variables

Functions

func GenUniqueId

func GenUniqueId() string

func InitHTTPMoniter

func InitHTTPMoniter(b *Broker)

func NewInfo

func NewInfo(sid, url string, isforword bool) *packets.PublishPacket

func NewTLSConfig

func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)

func ProcessMessage

func ProcessMessage(msg *Message)

func StateMonitor

func StateMonitor()

func TlsTimeout

func TlsTimeout(conn *tls.Conn)

Types

type Broker

type Broker struct {
	Auth     auth.Auth
	BridgeMQ bridge.BridgeMQ
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *Config) (*Broker, error)

func (*Broker) CheckConnectAuth

func (b *Broker) CheckConnectAuth(clientID, username, password string) bool

func (*Broker) CheckTopicAuth

func (b *Broker) CheckTopicAuth(action, username, topic string) bool

func (*Broker) ConnectToDiscovery

func (b *Broker) ConnectToDiscovery()

func (*Broker) Handshake

func (b *Broker) Handshake(conn net.Conn) bool

func (*Broker) OnlineOfflineNotification

func (b *Broker) OnlineOfflineNotification(clientID string, online bool)

func (*Broker) ProcessRemote

func (b *Broker) ProcessRemote(packet *packets.PublishPacket, loaclShareSub []*subscription)

func (*Broker) Publish

func (b *Broker) Publish(e *bridge.Elements)

func (*Broker) PublishDeliverdMessage

func (b *Broker) PublishDeliverdMessage(packet *packets.PublishPacket, share bool)

func (*Broker) PublishMessage

func (b *Broker) PublishMessage(packet *packets.PublishPacket)

func (*Broker) QueryConnect

func (b *Broker) QueryConnect(clientID string)

func (*Broker) QuerySubscribe

func (b *Broker) QuerySubscribe(topic string, qos byte) map[string]remoteSubInfo

func (*Broker) Start

func (b *Broker) Start()

func (*Broker) StartClientListening

func (b *Broker) StartClientListening(Tls bool)

func (*Broker) StartWebsocketListening

func (b *Broker) StartWebsocketListening()

func (*Broker) SubmitWork

func (b *Broker) SubmitWork(clientId string, msg *Message)

type Config

type Config struct {
	Worker  int     `json:"workerNum"`
	Host    string  `json:"host"`
	Port    string  `json:"port"`
	RpcPort string  `json:"rpc"`
	Router  string  `json:"router"`
	TlsHost string  `json:"tlsHost"`
	TlsPort string  `json:"tlsPort"`
	WsPath  string  `json:"wsPath"`
	WsPort  string  `json:"wsPort"`
	WsTLS   bool    `json:"wsTLS"`
	TlsInfo TLSInfo `json:"tlsInfo"`
	Debug   bool    `json:"debug"`
	Plugin  Plugins `json:"plugins"`
}
var DefaultConfig *Config = &Config{
	Worker: 4096,
	Host:   "0.0.0.0",
	Port:   "1883",
}

func ConfigureConfig

func ConfigureConfig(args []string) (*Config, error)

func LoadConfig

func LoadConfig(filename string) (*Config, error)

type HMQ

type HMQ struct {
	// contains filtered or unexported fields
}

func (*HMQ) DeliverMessage

func (h *HMQ) DeliverMessage(ctx context.Context, in *pb.DeliverMessageRequest) (*pb.Response, error)

func (*HMQ) QueryConnect

func (h *HMQ) QueryConnect(ctx context.Context, in *pb.QueryConnectRequest) (*pb.Response, error)

func (*HMQ) QuerySubscribe

func (h *HMQ) QuerySubscribe(ctx context.Context, in *pb.QuerySubscribeRequest) (*pb.SubscribeResponse, error)

type Message

type Message struct {
	// contains filtered or unexported fields
}

type Plugins

type Plugins struct {
	Auth   string
	Bridge string
}

type TLSInfo

type TLSInfo struct {
	Verify   bool   `json:"verify"`
	CaFile   string `json:"caFile"`
	CertFile string `json:"certFile"`
	KeyFile  string `json:"keyFile"`
}

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL