wxamqp

package module
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

WX AMQP

增加特性

  • 自动重连
  • 简化操作

功能列表

  • 获取包
go get -u gitee.com/DreamPublic/wx-amqp
  • 连接服务器
instance := wxamqp.NewWxAmqp("guest", "guest", "127.0.0.1", 5672)
instance := wxamqp.NewWxAmqpWithVhost("guest", "guest", "127.0.0.1", 5672,"vhost")
  • 获取连接通道
channel := instance.GetChannel()
  • 创建消息队列
qName, err := channel.DeclareQueue("队列名称", true, false)
  • 删除消息队列
err := channel.DeleteQueue("队列名称",false,false)
  • 创建交换机
err := channel.DeclareExchange("交换机名称", "direct", false, false)
  • 删除交换机
err := channel.DeleteExchange("交换机名称",false)
  • 绑定队列至交换机
err := channel.Bind("队列名称", "routingKey", "交换机名称")
  • 解绑队列与交换机
err := channel.Unbind("队列名称", "routingKey", "交换机名称")
  • 发送消息至指定队列
err := channel.SendToQueue("队列名称", "msg content")
  • 发送消息至指定交换机
err := channel.SendToExchange("交换机名称","routingKey", "msg content")
  • 监听队列,获取数据(手动ACK)
# consumerId 必须唯一
deliveryChan := channel.AddConsumer("队列名称", "consumerId", 1)
go func() {
    for {
        data, ok := <-deliveryChan
        if !ok {
            break
        }
        body := string(data.Body)
        fmt.Println(body)
        data.Ack(false)
    }
}()
  • 取消队列监听
channel.RemoveConsumer("consumerId")
  • 关闭连接通道
channel.Close()
  • 断开服务器
instance.DisConnect()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDisconnect = errors.New("channel 断线")

ErrDisconnect channel 断线异常

Functions

This section is empty.

Types

type AmqpChannel

type AmqpChannel struct {
	// contains filtered or unexported fields
}

AmqpChannel channel

func NewAmqpChannel

func NewAmqpChannel(amqpConnection *AmqpConnection) *AmqpChannel

NewAmqpChannel new

func (*AmqpChannel) AddConsumer

func (a *AmqpChannel) AddConsumer(queueName string, consumerName string, prefetch int) chan amqp.Delivery

AddConsumer 消费队列数据 手动ack

func (*AmqpChannel) Bind

func (a *AmqpChannel) Bind(queueName string, routingKey string, exchangeName string) error

Bind 绑定queue至exchange

func (*AmqpChannel) Close

func (a *AmqpChannel) Close()

Close 关闭连接

func (*AmqpChannel) DeclareExchange

func (a *AmqpChannel) DeclareExchange(exchangeName, kind string, durable, autoDelete bool) error

DeclareExchange 创建Exchange kind direct fanout topic headers

func (*AmqpChannel) DeclareQueue

func (a *AmqpChannel) DeclareQueue(queueName string, durable bool, autoDelete bool) (string, error)

DeclareQueue 创建队列

func (*AmqpChannel) DeleteExchange added in v0.0.5

func (a *AmqpChannel) DeleteExchange(exchangeName string, ifUnused bool) error

DeleteExchange 删除Exchange

func (*AmqpChannel) DeleteQueue added in v0.0.5

func (a *AmqpChannel) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) error

DeleteQueue 删除队列

func (*AmqpChannel) RemoveConsumer

func (a *AmqpChannel) RemoveConsumer(consumerName string)

RemoveConsumer 移除消费者

func (*AmqpChannel) SendToExchange

func (a *AmqpChannel) SendToExchange(exchangeName string, routingKey string, body string) error

SendToExchange 发送数据至指定交换机

func (*AmqpChannel) SendToExchangeWithType added in v0.0.10

func (a *AmqpChannel) SendToExchangeWithType(exchangeName string, routingKey string, body string, contentType string) error

SendToExchangeWithType 发送数据至指定交换机

func (*AmqpChannel) SendToQueue

func (a *AmqpChannel) SendToQueue(queueName string, body string) error

SendToQueue 发送数据至指定队列

func (*AmqpChannel) SendToQueueWithType added in v0.0.10

func (a *AmqpChannel) SendToQueueWithType(queueName string, body string, contentType string) error

SendToQueueWithType 发送数据至指定队列

func (*AmqpChannel) Unbind

func (a *AmqpChannel) Unbind(queueName string, routingKey string, exchangeName string) error

Unbind 解绑queue与exchange

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

AmqpConnection connection

func NewAmqpConnection

func NewAmqpConnection(username string, password string, host string, port int, vhost string) *AmqpConnection

NewAmqpConnection new

func (*AmqpConnection) Disconnect

func (a *AmqpConnection) Disconnect()

Disconnect 断开连接

type WxAmqp

type WxAmqp struct {
	// contains filtered or unexported fields
}

WxAmqp 主要功能类,用于操作消息队列

func NewWxAmqp

func NewWxAmqp(username string, password string, host string, port int) *WxAmqp

NewWxAmqp 创建一个新的连接实例

func NewWxAmqpWithVhost

func NewWxAmqpWithVhost(username string, password string, host string, port int, vhost string) *WxAmqp

NewWxAmqpWithVhost 创建一个新的连接实例,带有virtual host

func (*WxAmqp) DisConnect

func (a *WxAmqp) DisConnect()

DisConnect 断开消息队列的连接

func (*WxAmqp) GetChannel

func (a *WxAmqp) GetChannel() *AmqpChannel

GetChannel 获取连接通道

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL