Documentation ¶
Index ¶
- Constants
- Variables
- func NewQPub(queueUri string, serviceName string, exchange string, instanceId string) (*qPub, error)
- func PrepareRoute(org string, r string) string
- func PrepareRoutes(org string, route []string) []string
- func RemovePassFromConnection(connectioStr string) string
- func UpdateToAcceptFromAllOrg(key string) string
- type Config
- type Consumer
- type IMsgBus
- type MailMessage
- type MsgBusQConfig
- type MsgBusShovelProvider
- type MsgClient
- func (m *MsgClient) Close()
- func (m *MsgClient) ConnectToBroker(connectionString string)
- func (m *MsgClient) DeclareQueue(queueName string, durable bool) (*amqp.Queue, error)
- func (m *MsgClient) IsClosed() bool
- func (m *MsgClient) Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, ...) error
- func (m *MsgClient) PublishOnExchange(exchange string, routingKey string, body interface{}) error
- func (m *MsgClient) PublishOnQueue(body []byte, queueName string, initQueue bool) error
- func (m *MsgClient) Subscribe(queueName string, exchangeName string, exchangeType string, ...) error
- func (m *MsgClient) SubscribeToQueue(queueName string, consumerName string, ...) error
- func (m *MsgClient) SubscribeToServiceQueue(serviceName string, exchangeName string, routingKeys []RoutingKey, ...) error
- func (m *MsgClient) SubscribeWithArgs(queueName string, exchangeName string, exchangeType string, ...) error
- type NodeUpdateBody
- type OrgCreatedBody
- type Publisher
- type QPub
- type RPCResponse
- type RoutingKey
- type RoutingKeyBuilder
- func (r RoutingKeyBuilder) Build() (string, error)
- func (r RoutingKeyBuilder) MustBuild() string
- func (r RoutingKeyBuilder) SetAction(action string) RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetActionCreate() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetActionDelete() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetActionUpdate() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetCloudSource() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetDeviceSource() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetEventType() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetGlobalScope() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetLocalScope() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetObject(object string) RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetOrgName(orgName string) RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetRequestType() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetResponseType() RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetScope(scope string) RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetService(service string) RoutingKeyBuilder
- func (r RoutingKeyBuilder) SetSystem(system string) RoutingKeyBuilder
- type Shovel
Constants ¶
const ( TYPE_EVENT = "event" TYPE_REQUEST = "request" TYPE_RESPONSE = "response" SOURCE_NODE = "node" SOURCE_CLOUD = "cloud" SCOPE_LOCAL = "local" SCOPE_GLOBAL = "global" ACTION_CRUD_UPDATE = "update" ACTION_CRUD_CREATE = "create" ACTION_CRUD_DELETE = "delete" )
const CONNECTION_NOT_INIT_ERR_MSG = "Connection is not initialized"
Variables ¶
var DeviceQ = MsgBusQConfig{ Exchange: "DEVICE_EXCHANGE", Queue: "DEVICE_HANDLE_QUEUE", ExchangeType: "topic", ReqRountingKeys: []RoutingKey{ RequestDeviceCreate, RequestDeviceDelete, RequestDeviceReadConfig, RequestDeviceUpdateConfig, CommandControllerExecuteReloadMetric, CommandDeviceExecuteResource, RequestDeviceSetobserveConfig, RequestDeviceCancelobserveConfig, }, RespRountingKeys: []RoutingKey{ ResponseDeviceCreate, ResponseDeviceDelete, ResponseDeviceUpdateConfig, ResponseDeviceReadConfig, ResponseCommandControllerExecuteReloadMetric, ResponseDeviceExecuteResource, ResponseDeviceSetobserveConfig, ResponseDeviceCancelobserveConfig, }, }
var GNotifyQ = MsgBusQConfig{ Exchange: "DEVICE_EXCHANGE", Queue: "GNOTIFY_QUEUE", ExchangeType: "topic", ReqRountingKeys: []RoutingKey{NotificationGitServerCreate}, RespRountingKeys: []RoutingKey{}, }
var LwM2MQ = MsgBusQConfig{ Exchange: "LWM2M_EXCHANGE", Queue: "LWM2M_QUEUE", ExchangeType: "topic", ReqRountingKeys: []RoutingKey{RequestDeviceReadConfig, RequestDeviceUpdateConfig, CommandDeviceExecuteResource, RequestDeviceSetobserveConfig, RequestDeviceCancelobserveConfig}, RespRountingKeys: []RoutingKey{ ResponseDeviceUpdateConfig, ResponseDeviceReadConfig, ResponseDeviceExecuteResource, ResponseDeviceSetobserveConfig, ResponseDeviceCancelobserveConfig, }, }
Functions ¶
func PrepareRoute ¶
func PrepareRoutes ¶
func UpdateToAcceptFromAllOrg ¶
Set the key to orgname as "*"
Types ¶
type Consumer ¶
type Consumer interface { Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error SubscribeToServiceQueue(serviceName string, exchangeName string, routingKeys []RoutingKey, consumerId string, handlerFunc func(amqp.Delivery, chan<- bool)) error SubscribeWithArgs(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, queueArgs map[string]interface{}, handlerFunc func(amqp.Delivery, chan<- bool)) error IsClosed() bool Close() }
func NewConsumerClient ¶
type IMsgBus ¶
type IMsgBus interface { ConnectToBroker(connectionString string) Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error PublishOnQueue(body []byte, queueName string, initQueue bool) error Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error Close() }
Defines our interface for connecting and consuming messages. Consider using github.com/wagslane/go-rabbitmq instead. It provides similar functionality.
type MailMessage ¶
type MsgBusQConfig ¶
type MsgBusQConfig struct { Exchange string Queue string ExchangeType string ReqRountingKeys []RoutingKey RespRountingKeys []RoutingKey }
Queue Config
type MsgBusShovelProvider ¶
type MsgBusShovelProvider interface { AddShovel(name string, s *Shovel) error GetShovel(name string) (s *Shovel, err error) RemoveShovel(name string) error CreateShovel(name string, s *Shovel) error RestartShovel(name string) error }
func NewShovelProvider ¶
func NewShovelProvider(url string, debug bool, name, user, password, srcUri, destUri, srcExchange, destExchange, srcExchangeKey string) MsgBusShovelProvider
type MsgClient ¶
type MsgClient struct {
// contains filtered or unexported fields
}
Real implementation, encapsulates a pointer to an amqp.Connection Does not reconnect if connection is lost
func (*MsgClient) ConnectToBroker ¶
Connect to Broker(RabbitMq server)
func (*MsgClient) DeclareQueue ¶
func (*MsgClient) Publish ¶
func (m *MsgClient) Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error
Publish to queue through exchange
func (*MsgClient) PublishOnExchange ¶
PublishOnExchange publishes event to an exchange body - an object that is marshalled to json
func (*MsgClient) PublishOnQueue ¶
Publish to Queue.
func (*MsgClient) Subscribe ¶
func (m *MsgClient) Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
Subscribe to exchange with option to listen to particular type of message
func (*MsgClient) SubscribeToQueue ¶
func (m *MsgClient) SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
Subscribe directly to queue
func (*MsgClient) SubscribeToServiceQueue ¶
func (m *MsgClient) SubscribeToServiceQueue(serviceName string, exchangeName string, routingKeys []RoutingKey, consumerId string, handlerFunc func(amqp.Delivery, chan<- bool)) error
SubscribeToServiceQueue creates a durable queue with a serviceName name and routes messages from an exchange If queue does not exist then it will be created with the `serviceName`
type NodeUpdateBody ¶
type OrgCreatedBody ¶
type Publisher ¶
type Publisher interface { Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error PublishOnQueue(msg []byte, queueName string, initQueue bool) error PublishOnExchange(exchange string, routingKey string, body interface{}) error DeclareQueue(queueName string, durable bool) (*amqp.Queue, error) IsClosed() bool Close() }
func NewPublisherClient ¶
NewPublisherClient creates a publisher and opens connection and channel Use one publisher per thread as it's common practice to use one channel per thread
type RPCResponse ¶
type RPCResponse struct { Status bool Resp *amqp.Delivery RoutingKey RoutingKey }
type RoutingKey ¶
type RoutingKey string
const ( RequestDeviceUpdateConfig RoutingKey = "REQUEST.DEVICE.UPDATE.CONFIG" ResponseDeviceUpdateConfig RoutingKey = "RESPONSE.DEVICE.UPDATE.CONFIG" NotificationGitServerCreate RoutingKey = "NOTIFICATION.GITSERVER.CREATE.*" RequestDeviceCreate RoutingKey = "REQUEST.DEVICE.CREATE.*" ResponseDeviceCreate RoutingKey = "RESPONSE.DEVICE.CREATE.*" RequestDeviceDelete RoutingKey = "REQUEST.DEVICE.DELETE.*" ResponseDeviceDelete RoutingKey = "RESPONSE.DEVICE.DELETE.*" RequestDeviceReadConfig RoutingKey = "REQUEST.DEVICE.READ.CONFIG" ResponseDeviceReadConfig RoutingKey = "RESPONSE.DEVICE.READ.CONFIG" CommandControllerExecuteReloadMetric RoutingKey = "CMD.CONTROLLER.EXEC.RELOAD_METRIC" ResponseCommandControllerExecuteReloadMetric RoutingKey = "CMD.CONTROLLER.EXEC.RELOAD_METRIC" RequestDeviceSetobserveConfig RoutingKey = "REQUEST.DEVICE.OBSERVE.CONFIG" ResponseDeviceSetobserveConfig RoutingKey = "RESPONSE.DEVICE.OBSERVE.CONFIG" RequestDeviceCancelobserveConfig RoutingKey = "REQUEST.DEVICE.CANCEL.CONFIG" ResponseDeviceCancelobserveConfig RoutingKey = "RESPONSE.DEVICE.CANCEL.CONFIG" CommandDeviceExecuteResource RoutingKey = "CMD.DEVICE.EXEC.RESOURCE" ResponseDeviceExecuteResource RoutingKey = "RESPONSE.DEVICE.EXEC.RESOURCE" EventDeviceCreate RoutingKey = "EVENT.DEVICE.CREATE.*" EventVirtNodeUpdateStatus RoutingKey = "EVENT.VIRTNODE.UPDATE.STATUS" )
deprecated routing keys
const ( NodeConnectedRoutingKey RoutingKey = "event.device.mesh.link.connect" UserRegisteredRoutingKey RoutingKey = "event.cloud.identity.user.create" NodeFeederRequestRoutingKey RoutingKey = "request.cloud.node-feeder" OrgCreatedRoutingKey RoutingKey = "event.cloud.org.org.created" OrgDeletedRoutingKey RoutingKey = "event.cloud.org.org.deleted" NodeUpdatedRoutingKey RoutingKey = "event.cloud.node.node.updated" DefaultExchange = "amq.topic" )
actual routing keys
func Parse ¶
func Parse(s string) (RoutingKey, error)
func ParseRouteList ¶
func ParseRouteList(s []string) ([]RoutingKey, error)
func (RoutingKey) String ¶
func (k RoutingKey) String() string
func (RoutingKey) StringLowercase ¶
func (k RoutingKey) StringLowercase() string
type RoutingKeyBuilder ¶
type RoutingKeyBuilder struct {
// contains filtered or unexported fields
}
func NewRoutingKeyBuilder ¶
func NewRoutingKeyBuilder() RoutingKeyBuilder
Deprecated. Just use string constants. This one is hard to read
func (RoutingKeyBuilder) Build ¶
func (r RoutingKeyBuilder) Build() (string, error)
Build creates a routing key.
func (RoutingKeyBuilder) MustBuild ¶
func (r RoutingKeyBuilder) MustBuild() string
Panics if one of the segments in not set
func (RoutingKeyBuilder) SetAction ¶
func (r RoutingKeyBuilder) SetAction(action string) RoutingKeyBuilder
func (RoutingKeyBuilder) SetActionCreate ¶
func (r RoutingKeyBuilder) SetActionCreate() RoutingKeyBuilder
func (RoutingKeyBuilder) SetActionDelete ¶
func (r RoutingKeyBuilder) SetActionDelete() RoutingKeyBuilder
func (RoutingKeyBuilder) SetActionUpdate ¶
func (r RoutingKeyBuilder) SetActionUpdate() RoutingKeyBuilder
func (RoutingKeyBuilder) SetCloudSource ¶
func (r RoutingKeyBuilder) SetCloudSource() RoutingKeyBuilder
func (RoutingKeyBuilder) SetDeviceSource ¶
func (r RoutingKeyBuilder) SetDeviceSource() RoutingKeyBuilder
func (RoutingKeyBuilder) SetEventType ¶
func (r RoutingKeyBuilder) SetEventType() RoutingKeyBuilder
func (RoutingKeyBuilder) SetGlobalScope ¶
func (r RoutingKeyBuilder) SetGlobalScope() RoutingKeyBuilder
func (RoutingKeyBuilder) SetLocalScope ¶
func (r RoutingKeyBuilder) SetLocalScope() RoutingKeyBuilder
func (RoutingKeyBuilder) SetObject ¶
func (r RoutingKeyBuilder) SetObject(object string) RoutingKeyBuilder
SetObject sets the object segment that defines what object inside the container produced the message
func (RoutingKeyBuilder) SetOrgName ¶
func (r RoutingKeyBuilder) SetOrgName(orgName string) RoutingKeyBuilder
func (RoutingKeyBuilder) SetRequestType ¶
func (r RoutingKeyBuilder) SetRequestType() RoutingKeyBuilder
func (RoutingKeyBuilder) SetResponseType ¶
func (r RoutingKeyBuilder) SetResponseType() RoutingKeyBuilder
func (RoutingKeyBuilder) SetScope ¶
func (r RoutingKeyBuilder) SetScope(scope string) RoutingKeyBuilder
func (RoutingKeyBuilder) SetService ¶
func (r RoutingKeyBuilder) SetService(service string) RoutingKeyBuilder
Setservice sets the service part of routing key.
func (RoutingKeyBuilder) SetSystem ¶
func (r RoutingKeyBuilder) SetSystem(system string) RoutingKeyBuilder
type Shovel ¶
type Shovel struct { SrcProtocol string `json:"src-protocol" default:"amqp091"` DestProtocol string `default:"amqp091" json:"dest-protocol"` SrcExchange string `default:"amq.topic" json:"src-exchange"` SrcExchangeKey string `json:"src-exchange-key,omitempty"` DestExchange string `default:"amq.topic" json:"dest-exchange,omitempty"` DestExchangeKey string `json:"dest-exchange-key,omitempty"` DestQueue string `json:"dest-queue,omitempty"` SrcQueue string `json:"src-queue,omitempty"` SrcUri string `json:"src-uri"` DestUri string `json:"dest-uri"` Name string `json:"name,omitempty"` Status string `json:"status,omitempty"` Vhost string `json:"vhost,omitempty"` Type string `json:"type,omitempty"` }