consumer

package
v0.3.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrQueueExclusiveUseCode           = amqp.AccessRefused
	ErrQueueExclusiveUseReasonTemplate = `ACCESS_REFUSED - queue '%s' in vhost '%s' in exclusive use`
	ErrChannelOrConnectionClosedCode   = amqp.ChannelError
	ErrChannelOrConnectionClosedReason = `channel/connection is not open`
)
View Source
const (
	DefaultMaxConnections      = 20
	DefaultInitConnections     = 5
	DefaultMaxIdleConnections  = 10
	DefaultMaxIdleTime         = 1800 // seconds
	DefaultMaxWaitTime         = 1    // seconds
	DefaultMaxRetryCount       = -1
	DefaultKeepAliveInterval   = 300 // seconds
	DefaultKeepAliveChunkSize  = 5
	DefaultSleepTime           = 1 // seconds
	DefaultFreeChanLengthTimes = 2

	DefaultUnlimitedWaitTime   = -1 // seconds
	DefaultUnlimitedRetryCount = -1
	DefaultDelayTime           = 5 // milliseconds
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Conn  *client.Conn
	Chan  *amqp.Channel
	Queue amqp.Queue
}

func NewConsumer

func NewConsumer(addr, user, pass, vhost, tag, exchange, queue, key string) (*Consumer, error)

NewConsumer returns a new *Consumer

func NewConsumerWithConfig

func NewConsumerWithConfig(config *rabbitmq.Config) (*Consumer, error)

NewConsumerWithConfig returns a new *Consumer with given config

func (*Consumer) Ack

func (c *Consumer) Ack(tag uint64, multiple bool) error

Ack acknowledges a delivery

func (*Consumer) Cancel

func (c *Consumer) Cancel() error

Cancel cancels the delivery of a consumer

func (*Consumer) Channel

func (c *Consumer) Channel() (*amqp.Channel, error)

Channel returns the amqp channel, if the channel of the consumer is nil or had been closed, a new channel will be opened, otherwise the existing channel will be returned

func (*Consumer) Close

func (c *Consumer) Close() error

Close disconnects the rabbitmq server

func (*Consumer) Consume

func (c *Consumer) Consume(queue string, exclusive bool) (<-chan amqp.Delivery, error)

Consume consumes messages from the queue

func (*Consumer) Disconnect

func (c *Consumer) Disconnect() error

Disconnect disconnects the rabbitmq server

func (*Consumer) ExchangeDeclare

func (c *Consumer) ExchangeDeclare(name, kind string) error

ExchangeDeclare declares an exchange

func (*Consumer) IsChannelOrConnectionClosedError

func (c *Consumer) IsChannelOrConnectionClosedError(err error) bool

IsNotFoundQueueError returns true if the error is channel or connection closed error

func (*Consumer) IsExclusiveUseError

func (c *Consumer) IsExclusiveUseError(queue string, err error) bool

IsExclusiveUseError returns true if the error is exclusive use error

func (*Consumer) Nack

func (c *Consumer) Nack(tag uint64, multiple bool, requeue bool) error

Nack negatively acknowledge a delivery

func (*Consumer) Qos

func (c *Consumer) Qos(prefetchCount int, global bool) error

Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks.

func (*Consumer) QueueBind

func (c *Consumer) QueueBind(queue, exchange, key string) error

QueueBind binds a queue to an exchange

func (*Consumer) QueueDeclare

func (c *Consumer) QueueDeclare(name string) error

QueueDeclare declares a queue

type Pool

type Pool struct {
	sync.Mutex
	*rabbitmq.PoolConfig
	// contains filtered or unexported fields
}

func NewPool

func NewPool(addr, user, host, vhost, tagPrefix, exchange, queue, key string,
	maxConnections, initConnections, maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)

NewPool returns a new *Pool

func NewPoolWithConfig

func NewPoolWithConfig(config *rabbitmq.Config, maxConnections, initConnections,
	maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)

NewPoolWithConfig returns a new *Pool with a Config object

func NewPoolWithDefault

func NewPoolWithDefault(addr, user, host, vhost, tagPrefix, exchange, queue, key string) (*Pool, error)

NewPoolWithDefault returns a new *Pool with default configuration

func NewPoolWithPoolConfig

func NewPoolWithPoolConfig(config *rabbitmq.PoolConfig) (*Pool, error)

NewPoolWithPoolConfig returns a new *Pool with a PoolConfig object

func (*Pool) Close

func (p *Pool) Close() error

Close releases each connection in the pool

func (*Pool) Get

func (p *Pool) Get() (*PoolConsumer, error)

Get is an exported alias of get() function with routine safe

func (*Pool) GetFullTag added in v0.3.19

func (p *Pool) GetFullTag() string

GetFullTag gets the full tag

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns if pool had been closed

func (*Pool) Release

func (p *Pool) Release(num int) error

Release is an exported alias of release() function

func (*Pool) Supply

func (p *Pool) Supply(num int) error

Supply is an exported alias of supply() function with routine safe

func (*Pool) UsedConnections

func (p *Pool) UsedConnections() int

UsedConnections returns used connection number

type PoolConsumer

type PoolConsumer struct {
	*Consumer
	Pool *Pool
}

func NewPoolConsumer

func NewPoolConsumer(pool *Pool) (*PoolConsumer, error)

NewPoolConsumer returns a new *PoolConsumer

func (*PoolConsumer) Close

func (pp *PoolConsumer) Close() error

Close closes the channel and returns the Consumer back to the pool

func (*PoolConsumer) Disconnect

func (pp *PoolConsumer) Disconnect() error

Disconnect disconnects from rabbitmq, normally when using connection pool, there is no need to disconnect manually, consider to use Close() instead.

func (*PoolConsumer) GetTag added in v0.3.19

func (pp *PoolConsumer) GetTag() string

GetTag returns the tag of the pool consumer, should use this method to get the tag instead of using the tag property of the pool config, because the tag of the pool config is the prefix of the tag of the pool consumer

func (*PoolConsumer) IsValid

func (pp *PoolConsumer) IsValid() bool

IsValid validates if connection is valid

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL