mq

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2023 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DriverTypeAMQP  = mqenv.DriverTypeAMQP
	DriverTypeKafka = mqenv.DriverTypeKafka

	MQTypeConsumer  = mqenv.MQTypeConsumer
	MQTypePublisher = mqenv.MQTypePublisher

	MQEventCodeOk     = mqenv.MQEventCodeOk
	MQEventCodeFailed = mqenv.MQEventCodeFailed
)

Constants

Variables

This section is empty.

Functions

func ConsumeMQ

func ConsumeMQ(mqCategory string, consumeProxy *mqenv.MQConsumerProxy) error

ConsumeMQ consume

func FindOneCategoryNameByInstance added in v0.2.5

func FindOneCategoryNameByInstance(instanceName string) string

FindOneCategoryNameByInstance first hit category

func GetAllCategoryNamesByInstance added in v0.2.5

func GetAllCategoryNamesByInstance(instanceName string) []string

GetAllCategoryNamesByInstance by instancename

func GetAllMQDriverConfigs added in v0.2.2

func GetAllMQDriverConfigs() map[string]mqenv.MQConnectorConfig

GetAllMQDriverConfigs configs

func GetKafka

func GetKafka(name string) (*kafka.KafkaWorker, error)

GetKafka get kafka instance

func GetMQRoutes

func GetMQRoutes() map[string]Config

GetMQRoutes config map

func GetPulsar added in v0.2.6

func GetPulsar(name string) (*pulsar.PulsarMQ, error)

GetPulsar get pulsar instance

func GetRabbitMQ

func GetRabbitMQ(name string) (*rabbitmq.RabbitMQ, error)

GetRabbitMQ get rabbitmq instance

func HasConnections added in v0.2.6

func HasConnections() bool

HasConnections returns true if has any mq connection

func Init

func Init(mqConfigFile string, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

Init initializer

func InitMQTopic added in v0.2.0

func InitMQTopic(topicCategory string, topicConfig *Config, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

InitMQTopic initialize sigle mq topic with drivers

func InitMQWithRPC

func InitMQWithRPC(topicCategory string, rpcType int, connCfg *mqenv.MQConnectorConfig, mqCfg *Config) error

InitMQWithRPC init mq with RPC

func InitMockMQTopic added in v0.2.6

func InitMockMQTopic(mqCategory string, topic string)

InitMockMQTopic for testing

func InitWithMQRoutes added in v0.2.1

func InitWithMQRoutes(mqRoutesEnv *RoutesEnv, mqDriverConfigs map[string]mqenv.MQConnectorConfig) error

InitWithMQRoutes patitionally init with RoutesEnv

func NewMQResponseMessage added in v0.2.2

func NewMQResponseMessage(body []byte, cm *mqenv.MQConsumerMessage) *mqenv.MQPublishMessage

NewMQResponseMessage new mq response publish messge depends on mq consumer message

func PublishMQ

func PublishMQ(mqCategory string, publishMsg *mqenv.MQPublishMessage) error

PublishMQ publish

func QueryMQ added in v0.2.6

func QueryMQ(mqCategory string, pm *mqenv.MQPublishMessage) (*mqenv.MQConsumerMessage, error)

QueryMQ publishes a message and waiting the response

func QueryMQRPC added in v0.2.1

func QueryMQRPC(mqCategory string, pm *mqenv.MQPublishMessage) (*mqenv.MQConsumerMessage, error)

QueryMQRPC publishes a message and waiting the response

func SetMQConfig added in v0.2.6

func SetMQConfig(category string, cnf Config)

SetMQConfig with category

func SetupTrackerQueue added in v0.2.1

func SetupTrackerQueue(queueName string)

SetupTrackerQueue name

Types

type Config

type Config struct {
	Instance string `yaml:"instance" json:"instance"`
	// RabbitMQ parameters
	Queue       string            `yaml:"queue" json:"queue"`
	Exchange    Exchange          `yaml:"exchange" json:"exchange"`
	BindingKey  string            `yaml:"bindingKey" json:"bindingKey"`
	RoutingKeys map[string]string `yaml:"routingKeys" json:"routingKeys"`
	Durable     bool              `yaml:"durable" json:"durable"`
	AutoDelete  bool              `yaml:"autoDelete" json:"autoDelete"`
	RPCEnabled  bool              `yaml:"rpcEnabled"`
	// Kafka parameters
	Topic             string `yaml:"topic" json:"topic"`
	GroupID           string `yaml:"groupId" json:"groupId"`
	Partition         int    `yaml:"partition" json:"partition"`
	MaxPollIntervalMS int    `yaml:"maxPollIntervalMs" json:"maxPollIntervalMs"`
	// 消息类型:
	//direct:组播,订阅同一个topic,消费者组会相同,一条消息只会被组内一个消费者接收
	//fanout:广播,订阅同一个topic,但是消费者组会使用uuid,所有组都会收到信息
	MessageType        string `yaml:"messageType" json:"messageType"`
	UseOriginalContent bool   `yaml:"useOriginalContent" json:"useOriginalContent"`
}

Config struct

func GetMQConfig

func GetMQConfig(category string) *Config

GetMQConfig config

type Exchange

type Exchange struct {
	Type    string `yaml:"type" json:"type"`
	Name    string `yaml:"name" json:"name"`
	Durable bool   `yaml:"durable" json:"durable"`
}

Exchange struct

type RoutesEnv

type RoutesEnv struct {
	MQs map[string]Config `yaml:"mq"`
	// contains filtered or unexported fields
}

RoutesEnv struct

func InitMQRoutesEnv

func InitMQRoutesEnv(configFile string) (*RoutesEnv, error)

InitMQRoutesEnv initialize with configure file

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL