rabbitmq

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2023 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InitialTimeout      = 5                // 默认失败重新间隔/s
	DefaultExchangeName = "default.change" // 默认交换机
	DefaultExchangeType = "direct"         // 默认交换类型
)

Variables

This section is empty.

Functions

func MqStart

func MqStart() (err error)

func ReceiveMsg

func ReceiveMsg(queueExchange QueueExchange, receiver Receiver, otherParams ...int) (err error)

ReceiveMsg 接收消息,连接失败后进行重试

func Recv2

func Recv2(mq RabbitMQ, receiver Receiver, taskQuit chan<- struct{})

Recv2 当前监听队列关闭后通知进行重试机制

func Send added in v1.0.8

func Send(queueExchange QueueExchange, msg interface{}) (err error)

Send 发送消息

Types

type MapChild

type MapChild struct {
	Name string `json:"name"`
	Age  int    `json:"age"`
}

type MapTest

type MapTest struct {
	Name  string   `json:"name"`
	Age   int      `json:"age"`
	Child MapChild `json:"child"`
}

type QueueExchange

type QueueExchange struct {
	QuName string // 队列名称
	RtKey  string // key值
	ExName string // 交换机名称
	ExType string // 交换机类型
	Dns    string //链接地址
}

QueueExchange 定义队列交换机对象,外部可调用

type RabbitMQ

type RabbitMQ struct {
	Channel *amqp.Channel

	QueueName    string // 队列名称
	RoutingKey   string // key名称
	ExchangeName string // 交换机名称
	ExchangeType string // 交换机类型
	// contains filtered or unexported fields
}

RabbitMQ 定义RabbitMQ对象

var StartQueue *RabbitMQ

func NewMq

func NewMq(q QueueExchange) RabbitMQ

NewMq 创建一个新的Mq操作对象,用于延迟消息处理

func (*RabbitMQ) CloseMqChannel

func (r *RabbitMQ) CloseMqChannel() (err error)

CloseMqChannel 关闭rabbitMQ信道

func (*RabbitMQ) CloseMqConnect

func (r *RabbitMQ) CloseMqConnect() (err error)

CloseMqConnect 关闭mq链接

func (*RabbitMQ) MqConnect

func (r *RabbitMQ) MqConnect() (err error)

MqConnect 获取连接给 RabbitMQ

func (*RabbitMQ) MqOpenChannel

func (r *RabbitMQ) MqOpenChannel() (err error)

MqOpenChannel 获取信道给 RabbitMQ

func (*RabbitMQ) SendDelayMsg added in v1.0.8

func (mq *RabbitMQ) SendDelayMsg(body string, ttl int64) (err error)

SendDelayMsg 生产者发送延时消息

func (*RabbitMQ) SendMsg added in v1.0.8

func (mq *RabbitMQ) SendMsg(body interface{}) (err error)

Send 生产者发送消息

func (*RabbitMQ) SendRetryMsg added in v1.0.8

func (mq *RabbitMQ) SendRetryMsg(body string, retry_nums int32, args ...string)

SendRetryMsg 发送重试消息

type Receiver

type Receiver interface {
	Consumer([]byte) error
	FailAction(error, []byte) error
}

Receiver 定义接收者接口

type TestReceive

type TestReceive struct {
}

func (*TestReceive) Consumer

func (t *TestReceive) Consumer(byte []byte) error

func (*TestReceive) FailAction

func (t *TestReceive) FailAction(err error, byte []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL