Documentation ¶
Index ¶
- type ChannelHost
- type ChannelPool
- func (cp *ChannelPool) AckChannelCount() int64
- func (cp *ChannelPool) ChannelCount() int64
- func (cp *ChannelPool) Errors() <-chan error
- func (cp *ChannelPool) FlagChannel(channelID uint64)
- func (cp *ChannelPool) FlushErrors()
- func (cp *ChannelPool) GetAckableChannel() (*ChannelHost, error)
- func (cp *ChannelPool) GetChannel() (*ChannelHost, error)
- func (cp *ChannelPool) Initialize() error
- func (cp *ChannelPool) IsChannelFlagged(channelID uint64) bool
- func (cp *ChannelPool) ReturnChannel(chanHost *ChannelHost, flagChannel bool)
- func (cp *ChannelPool) Shutdown()
- func (cp *ChannelPool) UnflagChannel(channelID uint64)
- type ConnectionHost
- func (ch *ConnectionHost) AddAckChannel()
- func (ch *ConnectionHost) AddChannel()
- func (ch *ConnectionHost) CanAddAckChannel() bool
- func (ch *ConnectionHost) CanAddChannel() bool
- func (ch *ConnectionHost) CloseErrors() <-chan *amqp.Error
- func (ch *ConnectionHost) RemoveAckChannel() error
- func (ch *ConnectionHost) RemoveChannel() error
- type ConnectionPool
- func (cp *ConnectionPool) ConnectionCount() int64
- func (cp *ConnectionPool) Errors() <-chan error
- func (cp *ConnectionPool) FlagConnection(connectionID uint64)
- func (cp *ConnectionPool) FlushErrors()
- func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
- func (cp *ConnectionPool) Initialize() error
- func (cp *ConnectionPool) IsConnectionFlagged(connectionID uint64) bool
- func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost)
- func (cp *ConnectionPool) Shutdown()
- func (cp *ConnectionPool) UnflagConnection(connectionID uint64)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelHost ¶
type ChannelHost struct { Channel *amqp.Channel ChannelID uint64 ConnectionID uint64 ErrorMessages chan *models.ErrorMessage ReturnMessages chan *models.ReturnMessage // contains filtered or unexported fields }
ChannelHost is an internal representation of amqp.Connection.
func NewChannelHost ¶
func NewChannelHost( amqpConn *amqp.Connection, channelID uint64, connectionID uint64, ackable bool) (*ChannelHost, error)
NewChannelHost creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ChannelHost) CloseErrors ¶
func (ch *ChannelHost) CloseErrors() <-chan *models.ErrorMessage
CloseErrors allow you to listen for amqp.Error messages.
func (*ChannelHost) IsAckable ¶
func (ch *ChannelHost) IsAckable() bool
IsAckable determines if this host contains an ackable channel.
func (*ChannelHost) Returns ¶
func (ch *ChannelHost) Returns() <-chan *models.ReturnMessage
Returns allow you to listen for ReturnMessages.
type ChannelPool ¶
type ChannelPool struct { Config models.PoolConfig Initialized bool // contains filtered or unexported fields }
ChannelPool houses the pool of RabbitMQ channels.
func NewChannelPool ¶
func NewChannelPool( config *models.PoolConfig, connPool *ConnectionPool, initializeNow bool) (*ChannelPool, error)
NewChannelPool creates hosting structure for the ChannelPool.
func (*ChannelPool) AckChannelCount ¶
func (cp *ChannelPool) AckChannelCount() int64
AckChannelCount lets you know how many ackable channels you have to use.
func (*ChannelPool) ChannelCount ¶
func (cp *ChannelPool) ChannelCount() int64
ChannelCount lets you know how many non-ackable channels you have to use.
func (*ChannelPool) Errors ¶
func (cp *ChannelPool) Errors() <-chan error
Errors yields all the internal err chan for managing the ChannelPool.
func (*ChannelPool) FlagChannel ¶
func (cp *ChannelPool) FlagChannel(channelID uint64)
FlagChannel flags that channel as non-usable in the future.
func (*ChannelPool) FlushErrors ¶
func (cp *ChannelPool) FlushErrors()
FlushErrors empties all current errors in the error channel.
func (*ChannelPool) GetAckableChannel ¶
func (cp *ChannelPool) GetAckableChannel() (*ChannelHost, error)
GetAckableChannel gets an ackable channel based on whats available in AckChannelPool queue.
func (*ChannelPool) GetChannel ¶
func (cp *ChannelPool) GetChannel() (*ChannelHost, error)
GetChannel gets a channel based on whats ChannelPool queue (blocking under bad network conditions). Outages/transient network outages block until success connecting. Uses the SleepOnErrorInterval to pause between retries.
func (*ChannelPool) Initialize ¶
func (cp *ChannelPool) Initialize() error
Initialize creates the ConnectionPool based on the config details. Blocks on network/communication issues unless overridden by config.
func (*ChannelPool) IsChannelFlagged ¶
func (cp *ChannelPool) IsChannelFlagged(channelID uint64) bool
IsChannelFlagged checks to see if the channel has been flagged for removal.
func (*ChannelPool) ReturnChannel ¶
func (cp *ChannelPool) ReturnChannel(chanHost *ChannelHost, flagChannel bool)
ReturnChannel puts the connection back in the queue. Developer has to manually return the Channel and helps maintain a Round Robin on Channels and their resources. Optional parameter allows you to flag a Channel as dead.
func (*ChannelPool) Shutdown ¶
func (cp *ChannelPool) Shutdown()
Shutdown closes all channels and all connections.
func (*ChannelPool) UnflagChannel ¶
func (cp *ChannelPool) UnflagChannel(channelID uint64)
UnflagChannel flags that channel as usable in the future.
type ConnectionHost ¶
type ConnectionHost struct { Connection *amqp.Connection ConnectionID uint64 // contains filtered or unexported fields }
ConnectionHost is an internal representation of amqp.Connection.
func NewConnectionHost ¶
func NewConnectionHost( uri string, connectionName string, connectionID uint64, heartbeat time.Duration, connectionTimeout time.Duration, maxChannel uint64, maxAckChannelCount uint64) (*ConnectionHost, error)
NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.
func NewConnectionHostWithTLS ¶
func NewConnectionHostWithTLS( certServerName string, connectionName string, connectionID uint64, heartbeat time.Duration, connectionTimeout time.Duration, maxChannel uint64, maxAckChannelCount uint64, tlsConfig *tls.Config) (*ConnectionHost, error)
NewConnectionHostWithTLS creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ConnectionHost) AddAckChannel ¶
func (ch *ConnectionHost) AddAckChannel()
AddAckChannel increments the count of currentChannels
func (*ConnectionHost) AddChannel ¶
func (ch *ConnectionHost) AddChannel()
AddChannel increments the count of currentChannels
func (*ConnectionHost) CanAddAckChannel ¶
func (ch *ConnectionHost) CanAddAckChannel() bool
CanAddAckChannel provides a true or false based on whether this connection host can handle more channels on it's connection (based on initialization).
func (*ConnectionHost) CanAddChannel ¶
func (ch *ConnectionHost) CanAddChannel() bool
CanAddChannel provides a true or false based on whether this connection host can handle more channels on it's connection (based on initialization).
func (*ConnectionHost) CloseErrors ¶
func (ch *ConnectionHost) CloseErrors() <-chan *amqp.Error
CloseErrors allow you to listen for amqp.Error messages.
func (*ConnectionHost) RemoveAckChannel ¶
func (ch *ConnectionHost) RemoveAckChannel() error
RemoveAckChannel decrements the count of currentChannels.
func (*ConnectionHost) RemoveChannel ¶
func (ch *ConnectionHost) RemoveChannel() error
RemoveChannel decrements the count of currentChannels.
type ConnectionPool ¶
type ConnectionPool struct { Initialized bool // contains filtered or unexported fields }
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶
func NewConnectionPool( config *models.PoolConfig, initializeNow bool) (*ConnectionPool, error)
NewConnectionPool creates hosting structure for the ConnectionPool. Needs to be Initialize() afterwards.
func (*ConnectionPool) ConnectionCount ¶
func (cp *ConnectionPool) ConnectionCount() int64
ConnectionCount flags that connection as usable in the future. Careful, locking call.
func (*ConnectionPool) Errors ¶
func (cp *ConnectionPool) Errors() <-chan error
Errors yields all the internal errs for creating connections.
func (*ConnectionPool) FlagConnection ¶
func (cp *ConnectionPool) FlagConnection(connectionID uint64)
FlagConnection flags that connection as non-usable in the future.
func (*ConnectionPool) FlushErrors ¶
func (cp *ConnectionPool) FlushErrors()
FlushErrors empties all current errors in the error channel.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
GetConnection gets a connection based on whats in the ConnectionPool (blocking under bad network conditions). Outages/transient network outages block until success connecting. Uses the SleepOnErrorInterval to pause between retries.
func (*ConnectionPool) Initialize ¶
func (cp *ConnectionPool) Initialize() error
Initialize creates the ConnectionPool based on the config details. Blocks on network/communication issues unless overridden by config.
func (*ConnectionPool) IsConnectionFlagged ¶
func (cp *ConnectionPool) IsConnectionFlagged(connectionID uint64) bool
IsConnectionFlagged checks to see if the connection has been flagged for removal.
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost)
ReturnConnection puts the connection back in the queue. This helps maintain a Round Robin on Connections and their resources.
func (*ConnectionPool) Shutdown ¶
func (cp *ConnectionPool) Shutdown()
Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.
func (*ConnectionPool) UnflagConnection ¶
func (cp *ConnectionPool) UnflagConnection(connectionID uint64)
UnflagConnection flags that connection as usable in the future.