Documentation ¶
Overview ¶
Package amqpx provides working with RabbitMQ using AMQP 0.9.1.
Index ¶
- Constants
- Variables
- func D[T any](fn func(ctx context.Context, d *Delivery[T]) Action) *handleValue[T]
- type Acknowledger
- type Action
- type Channel
- type Client
- type ClientOption
- func ApplyURI(s string) ClientOption
- func IsTLS(v bool) ClientOption
- func SetAuth(username, password string) ClientOption
- func SetConfig(c Config) ClientOption
- func SetConnectionName(name string) ClientOption
- func SetHost(h string) ClientOption
- func SetTLSConfig(t *tls.Config) ClientOption
- func SetVHost(vhost string) ClientOption
- func UseInterceptor(i ...Interceptor) ClientOption
- func UseMarshaler(m Marshaler) ClientOption
- func UseUnmarshaler(u ...Unmarshaler) ClientOption
- func WithLog(log LogFunc) ClientOption
- type Config
- type Connection
- type ConsumeFunc
- type ConsumeInterceptor
- type ConsumerOption
- func BindQueue(q QueueBind) ConsumerOption
- func ConsumerTag(tag string) ConsumerOption
- func DeclareExchange(e ExchangeDeclare) ConsumerOption
- func DeclareQueue(q QueueDeclare) ConsumerOption
- func SetAutoAckMode() ConsumerOption
- func SetConcurrency(i int) ConsumerOption
- func SetConsumeInterceptor(i ...ConsumeInterceptor) ConsumerOption
- func SetExclusive(b bool) ConsumerOption
- func SetPrefetchCount(i int) ConsumerOption
- func SetUnmarshaler(m Unmarshaler) ConsumerOption
- type Delivery
- type DeliveryRequest
- func (d *DeliveryRequest) AppID() string
- func (d *DeliveryRequest) Body() []byte
- func (d *DeliveryRequest) ConsumerTag() string
- func (d *DeliveryRequest) ContentEncoding() string
- func (d *DeliveryRequest) ContentType() string
- func (d *DeliveryRequest) CorrelationID() string
- func (d *DeliveryRequest) DeliveryMode() uint8
- func (d *DeliveryRequest) DeliveryTag() uint64
- func (d *DeliveryRequest) Exchange() string
- func (d *DeliveryRequest) Expiration() string
- func (d *DeliveryRequest) Headers() Table
- func (d *DeliveryRequest) Log(format string, v ...any)
- func (d *DeliveryRequest) MessageID() string
- func (d *DeliveryRequest) NewFrom(req *amqp091.Delivery) *DeliveryRequest
- func (d *DeliveryRequest) Priority() uint8
- func (d *DeliveryRequest) Redelivered() bool
- func (d *DeliveryRequest) ReplyTo() string
- func (d *DeliveryRequest) RoutingKey() string
- func (d *DeliveryRequest) SetBody(b []byte)
- func (d *DeliveryRequest) Status() Action
- func (d *DeliveryRequest) Timestamp() time.Time
- func (d *DeliveryRequest) Type() string
- func (d *DeliveryRequest) UserID() string
- type ExchangeDeclare
- type HandlerValue
- type Interceptor
- type LogFunc
- type Marshaler
- type PublishFunc
- type PublishInterceptor
- type PublishOption
- type Publisher
- type PublisherOption
- type Publishing
- func (m *Publishing[T]) PersistentMode() *Publishing[T]
- func (m *Publishing[T]) SetAppID(id string) *Publishing[T]
- func (m *Publishing[T]) SetCorrelationID(id string) *Publishing[T]
- func (m *Publishing[T]) SetExpiration(expiration string) *Publishing[T]
- func (m *Publishing[T]) SetMessageID(id string) *Publishing[T]
- func (m *Publishing[T]) SetPriority(priority uint8) *Publishing[T]
- func (m *Publishing[T]) SetReplyTo(replyTo string) *Publishing[T]
- func (m *Publishing[T]) SetTimestamp(timestamp time.Time) *Publishing[T]
- func (m *Publishing[T]) SetType(typ string) *Publishing[T]
- func (m *Publishing[T]) SetUserID(id string) *Publishing[T]
- type PublishingRequest
- type QueueBind
- type QueueDeclare
- type Table
- type Unmarshaler
Constants ¶
const ( // Transient means higher throughput but messages will not be restored on broker restart. // Transient messages will not be restored to durable queues. Transient = amqp091.Transient // Persistent messages will be restored to // durable queues and lost on non-durable queues during server restart. Persistent = amqp091.Persistent )
The delivery mode of messages is unrelated to the durability of the queues they reside on.
const ( ExchangeDefault = "" ExchangeDirect = "amq.direct" ExchangeFanout = "amq.fanout" ExchangeHeaders = "amq.headers" ExchangeMatch = "amq.match" ExchangeTopic = "amq.topic" )
Default exchanges.
Variables ¶
var NoOpLogger = LogFunc(func(format string, v ...any) {})
NoOpLogger logger does nothing
Functions ¶
Types ¶
type Acknowledger ¶
type Action ¶
type Action int8
Action represents acknowledgment status the delivered message.
const ( // Ack is an acknowledgement that the client or server has finished work on a delivery. // It removes a message from the queue permanently. Ack Action = iota // Nack is a negatively acknowledging the delivery of message and need requeue. // // The server to deliver this message to a different consumer. // If it is not possible, the message will be dropped or delivered to a server configured dead-letter queue. // // This action must not be used to select or re-queue messages the client wishes // not to handle, rather it is to inform the server that the client is incapable // of handling this message at this time. Nack // Reject is explicit not acknowledged and do not requeue. Reject )
type Channel ¶
type Channel interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp091.Table) (amqp091.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp091.Table) error Confirm(noWait bool) error Qos(prefetchCount, prefetchSize int, global bool) error NotifyClose(chan *amqp091.Error) chan *amqp091.Error NotifyCancel(chan string) chan string Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) (*amqp091.DeferredConfirmation, error) NotifyReturn(c chan amqp091.Return) chan amqp091.Return Close() error }
A Channel is an interface implemented by amqp091 client.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
A Client represents connection to rabbitmq.
func (*Client) IsConnOpen ¶ added in v0.3.3
IsConnOpen returns true if the connection is open.
func (*Client) NewConsumer ¶
func (c *Client) NewConsumer(queue string, fn HandlerValue, opts ...ConsumerOption) error
NewConsumer creates a consumer.
type ClientOption ¶
type ClientOption func(*clientOptions)
ClientOption is used to configure a client.
func SetAuth ¶
func SetAuth(username, password string) ClientOption
SetAuth sets auth username and password.
func SetConnectionName ¶
func SetConnectionName(name string) ClientOption
SetConnectionName sets client connection name.
func UseInterceptor ¶ added in v0.3.0
func UseInterceptor(i ...Interceptor) ClientOption
UseInterceptor sets interceptors.
func UseMarshaler ¶
func UseMarshaler(m Marshaler) ClientOption
UseMarshaler sets marshaler of the publisher message.
func UseUnmarshaler ¶
func UseUnmarshaler(u ...Unmarshaler) ClientOption
UseUnmarshaler sets unmarshaler of the consumer message.
func WithLog ¶ added in v0.3.0
func WithLog(log LogFunc) ClientOption
WithLog sets log. The default is stdout.
type Connection ¶
type Connection interface { IsClosed() bool Channel() (Channel, error) NotifyClose(chan *amqp091.Error) chan *amqp091.Error Close() error }
A Connection is an interface implemented by amqp091 client.
type ConsumeFunc ¶ added in v0.2.0
type ConsumeFunc func(ctx context.Context, req *DeliveryRequest) Action
type ConsumeInterceptor ¶ added in v0.3.0
type ConsumeInterceptor func(ConsumeFunc) ConsumeFunc
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption is used to configure a consumer.
func DeclareExchange ¶
func DeclareExchange(e ExchangeDeclare) ConsumerOption
DeclareExchange sets exchange declare.
func DeclareQueue ¶
func DeclareQueue(q QueueDeclare) ConsumerOption
DeclareQueue sets queue declare.
func SetAutoAckMode ¶
func SetAutoAckMode() ConsumerOption
SetAutoAckMode sets auto ack mode. The default is false.
func SetConcurrency ¶
func SetConcurrency(i int) ConsumerOption
SetConcurrency sets limit the number of goroutines for every delivered message. The default is runtime.GOMAXPROCS(0). The consumer ignores this option when prefetch count greater than zero with AutoAck=false.
func SetConsumeInterceptor ¶ added in v0.3.0
func SetConsumeInterceptor(i ...ConsumeInterceptor) ConsumerOption
SetConsumeInterceptor sets consume interceptor.
func SetExclusive ¶
func SetExclusive(b bool) ConsumerOption
SetExclusive sets exclusive. The default is false.
When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
func SetPrefetchCount ¶
func SetPrefetchCount(i int) ConsumerOption
SetPrefetchCount sets prefetch count. prefetchCount controls how many messages the server will try to keep on the network for consumers before receiving delivery acks. The intent of prefetchCount is to make sure the network buffers stay full between the server and client.
With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with AutoAck=false because no acknowledgments are expected or sent.
func SetUnmarshaler ¶
func SetUnmarshaler(m Unmarshaler) ConsumerOption
SetUnmarshaler sets unmarshaler.
type Delivery ¶
type Delivery[T any] struct { Msg *T Req *DeliveryRequest }
A Delivery represent the fields for a delivered message.
type DeliveryRequest ¶ added in v0.2.0
type DeliveryRequest struct {
// contains filtered or unexported fields
}
func (*DeliveryRequest) AppID ¶ added in v0.2.0
func (d *DeliveryRequest) AppID() string
An AppID returns the creating application id.
func (*DeliveryRequest) Body ¶ added in v0.2.0
func (d *DeliveryRequest) Body() []byte
A Body returns the body of the message.
func (*DeliveryRequest) ConsumerTag ¶ added in v0.2.0
func (d *DeliveryRequest) ConsumerTag() string
A ConsumerTag returns the consumer tag.
func (*DeliveryRequest) ContentEncoding ¶ added in v0.2.0
func (d *DeliveryRequest) ContentEncoding() string
A ContentEncoding returns the content encoding of the message.
func (*DeliveryRequest) ContentType ¶ added in v0.2.0
func (d *DeliveryRequest) ContentType() string
A ContentType returns the content type of the message.
func (*DeliveryRequest) CorrelationID ¶ added in v0.2.0
func (d *DeliveryRequest) CorrelationID() string
A CorrelationID returns the correlation identifier of the message.
func (*DeliveryRequest) DeliveryMode ¶ added in v0.2.0
func (d *DeliveryRequest) DeliveryMode() uint8
A DeliveryMode returns the delivery mode of the message.
func (*DeliveryRequest) DeliveryTag ¶ added in v0.2.0
func (d *DeliveryRequest) DeliveryTag() uint64
A DeliveryTag returns the server-assigned delivery tag.
func (*DeliveryRequest) Exchange ¶ added in v0.2.0
func (d *DeliveryRequest) Exchange() string
An Exchange returns the exchange name.
func (*DeliveryRequest) Expiration ¶ added in v0.2.0
func (d *DeliveryRequest) Expiration() string
An Expiration returns the expiration of the message.
func (*DeliveryRequest) Headers ¶ added in v0.2.0
func (d *DeliveryRequest) Headers() Table
A Headers returns the headers of the message.
func (*DeliveryRequest) Log ¶ added in v0.2.0
func (d *DeliveryRequest) Log(format string, v ...any)
func (*DeliveryRequest) MessageID ¶ added in v0.2.0
func (d *DeliveryRequest) MessageID() string
A MessageID returns the application message identifier.
func (*DeliveryRequest) NewFrom ¶ added in v0.3.0
func (d *DeliveryRequest) NewFrom(req *amqp091.Delivery) *DeliveryRequest
NewFrom using from only tests.
func (*DeliveryRequest) Priority ¶ added in v0.2.0
func (d *DeliveryRequest) Priority() uint8
A Priority returns the priority of the message.
func (*DeliveryRequest) Redelivered ¶ added in v0.2.0
func (d *DeliveryRequest) Redelivered() bool
A Redelivered returns whether this is a redelivery of a message.
func (*DeliveryRequest) ReplyTo ¶ added in v0.2.0
func (d *DeliveryRequest) ReplyTo() string
A ReplyTo returns the address to reply to (ex: RPC).
func (*DeliveryRequest) RoutingKey ¶ added in v0.2.0
func (d *DeliveryRequest) RoutingKey() string
A RoutingKey returns the routing key.
func (*DeliveryRequest) SetBody ¶ added in v0.3.0
func (d *DeliveryRequest) SetBody(b []byte)
SetBody sets the body of the message.
func (*DeliveryRequest) Status ¶ added in v0.2.0
func (d *DeliveryRequest) Status() Action
Status returns acknowledgement status.
func (*DeliveryRequest) Timestamp ¶ added in v0.2.0
func (d *DeliveryRequest) Timestamp() time.Time
A Timestamp returns the message timestamp.
func (*DeliveryRequest) Type ¶ added in v0.2.0
func (d *DeliveryRequest) Type() string
A Type returns the message type name.
func (*DeliveryRequest) UserID ¶ added in v0.2.0
func (d *DeliveryRequest) UserID() string
A UserID returns the creating user id.
type ExchangeDeclare ¶
type ExchangeDeclare struct { Name string Type string Durable bool AutoDelete bool Internal bool NoWait bool Args Table }
A ExchangeDeclare represents an exchange declaration.
type HandlerValue ¶ added in v0.2.0
type HandlerValue interface {
// contains filtered or unexported methods
}
type Interceptor ¶ added in v0.3.0
type Interceptor interface { WrapConsume(ConsumeFunc) ConsumeFunc WrapPublish(PublishFunc) PublishFunc }
type PublishFunc ¶ added in v0.3.0
type PublishFunc func(context.Context, *PublishingRequest) error
PublishFunc is func used for publish a message.
type PublishInterceptor ¶ added in v0.3.0
type PublishInterceptor func(PublishFunc) PublishFunc
type PublishOption ¶
type PublishOption func(*publishOptions)
PublishOption is used to configure the publishing message.
func SetContext ¶ added in v0.2.0
func SetContext(ctx context.Context) PublishOption
SetContext sets publish context.
func SetImmediate ¶
func SetImmediate(b bool) PublishOption
SetImmediate sets immediate. The default is false.
Message can be undeliverable when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.
func SetMandatory ¶
func SetMandatory(b bool) PublishOption
SetMandatory sets mandatory. The default is false.
Message can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key.
type Publisher ¶
type Publisher[T any] struct { // contains filtered or unexported fields }
A Publisher represents a client for sending the messages.
func NewPublisher ¶
func NewPublisher[T any](client *Client, exchange string, opts ...PublisherOption) *Publisher[T]
NewPublisher creates a publisher.
func (*Publisher[T]) Publish ¶
func (p *Publisher[T]) Publish(m *Publishing[T], opts ...PublishOption) error
Publish publishes the message.
type PublisherOption ¶
type PublisherOption func(*publisherOptions)
PublisherOption is used to configure a publisher.
func SetConfirmMode ¶
func SetConfirmMode() PublisherOption
SetConfirmMode sets confirm mode.
confirm sets channel into confirm mode so that the client can ensure all publishers have successfully been received by the server.
func SetPublishInterceptor ¶ added in v0.3.0
func SetPublishInterceptor(i ...PublishInterceptor) PublisherOption
SetPublishInterceptor sets publish interceptor.
func UseImmediate ¶
func UseImmediate(b bool) PublisherOption
UseImmediate sets immediate. The default is false.
Message can be undeliverable when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.
func UseMandatory ¶
func UseMandatory(b bool) PublisherOption
UseMandatory sets mandatory. The default is false.
Message can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key.
type Publishing ¶
type Publishing[T any] struct { // contains filtered or unexported fields }
A Publishing represents a message sending to the server.
func NewPublishing ¶
func NewPublishing[T any](v *T) *Publishing[T]
NewPublishing creates new publishing.
func (*Publishing[T]) PersistentMode ¶
func (m *Publishing[T]) PersistentMode() *Publishing[T]
PersistentMode sets delivery mode as persistent.
func (*Publishing[T]) SetAppID ¶
func (m *Publishing[T]) SetAppID(id string) *Publishing[T]
SetAppID sets application id.
func (*Publishing[T]) SetCorrelationID ¶
func (m *Publishing[T]) SetCorrelationID(id string) *Publishing[T]
SetCorrelationID sets correlation id.
func (*Publishing[T]) SetExpiration ¶
func (m *Publishing[T]) SetExpiration(expiration string) *Publishing[T]
SetExpiration sets expiration.
func (*Publishing[T]) SetMessageID ¶
func (m *Publishing[T]) SetMessageID(id string) *Publishing[T]
SetMessageID sets message id.
func (*Publishing[T]) SetPriority ¶
func (m *Publishing[T]) SetPriority(priority uint8) *Publishing[T]
SetPriority sets priority.
func (*Publishing[T]) SetReplyTo ¶
func (m *Publishing[T]) SetReplyTo(replyTo string) *Publishing[T]
SetReplyTo sets reply to.
func (*Publishing[T]) SetTimestamp ¶
func (m *Publishing[T]) SetTimestamp(timestamp time.Time) *Publishing[T]
SetTimestamp sets timestamp.
func (*Publishing[T]) SetType ¶
func (m *Publishing[T]) SetType(typ string) *Publishing[T]
SetType sets message type.
func (*Publishing[T]) SetUserID ¶
func (m *Publishing[T]) SetUserID(id string) *Publishing[T]
SetUserID sets user id.
type PublishingRequest ¶ added in v0.3.0
type PublishingRequest struct { amqp091.Publishing // contains filtered or unexported fields }
A PublishingRequest represents a request to publish a message.
func (*PublishingRequest) Exchange ¶ added in v0.3.0
func (p *PublishingRequest) Exchange() string
Exchange returns exchange name.
func (*PublishingRequest) NewFrom ¶ added in v0.3.0
func (p *PublishingRequest) NewFrom(exchange, routingKey string) *PublishingRequest
NewFrom using from only tests.
func (*PublishingRequest) RoutingKey ¶ added in v0.3.0
func (p *PublishingRequest) RoutingKey() string
RoutingKey returns routing key.
type QueueDeclare ¶
A QueueDeclare represents a queue declaration.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package amqpxgzip provides supporting gzip.
|
Package amqpxgzip provides supporting gzip. |
amqpxotel
module
|
|
amqpxproto
module
|
|
amqpxprotojson
module
|
|
amqpxzap
module
|