service

package
v0.0.0-...-f544672 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2016 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package service provides the MQTT Server and Client services in a library form. See Server and Client examples below for more detailed usage.

Index

Constants

View Source
const (
	DefaultKeepAlive        = 300
	DefaultConnectTimeout   = 2
	DefaultAckTimeout       = 20
	DefaultTimeoutRetries   = 3
	DefaultSessionsProvider = "mem"
	DefaultAuthenticator    = "mockSuccess"
	DefaultTopicsProvider   = "mx"
)

Variables

View Source
var (
	DefaultBufferSize int64

	DeviceInBufferSize  int64
	DeviceOutBufferSize int64

	MasterInBufferSize  int64
	MasterOutBufferSize int64
)
View Source
var (
	MsgPendingTime    time.Duration
	OfflineTopicRWmux sync.RWMutex
	BroadCastChannel  string
	SendChannel       string
	ApnPushChannel    string

	MessagePool    *sync.Pool
	OnGroupPublish func(msg *message.PublishMessage, this *service) (err error)

	OnlineStatusChannel     = "/fdf406fadef0ba24f3bfe8bc00b7bb350901417f"
	ApnInvalidTokensChannel = "/apn/invalid_token"

	ApnInvalidTokens = []string{}
)
View Source
var (
	Cert      tls.Certificate
	APNsTopic string

	ApnClient *apns.Client
)
View Source
var (
	PendingQueue = make([]*PendingStatus, 65536, 65536)

	OfflineTopicMap            = make(map[string]*OfflineTopicQueue)
	OfflineTopicQueueProcessor = make(chan *message.PublishMessage, 8192)
	OfflineTopicCleanProcessor = make(chan string, 128)

	ClientMap          = make(map[string]*net.Conn)
	ClientMapProcessor = make(chan ClientHash, 8192)

	PktId = uint32(1)

	OldMessagesQueue = make(chan *message.PublishMessage, 8192)

	Max_message_queue int
	MessageQueueStore string
	TempBytes         *sync.Pool

	LevelDB *leveldb.DB
)
View Source
var (
	Log      log.Logger
	LogLevel log.Level

	ErrInvalidConnectionType  error = errors.New("service: Invalid connection type")
	ErrInvalidSubscriber      error = errors.New("service: Invalid subscriber")
	ErrBufferNotReady         error = errors.New("service: buffer is not ready")
	ErrBufferInsufficientData error = errors.New("service: buffer has insufficient data.")
)
View Source
var (
	CompressLevel int
)
View Source
var (
	IsOnline func(topic string) (online bool)
)

Functions

func GetOnlineStatus

func GetOnlineStatus(key string) (online string, lasttime time.Time, conn *io.Closer)

func SetOnlineStatus

func SetOnlineStatus(key string, online bool, lasttime time.Time, conn *io.Closer)

Types

type BadgeMessage

type BadgeMessage struct {
	Data int    `json:data`
	Type string `json:type`
}

func (*BadgeMessage) MarshalJSON

func (mj *BadgeMessage) MarshalJSON() ([]byte, error)

func (*BadgeMessage) MarshalJSONBuf

func (mj *BadgeMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*BadgeMessage) UnmarshalJSON

func (uj *BadgeMessage) UnmarshalJSON(input []byte) error

func (*BadgeMessage) UnmarshalJSONFFLexer

func (uj *BadgeMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type BroadCastMessage

type BroadCastMessage struct {
	Clients []string `json:"clients"`
	Payload string   `json:"payload"`
}

func (*BroadCastMessage) MarshalJSON

func (mj *BroadCastMessage) MarshalJSON() ([]byte, error)

func (*BroadCastMessage) MarshalJSONBuf

func (mj *BroadCastMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*BroadCastMessage) UnmarshalJSON

func (uj *BroadCastMessage) UnmarshalJSON(input []byte) error

func (*BroadCastMessage) UnmarshalJSONFFLexer

func (uj *BroadCastMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type Client

type Client struct {
	// The number of seconds to keep the connection live if there's no data.
	// If not set then default to 5 mins.
	KeepAlive int

	// The number of seconds to wait for the CONNACK message before disconnecting.
	// If not set then default to 2 seconds.
	ConnectTimeout int

	// The number of seconds to wait for any ACK messages before failing.
	// If not set then default to 20 seconds.
	AckTimeout int

	// The number of times to retry sending a packet if ACK is not received.
	// If no set then default to 3 retries.
	TimeoutRetries int
	// contains filtered or unexported fields
}

Client is a library implementation of the MQTT client that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

func (*Client) Connect

func (this *Client) Connect(uri string, msg *message.ConnectMessage) (err error)

Connect is for MQTT clients to open a connection to a remote server. It needs to know the URI, e.g., "tcp://127.0.0.1:1883", so it knows where to connect to. It also needs to be supplied with the MQTT CONNECT message.

func (*Client) Disconnect

func (this *Client) Disconnect()

Disconnect sends a single DISCONNECT message to the server. The client immediately terminates after the sending of the DISCONNECT message.

func (*Client) Ping

func (this *Client) Ping(onComplete OnCompleteFunc) error

Ping sends a single PINGREQ message to the server. PINGREQ/PINGRESP messages are mainly used by the client to keep a heartbeat to the server so the connection won't be dropped.

func (*Client) Publish

func (this *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error

Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.

func (*Client) Subscribe

func (this *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, onPublish OnPublishFunc) error

Subscribe sends a single SUBSCRIBE message to the server. The SUBSCRIBE message can contain multiple topics that the client wants to subscribe to. On completion, which is when the client receives a SUBACK messsage back from the server, the supplied onComplete funciton is called.

When messages are sent to the client from the server that matches the topics the client subscribed to, the onPublish function is called to handle those messages. So in effect, the client can supply different onPublish functions for different topics.

func (*Client) Unsubscribe

func (this *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error

Unsubscribe sends a single UNSUBSCRIBE message to the server. The UNSUBSCRIBE message can contain multiple topics that the client wants to unsubscribe. On completion, which is when the client receives a UNSUBACK message from the server, the supplied onComplete function is called. The client will no longer handle messages from the server for those unsubscribed topics.

type ClientHash

type ClientHash struct {
	Name string
	Conn *net.Conn
}

type MtBroadCastMessage

type MtBroadCastMessage struct {
	Clients []string `json:"topics"`
	Payload string   `json:"payload"`
}

func (*MtBroadCastMessage) MarshalJSON

func (mj *MtBroadCastMessage) MarshalJSON() ([]byte, error)

func (*MtBroadCastMessage) MarshalJSONBuf

func (mj *MtBroadCastMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error

func (*MtBroadCastMessage) UnmarshalJSON

func (uj *MtBroadCastMessage) UnmarshalJSON(input []byte) error

func (*MtBroadCastMessage) UnmarshalJSONFFLexer

func (uj *MtBroadCastMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error

type OfflineTopicQueue

type OfflineTopicQueue struct {
	Topic   string
	Q       [][]byte
	Pos     int
	Cleaned bool
	// contains filtered or unexported fields
}

定义一个离线消息队列的结构体,保存一个二维byte数组和一个位置

func NewOfflineTopicQueue

func NewOfflineTopicQueue(topic string) (mq *OfflineTopicQueue)

func (*OfflineTopicQueue) Add

func (this *OfflineTopicQueue) Add(msg_bytes []byte)

向队列中添加消息

func (*OfflineTopicQueue) Clean

func (this *OfflineTopicQueue) Clean()

清除队列中已有消息

func (*OfflineTopicQueue) DBKey

func (this *OfflineTopicQueue) DBKey(pos int) (key string)

func (*OfflineTopicQueue) GetAll

func (this *OfflineTopicQueue) GetAll() (msg_bytes [][]byte)

type OnCompleteFunc

type OnCompleteFunc func(msg, ack message.Message, err error) error

type OnPublishFunc

type OnPublishFunc func(msg *message.PublishMessage) error

type PendingStatus

type PendingStatus struct {
	Done  chan (bool)
	Topic string
	Msg   *message.PublishMessage
}

func NewPendingStatus

func NewPendingStatus(topic string, msg *message.PublishMessage) *PendingStatus

type Server

type Server struct {
	// The number of seconds to keep the connection live if there's no data.
	// If not set then default to 5 mins.
	KeepAlive int

	// The number of seconds to wait for the CONNECT message before disconnecting.
	// If not set then default to 2 seconds.
	ConnectTimeout int

	// The number of seconds to wait for any ACK messages before failing.
	// If not set then default to 20 seconds.
	AckTimeout int

	// The number of times to retry sending a packet if ACK is not received.
	// If no set then default to 3 retries.
	TimeoutRetries int

	// Authenticator is the authenticator used to check username and password sent
	// in the CONNECT message. If not set then default to "mockSuccess".
	Authenticator string

	// SessionsProvider is the session store that keeps all the Session objects.
	// This is the store to check if CleanSession is set to 0 in the CONNECT message.
	// If not set then default to "mem".
	SessionsProvider string

	// TopicsProvider is the topic store that keeps all the subscription topics.
	// If not set then default to "mx".
	TopicsProvider string
	// contains filtered or unexported fields
}

Server is a library implementation of the MQTT server that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

func (*Server) Close

func (this *Server) Close() error

Close terminates the server by shutting down all the client connections and closing the listener. It will, as best it can, clean up after itself.

func (*Server) CreateAndGetBytes

func (this *Server) CreateAndGetBytes(size int64) []byte

func (*Server) DestoryBytes

func (this *Server) DestoryBytes(b []byte)

func (*Server) ListenAndServe

func (this *Server) ListenAndServe() error

ListenAndServe listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:1883".

func (*Server) Publish

func (this *Server) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) (err error)

Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.

type Status

type Status struct {
	// contains filtered or unexported fields
}

func (*Status) Conn

func (this *Status) Conn() *io.Closer

func (*Status) IsOnline

func (this *Status) IsOnline() bool

func (*Status) LastTime

func (this *Status) LastTime() (t time.Time)

func (*Status) OnlineStatus

func (this *Status) OnlineStatus() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL