amqppool

package module
v0.0.0-...-62d0397 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2020 License: MIT Imports: 4 Imported by: 0

README

AMQP Pool

A simple amqp channel pool made from of go client streadway/amqp.

WARNING: Immature, don't was tested them enough in production.

Install

go get github.com/gmarcial/amqp-pool

Usage

Test

Configure an instance of rabbitmq on your machine, export the connection string how environment variable and run the tests:

export AMQP_CONNECTION=amqp://guest:guest@127.0.0.1:5672/ 

go test ./...

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAllChannelsInUse  = &AllChannelsInUseError{message: "failed in try get a reusable channel, all are in use"}
	ErrUseReleaseChannel = &UseReleaseChannelError{message: "Tried to use a reusable channel that was already released"}
)

Functions

This section is empty.

Types

type AllChannelsInUseError

type AllChannelsInUseError struct {
	// contains filtered or unexported fields
}

AllChannelsInUseError an error of when is tried to get a reusable channel, but was hit the maximum quantity of pool.

func (*AllChannelsInUseError) Error

func (err *AllChannelsInUseError) Error() string

Error implementing the error interface

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a connection and manage the pool of reusable channels

func NewPool

func NewPool(connectionString string, maxChannels int, logger *log.Logger) (*Pool, error)

NewPool create a new Pool

func (*Pool) Close

func (pool *Pool) Close() error

Close close the connection with the broker amqp

func (*Pool) CloseReusableChannel

func (pool *Pool) CloseReusableChannel(id int) error

Close close a channel and remove of pool

func (*Pool) GetReusableChannel

func (pool *Pool) GetReusableChannel() (*ReusableChannel, error)

GetReusableChannel get a reusable channel of the pool to use

type ReusableChannel

type ReusableChannel struct {
	ID int //identification of a reusable channel
	// contains filtered or unexported fields
}

ReusableChannel represents a channel amqp that can be reusable

func (*ReusableChannel) Ack

func (reusableChannel *ReusableChannel) Ack(tag uint64, multiple bool) error

Ack wrap to use in reusable channel

func (*ReusableChannel) Cancel

func (reusableChannel *ReusableChannel) Cancel(consumer string, noWait bool) error

Cancel wrap to use in reusable channel

func (*ReusableChannel) Confirm

func (reusableChannel *ReusableChannel) Confirm(noWait bool) error

Confirm wrap to use in reusable channel

func (*ReusableChannel) Consume

func (reusableChannel *ReusableChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume wrap to use in reusable channel

func (*ReusableChannel) ExchangeBind

func (reusableChannel *ReusableChannel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error

ExchangeBind wrap to use in reusable channel

func (*ReusableChannel) ExchangeDeclare

func (reusableChannel *ReusableChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

ExchangeDeclare wrap to use in reusable channel

func (*ReusableChannel) ExchangeDeclarePassive

func (reusableChannel *ReusableChannel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

ExchangeDeclarePassive wrap to use in reusable channel

func (*ReusableChannel) ExchangeDelete

func (reusableChannel *ReusableChannel) ExchangeDelete(name string, ifUnused, noWait bool) error

ExchangeDelete wrap to use in reusable channel

func (*ReusableChannel) ExchangeUnbind

func (reusableChannel *ReusableChannel) ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error

ExchangeUnbind wrap to use in reusable channel

func (*ReusableChannel) Flow

func (reusableChannel *ReusableChannel) Flow(active bool) error

Flow wrap to use in reusable channel

func (*ReusableChannel) Get

func (reusableChannel *ReusableChannel) Get(queue string, autoAck bool) (msg amqp.Delivery, ok bool, err error)

Get wrap to use in reusable channel

func (*ReusableChannel) Nack

func (reusableChannel *ReusableChannel) Nack(tag uint64, multiple bool, requeue bool) error

Nack wrap to use in reusable channel

func (*ReusableChannel) NotifyCancel

func (reusableChannel *ReusableChannel) NotifyCancel(c chan string) (chan string, error)

NotifyCancel wrap to use in reusable channel

func (*ReusableChannel) NotifyClose

func (reusableChannel *ReusableChannel) NotifyClose(c chan *amqp.Error) (chan *amqp.Error, error)

NotifyClose wrap to use in reusable channel

func (*ReusableChannel) NotifyConfirm

func (reusableChannel *ReusableChannel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64, error)

NotifyConfirm wrap to use in reusable channel

func (*ReusableChannel) NotifyFlow

func (reusableChannel *ReusableChannel) NotifyFlow(c chan bool) (chan bool, error)

NotifyFlow wrap to use in reusable channel

func (*ReusableChannel) NotifyPublish

func (reusableChannel *ReusableChannel) NotifyPublish(confirm chan amqp.Confirmation) (chan amqp.Confirmation, error)

NotifyPublish wrap to use in reusable channel

func (*ReusableChannel) NotifyReturn

func (reusableChannel *ReusableChannel) NotifyReturn(c chan amqp.Return) (chan amqp.Return, error)

NotifyReturn wrap to use in reusable channel

func (*ReusableChannel) Publish

func (reusableChannel *ReusableChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

Publish wrap to use in reusable channel

func (*ReusableChannel) Qos

func (reusableChannel *ReusableChannel) Qos(prefetchCount, prefetchSize int, global bool) error

Qos wrap to use in reusable channel

func (*ReusableChannel) QueueBind

func (reusableChannel *ReusableChannel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

QueueBind wrap to use in reusable channel

func (*ReusableChannel) QueueDeclare

func (reusableChannel *ReusableChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclare wrap to use in reusable channel

func (*ReusableChannel) QueueDeclarePassive

func (reusableChannel *ReusableChannel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclarePassive wrap to use in reusable channel

func (*ReusableChannel) QueueDelete

func (reusableChannel *ReusableChannel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

QueueDelete wrap to use in reusable channel

func (*ReusableChannel) QueueInspect

func (reusableChannel *ReusableChannel) QueueInspect(name string) (amqp.Queue, error)

QueueInspect wrap to use in reusable channel

func (*ReusableChannel) QueueUnbind

func (reusableChannel *ReusableChannel) QueueUnbind(name, key, exchange string, args amqp.Table) error

QueueUnbind wrap to use in reusable channel

func (*ReusableChannel) Recover

func (reusableChannel *ReusableChannel) Recover(requeue bool) error

Recover wrap to use in reusable channel

func (*ReusableChannel) Reject

func (reusableChannel *ReusableChannel) Reject(tag uint64, requeue bool) error

Reject wrap to use in reusable channel

func (*ReusableChannel) Release

func (reusableChannel *ReusableChannel) Release()

Release release the reusable channel in use back to pool

func (*ReusableChannel) Tx

func (reusableChannel *ReusableChannel) Tx() error

Tx wrap to use in reusable channel

func (*ReusableChannel) TxCommit

func (reusableChannel *ReusableChannel) TxCommit() error

TxCommit wrap to use in reusable channel

func (*ReusableChannel) TxRollback

func (reusableChannel *ReusableChannel) TxRollback() error

TxRollback wrap to use in reusable channel

type UseReleaseChannelError

type UseReleaseChannelError struct {
	// contains filtered or unexported fields
}

UseReleaseChannelError an error of when is tried to use a reusable channel but was released back to pool.

func (*UseReleaseChannelError) Error

func (err *UseReleaseChannelError) Error() string

Error implementing the error interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL