mq

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package mq manages queues for applications and networks.

For applications, the `ApplicationMgr` manages the following kind of queues:

  • uldata: uplink data from the broker to the application.
  • dldata: downlink data from the application to the broker.
  • dldata-resp: the response of downlink data.
  • dldata-result: the data process result from the network.

For networks, the `NetworkMgr` manages the following kind of queues:

  • uldata: device uplink data from the network to the broker.
  • dldata: downlink data from the broker to the network.
  • dldata-result: the data process result from the network.
  • ctrl: the control messages from the broker to the network

Index

Constants

This section is empty.

Variables

View Source
var (
	// Support application/network host schemes.
	SupportSchemes = []string{"amqp", "amqps", "mqtt", "mqtts"}
)

Constants.

Functions

This section is empty.

Types

type AppDlData

type AppDlData struct {
	CorrelationID string
	DeviceID      string
	NetworkCode   string
	NetworkAddr   string
	Data          []byte
	Extension     map[string]interface{}
}

Downlink data from application to broker.

type AppDlDataResp

type AppDlDataResp struct {
	CorrelationID string `json:"correlationId"`
	DataID        string `json:"dataId"`
	Error         string `json:"error"`
	Message       string `json:"message"`
}

Downlink data response for `DlData`.

type AppDlDataResult

type AppDlDataResult struct {
	DataID  string `json:"dataId"`
	Status  int    `json:"status"`
	Message string `json:"message"`
}

Downlink data result when processing or completing data transfer to the device.

type AppMgrEventHandler

type AppMgrEventHandler interface {
	// Fired when one of the manager's queues encounters a state change.
	OnStatusChange(mgr *ApplicationMgr, status MgrStatus)

	// Fired when a `UlData` data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnUlData(mgr *ApplicationMgr, data *AppUlData) error

	// Fired when a `DlDataResult` data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnDlDataResp(mgr *ApplicationMgr, data *AppDlDataResp) error

	// Fired when a `DlDataResp` data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnDlDataResult(mgr *ApplicationMgr, data *AppDlDataResult) error
}

Event handler interface for the `ApplicationMgr`.

type AppUlData

type AppUlData struct {
	DataID      string
	Time        time.Time
	Pub         time.Time
	DeviceID    string
	NetworkID   string
	NetworkCode string
	NetworkAddr string
	IsPublic    bool
	Data        []byte
	Extension   map[string]interface{}
}

Uplink data from broker to application.

type ApplicationMgr

type ApplicationMgr struct {
	// contains filtered or unexported fields
}

The manager for application queues.

func NewApplicationMgr

func NewApplicationMgr(
	connPool *ConnectionPool,
	hostUri url.URL,
	opts Options,
	handler AppMgrEventHandler,
) (*ApplicationMgr, error)

func (*ApplicationMgr) Close

func (mgr *ApplicationMgr) Close() error

To close the manager queues. The underlying connection will be closed when there are no queues use it.

func (*ApplicationMgr) ID

func (mgr *ApplicationMgr) ID() string

The application ID.

func (*ApplicationMgr) MqStatus

func (mgr *ApplicationMgr) MqStatus() DataMqStatus

Detail status of each message queue. Please ignore `Ctrl`.

func (*ApplicationMgr) Name

func (mgr *ApplicationMgr) Name() string

The application code.

func (*ApplicationMgr) SendDlData

func (mgr *ApplicationMgr) SendDlData(data AppDlData) error

Send downlink data `DlData` to the broker.

func (*ApplicationMgr) Status

func (mgr *ApplicationMgr) Status() MgrStatus

Manager status.

func (*ApplicationMgr) UnitCode

func (mgr *ApplicationMgr) UnitCode() string

The associated unit code of the application.

func (*ApplicationMgr) UnitID

func (mgr *ApplicationMgr) UnitID() string

The associated unit ID of the application.

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

Connection pool. The key is `host` of the message broker.

func NewConnectionPool

func NewConnectionPool() *ConnectionPool

func (*ConnectionPool) ForceClear

func (p *ConnectionPool) ForceClear()

type DataMqStatus

type DataMqStatus struct {
	// For `uldata`.
	UlData gmq.Status
	// For `dldata`.
	DlData gmq.Status
	// For `dldata-resp`.
	DlDataResp gmq.Status
	// For `dldata-result`.
	DlDataResult gmq.Status
	// For `ctrl`.
	Ctrl gmq.Status
}

Detail queue connection status.

type MgrStatus

type MgrStatus int

Application/Network Manager status.

const (
	// One or more queues are not connected.
	NotReady MgrStatus = iota
	// All queues are connected.
	Ready
)

Manager status.

func (MgrStatus) String

func (s MgrStatus) String() string

type NetCtrlAddDevice

type NetCtrlAddDevice struct {
	NetworkAddr string `json:"networkAddr"`
}

`add-device` control data.

type NetCtrlAddDeviceBulk

type NetCtrlAddDeviceBulk struct {
	NetworkAddrs []string `json:"networkAddrs"`
}

`add-device-bulk` control data.

type NetCtrlAddDeviceRange

type NetCtrlAddDeviceRange struct {
	StartAddr string `json:"startAddr"`
	EndAddr   string `json:"endAddr"`
}

`add-device-range` control data.

type NetCtrlDelDevice

type NetCtrlDelDevice struct {
	NetworkAddr string `json:"networkAddr"`
}

`del-device` control data.

type NetCtrlDelDeviceBulk

type NetCtrlDelDeviceBulk struct {
	NetworkAddrs []string `json:"networkAddrs"`
}

`del-device-bulk` control data.

type NetCtrlDelDeviceRange

type NetCtrlDelDeviceRange struct {
	StartAddr string `json:"startAddr"`
	EndAddr   string `json:"endAddr"`
}

`del-device-range` control data.

type NetDlData

type NetDlData struct {
	DataID      string
	Pub         time.Time
	ExpiresIn   int64
	NetworkAddr string
	Data        []byte
	Extension   map[string]interface{}
}

Downlink data from broker to network.

type NetDlDataResult

type NetDlDataResult struct {
	DataID  string `json:"dataId"`
	Status  int    `json:"status"`
	Message string `json:"message,omitempty"`
}

Downlink data result when processing or completing data transfer to the device.

type NetMgrEventHandler

type NetMgrEventHandler interface {
	// Fired when one of the manager's queues encounters a state change.
	OnStatusChange(mgr *NetworkMgr, status MgrStatus)

	// Fired when a `DlData` data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnDlData(mgr *NetworkMgr, data *NetDlData) error

	// Fired when a `add-device` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlAddDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDevice) error

	// Fired when a `add-device-bulk` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlAddDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceBulk) error

	// Fired when a `add-device-range` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlAddDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceRange) error

	// Fired when a `del-device` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlDelDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDevice) error

	// Fired when a `del-device-bulk` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlDelDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceBulk) error

	// Fired when a `del-device-range` control data is received.
	//
	// Return error will NACK the data.
	// The data may will be received again depending on the protocol (such as AMQP).
	OnCtrlDelDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceRange) error
}

Event handler interface for the `NetworkMgr`.

type NetUlData

type NetUlData struct {
	Time        time.Time
	NetworkAddr string
	Data        []byte
	Extension   map[string]interface{}
}

Uplink data from network to broker.

type NetworkMgr

type NetworkMgr struct {
	// contains filtered or unexported fields
}

The manager for network queues.

func NewNetworkMgr

func NewNetworkMgr(
	connPool *ConnectionPool,
	hostUri url.URL,
	opts Options,
	handler NetMgrEventHandler,
) (*NetworkMgr, error)

func (*NetworkMgr) Close

func (mgr *NetworkMgr) Close() error

To close the manager queues. The underlying connection will be closed when there are no queues use it.

func (*NetworkMgr) ID

func (mgr *NetworkMgr) ID() string

The network ID.

func (*NetworkMgr) MqStatus

func (mgr *NetworkMgr) MqStatus() DataMqStatus

Detail status of each message queue. Please ignore `DlDataResp`.

func (*NetworkMgr) Name

func (mgr *NetworkMgr) Name() string

The network code.

func (*NetworkMgr) SendDlDataResult

func (mgr *NetworkMgr) SendDlDataResult(data NetDlDataResult) error

Send uplink data `DlDataResult` to the broker.

func (*NetworkMgr) SendUlData

func (mgr *NetworkMgr) SendUlData(data NetUlData) error

Send uplink data `UlData` to the broker.

func (*NetworkMgr) Status

func (mgr *NetworkMgr) Status() MgrStatus

Manager status.

func (*NetworkMgr) UnitCode

func (mgr *NetworkMgr) UnitCode() string

The associated unit code of the network.

func (*NetworkMgr) UnitID

func (mgr *NetworkMgr) UnitID() string

The associated unit ID of the network.

type Options

type Options struct {
	// The associated unit ID of the application/network. Empty for public network.
	UnitID string `json:"unitId"`
	// The associated unit code of the application/network. Empty for public network.
	UnitCode string `json:"unitCode"`
	// The associated application/network ID.
	ID string `json:"id"`
	// The associated application/network code.
	Name string `json:"name"`
	// AMQP prefetch option.
	Prefetch uint16 `json:"prefetch"`
	// AMQP persistent option.
	Persistent bool `json:"persistent"`
	// MQTT shared queue prefix option.
	SharedPrefix string `json:"sharedPrefix"`
}

The options of the application/network manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL