Documentation ¶
Index ¶
- type BindingConfig
- type Config
- type Connection
- func (c *Connection) Close() error
- func (c *Connection) Dial() error
- func (c *Connection) ExchangeDeclare(exchange *Exchange) error
- func (c *Connection) NewConsumer(e *Exchange, q *Queue, bc *BindingConfig, cc *ConsumerConfig) (*Consumer, error)
- func (c *Connection) NewProducer(pc *ProducerConfig) (*Producer, error)
- func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg *amqp.Publishing) error
- type Consumer
- func (c *Consumer) Consume(handler func(delivery *Delivery)) error
- func (c *Consumer) ConsumeRPC(handler func(delivery *Delivery)) error
- func (c *Consumer) Deliveries() <-chan amqp.Delivery
- func (c *Consumer) Get(handler func(delivery *Delivery)) error
- func (c *Consumer) QOS(messageCount int) error
- func (c *Consumer) Shutdown() error
- type ConsumerConfig
- type Delivery
- func (d *Delivery) AckOrSkip(multiple bool) error
- func (d *Delivery) Data(data []byte, contentType string)
- func (d *Delivery) Delegate(ack string, options ...bool) *amqp.Publishing
- func (d *Delivery) IsAutoAck() bool
- func (d *Delivery) PreAck(multiple bool)
- func (d *Delivery) Proto(message interface{})
- func (d *Delivery) RejectOrSkip(requeue bool) error
- type ErrorTimeout
- type Exchange
- type Producer
- func (p *Producer) NotifyPublish(confirmer func(message amqp.Confirmation))
- func (p *Producer) NotifyReturn(notifier func(message amqp.Return))
- func (p *Producer) Publish(publishing *amqp.Publishing) error
- func (p *Producer) PublishRPC(publishing *amqp.Publishing, handler func(delivery *Delivery)) error
- func (p *Producer) Shutdown() error
- type ProducerConfig
- type Publishing
- type Queue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct { IsConnected bool IsBlocked bool // contains filtered or unexported fields }
func NewConnection ¶
func NewConnection(c *Config) (*Connection, error)
func (*Connection) Close ¶
func (c *Connection) Close() error
func (*Connection) Dial ¶
func (c *Connection) Dial() error
func (*Connection) ExchangeDeclare ¶ added in v0.1.2
func (c *Connection) ExchangeDeclare(exchange *Exchange) error
ExchangeDeclare declares an exchange on the server.
func (*Connection) NewConsumer ¶
func (c *Connection) NewConsumer(e *Exchange, q *Queue, bc *BindingConfig, cc *ConsumerConfig) (*Consumer, error)
NewConsumer is a constructor for consumer creation Accepts Exchange, Queue, BindingOptions and ConsumerOptions
func (*Connection) NewProducer ¶
func (c *Connection) NewProducer(pc *ProducerConfig) (*Producer, error)
func (*Connection) Publish ¶
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg *amqp.Publishing) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) Consume ¶
Consume accepts a handler function for every message streamed from RabbitMq will be called within this handler func
func (*Consumer) ConsumeRPC ¶
ConsumeRPC accepts a handler function for every message streamed from RabbitMq will be called within this handler func. It returns the message to send and the content type.
func (*Consumer) Deliveries ¶
func (*Consumer) Get ¶
ConsumeMessage accepts a handler function and only consumes one message stream from RabbitMq
type ConsumerConfig ¶
type Delivery ¶
type Delivery struct { amqp.Delivery Delegated bool AckError error Response *amqp.Publishing // contains filtered or unexported fields }
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Consumer.Consume or Consumer.Get.
func (*Delivery) Delegate ¶
func (d *Delivery) Delegate(ack string, options ...bool) *amqp.Publishing
Delegate delegates an acknowledgement through the amqp.Acknowledger interface. It must be called during a handler execution.
Either ack, reject or nack can be used as the acknowledger.
The order of the options must be exactly the same as it is required in the respective amqp.Delivery function.
func (*Delivery) Proto ¶ added in v0.1.2
func (d *Delivery) Proto(message interface{})
Proto takes the protocol buffer and encodes it into bytes, then writes it into the response body through Delivery.Data
func (*Delivery) RejectOrSkip ¶
RejectOrSkip calls Delivery.Reject if auto ack is not activated.
type ErrorTimeout ¶ added in v0.1.2
func (ErrorTimeout) Error ¶ added in v0.1.2
func (e ErrorTimeout) Error() string
type Exchange ¶
type Exchange struct { // Exchange name Name string // Exchange type Type string // Durable exchanges will survive server restarts Durable bool // Will remain declared when there are no remaining bindings. AutoDelete bool // Exchanges declared as `internal` do not accept accept publishings.Internal // exchanges are useful for when you wish to implement inter-exchange topologies // that should not be exposed to users of the broker. Internal bool // When noWait is true, declare without waiting for a confirmation from the server. NoWait bool // amqp.Table of arguments that are specific to the server's implementation of // the exchange can be sent for exchange types that require extra parameters. Args amqp.Table }
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) NotifyPublish ¶
func (p *Producer) NotifyPublish(confirmer func(message amqp.Confirmation))
func (*Producer) NotifyReturn ¶
NotifyReturn captures a message when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.
func (*Producer) PublishRPC ¶
func (p *Producer) PublishRPC(publishing *amqp.Publishing, handler func(delivery *Delivery)) error
PublishRPC accepts a handler function for every message streamed from RabbitMq as a reply after publishing a message.
type ProducerConfig ¶
type ProducerConfig struct { Exchange string // The key that when publishing a message to a exchange/queue will be only delivered to // given routing key listeners RoutingKey string // Publishing tagpackage Tag string // Maximum waiting time in miliseconds Timeout time.Duration // Maximum number of deliveries that will be received MaxDeliveries int // Queue should be on the server/broker Mandatory bool // Consumer should be bound to server Immediate bool }
type Publishing ¶
type Publishing struct { // Application or exchange specific fields, // the headers exchange will inspect this field. // TODO: convert to amqp.Table Headers map[string]interface{} // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) Priority uint8 // 0 to 9 CorrelationId string // correlation identifier ReplyTo string // address to to reply to (ex: RPC) Expiration string // message expiration spec MessageId string // message identifier Timestamp time.Time // message timestamp Type string // message type name UserId string // creating user id - ex: "guest" AppId string // creating application id // The application specific payload of the message Body []byte }
TODO: Should we use this instead of amqp.Publishing?