grabbit

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2023 License: MIT Imports: 10 Imported by: 0

README

grabbit

Golang wrapper for RabbitMQ managed connections.

Version 1.0.0 and beyond 🚀.
Note: API breaking from the previous β release (see CallbackProcessMessages)

Go Reference

Rationale

This is an alternative library providing auto-reconnection support. It's been heavily inspired by other projects (listed in credits) and my previous experiments. The reason for a new project (instead of cloning/contributing to an existing one) is that internals may start to diverge too much from the original and risk non-adoption.

Usage

Please use the wiki page for a detailed list of how to get the most out of this library.

Goals

What I'd like this library to provide is:

  • make use of the latest amqp091-go library; this is the up to date version building on streadway's original work
  • be able to share a connection between multiple channels
  • have connection and channels auto-recover (on infrastructure failure) via managers
  • replace the internal logging with an alternative. Current thought is to have some buffered channel over which detailed events are submitted (non-blocking)
  • have the topology defined by consumers and publishers. Once when creating and then during channels recovery (ephemeral queues/exchanges only)
  • provide an optional callback to the caller space during recoveries. This supplements in a synchronous (blocking) mode the logging replacement mechanism.
  • awaiting confirmation of the published events to be handled within the library. (perhaps allow an user defined function if needed). ♻ I may abandon this in favor of application space handling ♻
  • consumers to accept user defined handlers for processing the received messages
  • the consumer handlers to also allow batch processing (with support for partial fulfillment of QoS expectations based on a timeout)
  • Bonus: optionally provide the users with access to the low level amqp.Channel. Unsafe initially. Note: safety migh have come for free if using the slightly higher level grabbit.Channel wrappers.

Non goals

  • not interested in concurrency safety of the channels. Publisher and consumers are relatively cheap, use plenty as needed instead of passing them across coroutines.

Credits

  • wagslane from whom I got heavily inspired to do the sane parameters, topology maintenance and consumer handlers. Please browse and star his repository.
  • Emir Ribic for his inspiring post that lead me to think about adding a resilience layer for the RabbitMQ client. You may want to read the full post
  • gbeletti from whose project I might pinch a few ideas. Regardless of drawing inspiration or not, his version made for an interesting reading.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RandConsumerName added in v0.2.0

func RandConsumerName() string

RandConsumerName creates a random string for the consumers. It is used internally by DefaultConsumerOptions by setting the 'ConsumerName' property of ConsumerOptions

func WithChannelOptionContext

func WithChannelOptionContext(ctx context.Context) func(options *ChannelOptions)

WithChannelOptionContext creates a function that sets the context of a ChannelOptions struct.

It takes a context.Context as a parameter and returns a function that takes a pointer to a ChannelOptions struct. The returned function sets the ctx field of the ChannelOptions struct to the provided context.

func WithChannelOptionDelay

func WithChannelOptionDelay(delayer DelayProvider) func(options *ChannelOptions)

WithChannelOptionDelay returns a function that sets the "delayer" field of the ChannelOptions struct to the given DelayProvider.

Parameters: - delayer: The DelayProvider that will be set as the "delayer" field of ChannelOptions.

Return type: A function that takes a pointer to a ChannelOptions struct as its parameter.

func WithChannelOptionDown

func WithChannelOptionDown(down CallbackWhenDown) func(options *ChannelOptions)

WithChannelOptionDown returns a function that sets the callback function to be called when the channel is down.

down - The callback function to be called when the channel is down. options - The ChannelOptions object to be modified.

func WithChannelOptionName

func WithChannelOptionName(name string) func(options *ChannelOptions)

WithChannelOptionName creates a function that sets the name field of the ChannelOptions struct.

It takes a string parameter 'name' and returns a function that takes a pointer to the ChannelOptions struct as a parameter.

func WithChannelOptionNotification

func WithChannelOptionNotification(ch chan Event) func(options *ChannelOptions)

WithChannelOptionNotification provides an application defined Event receiver to handle various alerts about the channel status.

func WithChannelOptionNotifyPublish

func WithChannelOptionNotifyPublish(publishNotifier CallbackNotifyPublish) func(options *ChannelOptions)

WithChannelOptionNotifyPublish returns a function that sets the callback function for notifying the publish event in the ChannelOptions.

It takes a single parameter: - publishNotifier: the callback function for notifying the publish event.

It returns a function that takes a pointer to ChannelOptions as a parameter.

func WithChannelOptionNotifyReturn

func WithChannelOptionNotifyReturn(returnNotifier CallbackNotifyReturn) func(options *ChannelOptions)

WithChannelOptionNotifyReturn generates a function that sets the returnNotifier callback for a ChannelOptions struct.

It takes a returnNotifier parameter of type CallbackNotifyReturn which represents a function that will be called when a return value is received.

The generated function takes an options parameter of type *ChannelOptions and sets the cbNotifyReturn field to the provided returnNotifier.

func WithChannelOptionProcessor added in v0.2.0

func WithChannelOptionProcessor(proc CallbackProcessMessages) func(options *ChannelOptions)

WithChannelOptionProcessor is a function that returns a function which sets the callback process messages for the ChannelOptions struct.

The parameter `proc` is a CallbackProcessMessages function that will be assigned to the `cbProcessMessages` field of the `ChannelOptions` struct.

The return type of the returned function is `func(options *ChannelOptions)`.

func WithChannelOptionRecovering

func WithChannelOptionRecovering(recover CallbackWhenRecovering) func(options *ChannelOptions)

WithChannelOptionRecovering generates a function that sets the callback function to be called when recovering from an error in the ChannelOptions.

Parameters:

  • recover: a CallbackWhenRecovering function that will be called when recovering from an error in the ChannelOptions.

Returns:

  • A function that takes a pointer to ChannelOptions and sets the cbReconnect field to the provided recover function.

func WithChannelOptionTopology

func WithChannelOptionTopology(topology []*TopologyOptions) func(options *ChannelOptions)

WithChannelOptionTopology returns a function that sets the topology options for a channel.

The function takes a slice of TopologyOptions as a parameter, which specifies the desired topology for the channel. It returns a function that takes a pointer to a ChannelOptions struct as a parameter. The function sets the topology field of the ChannelOptions struct to the provided topology slice.

func WithChannelOptionUp

func WithChannelOptionUp(up CallbackWhenUp) func(options *ChannelOptions)

WithChannelOptionUp returns a function that sets the callback function to be executed when the channel is up.

up: the callback function to be executed when the channel is up. options: the ChannelOptions to be modified.

returns: a function that modifies the ChannelOptions by setting the callback function to be executed when the channel is up.

func WithChannelOptionUsageParams

func WithChannelOptionUsageParams(params ChanUsageParameters) func(options *ChannelOptions)

WithChannelOptionUsageParams returns a function that sets the implementation parameters of the ChannelOptions struct.

It takes a parameter of type ChanUsageParameters and returns a function that takes a pointer to a ChannelOptions struct.

func WithConnectionOptionContext

func WithConnectionOptionContext(ctx context.Context) func(options *ConnectionOptions)

WithConnectionOptionContext stores the application provided context. Cancelling this context will terminate the recovery loop and also close down the connection (and indirectly its channel dependents).

func WithConnectionOptionDelay

func WithConnectionOptionDelay(delayer DelayProvider) func(options *ConnectionOptions)

WithConnectionOptionDelay provides an application space defined delay (between re-connection attempts) policy. An example of DelayProvider could be an exponential timeout routine based on the retry parameter.

func WithConnectionOptionDown

func WithConnectionOptionDown(down CallbackWhenDown) func(options *ConnectionOptions)

WithConnectionOptionDown stores the application space callback for connection down events.

func WithConnectionOptionName

func WithConnectionOptionName(name string) func(options *ConnectionOptions)

WithConnectionOptionName assigns a tag to this connection.

func WithConnectionOptionNotification

func WithConnectionOptionNotification(ch chan Event) func(options *ConnectionOptions)

WithConnectionOptionNotification provides an application defined Event receiver to handle various alerts about the connection status.

func WithConnectionOptionPassword

func WithConnectionOptionPassword(credentials SecretProvider) func(options *ConnectionOptions)

WithConnectionOptionPassword provides password refresh capabilities for dynamically protected services (future IAM)

func WithConnectionOptionRecovering

func WithConnectionOptionRecovering(recover CallbackWhenRecovering) func(options *ConnectionOptions)

WithConnectionOptionRecovering stores the application space callback for connection recovering events.

func WithConnectionOptionUp

func WithConnectionOptionUp(up CallbackWhenUp) func(options *ConnectionOptions)

WithConnectionOptionUp stores the application space callback for connection established events.

Types

type CallbackNotifyPublish

type CallbackNotifyPublish func(confirm amqp.Confirmation, ch *Channel)

CallbackNotifyPublish defines a function type for handling the publish notifications. Applications can define their own handler and pass it via WithChannelOptionNotifyPublish.

type CallbackNotifyReturn

type CallbackNotifyReturn func(confirm amqp.Return, ch *Channel)

CallbackNotifyReturn defines a function type for handling the return notifications. Applications can define their own handler and pass it via WithChannelOptionNotifyReturn.

type CallbackProcessMessages added in v0.2.0

type CallbackProcessMessages func(props *DeliveriesProperties, messages []DeliveryData, mustAck bool, ch *Channel)

CallbackProcessMessages defines a user passed function for processing the received messages. Applications can define their own handler and pass it via WithChannelOptionProcessor.

type CallbackWhenDown

type CallbackWhenDown func(name string, err OptionalError) bool

CallbackWhenDown defines a function type used when connection was lost. Returns false when want aborting this connection. Pass your implementations via WithChannelOptionDown and WithConnectionOptionDown.

type CallbackWhenRecovering

type CallbackWhenRecovering func(name string, retry int) bool

CallbackWhenRecovering defines a function used prior to recovering a connection. Returns false when want aborting this connection. Applications can define their own handler and pass it via WithChannelOptionRecovering and WithConnectionOptionRecovering.

type CallbackWhenUp

type CallbackWhenUp func(name string)

CallbackWhenUp defines a function type used after a successful connection or channel recovery. Applications can define their own handler and pass it via WithConnectionOptionUp and WithChannelOptionUp.

type ChanUsageParameters

type ChanUsageParameters struct {
	PublisherUsageOptions
	ConsumerUsageOptions
}

ChanUsageParameters embeds PublisherUsageOptions and ConsumerUsageOptions. It is a private member of the ChannelOptions and cen be passed via WithChannelOptionUsageParams.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel wraps the base amqp channel by creating a managed channel.

func NewChannel

func NewChannel(conn *Connection, optionFuncs ...func(*ChannelOptions)) *Channel

NewChannel creates a new managed Channel with the given Connection and optional ChannelOptions. There shouldn't be any need to have direct access and is recommended using a Consumer or Publisher instead.

The resulting channel inherits the events notifier, context and delayer from the master connection but all can be overridden by passing options. Use the 'WithChannelOption<OptionName>' for optionFuncs.

Example Usage:

  chan := NewChannel(conn,
    WithChannelOptionName("myChannel"),
    WithChannelOptionDown(Down),
    WithChannelOptionUp(Up),
	WithChannelOptionRecovering(Reattempting),
	WithChannelOptionNotification(dataStatusChan),
    WithChannelOptionContext(ctx),
  )

Parameters:

  • conn: The Connection to associate the Channel with.
  • optionFuncs: An optional list of functions to modify the ChannelOptions.

Returns: A new Channel object.

func (*Channel) Ack added in v0.2.0

func (ch *Channel) Ack(tag uint64, multiple bool) error

Ack safely wraps the base channel Ack.

func (*Channel) Cancel added in v0.2.0

func (ch *Channel) Cancel(consumer string, noWait bool) error

Cancel wraps safely the base channel cancellation.

func (*Channel) Channel

func (ch *Channel) Channel() *SafeBaseChan

Channel returns the low level library channel for further direct access to its Super() low level channel. Use sparingly and prefer using the predefined Channel wrapping methods instead. Pair usage with the provided full [Lock][UnLock] or read [RLock][RUnlock] locking/unlocking mechanisms for safety!

func (*Channel) Close

func (ch *Channel) Close() error

Close wraps the base channel Close.

func (*Channel) ExchangeDeclare added in v0.2.0

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

ExchangeDeclare safely wraps the base channel ExchangeDeclare Prefer using the [ExchangeDeclareWithTopology] instead; that also supports bindings, see TopologyOptions

func (*Channel) ExchangeDeclareWithTopology added in v0.2.0

func (ch *Channel) ExchangeDeclareWithTopology(t *TopologyOptions) error

ExchangeDeclareWithTopology safely declares a desired exchange as described in the parameter; see TopologyOptions

func (*Channel) ExchangeDelete added in v1.0.0

func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error

ExchangeDelete safely wraps the base channel ExchangeDelete.

func (*Channel) GetNextPublishSeqNo added in v1.0.0

func (ch *Channel) GetNextPublishSeqNo() uint64

GetNextPublishSeqNo safely wraps the base channel GetNextPublishSeqNo

func (*Channel) IsClosed

func (ch *Channel) IsClosed() bool

IsClosed safely wraps the base channel IsClosed.

func (*Channel) IsPaused

func (ch *Channel) IsPaused() bool

IsPaused returns a publisher's flow status of the base channel.

func (*Channel) Nack added in v0.2.0

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error

Ack safely wraps the base channel Nak.

func (*Channel) Name added in v1.1.0

func (ch *Channel) Name() string

Name returns the tag defined originally when creating this channel

func (*Channel) PublishWithContext added in v0.2.1

func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

PublishWithContext safely wraps the base channel PublishWithContext.

func (*Channel) PublishWithDeferredConfirmWithContext added in v1.0.0

func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

PublishWithDeferredConfirmWithContext safely wraps the base channel PublishWithDeferredConfirmWithContext.

func (*Channel) Queue

func (ch *Channel) Queue() string

Queue returns the active (as indicated by [IsDestination] option in topology options) queue name. Useful for finding the server assigned name.

func (*Channel) QueueDeclare added in v0.2.0

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclare safely wraps the base channel QueueDeclare. Prefer using the [QueueDeclareWithTopology] instead; that also supports bindings, see TopologyOptions

func (*Channel) QueueDeclarePassive added in v1.0.0

func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

QueueDeclarePassive safely wraps the base channel QueueInspect.

func (*Channel) QueueDeclareWithTopology added in v0.2.0

func (ch *Channel) QueueDeclareWithTopology(t *TopologyOptions) (amqp.Queue, error)

QueueDeclareWithTopology safely declares a desired queue as described in the parameter; see TopologyOptions

func (*Channel) QueueDelete added in v0.2.0

func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

QueueDelete safely wraps the base channel QueueDelete.

func (*Channel) QueueInspect deprecated added in v0.2.0

func (ch *Channel) QueueInspect(name string) (amqp.Queue, error)

QueueInspect safely wraps the base channel QueueInspect.

Deprecated: use QueueDeclarePassive

func (*Channel) QueuePurge added in v0.2.0

func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)

QueuePurge safely wraps the base channel QueuePurge.

func (*Channel) Reject added in v1.0.0

func (ch *Channel) Reject(tag uint64, requeue bool) error

Reject safely wraps the base channel Ack.

type ChannelOptions

type ChannelOptions struct {
	// contains filtered or unexported fields
}

ChannelOptions represents the options for configuring a channel.

type ClientType

type ClientType int

ClientType defines the class of objects that interact with the amqp functionality. Used mostly for sending alerts about specific functionality areas.

const (
	CliConnection ClientType = iota
	CliChannel
)

func (ClientType) String

func (i ClientType) String() string

type ConfirmationOutcome added in v1.1.0

type ConfirmationOutcome int
const (
	ConfirmationTimeOut  ConfirmationOutcome = iota // no timely response
	ConfirmationClosed                              // data confirmation channel is closed
	ConfirmationDisabled                            // base channel has not been put into confirm mode
	ConfirmationPrevious                            // lower sequence number than expected
	ConfirmationACK                                 // ACK (publish confirmed)
	ConfirmationNAK                                 // NAK (publish negative acknowledgement)
)

func (ConfirmationOutcome) String added in v1.1.0

func (i ConfirmationOutcome) String() string

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection wraps a SafeBaseConn with additional attributes (impl. details: rabbit URL, ConnectionOptions and a cancelling context). Applications should obtain a connection using NewConnection.

func NewConnection

func NewConnection(address string, config amqp.Config, optionFuncs ...func(*ConnectionOptions)) *Connection

NewConnection creates a new managed Connection object with the given address, configuration, and option functions.

Example Usage:

  conn := NewConnection("amqp://guest:guest@localhost:5672/", amqp.Config{},
	  WithConnectionOptionContext(context.Background(),
	  WithConnectionOptionName("default"),
	  WithConnectionOptionDown(Down),
	  WithConnectionOptionUp(Up),
	  WithConnectionOptionRecovering(Reattempting),
	  WithConnectionOptionNotification(connStatusChan),
  )

Parameters:

  • address: the address of the connection.
  • config: the AMQP configuration.
  • optionFuncs: variadic option functions to customize the connection options.

Returns: a new Connection object.

func (*Connection) Channel

func (conn *Connection) Channel() (*amqp.Channel, error)

Channel safely wraps the amqp connection Channel() function.

func (*Connection) Close

func (conn *Connection) Close() error

Close safely wraps the amqp connection Close and terminates the maintenance loop.

func (*Connection) Connection

func (conn *Connection) Connection() *SafeBaseConn

Connection returns the safe base connection and thus indirectly the low level library connection.

func (*Connection) IsBlocked

func (conn *Connection) IsBlocked() bool

IsBlocked returns the TCP flow status of the base connection.

func (*Connection) IsClosed

func (conn *Connection) IsClosed() bool

IsClosed safely wraps the amqp connection IsClosed

type ConnectionOptions

type ConnectionOptions struct {
	// contains filtered or unexported fields
}

ConnectionOptions defines a collection of attributes used internally by the Connection.

Attributes can be set via optionFuncs parameters of NewConnection via WithConnectionOption<Fct> family, ex: WithConnectionOptionDown, WithConnectionOptionContext, WithConnectionOptionNotification.

type Consumer added in v0.2.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements an object allowing calling applications to receive messages on already established connections. Create a consumer instance by calling NewConsumer.

func NewConsumer added in v0.2.0

func NewConsumer(conn *Connection, opt ConsumerOptions, optionFuncs ...func(*ChannelOptions)) *Consumer

NewConsumer creates a consumer with the desired options and then starts consuming. It creates and opens a new dedicated Channel using the passed shared connection. NOTE: It's advisable to use separate connections for Channel.Publish and Channel.Consume

func (*Consumer) Available added in v0.2.0

func (c *Consumer) Available() (bool, bool)

Available returns the status of both the underlying connection and channel.

func (*Consumer) AwaitAvailable added in v0.2.0

func (c *Consumer) AwaitAvailable(timeout, pollFreq time.Duration) bool

AwaitAvailable waits till the consumer infrastructure is ready or timeout expires. Useful when the connections and channels are about being created or recovering. When passing zero value parameter the defaults used are 7500ms for timeout and 330 ms for polling frequency.

func (*Consumer) Cancel added in v0.2.0

func (p *Consumer) Cancel() error

Cancel wraps safely the base consumer channel cancellation.

func (*Consumer) Channel added in v0.2.0

func (p *Consumer) Channel() *Channel

Channel returns the managed Channel which can be further used to extract SafeBaseChan

func (*Consumer) Close added in v0.2.0

func (c *Consumer) Close() error

Close shuts down cleanly the publisher channel.

type ConsumerOptions added in v0.2.0

type ConsumerOptions struct {
	ConsumerUsageOptions
}

func DefaultConsumerOptions added in v0.2.0

func DefaultConsumerOptions() ConsumerOptions

DefaultConsumerOptions creates some sane defaults for consuming messages.

func (*ConsumerOptions) WithArgs added in v0.2.0

func (opt *ConsumerOptions) WithArgs(args amqp.Table) *ConsumerOptions

WithArgs sets the arguments for the consumer options.

args: The arguments to be set. Returns: The updated consumer options.

func (*ConsumerOptions) WithAutoAck added in v0.2.0

func (opt *ConsumerOptions) WithAutoAck(autoAck bool) *ConsumerOptions

WithAutoAck sets the ConsumerAutoAck field of the ConsumerOptions struct to the provided boolean value.

autoAck: A boolean value indicating whether the consumer should automatically acknowledge messages.

*ConsumerOptions: A pointer to the ConsumerOptions struct. Returns: A pointer to the updated ConsumerOptions struct.

func (*ConsumerOptions) WithExclusive added in v0.2.0

func (opt *ConsumerOptions) WithExclusive(exclusive bool) *ConsumerOptions

WithExclusive sets the exclusive flag for the ConsumerOptions.

exclusive: a boolean indicating whether the ConsumerOptions should be exclusive. Returns a pointer to the updated ConsumerOptions.

func (*ConsumerOptions) WithName added in v0.2.0

func (opt *ConsumerOptions) WithName(name string) *ConsumerOptions

WithName sets the name of the ConsumerOptions.

name: the name to set for the ConsumerOptions. return: the updated ConsumerOptions.

func (*ConsumerOptions) WithNoLocal added in v0.2.0

func (opt *ConsumerOptions) WithNoLocal(noLocal bool) *ConsumerOptions

WithNoLocal sets the ConsumerNoLocal field of the ConsumerOptions struct.

It takes a boolean parameter named noLocal. It returns a pointer to the ConsumerOptions struct.

func (*ConsumerOptions) WithNoWait added in v0.2.0

func (opt *ConsumerOptions) WithNoWait(noWait bool) *ConsumerOptions

WithNoWait sets the ConsumerNoWait field of ConsumerOptions struct and returns the modified ConsumerOptions object.

Parameters: - noWait: a boolean value indicating whether the consumer should wait or not.

Return type: - *ConsumerOptions: the modified ConsumerOptions object.

func (*ConsumerOptions) WithPrefetchCount added in v0.2.0

func (opt *ConsumerOptions) WithPrefetchCount(count int) *ConsumerOptions

WithPrefetchCount sets the prefetch count for the ConsumerOptions.

count: the number of messages to prefetch. returns: a pointer to the updated ConsumerOptions.

func (*ConsumerOptions) WithPrefetchSize added in v0.2.0

func (opt *ConsumerOptions) WithPrefetchSize(size int) *ConsumerOptions

WithPrefetchSize sets the prefetch size for the ConsumerOptions struct.

It takes an integer `size` as a parameter and sets the PrefetchSize field of the ConsumerOptions struct to that value. It returns a pointer to the modified ConsumerOptions struct.

func (*ConsumerOptions) WithPrefetchTimeout added in v0.2.0

func (opt *ConsumerOptions) WithPrefetchTimeout(timeout time.Duration) *ConsumerOptions

WithPrefetchTimeout sets the prefetch timeout for the ConsumerOptions struct.

timeout - The duration of the prefetch timeout. Returns the updated ConsumerOptions struct.

func (*ConsumerOptions) WithQosGlobal added in v0.2.0

func (opt *ConsumerOptions) WithQosGlobal(global bool) *ConsumerOptions

WithQosGlobal sets the global QoS option for the ConsumerOptions struct.

It takes a boolean value, `global`, to determine whether the QoS option should be set globally. The function returns a pointer to the updated ConsumerOptions struct.

func (*ConsumerOptions) WithQueue added in v0.2.0

func (opt *ConsumerOptions) WithQueue(queue string) *ConsumerOptions

WithQueue sets the consumer queue for the ConsumerOptions struct.

queue: the name of the queue. returns: the updated ConsumerOptions struct.

type ConsumerUsageOptions added in v0.2.0

type ConsumerUsageOptions struct {
	IsConsumer        bool          // indicates if this chan is used for consuming
	ConsumerName      string        // chanel wide consumers unique identifier
	PrefetchTimeout   time.Duration // how long to wait for PrefetchCount messages to arrive
	PrefetchCount     int           // Qos count
	PrefetchSize      int           // Qos payload size
	QosGlobal         bool          // all future channels
	ConsumerQueue     string        // queue name from which to receive. Overridden by engine assigned name.
	ConsumerAutoAck   bool          // see [amqp.Consume]
	ConsumerExclusive bool          // see [amqp.Consume]
	ConsumerNoLocal   bool          // see [amqp.Consume]
	ConsumerNoWait    bool          // see [amqp.Consume]
	ConsumerArgs      amqp.Table    // core properties
}

ConsumerUsageOptions defines parameters for driving the consumers behavior and indicating to the supporting channel to start consuming.

type DefaultDelayer

type DefaultDelayer struct {
	Value time.Duration
}

DefaultDelayer allows defining a basic (constant) delay policy. The implementation defaults used by new connections and channels has a value of 7.5 seconds.

func (DefaultDelayer) Delay

func (delayer DefaultDelayer) Delay(retry int) time.Duration

Delay implements the DelayProvider i/face for the DefaultDelayer.

type DeferredConfirmation added in v1.1.0

type DeferredConfirmation struct {
	*amqp.DeferredConfirmation                     // wrapped low level confirmation
	Outcome                    ConfirmationOutcome // acknowledgment received stats
	RequestSequence            uint64              // sequence of the original request (GetNextPublishSeqNo)
	ChannelName                string              // channel name of the publisher
	Queue                      string              // queue name of the publisher
}

DeferredConfirmation wraps amqp.DeferredConfirmation with additional data. It inherits (by embedding) all original fields and functonality from the amqp object.

type DelayProvider

type DelayProvider interface {
	Delay(retry int) time.Duration
}

DelayProvider allows passing a bespoke method for providing the delay policy for waiting between reconnection attempts. See WithConnectionOptionDelay, WithChannelOptionDelay. TIP: one could pass en exponential delayer derived from the 'retry' counter.

type DeliveriesProperties added in v0.2.0

type DeliveriesProperties struct {
	Headers amqp.Table // Application or header exchange table
	// Properties; assume all are common
	ContentType     string // MIME content type
	ContentEncoding string // MIME content encoding
	DeliveryMode    uint8  // queue implementation use - non-persistent (1) or persistent (2)
	Priority        uint8  // queue implementation use - 0 to 9
	ConsumerTag     string // client tag as provided during consumer registration
	Exchange        string // basic.publish exchange
	RoutingKey      string // basic.publish routing key
}

DeliveriesProperties captures the common attributes of multiple commonly grouped (i.e. received over same channel in one go) deliveries. It is an incomplete amqp.Delivery

func DeliveryPropsFrom added in v1.0.0

func DeliveryPropsFrom(d *amqp.Delivery) (prop DeliveriesProperties)

DeliveryPropsFrom generates a DeliveriesProperties struct from an amqp.Delivery.

Takes a pointer to an amqp.Delivery as the parameter and returns a DeliveriesProperties struct.

type DeliveryData added in v1.0.0

type DeliveryData struct {
	Body        DeliveryPayload // actual data payload
	DeliveryTag uint64          // sequential number of this message
	Redelivered bool            // message has been re-enqueued
	Expiration  string          // message expiration spec
	MessageId   string          // message identifier
	Timestamp   time.Time       // message timestamp
	Type        string          // message type name
	UserId      string          // user of the publishing connection
	AppId       string          // application id
}

DeliveryData isolates the data part of each specific delivered message

func DeliveryDataFrom added in v1.0.0

func DeliveryDataFrom(d *amqp.Delivery) (data DeliveryData)

DeliveryDataFrom creates a DeliveryData object from an amqp.Delivery object.

It takes a pointer to an amqp.Delivery object as its parameter and returns a DeliveryData object.

type DeliveryPayload added in v0.2.0

type DeliveryPayload []byte

DeliveryPayload subtypes the actual content of deliveries

type Event

type Event struct {
	SourceType ClientType    // origin type
	SourceName string        // origin tag
	TargetName string        // affected tag
	Kind       EventType     // type of event
	Err        OptionalError // low level error
}

Event defines a simple body structure for the alerts received via the notification channels passed in WithChannelOptionNotification and WithConnectionOptionNotification.

type EventType

type EventType int

EventType defines the class of alerts sent to the application layer.

const (
	EventUp EventType = iota
	EventDown
	EventCannotEstablish
	EventBlocked
	EventUnBlocked
	EventClosed
	EventMessageReceived
	EventMessagePublished
	EventMessageReturned
	EventConfirm
	EventQos
	EventConsume
	EventDefineTopology
	EventDataExhausted
	EventDataPartial
)

func (EventType) String

func (i EventType) String() string

type OptionalError added in v0.2.0

type OptionalError struct {
	// contains filtered or unexported fields
}

func SomeErrFromError added in v0.2.0

func SomeErrFromError(err error, isSet bool) OptionalError

SomeErrFromError creates an OptionalError struct with the given error and isSet values.

Parameters: - err: The error to be assigned to the OptionalError struct. - isSet: A boolean value indicating whether the error is set or not.

Return: - OptionalError: The OptionalError struct with the assigned error and isSet values.

func SomeErrFromString added in v0.2.0

func SomeErrFromString(text string) OptionalError

SomeErrFromString creates an OptionalError from the specified text.

Parameters: - text: the string to create the error from.

Return type: - OptionalError: the created OptionalError.

func (OptionalError) Error added in v0.2.0

func (e OptionalError) Error() string

Error returns the error string representation of the OptionalError.

It calls the Or method on the OptionalError to get the error value and returns its Error method.

func (OptionalError) IsSet added in v0.2.0

func (e OptionalError) IsSet() bool

IsSet returns a boolean value indicating whether the OptionalError is set.

This function does not take any parameters. It returns a boolean value.

func (OptionalError) Or added in v0.2.0

func (e OptionalError) Or(err error) error

Or returns the optional error if it is set, otherwise it returns the provided error.

err - The error to return if the optional error is not set. error - The optional error.

type PersistentNotifiers

type PersistentNotifiers struct {
	Published chan amqp.Confirmation // publishing confirmation
	Returned  chan amqp.Return       // returned messages
	Flow      chan bool              // flow control
	Closed    chan *amqp.Error       // channel closed
	Cancel    chan string            // channel cancelled
	Consumer  <-chan amqp.Delivery   // message intake
}

PersistentNotifiers are channels that have the lifespan of the channel. Only need refreshing when recovering.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher implements an object allowing calling applications to publish messages on already established connections. Create a publisher instance by calling NewPublisher.

func NewPublisher

func NewPublisher(conn *Connection, opt PublisherOptions, optionFuncs ...func(*ChannelOptions)) *Publisher

NewPublisher creates a publisher with the desired options. It creates and opens a new dedicated Channel using the passed shared connection.

func (*Publisher) Available

func (p *Publisher) Available() (bool, bool)

Available returns the status of both the underlying connection and channel.

func (*Publisher) AwaitAvailable

func (p *Publisher) AwaitAvailable(timeout, pollFreq time.Duration) bool

AwaitAvailable waits till the publisher infrastructure is ready or timeout expires. Useful when the connections and channels are about being created or recovering. When passing zero value parameter the defaults used are 7500ms for timeout and 330 ms for polling frequency.

func (*Publisher) AwaitDeferredConfirmation added in v1.1.1

func (p *Publisher) AwaitDeferredConfirmation(d *DeferredConfirmation, tmr time.Duration) *DeferredConfirmation

AwaitDeferredConfirmation waits for the confirmation of a deferred action and updates its outcome.

It takes in a deferred confirmation object and a time duration for the timeout. It returns the updated deferred confirmation object.

func (*Publisher) Channel added in v0.2.0

func (p *Publisher) Channel() *Channel

Channel returns the managed Channel which can be further used to extract SafeBaseChan

func (*Publisher) Close

func (p *Publisher) Close() error

Close shuts down cleanly the publisher channel.

func (*Publisher) Publish

func (p *Publisher) Publish(msg amqp.Publishing) error

Publish wraps the amqp.PublishWithContext using the internal PublisherOptions cached when the publisher was created.

func (*Publisher) PublishDeferredConfirm added in v1.1.0

func (p *Publisher) PublishDeferredConfirm(msg amqp.Publishing) (*DeferredConfirmation, error)

PublishDeferredConfirm wraps the amqp.PublishWithDeferredConfirmWithContext using the internal PublisherOptions cached when the publisher was created.

func (*Publisher) PublishDeferredConfirmWithOptions added in v1.1.0

func (p *Publisher) PublishDeferredConfirmWithOptions(opt PublisherOptions, msg amqp.Publishing) (*DeferredConfirmation, error)

PublishDeferredConfirmWithOptions wraps the amqp.PublishWithDeferredConfirmWithContext using the passed options.

func (*Publisher) PublishWithOptions

func (p *Publisher) PublishWithOptions(opt PublisherOptions, msg amqp.Publishing) error

PublishWithOptions wraps the amqp.PublishWithContext using the passed options.

type PublisherOptions

type PublisherOptions struct {
	PublisherUsageOptions
	Context   context.Context // controlling environment
	Exchange  string          // routing exchange
	Key       string          // routing key (usually queue name)
	Mandatory bool            // delivery is mandatory
	Immediate bool            // delivery is immediate
}

PublisherOptions defines publisher specific parameters. Mostly used as defaults for sending messages and inner channel functionality.

func DefaultPublisherOptions

func DefaultPublisherOptions() PublisherOptions

DefaultPublisherOptions creates some sane defaults for publishing messages. Note: The Message/payload itself must still be an amqp.Publishing object, fully under application's control.

func (*PublisherOptions) WithConfirmationNoWait

func (opt *PublisherOptions) WithConfirmationNoWait(confNoWait bool) *PublisherOptions

WithConfirmationNoWait sets the ConfirmationNoWait field of the PublisherOptions struct.

It takes a boolean parameter `confNoWait` and updates the `ConfirmationNoWait` field of the `PublisherOptions` struct to the value of `confNoWait`. It returns a pointer to the `PublisherOptions` struct.

func (*PublisherOptions) WithConfirmationsCount

func (opt *PublisherOptions) WithConfirmationsCount(count int) *PublisherOptions

WithConfirmationsCount sets the number of confirmations required for publishing.

count: The number of confirmations required. *PublisherOptions: The updated PublisherOptions object.

func (*PublisherOptions) WithContext

func (opt *PublisherOptions) WithContext(ctx context.Context) *PublisherOptions

WithContext sets the context for the PublisherOptions.

ctx: The context to be set. Returns: A pointer to PublisherOptions.

func (*PublisherOptions) WithExchange

func (opt *PublisherOptions) WithExchange(exchange string) *PublisherOptions

WithExchange sets the exchange for the PublisherOptions struct.

Parameters: - exchange: The exchange to set.

Returns: - *PublisherOptions: The updated PublisherOptions struct.

func (*PublisherOptions) WithImmediate

func (opt *PublisherOptions) WithImmediate(immediate bool) *PublisherOptions

WithImmediate sets the immediate flag of the PublisherOptions struct.

It takes a boolean parameter `immediate` and returns a pointer to the updated PublisherOptions.

func (*PublisherOptions) WithKey

func (opt *PublisherOptions) WithKey(key string) *PublisherOptions

WithKey sets the key for the PublisherOptions.

key: the key to set. returns: a pointer to the PublisherOptions.

func (*PublisherOptions) WithMandatory

func (opt *PublisherOptions) WithMandatory(mandatory bool) *PublisherOptions

WithMandatory sets the mandatory flag in the PublisherOptions struct.

Parameters: - mandatory: a boolean indicating whether the field should be mandatory.

Returns: - *PublisherOptions: a pointer to the PublisherOptions struct.

type PublisherUsageOptions added in v0.2.0

type PublisherUsageOptions struct {
	ConfirmationCount  int  // size of publishing confirmations over the amqp channel
	ConfirmationNoWait bool // publisher confirmation mode parameter
	IsPublisher        bool // indicates if this chan is used for publishing

}

PublisherUsageOptions defines parameters for driving the publishers behavior and indicating to the supporting channel that publishing operations are enabled.

type SafeBaseChan

type SafeBaseChan struct {
	// contains filtered or unexported fields
}

SafeBaseChan wraps in a concurrency safe way the low level amqp.Channel.

func (*SafeBaseChan) IsSet

func (c *SafeBaseChan) IsSet() bool

IsSet checks whether the SafeBaseChan's super field is set.

It does this by acquiring a read lock on the SafeBaseChan's mutex and then deferring its release.

Returns true if the super field is not nil, false otherwise.

func (*SafeBaseChan) Lock added in v0.2.0

func (c *SafeBaseChan) Lock()

Lock acquires locking of the low level channel [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!

func (*SafeBaseChan) RLock added in v1.0.0

func (c *SafeBaseChan) RLock()

RLock acquires read locking of the low level channel [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!

func (*SafeBaseChan) RUnlock added in v1.0.0

func (c *SafeBaseChan) RUnlock()

RUnlock releases the low level channel [Super] read lock.

func (*SafeBaseChan) Super

func (c *SafeBaseChan) Super() *amqp.Channel

Super returns the low level amqp channel for direct interactions. Use sparingly and prefer using the predefined Channel wrapping methods instead. Pair usage with the locking/unlocking routines for safety!

func (*SafeBaseChan) UnLock added in v0.2.0

func (c *SafeBaseChan) UnLock()

UnLock releases the low level channel [Super] lock.

type SafeBaseConn

type SafeBaseConn struct {
	// contains filtered or unexported fields
}

SafeBaseConn wraps in a concurrency safe way the low level amqp.Connection.

func (*SafeBaseConn) IsSet

func (c *SafeBaseConn) IsSet() bool

IsSet tests if the low level amqp connection is set.

func (*SafeBaseConn) Lock added in v0.2.0

func (c *SafeBaseConn) Lock()

Lock acquires locking of the low level connection [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!

func (*SafeBaseConn) Super

func (c *SafeBaseConn) Super() *amqp.Connection

Super returns the low level amqp connection for direct interactions. Use sparingly and prefer using the predefined Connection wrapping methods instead. Pair usage with the locking/unlocking routines for safety!

func (*SafeBaseConn) UnLock added in v0.2.0

func (c *SafeBaseConn) UnLock()

UnLock releases the low level connection [Super] lock.

type SafeBool

type SafeBool struct {
	// contains filtered or unexported fields
}

SafeBool wraps a boolean in a concurrency safe way so it can be set, reset and tested from different coroutines.

type SecretProvider

type SecretProvider interface {
	Password() (string, error)
}

SecretProvider allows passing a bespoke method for providing the secret required when connecting to the Rabbit engine. See WithConnectionOptionPassword.

type TopologyBind

type TopologyBind struct {
	Enabled bool       // want this re-routing
	Peer    string     // other end of routing
	Key     string     // routing key / filter
	NoWait  bool       // re-routing confirmation required
	Args    amqp.Table // core properties
}

TopologyBind defines the possible binding relation between exchanges or queues and exchanges.

type TopologyOptions

type TopologyOptions struct {
	Name          string       // tag of exchange or queue
	IsDestination bool         // end target, i.e. if messages should be routed to it
	IsExchange    bool         // indicates if this an exchange or queue
	Bind          TopologyBind // complex routing
	Kind          string       // empty string for default exchange or: direct, topic, fanout, headers.
	Durable       bool         // maps the durable amqp attribute
	AutoDelete    bool         // maps the auto-delete amqp attribute
	Exclusive     bool         // if queue is exclusive
	Internal      bool         //
	NoWait        bool         // // maps the noWait amqp attribute
	Passive       bool         // if false, it will be created on the server when missing
	Args          amqp.Table   // wraps the amqp Table parameters
	Declare       bool         // gets created on start and also during recovery if Durable is false
}

TopologyOptions defines the infrastructure topology, i.e. exchange and queues definition when wanting handling automatically on recovery or one time creation

func (*TopologyOptions) GetRouting added in v0.2.0

func (t *TopologyOptions) GetRouting() (source, destination string)

GetRouting returns the source and destination strings for the TopologyOptions struct.

The source and destination strings are determined based on whether IsDestination is true or false.

  • if IsDestination is true, the source string is set to t.Bind.Peer and the destination string is set to t.Name.
  • if IsDestination is false, the source string is set to t.Name and the destination string is set to t.Bind.Peer.

Returns the source and destination strings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL