Documentation ¶
Index ¶
- Variables
- func MessageName(m proto.Message) string
- func WithConsumerNaming(consumer string, prefixes ...string) func(*RegistryOptions)
- func WithMiddleware(mw ...MiddlewareFunc) func(*Options)
- func WithRegistry(r *Registry) func(*Options)
- type Channel
- type Consumer
- type Handler
- type HandlerFunc
- type Metadata
- type MiddlewareFunc
- type Options
- type Publisher
- type Registry
- type RegistryOptions
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func WithConsumerNaming ¶ added in v0.2.0
func WithConsumerNaming(consumer string, prefixes ...string) func(*RegistryOptions)
WithConsumerNaming applies a consumer prefix naming convention to queue names This ensures that each separate consumer uses a dedicated queue bound to the publisher exchange.
func WithMiddleware ¶
func WithMiddleware(mw ...MiddlewareFunc) func(*Options)
WithMiddleware configures the publisher/consumer to use the specified middleware
func WithRegistry ¶
WithRegistry configures the publisher/consumer to use the specified registry
Types ¶
type Channel ¶
type Channel interface { ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) error QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error) QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) Close() error }
Channel represents an amqp channel interface
type Consumer ¶ added in v0.2.0
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a consumer
func NewConsumer ¶ added in v0.2.0
func NewConsumer(c *amqp.Connection, optFns ...func(*Options)) *Consumer
NewConsumer returns a new consumer
type Handler ¶
type Handler interface { Message() proto.Message Handle(context.Context, proto.Message, Metadata) error }
Handler represents a message handler
type HandlerFunc ¶
HandlerFunc represents a handler func
type Metadata ¶
type Metadata struct { ID string Type string ContentType string CorrelationID string Timestamp time.Time Headers map[string]interface{} }
Metadata represents message metadata
type MiddlewareFunc ¶
type MiddlewareFunc func(HandlerFunc) HandlerFunc
MiddlewareFunc represents a middleware func
func ChainMiddleware ¶
func ChainMiddleware(m ...MiddlewareFunc) MiddlewareFunc
ChainMiddleware returns a middleware func that wraps the specified funcs
type Options ¶
type Options struct { ChannelFn func(*amqp.Connection) (Channel, error) ExchangeNameFn func(proto.Message) (string, error) QueueNameFn func(proto.Message) (string, error) RoutingKeyFn func(proto.Message) (string, error) MiddlewareFn MiddlewareFunc ErrorFn func(Metadata, error) }
Options represents a set of options
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher represents a message publisher
func NewPublisher ¶
func NewPublisher(c *amqp.Connection, optFns ...func(*Options)) (*Publisher, error)
NewPublisher returns a new publisher
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry represents an exchange/queue registry
func NewRegistry ¶
func NewRegistry(conn *amqp.Connection, optFns ...func(*RegistryOptions)) (*Registry, error)
NewRegistry returns a new registry
type RegistryOptions ¶
type RegistryOptions struct { ChannelFn func(*amqp.Connection) (Channel, error) ExchangeNameFn func(proto.Message) string QueueNameFn func(proto.Message) string DeadLetterExchangeNameFn func(proto.Message) string DeadLetterQueueNameFn func(proto.Message) string DeadLetterKeyFn func(proto.Message) string }
RegistryOptions represents a set of registry options