amqpx

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: MIT Imports: 14 Imported by: 4

README

RabbitMQ Go Client

build-img pkg-img coverage-img

This is a Go AMQP 0.9.1 client wraps amqp091-go with support generics

  • Support of the encoding messages
    • defaults encoding (json, protobuf, protojson)
    • support of custom marshal/unmarshal functions
  • Middleware for easy integration

Installation

Go version 1.19+

go get github.com/itcomusic/amqpx

Usage

package main

import (
    "context" 
	"fmt"
	
	"github.com/itcomusic/amqpx"
)

func main() {
    conn, _ := amqpx.Connect()
    defer conn.Close()

    // simple publisher
    pub := amqpx.NewPublisher[[]byte](conn, amqpx.Direct, amqpx.UseRoutingKey("routing_key"))
    _ = pub.Publish(amqpx.NewPublishing([]byte("hello")).PersistentMode(), amqpx.SetRoutingKey("override_routing_key"))
	
    // simple consumer 
    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }))
}
Publisher & consumer struct

Pretty using struct and avoiding boilerplate marhsal/unmarshal. It is strict compared content-type of the message and invalid body is rejected.

    conn, _ := amqpx.Connect(
        amqpx.UseUnmarshaler(amqpxproto.NewUnmarshaler()), // global unmarshalers
        amqpx.UseMarshaler(amqpxproto.NewMarshaler())), // global marshaler
    defer conn.Close()

    type Gopher struct {
        Name string
    }
	
    // override default marshaler
    pub := amqpx.NewPublisher[Gopher](conn, amqpx.Direct, amqpx.SetMarshaler(amqpxjson.Marshaler)) 
    _ = pub.Publish(amqpx.NewPublishing(Gopher{Name: "Rob"}), amqpx.SetRoutingKey("routing_key"))
	
    // override default unmarshaler
    _ = conn.NewConsumer("bar", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[Gopher]) amqpx.Action {
        fmt.Printf("user-id: %s, received message: %s\n", req.Req.UserID, req.Msg.Name)
        return amqpx.Ack
    }), amqpx.SetUnmarshaler(amqpxjson.Unmarshaler), amqpx.SetAutoAckMode())
Consumer rate limiting

The Prefetch count informs the server will deliver that many messages to consumers before acknowledgments are received. The Concurrency option limits numbers of goroutines of consumer, depends on prefetch count and auto-ack mode.

    // prefetch count
    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }), amqpx.SetPrefetchCount(8))

    // limit goroutines
	_ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Body))
        return amqpx.Ack
    }), amqpx.SetAutoAckMode(), amqpx.SetConcurrency(32))
Declare queue

The declare queue, exchange and binding queue.

    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }), amqpx.DeclareQueue(amqpx.QueueDeclare{AutoDelete: true}),
        amqpx.DeclareExchange(amqpx.ExchangeDeclare{Name: "exchange_name", Type: amqpx.Direct}),
        amqpx.BindQueue(amqpx.QueueBind{Exchange: "exchange_name", RoutingKey: []string{"routing_key"}}))
Middleware

Predefined support opentelemetry using interceptor.

    import (
        "github.com/itcomusic/amqpx"
        "github.com/itcomusic/amqpx/amqpxotel"
    )

    // global
    conn, _ := amqpx.Connect(amqpx.UserInterceptor(amqpxotel.NewInterceptor())
    defer conn.Close()

    // can use special interceptor for publisher
    _ = amqpx.NewPublisher[[]byte](conn, amqpx.Direct, amqpx.SetPublishInterceptor(func(next amqpx.PublisherFunc) amqpx.PublisherFunc {
        return func(ctx context.Context, m *amqpx.PublishingRequest) error {
            fmt.Printf("message: %s\n", m.Body)
            return next(m)
        }
    }))

License

MIT License

Documentation

Overview

Package amqpx provides working with RabbitMQ using AMQP 0.9.1.

Index

Constants

View Source
const (
	// Transient means higher throughput but messages will not be restored on broker restart.
	// Transient messages will not be restored to durable queues.
	Transient = amqp091.Transient

	// Persistent messages will be restored to
	// durable queues and lost on non-durable queues during server restart.
	Persistent = amqp091.Persistent
)

The delivery mode of messages is unrelated to the durability of the queues they reside on.

View Source
const (
	ExchangeDefault = ""
	ExchangeDirect  = "amq.direct"
	ExchangeFanout  = "amq.fanout"
	ExchangeHeaders = "amq.headers"
	ExchangeMatch   = "amq.match"
	ExchangeTopic   = "amq.topic"
)

Default exchanges.

Variables

View Source
var NoOpLogger = LogFunc(func(format string, v ...any) {})

NoOpLogger logger does nothing

Functions

func D

func D[T any](fn func(ctx context.Context, d *Delivery[T]) Action) *handleValue[T]

D represents handler of consume amqpx.Delivery[T].

Types

type Acknowledger

type Acknowledger interface {
	Ack(tag uint64, multiple bool) error
	Nack(tag uint64, multiple bool, requeue bool) error
	Reject(tag uint64, requeue bool) error
}

type Action

type Action int8

Action represents acknowledgment status the delivered message.

const (
	// Ack is an acknowledgement that the client or server has finished work on a delivery.
	// It removes a message from the queue permanently.
	Ack Action = iota

	// Nack is a negatively acknowledging the delivery of message and need requeue.
	//
	// The server to deliver this message to a different consumer.
	// If it is not possible, the message will be dropped or delivered to a server configured dead-letter queue.
	//
	// This action must not be used to select or re-queue messages the client wishes
	// not to handle, rather it is to inform the server that the client is incapable
	// of handling this message at this time.
	Nack

	// Reject is explicit not acknowledged and do not requeue.
	Reject
)

func (Action) String

func (i Action) String() string

type Channel

type Channel interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp091.Table) (amqp091.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp091.Table) error
	Confirm(noWait bool) error
	Qos(prefetchCount, prefetchSize int, global bool) error
	NotifyClose(chan *amqp091.Error) chan *amqp091.Error
	NotifyCancel(chan string) chan string
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error)
	PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) (*amqp091.DeferredConfirmation, error)
	NotifyReturn(c chan amqp091.Return) chan amqp091.Return
	Close() error
}

A Channel is an interface implemented by amqp091 client.

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client represents connection to rabbitmq.

func Connect

func Connect(opts ...ClientOption) (*Client, error)

Connect creates a connection.

func (*Client) Close

func (c *Client) Close()

Close closes Connection. Waits all consumers.

func (*Client) IsConnOpen added in v0.3.3

func (c *Client) IsConnOpen() bool

IsConnOpen returns true if the connection is open.

func (*Client) NewConsumer

func (c *Client) NewConsumer(queue string, fn HandlerValue, opts ...ConsumerOption) error

NewConsumer creates a consumer.

type ClientOption

type ClientOption func(*clientOptions)

ClientOption is used to configure a client.

func ApplyURI

func ApplyURI(s string) ClientOption

ApplyURI sets amqp URI.

func IsTLS

func IsTLS(v bool) ClientOption

IsTLS sets TLS.

func SetAuth

func SetAuth(username, password string) ClientOption

SetAuth sets auth username and password.

func SetConfig

func SetConfig(c Config) ClientOption

SetConfig sets amqp config.

func SetConnectionName

func SetConnectionName(name string) ClientOption

SetConnectionName sets client connection name.

func SetHost

func SetHost(h string) ClientOption

SetHost sets host and port.

func SetTLSConfig

func SetTLSConfig(t *tls.Config) ClientOption

SetTLSConfig sets tls config.

func SetVHost added in v0.3.2

func SetVHost(vhost string) ClientOption

SetVHost sets vhost.

func UseInterceptor added in v0.3.0

func UseInterceptor(i ...Interceptor) ClientOption

UseInterceptor sets interceptors.

func UseMarshaler

func UseMarshaler(m Marshaler) ClientOption

UseMarshaler sets marshaler of the publisher message.

func UseUnmarshaler

func UseUnmarshaler(u ...Unmarshaler) ClientOption

UseUnmarshaler sets unmarshaler of the consumer message.

func WithLog added in v0.3.0

func WithLog(log LogFunc) ClientOption

WithLog sets log. The default is stdout.

type Config

type Config = amqp091.Config

type Connection

type Connection interface {
	IsClosed() bool
	Channel() (Channel, error)
	NotifyClose(chan *amqp091.Error) chan *amqp091.Error
	Close() error
}

A Connection is an interface implemented by amqp091 client.

type ConsumeFunc added in v0.2.0

type ConsumeFunc func(ctx context.Context, req *DeliveryRequest) Action

type ConsumeInterceptor added in v0.3.0

type ConsumeInterceptor func(ConsumeFunc) ConsumeFunc

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption is used to configure a consumer.

func BindQueue

func BindQueue(q QueueBind) ConsumerOption

BindQueue sets queue bind.

func ConsumerTag

func ConsumerTag(tag string) ConsumerOption

ConsumerTag sets tag.

func DeclareExchange

func DeclareExchange(e ExchangeDeclare) ConsumerOption

DeclareExchange sets exchange declare.

func DeclareQueue

func DeclareQueue(q QueueDeclare) ConsumerOption

DeclareQueue sets queue declare.

func SetAutoAckMode

func SetAutoAckMode() ConsumerOption

SetAutoAckMode sets auto ack mode. The default is false.

func SetConcurrency

func SetConcurrency(i int) ConsumerOption

SetConcurrency sets limit the number of goroutines for every delivered message. The default is runtime.GOMAXPROCS(0). The consumer ignores this option when prefetch count greater than zero with AutoAck=false.

func SetConsumeInterceptor added in v0.3.0

func SetConsumeInterceptor(i ...ConsumeInterceptor) ConsumerOption

SetConsumeInterceptor sets consume interceptor.

func SetExclusive

func SetExclusive(b bool) ConsumerOption

SetExclusive sets exclusive. The default is false.

When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.

func SetPrefetchCount

func SetPrefetchCount(i int) ConsumerOption

SetPrefetchCount sets prefetch count. prefetchCount controls how many messages the server will try to keep on the network for consumers before receiving delivery acks. The intent of prefetchCount is to make sure the network buffers stay full between the server and client.

With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with AutoAck=false because no acknowledgments are expected or sent.

func SetUnmarshaler

func SetUnmarshaler(m Unmarshaler) ConsumerOption

SetUnmarshaler sets unmarshaler.

type Delivery

type Delivery[T any] struct {
	Msg *T
	Req *DeliveryRequest
}

A Delivery represent the fields for a delivered message.

type DeliveryRequest added in v0.2.0

type DeliveryRequest struct {
	// contains filtered or unexported fields
}

func (*DeliveryRequest) AppID added in v0.2.0

func (d *DeliveryRequest) AppID() string

An AppID returns the creating application id.

func (*DeliveryRequest) Body added in v0.2.0

func (d *DeliveryRequest) Body() []byte

A Body returns the body of the message.

func (*DeliveryRequest) ConsumerTag added in v0.2.0

func (d *DeliveryRequest) ConsumerTag() string

A ConsumerTag returns the consumer tag.

func (*DeliveryRequest) ContentEncoding added in v0.2.0

func (d *DeliveryRequest) ContentEncoding() string

A ContentEncoding returns the content encoding of the message.

func (*DeliveryRequest) ContentType added in v0.2.0

func (d *DeliveryRequest) ContentType() string

A ContentType returns the content type of the message.

func (*DeliveryRequest) CorrelationID added in v0.2.0

func (d *DeliveryRequest) CorrelationID() string

A CorrelationID returns the correlation identifier of the message.

func (*DeliveryRequest) DeliveryMode added in v0.2.0

func (d *DeliveryRequest) DeliveryMode() uint8

A DeliveryMode returns the delivery mode of the message.

func (*DeliveryRequest) DeliveryTag added in v0.2.0

func (d *DeliveryRequest) DeliveryTag() uint64

A DeliveryTag returns the server-assigned delivery tag.

func (*DeliveryRequest) Exchange added in v0.2.0

func (d *DeliveryRequest) Exchange() string

An Exchange returns the exchange name.

func (*DeliveryRequest) Expiration added in v0.2.0

func (d *DeliveryRequest) Expiration() string

An Expiration returns the expiration of the message.

func (*DeliveryRequest) Headers added in v0.2.0

func (d *DeliveryRequest) Headers() Table

A Headers returns the headers of the message.

func (*DeliveryRequest) Log added in v0.2.0

func (d *DeliveryRequest) Log(format string, v ...any)

func (*DeliveryRequest) MessageID added in v0.2.0

func (d *DeliveryRequest) MessageID() string

A MessageID returns the application message identifier.

func (*DeliveryRequest) NewFrom added in v0.3.0

func (d *DeliveryRequest) NewFrom(req *amqp091.Delivery) *DeliveryRequest

NewFrom using from only tests.

func (*DeliveryRequest) Priority added in v0.2.0

func (d *DeliveryRequest) Priority() uint8

A Priority returns the priority of the message.

func (*DeliveryRequest) Redelivered added in v0.2.0

func (d *DeliveryRequest) Redelivered() bool

A Redelivered returns whether this is a redelivery of a message.

func (*DeliveryRequest) ReplyTo added in v0.2.0

func (d *DeliveryRequest) ReplyTo() string

A ReplyTo returns the address to reply to (ex: RPC).

func (*DeliveryRequest) RoutingKey added in v0.2.0

func (d *DeliveryRequest) RoutingKey() string

A RoutingKey returns the routing key.

func (*DeliveryRequest) SetBody added in v0.3.0

func (d *DeliveryRequest) SetBody(b []byte)

SetBody sets the body of the message.

func (*DeliveryRequest) Status added in v0.2.0

func (d *DeliveryRequest) Status() Action

Status returns acknowledgement status.

func (*DeliveryRequest) Timestamp added in v0.2.0

func (d *DeliveryRequest) Timestamp() time.Time

A Timestamp returns the message timestamp.

func (*DeliveryRequest) Type added in v0.2.0

func (d *DeliveryRequest) Type() string

A Type returns the message type name.

func (*DeliveryRequest) UserID added in v0.2.0

func (d *DeliveryRequest) UserID() string

A UserID returns the creating user id.

type ExchangeDeclare

type ExchangeDeclare struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       Table
}

A ExchangeDeclare represents an exchange declaration.

type HandlerValue added in v0.2.0

type HandlerValue interface {
	// contains filtered or unexported methods
}

type Interceptor added in v0.3.0

type Interceptor interface {
	WrapConsume(ConsumeFunc) ConsumeFunc
	WrapPublish(PublishFunc) PublishFunc
}

type LogFunc

type LogFunc func(format string, v ...any)

LogFunc type is an adapter to allow the use of ordinary functions as LogFunc.

type Marshaler

type Marshaler interface {
	ContentType() string
	Marshal(any) ([]byte, error)
}

Marshaler is an interface implemented by format.

type PublishFunc added in v0.3.0

type PublishFunc func(context.Context, *PublishingRequest) error

PublishFunc is func used for publish a message.

type PublishInterceptor added in v0.3.0

type PublishInterceptor func(PublishFunc) PublishFunc

type PublishOption

type PublishOption func(*publishOptions)

PublishOption is used to configure the publishing message.

func SetContext added in v0.2.0

func SetContext(ctx context.Context) PublishOption

SetContext sets publish context.

func SetImmediate

func SetImmediate(b bool) PublishOption

SetImmediate sets immediate. The default is false.

Message can be undeliverable when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.

func SetMandatory

func SetMandatory(b bool) PublishOption

SetMandatory sets mandatory. The default is false.

Message can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key.

func SetRoutingKey

func SetRoutingKey(s string) PublishOption

SetRoutingKey sets routing key.

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

A Publisher represents a client for sending the messages.

func NewPublisher

func NewPublisher[T any](client *Client, exchange string, opts ...PublisherOption) *Publisher[T]

NewPublisher creates a publisher.

func (*Publisher[T]) Close

func (p *Publisher[T]) Close()

Close closes publisher.

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(m *Publishing[T], opts ...PublishOption) error

Publish publishes the message.

type PublisherOption

type PublisherOption func(*publisherOptions)

PublisherOption is used to configure a publisher.

func SetConfirmMode

func SetConfirmMode() PublisherOption

SetConfirmMode sets confirm mode.

confirm sets channel into confirm mode so that the client can ensure all publishers have successfully been received by the server.

func SetMarshaler

func SetMarshaler(m Marshaler) PublisherOption

SetMarshaler sets marshaler.

func SetPublishInterceptor added in v0.3.0

func SetPublishInterceptor(i ...PublishInterceptor) PublisherOption

SetPublishInterceptor sets publish interceptor.

func UseImmediate

func UseImmediate(b bool) PublisherOption

UseImmediate sets immediate. The default is false.

Message can be undeliverable when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.

func UseMandatory

func UseMandatory(b bool) PublisherOption

UseMandatory sets mandatory. The default is false.

Message can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key.

func UseRoutingKey

func UseRoutingKey(s string) PublisherOption

UseRoutingKey sets routing key.

type Publishing

type Publishing[T any] struct {
	// contains filtered or unexported fields
}

A Publishing represents a message sending to the server.

func NewPublishing

func NewPublishing[T any](v *T) *Publishing[T]

NewPublishing creates new publishing.

func (*Publishing[T]) PersistentMode

func (m *Publishing[T]) PersistentMode() *Publishing[T]

PersistentMode sets delivery mode as persistent.

func (*Publishing[T]) SetAppID

func (m *Publishing[T]) SetAppID(id string) *Publishing[T]

SetAppID sets application id.

func (*Publishing[T]) SetCorrelationID

func (m *Publishing[T]) SetCorrelationID(id string) *Publishing[T]

SetCorrelationID sets correlation id.

func (*Publishing[T]) SetExpiration

func (m *Publishing[T]) SetExpiration(expiration string) *Publishing[T]

SetExpiration sets expiration.

func (*Publishing[T]) SetMessageID

func (m *Publishing[T]) SetMessageID(id string) *Publishing[T]

SetMessageID sets message id.

func (*Publishing[T]) SetPriority

func (m *Publishing[T]) SetPriority(priority uint8) *Publishing[T]

SetPriority sets priority.

func (*Publishing[T]) SetReplyTo

func (m *Publishing[T]) SetReplyTo(replyTo string) *Publishing[T]

SetReplyTo sets reply to.

func (*Publishing[T]) SetTimestamp

func (m *Publishing[T]) SetTimestamp(timestamp time.Time) *Publishing[T]

SetTimestamp sets timestamp.

func (*Publishing[T]) SetType

func (m *Publishing[T]) SetType(typ string) *Publishing[T]

SetType sets message type.

func (*Publishing[T]) SetUserID

func (m *Publishing[T]) SetUserID(id string) *Publishing[T]

SetUserID sets user id.

type PublishingRequest added in v0.3.0

type PublishingRequest struct {
	amqp091.Publishing
	// contains filtered or unexported fields
}

A PublishingRequest represents a request to publish a message.

func (*PublishingRequest) Exchange added in v0.3.0

func (p *PublishingRequest) Exchange() string

Exchange returns exchange name.

func (*PublishingRequest) NewFrom added in v0.3.0

func (p *PublishingRequest) NewFrom(exchange, routingKey string) *PublishingRequest

NewFrom using from only tests.

func (*PublishingRequest) RoutingKey added in v0.3.0

func (p *PublishingRequest) RoutingKey() string

RoutingKey returns routing key.

type QueueBind

type QueueBind struct {
	Exchange   string
	RoutingKey []string
	NoWait     bool
	Args       Table
}

A QueueBind represents a binding routing key, exchange and queue declaration.

type QueueDeclare

type QueueDeclare struct {
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       Table
}

A QueueDeclare represents a queue declaration.

type Table

type Table = amqp091.Table

type Unmarshaler

type Unmarshaler interface {
	ContentType() string
	Unmarshal([]byte, any) error
}

Unmarshaler is an interface implemented by format.

Directories

Path Synopsis
Package amqpxgzip provides supporting gzip.
Package amqpxgzip provides supporting gzip.
amqpxotel module
amqpxproto module
amqpxzap module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL