Documentation ¶
Index ¶
- Constants
- func ConvertJsonToStruct[C interface{}](fileNamePath string, c *C) error
- func ReadPayload(buffer *bytes.Buffer, compressionConf *CompressionConfig, ...) error
- func ToPayload(input interface{}, compressionConf *CompressionConfig, ...) ([]byte, error)
- type ChannelHost
- type CompressionConfig
- type ConnectionHost
- type ConnectionPool
- func (cp *ConnectionPool) ActiveConnections() int64
- func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost
- func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
- func (cp *ConnectionPool) GetTransientChannel(ackable bool) *ChannelHost
- func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
- func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)
- func (cp *ConnectionPool) Shutdown()
- type Consumer
- func (con *Consumer) Errors() <-chan error
- func (con *Consumer) FlushErrors()
- func (con *Consumer) FlushMessages()
- func (con *Consumer) FlushStop()
- func (con *Consumer) Get(queueName string) (*amqp.Delivery, error)
- func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage
- func (con *Consumer) StartConsuming(action func(*ReceivedMessage))
- func (con *Consumer) Started() bool
- func (con *Consumer) StopConsuming(flushMessages bool)
- type ConsumerConfig
- type EncryptionConfig
- type Exchange
- type ExchangeBinding
- type Letter
- type LetterOpts
- type LetterOptsFun
- type PoolConfig
- type PublishReceipt
- type Publisher
- func (pub *Publisher) Publish(letter *Letter, receipt bool) error
- func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt
- func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration) error
- func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration) error
- func (pub *Publisher) PublishWithTransient(letter *Letter) error
- func (pub *Publisher) QueueLetter(letter *Letter) bool
- func (pub *Publisher) QueueLetters(letters []*Letter) bool
- func (pub *Publisher) Shutdown(shutdownPools bool)
- func (pub *Publisher) StartAutoPublishing()
- type PublisherConfig
- type Queue
- type QueueBinding
- type RabbitSeasoning
- type RabbitService
- func NewRabbitService(config *RabbitSeasoning, passphrase string, salt string, ...) (*RabbitService, error)
- func NewRabbitServiceWithConnectionPool(connectionPool *ConnectionPool, config *RabbitSeasoning, passphrase string, ...) (*RabbitService, error)
- func NewRabbitServiceWithPublisher(publisher *Publisher, config *RabbitSeasoning, passphrase string, salt string, ...) (*RabbitService, error)
- func (rs *RabbitService) CentralErr() <-chan error
- func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)
- func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)
- func (rs *RabbitService) Publish(ctx context.Context, input interface{}, ...) error
- func (rs *RabbitService) PublishData(ctx context.Context, data []byte, exchangeName, routingKey string, ...) error
- func (rs *RabbitService) PublishLetter(letter *Letter) error
- func (rs *RabbitService) PublishWithConfirmation(ctx context.Context, input interface{}, ...) error
- func (rs *RabbitService) QueueLetter(letter *Letter) error
- func (rs *RabbitService) Shutdown(stopConsumers bool)
- type ReceivedMessage
- type TLSConfig
- type Topologer
- func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error
- func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error
- func (top *Topologer) BuildExchanges(exchanges []*Exchange, ignoreErrors bool) error
- func (top *Topologer) BuildQueues(queues []*Queue, ignoreErrors bool) error
- func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error
- func (top *Topologer) CreateExchange(exchangeName string, exchangeType string, ...) error
- func (top *Topologer) CreateExchangeFromConfig(exchange *Exchange) error
- func (top *Topologer) CreateQueue(queueName string, passiveDeclare bool, durable bool, autoDelete bool, ...) error
- func (top *Topologer) CreateQueueFromConfig(queue *Queue) error
- func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error
- func (top *Topologer) ExchangeDelete(exchangeName string, ifUnused, noWait bool) error
- func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, ...) error
- func (top *Topologer) PurgeQueue(queueName string, noWait bool) (int, error)
- func (top *Topologer) PurgeQueues(queueNames []string, noWait bool) (int, error)
- func (top *Topologer) QueueBind(queueBinding *QueueBinding) error
- func (top *Topologer) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
- func (top *Topologer) UnbindQueue(queueName, routingKey, exchangeName string, args map[string]interface{}) error
- type TopologyConfig
Constants ¶
const ( CONFIRMATION_TIMEOUT = "publish confirmation timeout after %d millisecond - recommend retry/requeue (LetterID %s)" CONFIRMATION_CANCEL = "publish confirmation not received before context expired - recommend retry/requeue (LetterID %s)" )
const ( // QueueTypeQuorum indicates a queue of type quorum. QueueTypeQuorum = "quorum" // QueueTypeClassic indicates a queue of type classic. QueueTypeClassic = "classic" )
Variables ¶
This section is empty.
Functions ¶
func ConvertJsonToStruct ¶
func ReadPayload ¶
func ReadPayload( buffer *bytes.Buffer, compressionConf *CompressionConfig, encryptionConf *EncryptionConfig, ) error
func ToPayload ¶
func ToPayload( input interface{}, compressionConf *CompressionConfig, encryptionConf *EncryptionConfig, ) ([]byte, error)
ToPayload creates a JSON marshal and optionally compresses and encrypts the bytes.
Types ¶
type ChannelHost ¶
type ChannelHost struct { Channel *amqp.Channel ID uint64 ConnectionID uint64 Ackable bool Errors chan *amqp.Error // contains filtered or unexported fields }
ChannelHost is an internal representation of amqp.Channel.
func NewChannelHost ¶
func NewChannelHost( connHost *ConnectionHost, id uint64, connectionID uint64, ackable, transient bool, ) (*ChannelHost, error)
NewChannelHost creates a simple ChannelHost wrapper for management by end-user developer.
func (*ChannelHost) Close ¶
func (ch *ChannelHost) Close()
Close allows for manual close of Amqp Channel kept internally.
func (*ChannelHost) MakeChannel ¶
func (ch *ChannelHost) MakeChannel() (err error)
MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.
func (*ChannelHost) PauseForFlowControl ¶
func (ch *ChannelHost) PauseForFlowControl()
PauseForFlowControl allows you to wait and sleep while receiving flow control messages.
type CompressionConfig ¶
type CompressionConfig struct { Enabled bool `json:"Enabled" yaml:"Enabled"` Type string `json:"Type,omitempty" yaml:"Type,omitempty"` }
CompressionConfig allows you to configuration symmetric key encryption based on options
type ConnectionHost ¶
type ConnectionHost struct { Connection *amqp.Connection ConnectionID uint64 CachedChannelCount uint64 Errors chan *amqp.Error Blockers chan amqp.Blocking // contains filtered or unexported fields }
ConnectionHost is an internal representation of amqp.Connection.
func NewConnectionHost ¶
func NewConnectionHost( uri string, connectionName string, connectionID uint64, heartbeatInterval time.Duration, connectionTimeout time.Duration, tlsConfig *TLSConfig) (*ConnectionHost, error)
NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ConnectionHost) Connect ¶
func (ch *ConnectionHost) Connect() bool
Connect tries to connect (or reconnect) to the provided properties of the host one time.
func (*ConnectionHost) ConnectWithErrorHandler ¶
func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool
ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler.
func (*ConnectionHost) PauseOnFlowControl ¶
func (ch *ConnectionHost) PauseOnFlowControl()
PauseOnFlowControl allows you to wait and sleep while receiving flow control messages. Sleeps for one second, repeatedly until the blocking has stopped.
type ConnectionPool ¶
type ConnectionPool struct { Config PoolConfig // contains filtered or unexported fields }
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶
func NewConnectionPool(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error)
NewConnectionPool creates hosting structure for the ConnectionPool with an error and/or unhealthy handler.
func (*ConnectionPool) ActiveConnections ¶
func (cp *ConnectionPool) ActiveConnections() int64
func (*ConnectionPool) GetChannelFromPool ¶
func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost
GetChannelFromPool gets a cached ackable channel from the Pool if they exist or creates a channel. A non-acked channel is always a transient channel. Blocking if Ackable is true and the cache is empty. If you want a transient Ackable channel (un-managed), use CreateChannel directly.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
GetConnection gets a connection based on whats in the ConnectionPool (blocking under bad network conditions). Flowcontrol (blocking) or transient network outages will pause here until cleared. Uses the SleepOnErrorInterval to pause between retries.
func (*ConnectionPool) GetTransientChannel ¶
func (cp *ConnectionPool) GetTransientChannel(ackable bool) *ChannelHost
GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.
func (*ConnectionPool) ReturnChannel ¶
func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
ReturnChannel returns a Channel. If Channel is not a cached channel, it is simply closed here. If Cache Channel, we check if erred, new Channel is created instead and then returned to the cache.
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)
ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources.
func (*ConnectionPool) Shutdown ¶
func (cp *ConnectionPool) Shutdown()
Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.
type Consumer ¶
type Consumer struct { Config *ConsumerConfig ConnectionPool *ConnectionPool QueueName string ConsumerName string // contains filtered or unexported fields }
Consumer receives messages from a RabbitMQ location.
func NewConsumer ¶
func NewConsumer(config *ConsumerConfig, cp *ConnectionPool) *Consumer
NewConsumer creates a new Consumer to receive messages from a specific queuename.
func (*Consumer) FlushErrors ¶
func (con *Consumer) FlushErrors()
FlushErrors allows you to flush out all previous Errors.
func (*Consumer) FlushMessages ¶
func (con *Consumer) FlushMessages()
FlushMessages allows you to flush out all previous Messages. WARNING: THIS WILL RESULT IN LOST MESSAGES.
func (*Consumer) FlushStop ¶
func (con *Consumer) FlushStop()
FlushStop allows you to flush out all previous Stop signals.
func (*Consumer) ReceivedMessages ¶
func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage
ReceivedMessages yields all the internal messages ready for consuming.
func (*Consumer) StartConsuming ¶
func (con *Consumer) StartConsuming(action func(*ReceivedMessage))
StartConsuming starts the Consumer invoking a method on every ReceivedMessage.
func (*Consumer) StopConsuming ¶
StopConsuming allows you to signal stop to the consumer. Will stop on the consumer channelclose or responding to signal after getting all remaining deviveries. FlushMessages empties the internal buffer of messages received by queue. Ackable messages are still in RabbitMQ queue, while noAck messages will unfortunately be lost. Use wisely.
type ConsumerConfig ¶
type ConsumerConfig struct { QueueName string `json:"QueueName" yaml:"QueueName"` ConsumerName string `json:"ConsumerName" yaml:"ConsumerName"` AutoAck bool `json:"AutoAck" yaml:"AutoAck"` Exclusive bool `json:"Exclusive" yaml:"Exclusive"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args map[string]interface{} `json:"Args" yaml:"Args"` QosCountOverride int `json:"QosCountOverride" yaml:"QosCountOverride"` // if zero ignored SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep on error }
ConsumerConfig represents settings for configuring a consumer with ease.
type EncryptionConfig ¶
type EncryptionConfig struct { Enabled bool `json:"Enabled" yaml:"Enabled"` Type string `json:"Type,omitempty" yaml:"Type,omitempty"` Hashkey []byte TimeConsideration uint32 `json:"TimeConsideration,omitempty" yaml:"TimeConsideration,omitempty"` MemoryMultiplier uint32 `json:"" yaml:""` Threads uint8 `json:"Threads,omitempty" yaml:"Threads,omitempty"` }
EncryptionConfig allows you to configuration symmetric key encryption based on options
type Exchange ¶
type Exchange struct { Name string `json:"Name" yaml:"Name"` Type string `json:"Type" yaml:"Type"` // "direct", "fanout", "topic", "headers" PassiveDeclare bool `json:"PassiveDeclare" yaml:"PassiveDeclare"` Durable bool `json:"Durable" yaml:"Durable"` AutoDelete bool `json:"AutoDelete" yaml:"AutoDelete"` InternalOnly bool `json:"InternalOnly" yaml:"InternalOnly"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
Exchange allows for you to create Exchange topology.
type ExchangeBinding ¶
type ExchangeBinding struct { ExchangeName string `json:"ExchangeName" yaml:"ExchangeName"` ParentExchangeName string `json:"ParentExchangeName" yaml:"ParentExchangeName"` RoutingKey string `json:"RoutingKey" yaml:"RoutingKey"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
ExchangeBinding allows for you to create Bindings between an Exchange and Exchange.
type Letter ¶
Letter contains the message body and address of where things are going.
func NewLetter ¶
func NewLetter( exchange string, routingKey string, data interface{}, optsFuns ...LetterOptsFun, ) (*Letter, error)
func (*Letter) Options ¶
func (l *Letter) Options() LetterOpts
type LetterOpts ¶
type LetterOpts struct { Ctx context.Context Exchange string RoutingKey string ContentType string CorrelationID string MessageType string Mandatory bool Immediate bool Headers map[string]interface{} DeliveryMode uint8 Priority uint8 RetryCount uint16 EConf *EncryptionConfig CConf *CompressionConfig }
type LetterOptsFun ¶
type LetterOptsFun func(*LetterOpts)
func WithCompressionConfig ¶
func WithCompressionConfig(cConf *CompressionConfig) LetterOptsFun
func WithContext ¶
func WithContext(ctx context.Context) LetterOptsFun
func WithCorrelationID ¶
func WithCorrelationID(correlationID string) LetterOptsFun
func WithEncryptionConfig ¶
func WithEncryptionConfig(eConf *EncryptionConfig) LetterOptsFun
func WithHeaders ¶
func WithHeaders(h map[string]interface{}) LetterOptsFun
type PoolConfig ¶
type PoolConfig struct { ApplicationName string `json:"ApplicationName" yaml:"ApplicationName"` URI string `json:"URI" yaml:"URI"` Heartbeat uint32 `json:"Heartbeat" yaml:"Heartbeat"` ConnectionTimeout uint32 `json:"ConnectionTimeout" yaml:"ConnectionTimeout"` SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep length on errors MaxConnectionCount uint64 `json:"MaxConnectionCount" yaml:"MaxConnectionCount"` // number of connections to create in the pool MaxCacheChannelCount uint64 `json:"MaxCacheChannelCount" yaml:"MaxCacheChannelCount"` // number of channels to be cached in the pool TLSConfig *TLSConfig `json:"TLSConfig" yaml:"TLSConfig"` // TLS settings for connection with AMQPS. }
PoolConfig represents settings for creating/configuring pools.
type PublishReceipt ¶
PublishReceipt is a way to monitor publishing success and to initiate a retry when using async publishing.
func (*PublishReceipt) ToString ¶
func (not *PublishReceipt) ToString() string
ToString allows you to quickly log the PublishReceipt struct as a string.
type Publisher ¶
type Publisher struct { Config *RabbitSeasoning ConnectionPool *ConnectionPool PublishTimeout time.Duration // contains filtered or unexported fields }
Publisher contains everything you need to publish a message.
func NewPublisher ¶
func NewPublisher(conf *RabbitSeasoning, cp *ConnectionPool) *Publisher
NewPublisher creates and configures a new Publisher.
func (*Publisher) Publish ¶
Publish sends a single message to the address on the letter using a cached ChannelHost. Subscribe to PublishReceipts to see success and errors or read the function output.
For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (*Publisher) PublishReceipts ¶
func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt
PublishReceipts yields all the success and failures during all publish events. Highly recommend susbscribing to this.
func (*Publisher) PublishWithConfirmation ¶
PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call, use this when delivery confirmation on publish is your highest priority.
A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically gets requeued for re-publish. A confirmation failure keeps trying to publish until a timeout failure occurs or context got canceled. If a nack occurs, it will republish the letter. It continuously tries to publish the letter until it receives a confirmation or encounters an error.
The function returns an error if it fails to publish the letter within the specified timeout.
func (*Publisher) PublishWithConfirmationTransient ¶
PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority.
A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically gets requeued for re-publish. A confirmation failure keeps trying to publish until a timeout failure occurs or context got canceled. If a nack occurs, it will republish the letter. It continuously tries to publish the letter until it receives a confirmation or encounters an error.
The function returns an error if it fails to publish the letter within the specified timeout.
func (*Publisher) PublishWithTransient ¶
PublishWithTransient sends a single message to the address on the letter using a transient (new) RabbitMQ channel.
Subscribe to PublishReceipts to see success and errors or read the function output.
For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (*Publisher) QueueLetter ¶
QueueLetter queues up a letter that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.
func (*Publisher) QueueLetters ¶
QueueLetters allows you to bulk queue letters that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.
func (*Publisher) Shutdown ¶
Shutdown cleanly shutdown the publisher and resets it's internal state.
func (*Publisher) StartAutoPublishing ¶
func (pub *Publisher) StartAutoPublishing()
StartAutoPublishing starts the Publisher's auto-publishing capabilities.
type PublisherConfig ¶
type PublisherConfig struct { AutoAck bool `json:"AutoAck" yaml:"AutoAck"` SleepOnIdleInterval uint32 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"` SleepOnErrorInterval uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` PublishTimeOutInterval uint32 `json:"PublishTimeOutInterval" yaml:"PublishTimeOutInterval"` MaxRetryCount uint16 `json:"MaxRetryCount" yaml:"MaxRetryCount"` }
PublisherConfig represents settings for configuring global settings for all Publishers with ease.
type Queue ¶
type Queue struct { Name string `json:"Name" yaml:"Name"` PassiveDeclare bool `json:"PassiveDeclare" yaml:"PassiveDeclare"` Durable bool `json:"Durable" yaml:"Durable"` AutoDelete bool `json:"AutoDelete" yaml:"AutoDelete"` Exclusive bool `json:"Exclusive" yaml:"Exclusive"` NoWait bool `json:"NoWait" yaml:"NoWait"` Type string `json:"Type" yaml:"Type"` // classic or quorum, type of quorum disregards exclusive and enables durable properties when building from config Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
Queue allows for you to create Queue topology.
type QueueBinding ¶
type QueueBinding struct { QueueName string `json:"QueueName" yaml:"QueueName"` ExchangeName string `json:"ExchangeName" yaml:"ExchangeName"` RoutingKey string `json:"RoutingKey" yaml:"RoutingKey"` NoWait bool `json:"NoWait" yaml:"NoWait"` Args amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface() }
QueueBinding allows for you to create Bindings between a Queue and Exchange.
type RabbitSeasoning ¶
type RabbitSeasoning struct { EncryptionConfig *EncryptionConfig `json:"EncryptionConfig" yaml:"EncryptionConfig"` CompressionConfig *CompressionConfig `json:"CompressionConfig" yaml:"CompressionConfig"` PoolConfig *PoolConfig `json:"PoolConfig" yaml:"PoolConfig"` ConsumerConfigs map[string]*ConsumerConfig `json:"ConsumerConfigs" yaml:"ConsumerConfigs"` PublisherConfig *PublisherConfig `json:"PublisherConfig" yaml:"PublisherConfig"` }
RabbitSeasoning represents the configuration values.
func ConvertJSONFileToConfig ¶
func ConvertJSONFileToConfig(fileNamePath string) (*RabbitSeasoning, error)
ConvertJSONFileToConfig opens a file.json and converts to RabbitSeasoning.
type RabbitService ¶
type RabbitService struct { Config *RabbitSeasoning ConnectionPool *ConnectionPool Topologer *Topologer Publisher *Publisher // contains filtered or unexported fields }
RabbitService is the struct for containing all you need for RabbitMQ access.
func NewRabbitService ¶
func NewRabbitService( config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitService creates everything you need for a RabbitMQ communication service.
func NewRabbitServiceWithConnectionPool ¶
func NewRabbitServiceWithConnectionPool( connectionPool *ConnectionPool, config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool.
func NewRabbitServiceWithPublisher ¶
func NewRabbitServiceWithPublisher( publisher *Publisher, config *RabbitSeasoning, passphrase string, salt string, processPublishReceipts func(*PublishReceipt), processError func(error)) (*RabbitService, error)
NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher.
func (*RabbitService) CentralErr ¶
func (rs *RabbitService) CentralErr() <-chan error
CentralErr yields all the internal errs for sub-processes.
func (*RabbitService) GetConsumer ¶
func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)
GetConsumer allows you to get the individual consumers stored in memory.
func (*RabbitService) GetConsumerConfig ¶
func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)
GetConsumerConfig allows you to get the individual consumers' config stored in memory.
func (*RabbitService) Publish ¶
func (rs *RabbitService) Publish( ctx context.Context, input interface{}, exchangeName, routingKey, metadata string, headers amqp.Table, ) error
Publish tries to publish directly without retry and data optionally wrapped in a ModdedLetter.
func (*RabbitService) PublishData ¶
func (rs *RabbitService) PublishData( ctx context.Context, data []byte, exchangeName, routingKey string, headers map[string]interface{}, ) error
PublishData tries to publish.
func (*RabbitService) PublishLetter ¶
func (rs *RabbitService) PublishLetter(letter *Letter) error
PublishLetter wraps around Publisher to simply Publish.
func (*RabbitService) PublishWithConfirmation ¶
func (rs *RabbitService) PublishWithConfirmation( ctx context.Context, input interface{}, exchangeName, routingKey, metadata string, headers amqp.Table, ) error
PublishWithConfirmation tries to publish and wait for a confirmation.
func (*RabbitService) QueueLetter ¶
func (rs *RabbitService) QueueLetter(letter *Letter) error
QueueLetter wraps around AutoPublisher to simply QueueLetter. Error indicates message was not queued.
func (*RabbitService) Shutdown ¶
func (rs *RabbitService) Shutdown(stopConsumers bool)
Shutdown stops the service and shuts down the ChannelPool.
type ReceivedMessage ¶
type ReceivedMessage struct { IsAckable bool Body []byte MessageID string // LetterID ApplicationID string PublishDate string Delivery amqp.Delivery // Access everything. }
ReceivedMessage allow for you to acknowledge, after processing the received payload, by its RabbitMQ tag and Channel pointer.
func NewReceivedMessage ¶
func NewReceivedMessage( isAckable bool, delivery amqp.Delivery) *ReceivedMessage
NewReceivedMessage creates a new ReceivedMessage.
func (*ReceivedMessage) Acknowledge ¶
func (msg *ReceivedMessage) Acknowledge() error
Acknowledge allows for you to acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server. Can't ack from a different channel.
func (*ReceivedMessage) Nack ¶
func (msg *ReceivedMessage) Nack(requeue bool) error
Nack allows for you to negative acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.
func (*ReceivedMessage) Reject ¶
func (msg *ReceivedMessage) Reject(requeue bool) error
Reject allows for you to reject on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.
type TLSConfig ¶
type TLSConfig struct { EnableTLS bool `json:"EnableTLS" yaml:"EnableTLS"` // Use TLSConfig to create connections with AMQPS uri. PEMCertLocation string `json:"PEMCertLocation" yaml:"PEMCertLocation"` LocalCertLocation string `json:"LocalCertLocation" yaml:"LocalCertLocation"` CertServerName string `json:"CertServerName" yaml:"CertServerName"` }
TLSConfig represents settings for configuring TLS.
type Topologer ¶
type Topologer struct {
ConnectionPool *ConnectionPool
}
Topologer allows you to build RabbitMQ topology backed by a ConnectionPool.
func NewTopologer ¶
func NewTopologer(cp *ConnectionPool) *Topologer
NewTopologer builds you a new Topologer.
func (*Topologer) BindExchanges ¶
func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error
BindExchanges loops thrrough and binds Exchanges to Exchanges - stops on first error.
func (*Topologer) BindQueues ¶
func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error
BindQueues loops through and binds Queues to Exchanges - stops on first error.
func (*Topologer) BuildExchanges ¶
BuildExchanges loops through and builds Exchanges - stops on first error.
func (*Topologer) BuildQueues ¶
BuildQueues loops through and builds Queues - stops on first error.
func (*Topologer) BuildTopology ¶
func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error
BuildTopology builds a topology based on a TopologyConfig - stops on first error.
func (*Topologer) CreateExchange ¶
func (top *Topologer) CreateExchange( exchangeName string, exchangeType string, passiveDeclare, durable, autoDelete, internal, noWait bool, args map[string]interface{}) error
CreateExchange builds an Exchange topology.
func (*Topologer) CreateExchangeFromConfig ¶
CreateExchangeFromConfig builds an Exchange topology from a config Exchange element.
func (*Topologer) CreateQueue ¶
func (top *Topologer) CreateQueue( queueName string, passiveDeclare bool, durable bool, autoDelete bool, exclusive bool, noWait bool, args map[string]interface{}) error
CreateQueue builds a Queue topology.
func (*Topologer) CreateQueueFromConfig ¶
CreateQueueFromConfig builds a Queue topology from a config Exchange element.
func (*Topologer) ExchangeBind ¶
func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error
ExchangeBind binds an exchange to an Exchange.
func (*Topologer) ExchangeDelete ¶
ExchangeDelete removes the exchange from the server.
func (*Topologer) ExchangeUnbind ¶
func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, args map[string]interface{}) error
ExchangeUnbind removes the binding of an Exchange to an Exchange.
func (*Topologer) PurgeQueue ¶
PurgeQueue removes all messages from the Queue that are not waiting to be Acknowledged and returns the count.
func (*Topologer) PurgeQueues ¶
PurgeQueues purges each Queue provided.
func (*Topologer) QueueBind ¶
func (top *Topologer) QueueBind(queueBinding *QueueBinding) error
QueueBind binds an Exchange to a Queue.
func (*Topologer) QueueDelete ¶
QueueDelete removes the queue from the server (and all bindings) and returns messages purged (count).
type TopologyConfig ¶
type TopologyConfig struct { Exchanges []*Exchange `json:"Exchanges" yaml:"Exchanges"` Queues []*Queue `json:"Queues" yaml:"Queues"` QueueBindings []*QueueBinding `json:"QueueBindings" yaml:"QueueBindings"` ExchangeBindings []*ExchangeBinding `json:"ExchangeBindings" yaml:"ExchangeBindings"` }
TopologyConfig allows you to build simple toplogies from a JSON file.
func ConvertJSONFileToTopologyConfig ¶
func ConvertJSONFileToTopologyConfig(fileNamePath string) (*TopologyConfig, error)
ConvertJSONFileToTopologyConfig opens a file.json and converts to Topology.