rabbitgo

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2019 License: MIT Imports: 9 Imported by: 1

README

rabbitgo

Wrapper for AMQP written in Go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindingConfig

type BindingConfig struct {
	RoutingKey string
	NoWait     bool
	Args       amqp.Table
}

type Config

type Config struct {
	Host     string
	Port     int
	Username string
	Password string
	Vhost    string
}

type Connection

type Connection struct {
	IsConnected bool
	IsBlocked   bool
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(c *Config) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Dial

func (c *Connection) Dial() error

func (*Connection) ExchangeDeclare added in v0.1.2

func (c *Connection) ExchangeDeclare(exchange *Exchange) error

ExchangeDeclare declares an exchange on the server.

func (*Connection) NewConsumer

func (c *Connection) NewConsumer(e *Exchange, q *Queue, bc *BindingConfig, cc *ConsumerConfig) (*Consumer, error)

NewConsumer is a constructor for consumer creation Accepts Exchange, Queue, BindingOptions and ConsumerOptions

func (*Connection) NewProducer

func (c *Connection) NewProducer(pc *ProducerConfig) (*Producer, error)

func (*Connection) Publish

func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg *amqp.Publishing) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Consume

func (c *Consumer) Consume(handler func(delivery *Delivery)) error

Consume accepts a handler function for every message streamed from RabbitMq will be called within this handler func

func (*Consumer) ConsumeRPC

func (c *Consumer) ConsumeRPC(handler func(delivery *Delivery)) error

ConsumeRPC accepts a handler function for every message streamed from RabbitMq will be called within this handler func. It returns the message to send and the content type.

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan amqp.Delivery

func (*Consumer) Get

func (c *Consumer) Get(handler func(delivery *Delivery)) error

ConsumeMessage accepts a handler function and only consumes one message stream from RabbitMq

func (*Consumer) QOS

func (c *Consumer) QOS(messageCount int) error

QOS controls how many messages the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.

func (*Consumer) Shutdown

func (c *Consumer) Shutdown() error

type ConsumerConfig

type ConsumerConfig struct {
	Tag           string
	PrefetchCount int
	PrefetchSize  int
	MaxDeliveries int
	Timeout       time.Duration
	AutoAck       bool
	Exclusive     bool
	NoLocal       bool
	NoWait        bool
	Args          amqp.Table
}

type Delivery

type Delivery struct {
	amqp.Delivery

	Delegated bool
	AckError  error
	Response  *amqp.Publishing
	// contains filtered or unexported fields
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Consumer.Consume or Consumer.Get.

func (*Delivery) AckOrSkip

func (d *Delivery) AckOrSkip(multiple bool) error

AckOrSkip calls Delivery.Ack if auto ack is not activated.

func (*Delivery) Data

func (d *Delivery) Data(data []byte, contentType string)

Data writes some data into the response body

func (*Delivery) Delegate

func (d *Delivery) Delegate(ack string, options ...bool) *amqp.Publishing

Delegate delegates an acknowledgement through the amqp.Acknowledger interface. It must be called during a handler execution.

Either ack, reject or nack can be used as the acknowledger.

The order of the options must be exactly the same as it is required in the respective amqp.Delivery function.

func (*Delivery) IsAutoAck

func (d *Delivery) IsAutoAck() bool

func (*Delivery) PreAck

func (d *Delivery) PreAck(multiple bool)

func (*Delivery) Proto added in v0.1.2

func (d *Delivery) Proto(message interface{})

Proto takes the protocol buffer and encodes it into bytes, then writes it into the response body through Delivery.Data

func (*Delivery) RejectOrSkip

func (d *Delivery) RejectOrSkip(requeue bool) error

RejectOrSkip calls Delivery.Reject if auto ack is not activated.

type ErrorTimeout added in v0.1.2

type ErrorTimeout struct {
	Timeout time.Duration
	Queue   string
}

func (ErrorTimeout) Error added in v0.1.2

func (e ErrorTimeout) Error() string

type Exchange

type Exchange struct {
	// Exchange name
	Name string
	// Exchange type
	Type string
	// Durable exchanges will survive server restarts
	Durable bool
	// Will remain declared when there are no remaining bindings.
	AutoDelete bool
	// Exchanges declared as `internal` do not accept accept publishings.Internal
	// exchanges are useful for when you wish to implement inter-exchange topologies
	// that should not be exposed to users of the broker.
	Internal bool
	// When noWait is true, declare without waiting for a confirmation from the server.
	NoWait bool
	// amqp.Table of arguments that are specific to the server's implementation of
	// the exchange can be sent for exchange types that require extra parameters.
	Args amqp.Table
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) NotifyPublish

func (p *Producer) NotifyPublish(confirmer func(message amqp.Confirmation))

func (*Producer) NotifyReturn

func (p *Producer) NotifyReturn(notifier func(message amqp.Return))

NotifyReturn captures a message when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

func (*Producer) Publish

func (p *Producer) Publish(publishing *amqp.Publishing) error

func (*Producer) PublishRPC

func (p *Producer) PublishRPC(publishing *amqp.Publishing, handler func(delivery *Delivery)) error

PublishRPC accepts a handler function for every message streamed from RabbitMq as a reply after publishing a message.

func (*Producer) Shutdown

func (p *Producer) Shutdown() error

type ProducerConfig

type ProducerConfig struct {
	Exchange string
	// The key that when publishing a message to a exchange/queue will be only delivered to
	// given routing key listeners
	RoutingKey string
	// Publishing tagpackage
	Tag string
	// Maximum waiting time in miliseconds
	Timeout time.Duration
	// Maximum number of deliveries that will be received
	MaxDeliveries int
	// Queue should be on the server/broker
	Mandatory bool
	// Consumer should be bound to server
	Immediate bool
}

type Publishing

type Publishing struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	// TODO: convert to amqp.Table
	Headers map[string]interface{}
	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
	Priority        uint8     // 0 to 9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to to reply to (ex: RPC)
	Expiration      string    // message expiration spec
	MessageId       string    // message identifier
	Timestamp       time.Time // message timestamp
	Type            string    // message type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id
	// The application specific payload of the message
	Body []byte
}

TODO: Should we use this instead of amqp.Publishing?

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Directories

Path Synopsis
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL