mq

package
v0.0.0-...-7a3883e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DriverTypeAMQP  = mqenv.DriverTypeAMQP
	DriverTypeKafka = mqenv.DriverTypeKafka

	MQTypeConsumer  = mqenv.MQTypeConsumer
	MQTypePublisher = mqenv.MQTypePublisher

	MQEventCodeOk     = mqenv.MQEventCodeOk
	MQEventCodeFailed = mqenv.MQEventCodeFailed
)

Constants

Variables

This section is empty.

Functions

func ConsumeMQ

func ConsumeMQ(mqCategory string, consumeProxy *mqenv.MQConsumerProxy) error

ConsumeMQ consume

func FindOneCategoryNameByInstance

func FindOneCategoryNameByInstance(instanceName string) string

FindOneCategoryNameByInstance first hit category

func GetAllCategoryNamesByInstance

func GetAllCategoryNamesByInstance(instanceName string) []string

GetAllCategoryNamesByInstance by instancename

func GetAllMQDriverConfigs

func GetAllMQDriverConfigs() map[string]mqenv.MQConnectorConfig

GetAllMQDriverConfigs configs

func GetKafka

func GetKafka(name string) (*kafka.KafkaWorker, error)

GetKafka get kafka instance

func GetMQRoutes

func GetMQRoutes() map[string]Config

GetMQRoutes config map

func GetPulsar

func GetPulsar(name string) (*pulsar.PulsarMQ, error)

GetPulsar get pulsar instance

func GetRabbitMQ

func GetRabbitMQ(name string) (*rabbitmq.RabbitMQ, error)

GetRabbitMQ get rabbitmq instance

func HasConnections

func HasConnections() bool

HasConnections returns true if has any mq connection

func Init

func Init(mqConfigFile string, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

Init initializer

func InitMQTopic

func InitMQTopic(topicCategory string, topicConfig *Config, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

InitMQTopic initialize sigle mq topic with drivers

func InitMQWithRPC

func InitMQWithRPC(topicCategory string, rpcType int, connCfg *mqenv.MQConnectorConfig, mqCfg *Config) error

InitMQWithRPC init mq with RPC

func InitMockMQTopic

func InitMockMQTopic(mqCategory string, topic string)

InitMockMQTopic for testing

func InitWithMQRoutes

func InitWithMQRoutes(mqRoutesEnv *RoutesEnv, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

InitWithMQRoutes patitionally init with RoutesEnv

func NewMQResponseMessage

func NewMQResponseMessage(body []byte, cm *mqenv.MQConsumerMessage) *mqenv.MQPublishMessage

NewMQResponseMessage new mq response publish messge depends on mq consumer message

func PublishMQ

func PublishMQ(mqCategory string, publishMsg *mqenv.MQPublishMessage) error

PublishMQ publish

func QueryMQ

func QueryMQ(mqCategory string, pm *mqenv.MQPublishMessage) (*mqenv.MQConsumerMessage, error)

QueryMQ publishes a message and waiting the response

func QueryMQRPC

func QueryMQRPC(mqCategory string, pm *mqenv.MQPublishMessage) (*mqenv.MQConsumerMessage, error)

QueryMQRPC publishes a message and waiting the response

func SetMQConfig

func SetMQConfig(category string, cnf Config)

SetMQConfig with category

func SetupTrackerQueue

func SetupTrackerQueue(queueName string)

SetupTrackerQueue name

Types

type Config

type Config struct {
	Instance string `yaml:"instance" json:"instance"`
	// RabbitMQ parameters
	Queue       string            `yaml:"queue" json:"queue"`
	Exchange    Exchange          `yaml:"exchange" json:"exchange"`
	BindingKey  string            `yaml:"bindingKey" json:"bindingKey"`
	RoutingKeys map[string]string `yaml:"routingKeys" json:"routingKeys"`
	Durable     bool              `yaml:"durable" json:"durable"`
	AutoDelete  bool              `yaml:"autoDelete" json:"autoDelete"`
	RPCEnabled  bool              `yaml:"rpcEnabled"`
	// Kafka parameters
	Topic             string `yaml:"topic" json:"topic"`
	GroupID           string `yaml:"groupId" json:"groupId"`
	Partition         int    `yaml:"partition" json:"partition"`
	MaxPollIntervalMS int    `yaml:"maxPollIntervalMs" json:"maxPollIntervalMs"`
	// 消息类型:
	//direct:组播,订阅同一个topic,消费者组会相同,一条消息只会被组内一个消费者接收
	//fanout:广播,订阅同一个topic,但是消费者组会使用uuid,所有组都会收到信息
	MessageType        string `yaml:"messageType" json:"messageType"`
	UseOriginalContent bool   `yaml:"useOriginalContent" json:"useOriginalContent"`
}

Config struct

func GetMQConfig

func GetMQConfig(category string) *Config

GetMQConfig config

type Exchange

type Exchange struct {
	Type    string `yaml:"type" json:"type"`
	Name    string `yaml:"name" json:"name"`
	Durable bool   `yaml:"durable" json:"durable"`
}

Exchange struct

type RoutesEnv

type RoutesEnv struct {
	MQs map[string]Config `yaml:"mq"`
	// contains filtered or unexported fields
}

RoutesEnv struct

func InitMQRoutesEnv

func InitMQRoutesEnv(configFile string) (*RoutesEnv, error)

InitMQRoutesEnv initialize with configure file

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL