broker

package
v0.0.0-...-fba3f84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2019 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>

Index

Constants

View Source
const (
	SUB = "1"
	PUB = "2"
)
View Source
const (
	MessagePoolNum        = 1024
	MessagePoolMessageNum = 1024
)
View Source
const (
	Connected    = 1
	Disconnected = 2
)
View Source
const (
	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 100 * time.Millisecond
	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 10 * time.Second
	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 5 * time.Second
	// DEFAULT_TLS_TIMEOUT
	DEFAULT_TLS_TIMEOUT = 5 * time.Second
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)
View Source
const (
	QosAtMostOnce byte = iota
	QosAtLeastOnce
	QosExactlyOnce
	QosFailure = 0x80
)
View Source
const (
	//Connect mqtt connect
	Connect = "connect"
	//Publish mqtt publish
	Publish = "publish"
	//Subscribe mqtt sub
	Subscribe = "subscribe"
	//Unsubscribe mqtt sub
	Unsubscribe = "unsubscribe"
	//Disconnect mqtt disconenct
	Disconnect = "disconnect"
)
View Source
const (
	MQTTConnectionKey = "mqtt-connection"
)

Variables

Functions

func GenUniqueId

func GenUniqueId() string

func InitHTTPMoniter

func InitHTTPMoniter(b *Broker)

func NewTLSConfig

func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)

func ProcessMessage

func ProcessMessage(msg *Message)

func TlsTimeout

func TlsTimeout(conn *tls.Conn)

Types

type Broker

type Broker struct {
	RedisClient *redis.Client
	Auth        auth.Auth
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *Config) (*Broker, error)

func (*Broker) CheckConnectAuth

func (b *Broker) CheckConnectAuth(clientID, username, password string) bool

func (*Broker) CheckTopicAuth

func (b *Broker) CheckTopicAuth(action, username, topic string) bool

func (*Broker) DeleteConnect

func (b *Broker) DeleteConnect(clientID string)

func (*Broker) GetConnectionNumber

func (b *Broker) GetConnectionNumber() (int64, error)

func (*Broker) Handshake

func (b *Broker) Handshake(conn net.Conn) bool

func (*Broker) InitKafka

func (b *Broker) InitKafka()

func (*Broker) InitKafkaConsumer

func (b *Broker) InitKafkaConsumer()

Init init kafak client

func (*Broker) InitKafkaProducer

func (b *Broker) InitKafkaProducer()

Init init kafak client

func (*Broker) InitRedisClient

func (b *Broker) InitRedisClient()

func (*Broker) PublishMessage

func (b *Broker) PublishMessage(packet *packets.PublishPacket)

func (*Broker) PublishToKafka

func (b *Broker) PublishToKafka(e *Elements)

func (*Broker) RecvConnect

func (b *Broker) RecvConnect()

func (*Broker) SaveConnect

func (b *Broker) SaveConnect(clientID string)

func (*Broker) SendConnect

func (b *Broker) SendConnect(clientID string)

func (*Broker) Start

func (b *Broker) Start()

func (*Broker) StartClientListening

func (b *Broker) StartClientListening(Tls bool)

func (*Broker) StartWebsocketListening

func (b *Broker) StartWebsocketListening()

func (*Broker) SubmitWork

func (b *Broker) SubmitWork(clientId string, msg *Message)

type Config

type Config struct {
	Worker      int         `json:"workerNum"`
	Host        string      `json:"host"`
	Port        string      `json:"port"`
	TlsHost     string      `json:"tlsHost"`
	TlsPort     string      `json:"tlsPort"`
	WsPath      string      `json:"wsPath"`
	WsPort      string      `json:"wsPort"`
	WsTLS       bool        `json:"wsTLS"`
	TlsInfo     TLSInfo     `json:"tlsInfo"`
	Acl         bool        `json:"acl"`
	AclConf     string      `json:"aclConf"`
	Debug       bool        `json:"debug"`
	KafkaConfig KafkaConfig `json:"kafkaConfig"`
	RedisAddr   string      `json:"redisAddr"`
	Plugins     []string    `json:"plugins"`
}
var DefaultConfig *Config = &Config{
	Worker: 4096,
	Host:   "0.0.0.0",
	Port:   "1883",
	Acl:    false,
}

func ConfigureConfig

func ConfigureConfig(args []string) (*Config, error)

func LoadConfig

func LoadConfig(filename string) (*Config, error)

type Elements

type Elements struct {
	ClientID  string `json:"clientid"`
	Username  string `json:"username"`
	Topic     string `json:"topic"`
	Payload   string `json:"payload"`
	Timestamp int64  `json:"ts"`
	Size      int32  `json:"size"`
	Action    string `json:"action"`
}

Elements kafka publish elements

type KafkaConfig

type KafkaConfig struct {
	Addr      []string `json:"addr"`
	Partition int32
	Producer  struct {
		ConnectTopic     string            `json:"onConnect"`
		SubscribeTopic   string            `json:"onSubscribe"`
		PublishTopic     string            `json:"onPublish"`
		UnsubscribeTopic string            `json:"onUnsubscribe"`
		DisconnectTopic  string            `json:"onDisconnect"`
		RegexpMap        map[string]string `json:"regexpMap"`
	} `json:"producer"`
	Consumer struct {
		Topic []string
	} `json:"consumer"`
}

KfakConfig device kafka config

type Message

type Message struct {
	// contains filtered or unexported fields
}

type TLSInfo

type TLSInfo struct {
	Verify   bool   `json:"verify"`
	CaFile   string `json:"caFile"`
	CertFile string `json:"certFile"`
	KeyFile  string `json:"keyFile"`
}

Directories

Path Synopsis
lib
acl

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL