msgbus

package
v0.0.0-...-2119b44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: MPL-2.0 Imports: 14 Imported by: 22

Documentation

Index

Constants

View Source
const (
	TYPE_EVENT         = "event"
	TYPE_REQUEST       = "request"
	TYPE_RESPONSE      = "response"
	SOURCE_NODE        = "node"
	SOURCE_CLOUD       = "cloud"
	SCOPE_LOCAL        = "local"
	SCOPE_GLOBAL       = "global"
	ACTION_CRUD_UPDATE = "update"
	ACTION_CRUD_CREATE = "create"
	ACTION_CRUD_DELETE = "delete"
)
View Source
const CONNECTION_NOT_INIT_ERR_MSG = "Connection is not initialized"

Variables

View Source
var GNotifyQ = MsgBusQConfig{
	Exchange:         "DEVICE_EXCHANGE",
	Queue:            "GNOTIFY_QUEUE",
	ExchangeType:     "topic",
	ReqRountingKeys:  []RoutingKey{NotificationGitServerCreate},
	RespRountingKeys: []RoutingKey{},
}

Functions

func NewQPub

func NewQPub(queueUri string, serviceName string, exchange string, instanceId string) (*qPub, error)

func PrepareRoute

func PrepareRoute(org string, r string) string

func PrepareRoutes

func PrepareRoutes(org string, route []string) []string

func RemovePassFromConnection

func RemovePassFromConnection(connectioStr string) string

func UpdateToAcceptFromAllOrg

func UpdateToAcceptFromAllOrg(key string) string

Set the key to orgname as "*"

Types

type Config

type Config struct {
}

Servcie Config

type Consumer

type Consumer interface {
	Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
	SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
	SubscribeToServiceQueue(serviceName string, exchangeName string, routingKeys []RoutingKey, consumerId string, handlerFunc func(amqp.Delivery, chan<- bool)) error
	SubscribeWithArgs(queueName string, exchangeName string, exchangeType string,
		routingKeys []RoutingKey, consumerName string, queueArgs map[string]interface{}, handlerFunc func(amqp.Delivery, chan<- bool)) error
	IsClosed() bool
	Close()
}

func NewConsumerClient

func NewConsumerClient(connectionString string) (Consumer, error)

type IMsgBus

type IMsgBus interface {
	ConnectToBroker(connectionString string)
	Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error
	PublishOnQueue(body []byte, queueName string, initQueue bool) error
	Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
	SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error
	Close()
}

Defines our interface for connecting and consuming messages. Consider using github.com/wagslane/go-rabbitmq instead. It provides similar functionality.

type MailMessage

type MailMessage struct {
	To           string         `json:"to"`
	TemplateName string         `json:"templateName"`
	Values       map[string]any `json:"values"`
}

type MsgBusQConfig

type MsgBusQConfig struct {
	Exchange         string
	Queue            string
	ExchangeType     string
	ReqRountingKeys  []RoutingKey
	RespRountingKeys []RoutingKey
}

Queue Config

type MsgBusShovelProvider

type MsgBusShovelProvider interface {
	AddShovel(name string, s *Shovel) error
	GetShovel(name string) (s *Shovel, err error)
	RemoveShovel(name string) error
	CreateShovel(name string, s *Shovel) error
	RestartShovel(name string) error
}

func NewShovelProvider

func NewShovelProvider(url string, debug bool, name, user, password, srcUri, destUri, srcExchange, destExchange, srcExchangeKey string) MsgBusShovelProvider

type MsgClient

type MsgClient struct {
	// contains filtered or unexported fields
}

Real implementation, encapsulates a pointer to an amqp.Connection Does not reconnect if connection is lost

func (*MsgClient) Close

func (m *MsgClient) Close()

Close connection

func (*MsgClient) ConnectToBroker

func (m *MsgClient) ConnectToBroker(connectionString string)

Connect to Broker(RabbitMq server)

func (*MsgClient) DeclareQueue

func (m *MsgClient) DeclareQueue(queueName string, durable bool) (*amqp.Queue, error)

func (*MsgClient) IsClosed

func (m *MsgClient) IsClosed() bool

func (*MsgClient) Publish

func (m *MsgClient) Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error

Publish to queue through exchange

func (*MsgClient) PublishOnExchange

func (m *MsgClient) PublishOnExchange(exchange string, routingKey string, body interface{}) error

PublishOnExchange publishes event to an exchange body - an object that is marshalled to json

func (*MsgClient) PublishOnQueue

func (m *MsgClient) PublishOnQueue(body []byte, queueName string, initQueue bool) error

Publish to Queue.

func (*MsgClient) Subscribe

func (m *MsgClient) Subscribe(queueName string, exchangeName string, exchangeType string, routingKeys []RoutingKey, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error

Subscribe to exchange with option to listen to particular type of message

func (*MsgClient) SubscribeToQueue

func (m *MsgClient) SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery, chan<- bool)) error

Subscribe directly to queue

func (*MsgClient) SubscribeToServiceQueue

func (m *MsgClient) SubscribeToServiceQueue(serviceName string, exchangeName string, routingKeys []RoutingKey, consumerId string, handlerFunc func(amqp.Delivery, chan<- bool)) error

SubscribeToServiceQueue creates a durable queue with a serviceName name and routes messages from an exchange If queue does not exist then it will be created with the `serviceName`

func (*MsgClient) SubscribeWithArgs

func (m *MsgClient) SubscribeWithArgs(queueName string, exchangeName string, exchangeType string,
	routingKeys []RoutingKey, consumerName string, queueArgs map[string]interface{}, handlerFunc func(amqp.Delivery, chan<- bool)) error

type NodeUpdateBody

type NodeUpdateBody struct {
	NodeId string `json:"nodeId"`
	State  string `json:"state"`
	Name   string `json:"name"`
}

type OrgCreatedBody

type OrgCreatedBody struct {
	Name  string `json:"name"`
	Owner string `json:"owner"`
}

type Publisher

type Publisher interface {
	Publish(body []byte, queueName string, exchangeName string, routingKey RoutingKey, exchangeType string) error
	PublishOnQueue(msg []byte, queueName string, initQueue bool) error
	PublishOnExchange(exchange string, routingKey string, body interface{}) error
	DeclareQueue(queueName string, durable bool) (*amqp.Queue, error)
	IsClosed() bool
	Close()
}

func NewPublisherClient

func NewPublisherClient(connectionString string) (Publisher, error)

NewPublisherClient creates a publisher and opens connection and channel Use one publisher per thread as it's common practice to use one channel per thread

type QPub

type QPub interface {
	Publish(payload any, routingKey string) error
	PublishProto(payload proto.Message, routingKey string) error
	PublishToQueue(queueName string, payload any) error
	Close() error
}

type RPCResponse

type RPCResponse struct {
	Status     bool
	Resp       *amqp.Delivery
	RoutingKey RoutingKey
}

type RoutingKey

type RoutingKey string
const (
	RequestDeviceUpdateConfig                    RoutingKey = "REQUEST.DEVICE.UPDATE.CONFIG"
	ResponseDeviceUpdateConfig                   RoutingKey = "RESPONSE.DEVICE.UPDATE.CONFIG"
	NotificationGitServerCreate                  RoutingKey = "NOTIFICATION.GITSERVER.CREATE.*"
	RequestDeviceCreate                          RoutingKey = "REQUEST.DEVICE.CREATE.*"
	ResponseDeviceCreate                         RoutingKey = "RESPONSE.DEVICE.CREATE.*"
	RequestDeviceDelete                          RoutingKey = "REQUEST.DEVICE.DELETE.*"
	ResponseDeviceDelete                         RoutingKey = "RESPONSE.DEVICE.DELETE.*"
	RequestDeviceReadConfig                      RoutingKey = "REQUEST.DEVICE.READ.CONFIG"
	ResponseDeviceReadConfig                     RoutingKey = "RESPONSE.DEVICE.READ.CONFIG"
	CommandControllerExecuteReloadMetric         RoutingKey = "CMD.CONTROLLER.EXEC.RELOAD_METRIC"
	ResponseCommandControllerExecuteReloadMetric RoutingKey = "CMD.CONTROLLER.EXEC.RELOAD_METRIC"
	RequestDeviceSetobserveConfig                RoutingKey = "REQUEST.DEVICE.OBSERVE.CONFIG"
	ResponseDeviceSetobserveConfig               RoutingKey = "RESPONSE.DEVICE.OBSERVE.CONFIG"
	RequestDeviceCancelobserveConfig             RoutingKey = "REQUEST.DEVICE.CANCEL.CONFIG"
	ResponseDeviceCancelobserveConfig            RoutingKey = "RESPONSE.DEVICE.CANCEL.CONFIG"
	CommandDeviceExecuteResource                 RoutingKey = "CMD.DEVICE.EXEC.RESOURCE"
	ResponseDeviceExecuteResource                RoutingKey = "RESPONSE.DEVICE.EXEC.RESOURCE"
	EventDeviceCreate                            RoutingKey = "EVENT.DEVICE.CREATE.*"
	EventVirtNodeUpdateStatus                    RoutingKey = "EVENT.VIRTNODE.UPDATE.STATUS"
)

deprecated routing keys

const (
	NodeConnectedRoutingKey     RoutingKey = "event.device.mesh.link.connect"
	UserRegisteredRoutingKey    RoutingKey = "event.cloud.identity.user.create"
	NodeFeederRequestRoutingKey RoutingKey = "request.cloud.node-feeder"
	OrgCreatedRoutingKey        RoutingKey = "event.cloud.org.org.created"
	OrgDeletedRoutingKey        RoutingKey = "event.cloud.org.org.deleted"
	NodeUpdatedRoutingKey       RoutingKey = "event.cloud.node.node.updated"

	DefaultExchange = "amq.topic"
)

actual routing keys

func Parse

func Parse(s string) (RoutingKey, error)

func ParseRouteList

func ParseRouteList(s []string) ([]RoutingKey, error)

func (RoutingKey) String

func (k RoutingKey) String() string

func (RoutingKey) StringLowercase

func (k RoutingKey) StringLowercase() string

type RoutingKeyBuilder

type RoutingKeyBuilder struct {
	// contains filtered or unexported fields
}

func NewRoutingKeyBuilder

func NewRoutingKeyBuilder() RoutingKeyBuilder

Deprecated. Just use string constants. This one is hard to read

func (RoutingKeyBuilder) Build

func (r RoutingKeyBuilder) Build() (string, error)

Build creates a routing key.

func (RoutingKeyBuilder) MustBuild

func (r RoutingKeyBuilder) MustBuild() string

Panics if one of the segments in not set

func (RoutingKeyBuilder) SetAction

func (r RoutingKeyBuilder) SetAction(action string) RoutingKeyBuilder

func (RoutingKeyBuilder) SetActionCreate

func (r RoutingKeyBuilder) SetActionCreate() RoutingKeyBuilder

func (RoutingKeyBuilder) SetActionDelete

func (r RoutingKeyBuilder) SetActionDelete() RoutingKeyBuilder

func (RoutingKeyBuilder) SetActionUpdate

func (r RoutingKeyBuilder) SetActionUpdate() RoutingKeyBuilder

func (RoutingKeyBuilder) SetCloudSource

func (r RoutingKeyBuilder) SetCloudSource() RoutingKeyBuilder

func (RoutingKeyBuilder) SetDeviceSource

func (r RoutingKeyBuilder) SetDeviceSource() RoutingKeyBuilder

func (RoutingKeyBuilder) SetEventType

func (r RoutingKeyBuilder) SetEventType() RoutingKeyBuilder

func (RoutingKeyBuilder) SetGlobalScope

func (r RoutingKeyBuilder) SetGlobalScope() RoutingKeyBuilder

func (RoutingKeyBuilder) SetLocalScope

func (r RoutingKeyBuilder) SetLocalScope() RoutingKeyBuilder

func (RoutingKeyBuilder) SetObject

func (r RoutingKeyBuilder) SetObject(object string) RoutingKeyBuilder

SetObject sets the object segment that defines what object inside the container produced the message

func (RoutingKeyBuilder) SetOrgName

func (r RoutingKeyBuilder) SetOrgName(orgName string) RoutingKeyBuilder

func (RoutingKeyBuilder) SetRequestType

func (r RoutingKeyBuilder) SetRequestType() RoutingKeyBuilder

func (RoutingKeyBuilder) SetResponseType

func (r RoutingKeyBuilder) SetResponseType() RoutingKeyBuilder

func (RoutingKeyBuilder) SetScope

func (r RoutingKeyBuilder) SetScope(scope string) RoutingKeyBuilder

func (RoutingKeyBuilder) SetService

func (r RoutingKeyBuilder) SetService(service string) RoutingKeyBuilder

Setservice sets the service part of routing key.

func (RoutingKeyBuilder) SetSystem

func (r RoutingKeyBuilder) SetSystem(system string) RoutingKeyBuilder

type Shovel

type Shovel struct {
	SrcProtocol     string `json:"src-protocol" default:"amqp091"`
	DestProtocol    string `default:"amqp091" json:"dest-protocol"`
	SrcExchange     string `default:"amq.topic" json:"src-exchange"`
	SrcExchangeKey  string `json:"src-exchange-key,omitempty"`
	DestExchange    string `default:"amq.topic" json:"dest-exchange,omitempty"`
	DestExchangeKey string `json:"dest-exchange-key,omitempty"`
	DestQueue       string `json:"dest-queue,omitempty"`
	SrcQueue        string `json:"src-queue,omitempty"`
	SrcUri          string `json:"src-uri"`
	DestUri         string `json:"dest-uri"`
	Name            string `json:"name,omitempty"`
	Status          string `json:"status,omitempty"`
	Vhost           string `json:"vhost,omitempty"`
	Type            string `json:"type,omitempty"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL