friendlyrabbit

package module
v0.0.0-...-9b9fd61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2023 License: MIT Imports: 17 Imported by: 0

README

friendlyrabbit

A user friendly RabbitMQ library written in Golang. Fork of turbocookedrabbit.

Go Report Card

Release

Installation

go get github.com/pmatteo/friendlyrabbit
Getting Started
RabbitMQ Connection

friendlyrabbit expose a convinient struct called ConnectionPool which - as the name suggest - offers an easy way to established a pool of connections.

Configuration for ConnectionPool can be loaded from a json/yml file or manually defining a new PoolConfig.

poolConfig := &PoolConfig{}
 err := ConvertJsonToStruct("pool_config.json", config)

connectionPool, err := friendlyrabbit.NewConnectionPool(poolConfig)
Create a simple Publisher
publisher := friendlyrabbit.NewPublisher(config, connectionPool)

// Create the letter's Envelope containing all the address details of where a letter is going.
env := fr.NewEnvelope(context.Background(), exchangeName, routingKey, amqpTableHeader)

// Create and send a simple message
letter := friendlyrabbit.NewLetter(
    uuid.New(),
    env,
    []bytes("message"),
)
err := publisher.Publish(letter)

It's also possible to publish a message and wait for its publishing confirmation. This is obviously a bit slower operation so be careful.

publisher.PublishWithConfirmation(letter, time.Millisecond*500)

receipt := <-publisher.PublishReceipts():
if !receipt.Success {
// log? requeue? break WaitLoop?
}

Checkout the wiki for more information about the Publisher.

Create a simple Consumer
publisher := friendlyrabbit.NewPublisher(config, connectionPool)

// Create the letter's Envelope containing all the address details of where a letter is going.
env := fr.NewEnvelope(context.Background(), exchangeName, routingKey, amqpTableHeader)

// Create and send a simple message
letter := friendlyrabbit.NewLetter(
    uuid.New(),
    env,
    []bytes("message"),
)
err := publisher.Publish(letter)

It's also possible to publish a message and wait for its publishing confirmation. This is obviously a bit slower operation so be careful.

publisher.PublishWithConfirmation(letter, time.Millisecond*500)

receipt := <-publisher.PublishReceipts():
if !receipt.Success {
// log? requeue? break WaitLoop?
}

Checkout the wiki for more information about the Publisher.

Documentation

Index

Constants

View Source
const (
	CONFIRMATION_TIMEOUT = "publish confirmation timeout after %d millisecond - recommend retry/requeue (LetterID %s)"
	CONFIRMATION_CANCEL  = "publish confirmation not received before context expired - recommend retry/requeue (LetterID %s)"
)
View Source
const (
	// QueueTypeQuorum indicates a queue of type quorum.
	QueueTypeQuorum = "quorum"

	// QueueTypeClassic indicates a queue of type classic.
	QueueTypeClassic = "classic"
)

Variables

This section is empty.

Functions

func ConvertJsonToStruct

func ConvertJsonToStruct[C interface{}](fileNamePath string, c *C) error

func ReadPayload

func ReadPayload(
	buffer *bytes.Buffer,
	compressionConf *CompressionConfig,
	encryptionConf *EncryptionConfig,
) error

func ToPayload

func ToPayload(
	input interface{},
	compressionConf *CompressionConfig,
	encryptionConf *EncryptionConfig,
) ([]byte, error)

ToPayload creates a JSON marshal and optionally compresses and encrypts the bytes.

Types

type ChannelHost

type ChannelHost struct {
	Channel      *amqp.Channel
	ID           uint64
	ConnectionID uint64
	Ackable      bool

	Errors chan *amqp.Error
	// contains filtered or unexported fields
}

ChannelHost is an internal representation of amqp.Channel.

func NewChannelHost

func NewChannelHost(
	connHost *ConnectionHost,
	id uint64,
	connectionID uint64,
	ackable, transient bool,
) (*ChannelHost, error)

NewChannelHost creates a simple ChannelHost wrapper for management by end-user developer.

func (*ChannelHost) Close

func (ch *ChannelHost) Close()

Close allows for manual close of Amqp Channel kept internally.

func (*ChannelHost) MakeChannel

func (ch *ChannelHost) MakeChannel() (err error)

MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.

func (*ChannelHost) PauseForFlowControl

func (ch *ChannelHost) PauseForFlowControl()

PauseForFlowControl allows you to wait and sleep while receiving flow control messages.

type CompressionConfig

type CompressionConfig struct {
	Enabled bool   `json:"Enabled" yaml:"Enabled"`
	Type    string `json:"Type,omitempty" yaml:"Type,omitempty"`
}

CompressionConfig allows you to configuration symmetric key encryption based on options

type ConnectionHost

type ConnectionHost struct {
	Connection         *amqp.Connection
	ConnectionID       uint64
	CachedChannelCount uint64

	Errors   chan *amqp.Error
	Blockers chan amqp.Blocking
	// contains filtered or unexported fields
}

ConnectionHost is an internal representation of amqp.Connection.

func NewConnectionHost

func NewConnectionHost(
	uri string,
	connectionName string,
	connectionID uint64,
	heartbeatInterval time.Duration,
	connectionTimeout time.Duration,
	tlsConfig *TLSConfig) (*ConnectionHost, error)

NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.

func (*ConnectionHost) Connect

func (ch *ConnectionHost) Connect() bool

Connect tries to connect (or reconnect) to the provided properties of the host one time.

func (*ConnectionHost) ConnectWithErrorHandler

func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool

ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler.

func (*ConnectionHost) PauseOnFlowControl

func (ch *ConnectionHost) PauseOnFlowControl()

PauseOnFlowControl allows you to wait and sleep while receiving flow control messages. Sleeps for one second, repeatedly until the blocking has stopped.

type ConnectionPool

type ConnectionPool struct {
	Config PoolConfig
	// contains filtered or unexported fields
}

ConnectionPool houses the pool of RabbitMQ connections.

func NewConnectionPool

func NewConnectionPool(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error)

NewConnectionPool creates hosting structure for the ConnectionPool with an error and/or unhealthy handler.

func (*ConnectionPool) ActiveConnections

func (cp *ConnectionPool) ActiveConnections() int64

func (*ConnectionPool) GetChannelFromPool

func (cp *ConnectionPool) GetChannelFromPool() *ChannelHost

GetChannelFromPool gets a cached ackable channel from the Pool if they exist or creates a channel. A non-acked channel is always a transient channel. Blocking if Ackable is true and the cache is empty. If you want a transient Ackable channel (un-managed), use CreateChannel directly.

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)

GetConnection gets a connection based on whats in the ConnectionPool (blocking under bad network conditions). Flowcontrol (blocking) or transient network outages will pause here until cleared. Uses the SleepOnErrorInterval to pause between retries.

func (*ConnectionPool) GetTransientChannel

func (cp *ConnectionPool) GetTransientChannel(ackable bool) *ChannelHost

GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.

func (*ConnectionPool) ReturnChannel

func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)

ReturnChannel returns a Channel. If Channel is not a cached channel, it is simply closed here. If Cache Channel, we check if erred, new Channel is created instead and then returned to the cache.

func (*ConnectionPool) ReturnConnection

func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost, flag bool)

ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources.

func (*ConnectionPool) Shutdown

func (cp *ConnectionPool) Shutdown()

Shutdown closes all connections in the ConnectionPool and resets the Pool to pre-initialized state.

type Consumer

type Consumer struct {
	Config         *ConsumerConfig
	ConnectionPool *ConnectionPool
	QueueName      string
	ConsumerName   string
	// contains filtered or unexported fields
}

Consumer receives messages from a RabbitMQ location.

func NewConsumer

func NewConsumer(config *ConsumerConfig, cp *ConnectionPool) *Consumer

NewConsumer creates a new Consumer to receive messages from a specific queuename.

func (*Consumer) Errors

func (con *Consumer) Errors() <-chan error

Errors yields all the internal errs for consuming messages.

func (*Consumer) FlushErrors

func (con *Consumer) FlushErrors()

FlushErrors allows you to flush out all previous Errors.

func (*Consumer) FlushMessages

func (con *Consumer) FlushMessages()

FlushMessages allows you to flush out all previous Messages. WARNING: THIS WILL RESULT IN LOST MESSAGES.

func (*Consumer) FlushStop

func (con *Consumer) FlushStop()

FlushStop allows you to flush out all previous Stop signals.

func (*Consumer) Get

func (con *Consumer) Get(queueName string) (*amqp.Delivery, error)

Get gets a single message from any queue. Auto-Acknowledges.

func (*Consumer) ReceivedMessages

func (con *Consumer) ReceivedMessages() <-chan *ReceivedMessage

ReceivedMessages yields all the internal messages ready for consuming.

func (*Consumer) StartConsuming

func (con *Consumer) StartConsuming(action func(*ReceivedMessage))

StartConsuming starts the Consumer invoking a method on every ReceivedMessage.

func (*Consumer) Started

func (con *Consumer) Started() bool

Started allows you to determine if a consumer has started.

func (*Consumer) StopConsuming

func (con *Consumer) StopConsuming(flushMessages bool)

StopConsuming allows you to signal stop to the consumer. Will stop on the consumer channelclose or responding to signal after getting all remaining deviveries. FlushMessages empties the internal buffer of messages received by queue. Ackable messages are still in RabbitMQ queue, while noAck messages will unfortunately be lost. Use wisely.

type ConsumerConfig

type ConsumerConfig struct {
	QueueName            string                 `json:"QueueName" yaml:"QueueName"`
	ConsumerName         string                 `json:"ConsumerName" yaml:"ConsumerName"`
	AutoAck              bool                   `json:"AutoAck" yaml:"AutoAck"`
	Exclusive            bool                   `json:"Exclusive" yaml:"Exclusive"`
	NoWait               bool                   `json:"NoWait" yaml:"NoWait"`
	Args                 map[string]interface{} `json:"Args" yaml:"Args"`
	QosCountOverride     int                    `json:"QosCountOverride" yaml:"QosCountOverride"`         // if zero ignored
	SleepOnErrorInterval uint32                 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep on error
}

ConsumerConfig represents settings for configuring a consumer with ease.

type EncryptionConfig

type EncryptionConfig struct {
	Enabled           bool   `json:"Enabled" yaml:"Enabled"`
	Type              string `json:"Type,omitempty" yaml:"Type,omitempty"`
	Hashkey           []byte
	TimeConsideration uint32 `json:"TimeConsideration,omitempty" yaml:"TimeConsideration,omitempty"`
	MemoryMultiplier  uint32 `json:"" yaml:""`
	Threads           uint8  `json:"Threads,omitempty" yaml:"Threads,omitempty"`
}

EncryptionConfig allows you to configuration symmetric key encryption based on options

type Exchange

type Exchange struct {
	Name           string     `json:"Name" yaml:"Name"`
	Type           string     `json:"Type" yaml:"Type"` // "direct", "fanout", "topic", "headers"
	PassiveDeclare bool       `json:"PassiveDeclare" yaml:"PassiveDeclare"`
	Durable        bool       `json:"Durable" yaml:"Durable"`
	AutoDelete     bool       `json:"AutoDelete" yaml:"AutoDelete"`
	InternalOnly   bool       `json:"InternalOnly" yaml:"InternalOnly"`
	NoWait         bool       `json:"NoWait" yaml:"NoWait"`
	Args           amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

Exchange allows for you to create Exchange topology.

type ExchangeBinding

type ExchangeBinding struct {
	ExchangeName       string     `json:"ExchangeName" yaml:"ExchangeName"`
	ParentExchangeName string     `json:"ParentExchangeName" yaml:"ParentExchangeName"`
	RoutingKey         string     `json:"RoutingKey" yaml:"RoutingKey"`
	NoWait             bool       `json:"NoWait" yaml:"NoWait"`
	Args               amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

ExchangeBinding allows for you to create Bindings between an Exchange and Exchange.

type Letter

type Letter struct {
	LetterID uuid.UUID
	Body     []byte
	// contains filtered or unexported fields
}

Letter contains the message body and address of where things are going.

func NewLetter

func NewLetter(
	exchange string,
	routingKey string,
	data interface{},
	optsFuns ...LetterOptsFun,
) (*Letter, error)

func (*Letter) Done

func (l *Letter) Done() <-chan struct{}

func (*Letter) Options

func (l *Letter) Options() LetterOpts

type LetterOpts

type LetterOpts struct {
	Ctx context.Context

	Exchange      string
	RoutingKey    string
	ContentType   string
	CorrelationID string
	MessageType   string
	Mandatory     bool
	Immediate     bool
	Headers       map[string]interface{}
	DeliveryMode  uint8
	Priority      uint8
	RetryCount    uint16

	EConf *EncryptionConfig
	CConf *CompressionConfig
}

type LetterOptsFun

type LetterOptsFun func(*LetterOpts)

func WithCompressionConfig

func WithCompressionConfig(cConf *CompressionConfig) LetterOptsFun

func WithContext

func WithContext(ctx context.Context) LetterOptsFun

func WithCorrelationID

func WithCorrelationID(correlationID string) LetterOptsFun

func WithEncryptionConfig

func WithEncryptionConfig(eConf *EncryptionConfig) LetterOptsFun

func WithHeaders

func WithHeaders(h map[string]interface{}) LetterOptsFun

type PoolConfig

type PoolConfig struct {
	ApplicationName      string     `json:"ApplicationName" yaml:"ApplicationName"`
	URI                  string     `json:"URI" yaml:"URI"`
	Heartbeat            uint32     `json:"Heartbeat" yaml:"Heartbeat"`
	ConnectionTimeout    uint32     `json:"ConnectionTimeout" yaml:"ConnectionTimeout"`
	SleepOnErrorInterval uint32     `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"` // sleep length on errors
	MaxConnectionCount   uint64     `json:"MaxConnectionCount" yaml:"MaxConnectionCount"`     // number of connections to create in the pool
	MaxCacheChannelCount uint64     `json:"MaxCacheChannelCount" yaml:"MaxCacheChannelCount"` // number of channels to be cached in the pool
	TLSConfig            *TLSConfig `json:"TLSConfig" yaml:"TLSConfig"`                       // TLS settings for connection with AMQPS.
}

PoolConfig represents settings for creating/configuring pools.

type PublishReceipt

type PublishReceipt struct {
	LetterID     uuid.UUID
	FailedLetter *Letter
	Success      bool
	Error        error
}

PublishReceipt is a way to monitor publishing success and to initiate a retry when using async publishing.

func (*PublishReceipt) ToString

func (not *PublishReceipt) ToString() string

ToString allows you to quickly log the PublishReceipt struct as a string.

type Publisher

type Publisher struct {
	Config         *RabbitSeasoning
	ConnectionPool *ConnectionPool

	PublishTimeout time.Duration
	// contains filtered or unexported fields
}

Publisher contains everything you need to publish a message.

func NewPublisher

func NewPublisher(conf *RabbitSeasoning, cp *ConnectionPool) *Publisher

NewPublisher creates and configures a new Publisher.

func (*Publisher) Publish

func (pub *Publisher) Publish(letter *Letter, receipt bool) error

Publish sends a single message to the address on the letter using a cached ChannelHost. Subscribe to PublishReceipts to see success and errors or read the function output.

For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation

func (*Publisher) PublishReceipts

func (pub *Publisher) PublishReceipts() <-chan *PublishReceipt

PublishReceipts yields all the success and failures during all publish events. Highly recommend susbscribing to this.

func (*Publisher) PublishWithConfirmation

func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration) error

PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities. This is an expensive and slow call, use this when delivery confirmation on publish is your highest priority.

A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically gets requeued for re-publish. A confirmation failure keeps trying to publish until a timeout failure occurs or context got canceled. If a nack occurs, it will republish the letter. It continuously tries to publish the letter until it receives a confirmation or encounters an error.

The function returns an error if it fails to publish the letter within the specified timeout.

func (*Publisher) PublishWithConfirmationTransient

func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration) error

PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels. This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority.

A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically gets requeued for re-publish. A confirmation failure keeps trying to publish until a timeout failure occurs or context got canceled. If a nack occurs, it will republish the letter. It continuously tries to publish the letter until it receives a confirmation or encounters an error.

The function returns an error if it fails to publish the letter within the specified timeout.

func (*Publisher) PublishWithTransient

func (pub *Publisher) PublishWithTransient(letter *Letter) error

PublishWithTransient sends a single message to the address on the letter using a transient (new) RabbitMQ channel.

Subscribe to PublishReceipts to see success and errors or read the function output.

For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation

func (*Publisher) QueueLetter

func (pub *Publisher) QueueLetter(letter *Letter) bool

QueueLetter queues up a letter that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.

func (*Publisher) QueueLetters

func (pub *Publisher) QueueLetters(letters []*Letter) bool

QueueLetters allows you to bulk queue letters that will be consumed by AutoPublish. By default, AutoPublish uses PublishWithConfirmation as the mechanism for publishing.

func (*Publisher) Shutdown

func (pub *Publisher) Shutdown(shutdownPools bool)

Shutdown cleanly shutdown the publisher and resets it's internal state.

func (*Publisher) StartAutoPublishing

func (pub *Publisher) StartAutoPublishing()

StartAutoPublishing starts the Publisher's auto-publishing capabilities.

type PublisherConfig

type PublisherConfig struct {
	AutoAck                bool   `json:"AutoAck" yaml:"AutoAck"`
	SleepOnIdleInterval    uint32 `json:"SleepOnIdleInterval" yaml:"SleepOnIdleInterval"`
	SleepOnErrorInterval   uint32 `json:"SleepOnErrorInterval" yaml:"SleepOnErrorInterval"`
	PublishTimeOutInterval uint32 `json:"PublishTimeOutInterval" yaml:"PublishTimeOutInterval"`
	MaxRetryCount          uint16 `json:"MaxRetryCount" yaml:"MaxRetryCount"`
}

PublisherConfig represents settings for configuring global settings for all Publishers with ease.

type Queue

type Queue struct {
	Name           string     `json:"Name" yaml:"Name"`
	PassiveDeclare bool       `json:"PassiveDeclare" yaml:"PassiveDeclare"`
	Durable        bool       `json:"Durable" yaml:"Durable"`
	AutoDelete     bool       `json:"AutoDelete" yaml:"AutoDelete"`
	Exclusive      bool       `json:"Exclusive" yaml:"Exclusive"`
	NoWait         bool       `json:"NoWait" yaml:"NoWait"`
	Type           string     `json:"Type" yaml:"Type"`                     // classic or quorum, type of quorum disregards exclusive and enables durable properties when building from config
	Args           amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

Queue allows for you to create Queue topology.

type QueueBinding

type QueueBinding struct {
	QueueName    string     `json:"QueueName" yaml:"QueueName"`
	ExchangeName string     `json:"ExchangeName" yaml:"ExchangeName"`
	RoutingKey   string     `json:"RoutingKey" yaml:"RoutingKey"`
	NoWait       bool       `json:"NoWait" yaml:"NoWait"`
	Args         amqp.Table `json:"Args,omitempty" yaml:"Args,omitempty"` // map[string]interface()
}

QueueBinding allows for you to create Bindings between a Queue and Exchange.

type RabbitSeasoning

type RabbitSeasoning struct {
	EncryptionConfig  *EncryptionConfig          `json:"EncryptionConfig" yaml:"EncryptionConfig"`
	CompressionConfig *CompressionConfig         `json:"CompressionConfig" yaml:"CompressionConfig"`
	PoolConfig        *PoolConfig                `json:"PoolConfig" yaml:"PoolConfig"`
	ConsumerConfigs   map[string]*ConsumerConfig `json:"ConsumerConfigs" yaml:"ConsumerConfigs"`
	PublisherConfig   *PublisherConfig           `json:"PublisherConfig" yaml:"PublisherConfig"`
}

RabbitSeasoning represents the configuration values.

func ConvertJSONFileToConfig

func ConvertJSONFileToConfig(fileNamePath string) (*RabbitSeasoning, error)

ConvertJSONFileToConfig opens a file.json and converts to RabbitSeasoning.

type RabbitService

type RabbitService struct {
	Config         *RabbitSeasoning
	ConnectionPool *ConnectionPool
	Topologer      *Topologer
	Publisher      *Publisher
	// contains filtered or unexported fields
}

RabbitService is the struct for containing all you need for RabbitMQ access.

func NewRabbitService

func NewRabbitService(
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitService creates everything you need for a RabbitMQ communication service.

func NewRabbitServiceWithConnectionPool

func NewRabbitServiceWithConnectionPool(
	connectionPool *ConnectionPool,
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool.

func NewRabbitServiceWithPublisher

func NewRabbitServiceWithPublisher(
	publisher *Publisher,
	config *RabbitSeasoning,
	passphrase string,
	salt string,
	processPublishReceipts func(*PublishReceipt),
	processError func(error)) (*RabbitService, error)

NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher.

func (*RabbitService) CentralErr

func (rs *RabbitService) CentralErr() <-chan error

CentralErr yields all the internal errs for sub-processes.

func (*RabbitService) GetConsumer

func (rs *RabbitService) GetConsumer(consumerName string) (*Consumer, error)

GetConsumer allows you to get the individual consumers stored in memory.

func (*RabbitService) GetConsumerConfig

func (rs *RabbitService) GetConsumerConfig(consumerName string) (*ConsumerConfig, error)

GetConsumerConfig allows you to get the individual consumers' config stored in memory.

func (*RabbitService) Publish

func (rs *RabbitService) Publish(
	ctx context.Context,
	input interface{},
	exchangeName, routingKey, metadata string,
	headers amqp.Table,
) error

Publish tries to publish directly without retry and data optionally wrapped in a ModdedLetter.

func (*RabbitService) PublishData

func (rs *RabbitService) PublishData(
	ctx context.Context,
	data []byte,
	exchangeName, routingKey string,
	headers map[string]interface{},
) error

PublishData tries to publish.

func (*RabbitService) PublishLetter

func (rs *RabbitService) PublishLetter(letter *Letter) error

PublishLetter wraps around Publisher to simply Publish.

func (*RabbitService) PublishWithConfirmation

func (rs *RabbitService) PublishWithConfirmation(
	ctx context.Context,
	input interface{},
	exchangeName, routingKey, metadata string,
	headers amqp.Table,
) error

PublishWithConfirmation tries to publish and wait for a confirmation.

func (*RabbitService) QueueLetter

func (rs *RabbitService) QueueLetter(letter *Letter) error

QueueLetter wraps around AutoPublisher to simply QueueLetter. Error indicates message was not queued.

func (*RabbitService) Shutdown

func (rs *RabbitService) Shutdown(stopConsumers bool)

Shutdown stops the service and shuts down the ChannelPool.

type ReceivedMessage

type ReceivedMessage struct {
	IsAckable     bool
	Body          []byte
	MessageID     string // LetterID
	ApplicationID string
	PublishDate   string
	Delivery      amqp.Delivery // Access everything.
}

ReceivedMessage allow for you to acknowledge, after processing the received payload, by its RabbitMQ tag and Channel pointer.

func NewReceivedMessage

func NewReceivedMessage(
	isAckable bool,
	delivery amqp.Delivery) *ReceivedMessage

NewReceivedMessage creates a new ReceivedMessage.

func (*ReceivedMessage) Acknowledge

func (msg *ReceivedMessage) Acknowledge() error

Acknowledge allows for you to acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server. Can't ack from a different channel.

func (*ReceivedMessage) Nack

func (msg *ReceivedMessage) Nack(requeue bool) error

Nack allows for you to negative acknowledge message on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.

func (*ReceivedMessage) Reject

func (msg *ReceivedMessage) Reject(requeue bool) error

Reject allows for you to reject on the original channel it was received. Will fail if channel is closed and this is by design per RabbitMQ server.

type TLSConfig

type TLSConfig struct {
	EnableTLS         bool   `json:"EnableTLS" yaml:"EnableTLS"` // Use TLSConfig to create connections with AMQPS uri.
	PEMCertLocation   string `json:"PEMCertLocation" yaml:"PEMCertLocation"`
	LocalCertLocation string `json:"LocalCertLocation" yaml:"LocalCertLocation"`
	CertServerName    string `json:"CertServerName" yaml:"CertServerName"`
}

TLSConfig represents settings for configuring TLS.

type Topologer

type Topologer struct {
	ConnectionPool *ConnectionPool
}

Topologer allows you to build RabbitMQ topology backed by a ConnectionPool.

func NewTopologer

func NewTopologer(cp *ConnectionPool) *Topologer

NewTopologer builds you a new Topologer.

func (*Topologer) BindExchanges

func (top *Topologer) BindExchanges(bindings []*ExchangeBinding, ignoreErrors bool) error

BindExchanges loops thrrough and binds Exchanges to Exchanges - stops on first error.

func (*Topologer) BindQueues

func (top *Topologer) BindQueues(bindings []*QueueBinding, ignoreErrors bool) error

BindQueues loops through and binds Queues to Exchanges - stops on first error.

func (*Topologer) BuildExchanges

func (top *Topologer) BuildExchanges(exchanges []*Exchange, ignoreErrors bool) error

BuildExchanges loops through and builds Exchanges - stops on first error.

func (*Topologer) BuildQueues

func (top *Topologer) BuildQueues(queues []*Queue, ignoreErrors bool) error

BuildQueues loops through and builds Queues - stops on first error.

func (*Topologer) BuildTopology

func (top *Topologer) BuildTopology(config *TopologyConfig, ignoreErrors bool) error

BuildTopology builds a topology based on a TopologyConfig - stops on first error.

func (*Topologer) CreateExchange

func (top *Topologer) CreateExchange(
	exchangeName string,
	exchangeType string,
	passiveDeclare, durable, autoDelete, internal, noWait bool,
	args map[string]interface{}) error

CreateExchange builds an Exchange topology.

func (*Topologer) CreateExchangeFromConfig

func (top *Topologer) CreateExchangeFromConfig(exchange *Exchange) error

CreateExchangeFromConfig builds an Exchange topology from a config Exchange element.

func (*Topologer) CreateQueue

func (top *Topologer) CreateQueue(
	queueName string,
	passiveDeclare bool,
	durable bool,
	autoDelete bool,
	exclusive bool,
	noWait bool,
	args map[string]interface{}) error

CreateQueue builds a Queue topology.

func (*Topologer) CreateQueueFromConfig

func (top *Topologer) CreateQueueFromConfig(queue *Queue) error

CreateQueueFromConfig builds a Queue topology from a config Exchange element.

func (*Topologer) ExchangeBind

func (top *Topologer) ExchangeBind(exchangeBinding *ExchangeBinding) error

ExchangeBind binds an exchange to an Exchange.

func (*Topologer) ExchangeDelete

func (top *Topologer) ExchangeDelete(
	exchangeName string,
	ifUnused, noWait bool) error

ExchangeDelete removes the exchange from the server.

func (*Topologer) ExchangeUnbind

func (top *Topologer) ExchangeUnbind(exchangeName, routingKey, parentExchangeName string, noWait bool, args map[string]interface{}) error

ExchangeUnbind removes the binding of an Exchange to an Exchange.

func (*Topologer) PurgeQueue

func (top *Topologer) PurgeQueue(queueName string, noWait bool) (int, error)

PurgeQueue removes all messages from the Queue that are not waiting to be Acknowledged and returns the count.

func (*Topologer) PurgeQueues

func (top *Topologer) PurgeQueues(queueNames []string, noWait bool) (int, error)

PurgeQueues purges each Queue provided.

func (*Topologer) QueueBind

func (top *Topologer) QueueBind(queueBinding *QueueBinding) error

QueueBind binds an Exchange to a Queue.

func (*Topologer) QueueDelete

func (top *Topologer) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

QueueDelete removes the queue from the server (and all bindings) and returns messages purged (count).

func (*Topologer) UnbindQueue

func (top *Topologer) UnbindQueue(queueName, routingKey, exchangeName string, args map[string]interface{}) error

UnbindQueue removes the binding of a Queue to an Exchange.

type TopologyConfig

type TopologyConfig struct {
	Exchanges        []*Exchange        `json:"Exchanges" yaml:"Exchanges"`
	Queues           []*Queue           `json:"Queues" yaml:"Queues"`
	QueueBindings    []*QueueBinding    `json:"QueueBindings" yaml:"QueueBindings"`
	ExchangeBindings []*ExchangeBinding `json:"ExchangeBindings" yaml:"ExchangeBindings"`
}

TopologyConfig allows you to build simple toplogies from a JSON file.

func ConvertJSONFileToTopologyConfig

func ConvertJSONFileToTopologyConfig(fileNamePath string) (*TopologyConfig, error)

ConvertJSONFileToTopologyConfig opens a file.json and converts to Topology.

Directories

Path Synopsis
internal
tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL