rabbitmq

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2023 License: Apache-2.0 Imports: 7 Imported by: 1

README

rabbitmq

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	StateClosed    = uint8(0)
	StateOpened    = uint8(1)
	StateReopening = uint8(2)
)

*

  • 基于RabbitMQ的AMQ提供器,其初始化参数格式如下:
  • <pre>
  • {
  • "brokerURL" : "amqp://guest:guest@localhost:5672/", // RabbitMQ server的连接地址
  • "username" : "admin", // 登录RabbitMQ的账号
  • "password" : "admin", // 登录RabbitMQ的密码
  • }
  • </pre> *
  • @author Chirs Chou

Functions

This section is empty.

Types

type Binding

type Binding struct {
	RouteKey string
	Queues   []*Queue
	NoWait   bool       // default is false
	Args     amqp.Table // default is nil
}

Biding routeKey ==> queues

type Client

type Client struct {
	// contains filtered or unexported fields
}

func Engine

func Engine(conf configuration.Configuration, systemId string) *Client

func New

func New(brokerURL, username, password, vhost string) *Client

func (*Client) Channel added in v1.0.2

func (c *Client) Channel() (*amqp.Channel, error)

func (*Client) Close

func (c *Client) Close()

func (*Client) CloseConsumer

func (c *Client) CloseConsumer(name string) error

func (*Client) CloseProducer

func (c *Client) CloseProducer(name string) error

func (*Client) Consumer

func (c *Client) Consumer(name string) (*Consumer, error)

func (*Client) Open

func (c *Client) Open() (mq *Client, err error)

func (*Client) Producer

func (c *Client) Producer(name string) (*Producer, error)

func (*Client) State

func (c *Client) State() uint8

type ConsumeOption

type ConsumeOption struct {
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
}

消费者消费选项

func DefaultConsumeOption

func DefaultConsumeOption() *ConsumeOption

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

基于RabbitMQ消息中间件的客户端实现。

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) CloseChan

func (c *Consumer) CloseChan()

CloseChan 该接口仅用于测试使用, 勿手动调用

func (*Consumer) Name

func (c *Consumer) Name() string

func (*Consumer) Open

func (c *Consumer) Open() error

func (*Consumer) SetExchangeBinds

func (c *Consumer) SetExchangeBinds(eb []*ExchangeBinds) *Consumer

func (*Consumer) SetMsgCallback

func (c *Consumer) SetMsgCallback(cb chan<- Delivery) *Consumer

func (*Consumer) SetQos

func (c *Consumer) SetQos(prefetch int) *Consumer

SetQos 设置channel粒度的Qos, prefetch取值范围[0,∞), 默认为0 如果想要RoundRobin地进行消费,设置prefetch为1即可 注意:在调用Open前设置

func (*Consumer) State

func (c *Consumer) State() uint8

type Delivery

type Delivery struct {
	amqp.Delivery
}

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table // default is nil
}

Exchange 基于amqp的Exchange配置

func DefaultExchange

func DefaultExchange(name string, kind string, exchangeArgs amqp.Table) *Exchange

type ExchangeBinds

type ExchangeBinds struct {
	Exch     *Exchange
	Bindings []*Binding
}

ExchangeBinds exchange ==> routeKey ==> queues

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

基于RabbitMQ的生产者封装。

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Confirm

func (p *Producer) Confirm(enable bool) *Producer

Confirm 是否开启生产者confirm功能, 默认为false, 该选项在Open()前设置. 说明: 目前仅实现串行化的confirm, 每次的等待confirm额外需要约50ms,建议上层并发调用Publish

func (*Producer) ForDirect

func (p *Producer) ForDirect(exchange, route, queue, data string) error

func (*Producer) ForFanout

func (p *Producer) ForFanout(exchange, data string) error

func (*Producer) ForQueueArgs

func (p *Producer) ForQueueArgs(exchange, route, kind string, bindings []*Binding, data []byte) error

func (*Producer) ForTopic

func (p *Producer) ForTopic(exchange, route, queue, data string) error

func (*Producer) IsOpen

func (p *Producer) IsOpen() bool

func (Producer) Name

func (p Producer) Name() string

func (*Producer) Open

func (p *Producer) Open() error

func (*Producer) Publish

func (p *Producer) Publish(exchange, route string, msg *PublishMsg) error

在同步Publish Confirm模式下, 每次Publish将额外有约50ms的等待时间.如果采用这种模式,建议上层并发publish

func (*Producer) SetExchangeBinds

func (p *Producer) SetExchangeBinds(eb []*ExchangeBinds) *Producer

func (*Producer) State

func (p *Producer) State() uint8

type PublishMsg

type PublishMsg struct {
	ContentType     string // MIME content type
	ContentEncoding string // MIME content type
	DeliveryMode    uint8  // Transient or Persistent
	Priority        uint8  // 0 to 9
	Timestamp       time.Time
	Body            []byte
}

生产者生产的数据格式

func NewPublishMsg

func NewPublishMsg(body []byte) *PublishMsg

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Queue 基于amqp的Queue配置

func DefaultQueue

func DefaultQueue(name string, queueArgs amqp.Table) *Queue

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL