mq

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: MIT Imports: 7 Imported by: 0

README

消息队列

常用消息队列 整合接口

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Kafka

type Kafka struct {
	Service []string
	Topic   string
}

func NewKafka

func NewKafka(service []string, topic string) *Kafka

func (*Kafka) Consumer

func (k *Kafka) Consumer(ch chan []byte)

Consumer 消费,实例如下

kf := NewKafka([]string{"127.0.0.1:9092"}, "case")
ch := make(chan []byte)
go func() {
	for {
		select {
		case data := <-ch:
			log.Println("消费数据:", data)
		}
	}
}()
kf.Consumer(ch)

func (*Kafka) Producer

func (k *Kafka) Producer(data []byte)

Producer 生产,实例如下

kf := NewKafka([]string{"127.0.0.1:9092"}, "case")
for _, v := range []string{"1", "2", "3", "4", "5"} {
	kf.Producer([]byte(v))
}

type MQKafkaService

type MQKafkaService struct {
}

MQKafkaService Kafka消息队列

func (*MQKafkaService) Consumer

func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte))

Consumer 消费者

func (*MQKafkaService) Producer

func (m *MQKafkaService) Producer(topic string, data []byte)

Producer 生产者

type MQNsqService

type MQNsqService struct {
}

MQNsqService NSQ消息队列

func (*MQNsqService) Consumer

func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte))

Consumer 消费者

func (*MQNsqService) Producer

func (m *MQNsqService) Producer(topic string, data []byte)

Producer 生产者

type MQRabbitService

type MQRabbitService struct {
}

MQRabbitService Rabbit消息队列

func (*MQRabbitService) Consumer

func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte))

Consumer 消费者

func (*MQRabbitService) Producer

func (m *MQRabbitService) Producer(topic string, data []byte)

Producer 生产者

type MQer

type MQer interface {
	Producer(topic string, data []byte)
	Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}

MQer 消息队列接口

func NewMQ

func NewMQ() MQer

NewMQ 实例化消息队列对象

type MessageHandler

type MessageHandler struct {
	Channel string
	// contains filtered or unexported fields
}

MessageHandler MessageHandler

func NewMessageHandler

func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error)

NewMessageHandler return new MessageHandler

func (*MessageHandler) Registry

func (m *MessageHandler) Registry(topic string, ch chan []byte)

Registry register nsq topic

func (*MessageHandler) SetMaxInFlight

func (m *MessageHandler) SetMaxInFlight(val int)

SetMaxInFlight set nsq consumer MaxInFlight

type Producer

type Producer struct {
	P *goNsq.Producer
}

func NewProducer

func NewProducer(addr string) (producer *Producer, err error)

func (*Producer) Publish

func (m *Producer) Publish(topic string, data []byte) (err error)

type RabbitMQ

type RabbitMQ struct {

	//队列名称
	QueueName string
	//交换机
	Exchange string
	//key Simple模式 几乎用不到
	Key string
	//连接信息
	MqUrl string
	// contains filtered or unexported fields
}

func NewRabbitMQ

func NewRabbitMQ(queueName, exchange, key, amqpUrl string) (*RabbitMQ, error)

NewRabbitMQ 创建RabbitMQ结构体实例

func NewRabbitMQPubSub

func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error)

NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式)

func NewRabbitMQRouting

func NewRabbitMQRouting(exchange string, routingKey string) (*RabbitMQ, error)

NewRabbitMQRouting 路由模式 创建RabbitMQ实例

func NewRabbitMQSimple

func NewRabbitMQSimple(queueName string) (*RabbitMQ, error)

NewRabbitMQSimple 简单模式step:1。创建简单模式下RabbitMQ实例

func NewRabbitMQTopic

func NewRabbitMQTopic(exchange string, routingKey string) (*RabbitMQ, error)

NewRabbitMQTopic 话题模式 创建RabbitMQ实例

func (*RabbitMQ) Destroy

func (r *RabbitMQ) Destroy()

Destroy 断开channel和connection

func (*RabbitMQ) PublishPub

func (r *RabbitMQ) PublishPub(message []byte) (err error)

PublishPub 订阅模式生成

func (*RabbitMQ) PublishRouting

func (r *RabbitMQ) PublishRouting(message []byte) (err error)

PublishRouting 路由模式发送信息

func (*RabbitMQ) PublishSimple

func (r *RabbitMQ) PublishSimple(message []byte) (err error)

PublishSimple 简单模式Step:2、简单模式下生产代码

func (*RabbitMQ) PublishTopic

func (r *RabbitMQ) PublishTopic(message []byte) (err error)

PublishTopic 话题模式发送信息

func (*RabbitMQ) RegistryConsumeSimple

func (r *RabbitMQ) RegistryConsumeSimple() (msg <-chan amqp.Delivery)

RegistryConsumeSimple 简单模式注册消费者

func (*RabbitMQ) RegistryReceiveRouting

func (r *RabbitMQ) RegistryReceiveRouting() (msg <-chan amqp.Delivery)

RegistryReceiveRouting 路由模式接收信息

func (*RabbitMQ) RegistryReceiveSub

func (r *RabbitMQ) RegistryReceiveSub() (msg <-chan amqp.Delivery)

RegistryReceiveSub 订阅模式消费端代码

func (*RabbitMQ) RegistryReceiveTopic

func (r *RabbitMQ) RegistryReceiveTopic() (msg <-chan amqp.Delivery)

RegistryReceiveTopic 话题模式接收信息 要注意key 其中* 用于匹配一个单词,#用于匹配多个单词(可以是零个) 匹配 xx.* 表示匹配xx.hello,但是xx.hello.one需要用xx.#才能匹配到

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL