rabbitmq

package
v10.0.37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected is returned when not connected to a server
	ErrNotConnected = errors.New("not connected to a server")
	// ErrAlreadyClosed is returned when the connection is already closed
	ErrAlreadyClosed = errors.New("already closed: not connected to the server")
	// ErrShutdown is returned when already shutting down
	ErrShutdown = errors.New("session is shutting down")
	// ErrNack is returned when a message publish fails with a NACK
	ErrNack = errors.New("message was not sent")
	// ErrServerBusy is returned when there is too much tcp backpressure on a channel
	ErrServerBusy = errors.New("server busy; message was not sent")
	// ErrPublishOnly is returned when a channel is publish only and you try and use a queue function
	ErrPublishOnly = errors.New("channel is publish only")
	// ErrTimedOut is returned when a message times out waiting for confirmation
	ErrTimedOut = errors.New("confirmation timed out")
)

Functions

This section is empty.

Types

type ChannelHost added in v10.0.28

type ChannelHost struct {
	Channel       *amqp.Channel
	Confirmations chan amqp.Confirmation
	Errors        chan *amqp.Error
	Backpressure  chan bool
	// contains filtered or unexported fields
}

ChannelHost is an internal representation of amqp.Connection.

func NewChannelHost added in v10.0.28

func NewChannelHost(
	logger log.Logger,
	connHost *ConnectionHost,
	ackable bool) (*ChannelHost, error)

NewChannelHost creates a simple ConnectionHost wrapper for management by end-user developer.

func (*ChannelHost) Close added in v10.0.28

func (ch *ChannelHost) Close()

Close allows for manual close of Amqp Channel kept internally.

func (*ChannelHost) MakeChannel added in v10.0.28

func (ch *ChannelHost) MakeChannel() (err error)

MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.

type Config

type Config struct {
	Name                    string
	ID                      string
	Exchange                string
	ConsumerConnectionPool  *ConnectionPool
	PublisherConnectionPool *ConnectionPool
	AutoAck                 bool
	DurableQueue            bool
	DeleteUnused            bool
	Exclusive               bool
	Args                    amqp.Table
	Qos                     int
	PublishOnly             bool
	RoutingKeys             []string
	Context                 context.Context
}

Config for the session

type ConnectionHost added in v10.0.28

type ConnectionHost struct {
	Connection *amqp.Connection

	Errors chan *amqp.Error
	// contains filtered or unexported fields
}

ConnectionHost is an internal representation of amqp.Connection.

func NewConnectionHost added in v10.0.28

func NewConnectionHost(
	logger log.Logger,
	uri string,
	connectionName string,
) (*ConnectionHost, error)

NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.

func (*ConnectionHost) CanTakeMoreChannels added in v10.0.28

func (ch *ConnectionHost) CanTakeMoreChannels(maxchannels int) bool

CanTakeMoreChannels is a helpermethod to see if a connection can take another channel

func (*ConnectionHost) Connect added in v10.0.28

func (ch *ConnectionHost) Connect() error

Connect tries to connect (or reconnect) to the provided properties of the host one time.

func (*ConnectionHost) IncrChannels added in v10.0.28

func (ch *ConnectionHost) IncrChannels(i int)

IncrChannels increments the channel count by i (use -1 for decr)

func (*ConnectionHost) VerifyConnection added in v10.0.28

func (ch *ConnectionHost) VerifyConnection()

VerifyConnection will make sure a connection is ok or try to reconnect it

type ConnectionPool added in v10.0.28

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool houses the pool of RabbitMQ connections.

func NewConnectionPool added in v10.0.28

func NewConnectionPool(logger log.Logger, baseConnectionName, uri string, connections, channelsPerConnection int) (*ConnectionPool, error)

NewConnectionPool creates hosting structure for the ConnectionPool.

func (*ConnectionPool) GetChannel added in v10.0.28

func (cp *ConnectionPool) GetChannel() (*ChannelHost, error)

GetChannel gets a ackable channel from the Pool if they exist or creates a channel.

func (*ConnectionPool) GetConnection added in v10.0.28

func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)

GetConnection gets a connection based on whats in the ConnectionPool or creates one to avoid blocking

func (*ConnectionPool) GetTransientChannel added in v10.0.28

func (cp *ConnectionPool) GetTransientChannel(ackable bool) (*amqp.Channel, error)

GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.

func (*ConnectionPool) Push added in v10.0.28

func (cp *ConnectionPool) Push(exchange, routingKey string, data amqp.Publishing) error

Push sends data using the publisher channel pool

func (*ConnectionPool) PushWithRetry added in v10.0.28

func (cp *ConnectionPool) PushWithRetry(exchange, routingKey string, data amqp.Publishing) error

PushWithRetry sends data using the publisher channel pool

func (*ConnectionPool) ReturnChannel added in v10.0.28

func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)

ReturnChannel returns a Channel.

func (*ConnectionPool) ReturnConnection added in v10.0.28

func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost)

ReturnConnection puts the connection back in the queue

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is the rabbitmq session

func New

func New(logger log.Logger, config Config) *Session

New creates a new consumer state instance, and automatically attempts to connect to the server.

func (*Session) Ack

func (session *Session) Ack(tag uint64) error

Ack a consumer tag

func (*Session) Close

func (session *Session) Close() error

Close will cleanly shutdown the channel and connection.

func (*Session) Nack added in v10.0.28

func (session *Session) Nack(tag uint64) error

Nack a consumer tag

func (*Session) Push

func (session *Session) Push(routingKey string, data amqp.Publishing) error

Push will publish data to channel

func (*Session) Stream

func (session *Session) Stream(consumergroup string, autoAck bool, exclusive bool) (chan amqp.Delivery, error)

Stream will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL