rabbus: github.com/rafaeljesus/rabbus Index | Files | Directories

package rabbus

import "github.com/rafaeljesus/rabbus"

Index

Package Files

bind_args.go consumer_message.go declare_args.go error.go options.go rabbus.go

Constants

const (
    // Transient means higher throughput but messages will not be restored on broker restart.
    Transient uint8 = 1
    // Persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
    Persistent uint8 = 2
    // ContentTypeJSON define json content type.
    ContentTypeJSON = "application/json"
    // ContentTypePlain define plain text content type.
    ContentTypePlain = "plain/text"
    // ExchangeDirect indicates the exchange is of direct type.
    ExchangeDirect = "direct"
    // ExchangeFanout indicates the exchange is of fanout type.
    ExchangeFanout = "fanout"
    // ExchangeTopic indicates the exchange is of topic type.
    ExchangeTopic = "topic"
)

Variables

var (
    // ErrMissingExchange is returned when exchange name is not passed as parameter.
    ErrMissingExchange = errors.New("Missing field exchange")
    // ErrMissingKind is returned when exchange type is not passed as parameter.
    ErrMissingKind = errors.New("Missing field kind")
    // ErrMissingQueue is returned when queue name is not passed as parameter.
    ErrMissingQueue = errors.New("Missing field queue")
    // ErrMissingHandler is returned when function handler is not passed as parameter.
    ErrMissingHandler = errors.New("Missing field handler")
    // ErrUnsupportedArguments is returned when more than the permitted arguments is passed to a function.
    ErrUnsupportedArguments = errors.New("Unsupported arguments size")
)

type AMQP Uses

type AMQP interface {
    // Publish wraps amqp.Publish method
    Publish(exchange, key string, opts amqp.Publishing) error
    // CreateConsumer creates a amqp consumer
    CreateConsumer(exchange, key, kind, queue string, durable bool, declareArgs, bindArgs amqp.Table) (<-chan amqp.Delivery, error)
    // WithExchange creates a amqp exchange
    WithExchange(exchange, kind string, durable bool) error
    // WithQos wrapper over amqp.Qos method
    WithQos(count, size int, global bool) error
    // NotifyClose wrapper over notifyClose method
    NotifyClose(c chan *amqp.Error) chan *amqp.Error
    // Close closes the running amqp connection and channel
    Close() error
}

AMQP exposes a interface for interacting with AMQP broker

type BindArgs Uses

type BindArgs struct {
    // contains filtered or unexported fields
}

BindArgs is the wrapper for AMQP Table class to set common queue bind values

func NewBindArgs Uses

func NewBindArgs() *BindArgs

NewBindArgs creates new queue bind values builder

func (*BindArgs) With Uses

func (a *BindArgs) With(name string, value interface{}) *BindArgs

With sets the value by name

type ConsumerMessage Uses

type ConsumerMessage struct {
    ContentType     string
    ContentEncoding string
    // DeliveryMode queue implementation use, non-persistent (1) or persistent (2)
    DeliveryMode uint8
    // Priority queue implementation use, 0 to 9
    Priority uint8
    // CorrelationId application use, correlation identifier
    CorrelationId string
    // ReplyTo application use, address to to reply to (ex: RPC)
    ReplyTo string
    // Expiration implementation use, message expiration spec
    Expiration string
    // MessageId application use, message identifier
    MessageId string
    // Timestamp application use, message timestamp
    Timestamp time.Time
    // Type application use, message type name
    Type string
    // ConsumerTag valid only with Channel.Consume
    ConsumerTag string
    // MessageCount valid only with Channel.Get
    MessageCount uint32
    DeliveryTag  uint64
    Redelivered  bool
    Exchange     string
    // Headers application or header exchange table
    Headers map[string]interface{}
    // Key basic.publish routing key
    Key  string
    Body []byte
    // contains filtered or unexported fields
}

ConsumerMessage captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer.

func (ConsumerMessage) Ack Uses

func (cm ConsumerMessage) Ack(multiple bool) error

Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery. All deliveries in AMQP must be acknowledged. If you called Channel.Consume with autoAck true then the server will be automatically ack each message and this method should not be called. Otherwise, you must call Delivery.Ack after you have successfully processed this delivery. When multiple is true, this delivery and all prior unacknowledged deliveries on the same channel will be acknowledged. This is useful for batch processing of deliveries. An error will indicate that the acknowledge could not be delivered to the channel it was sent from. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Nack Uses

func (cm ConsumerMessage) Nack(multiple, requeue bool) error

Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Reject Uses

func (cm ConsumerMessage) Reject(requeue bool) error

Reject delegates a negatively acknowledgement through the Acknowledger interface. When requeue is true, queue this message to be delivered to a consumer on a different channel. When requeue is false or the server is unable to queue this message, it will be dropped. If you are batch processing deliveries, and your server supports it, prefer Delivery.Nack. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

type DeclareArgs Uses

type DeclareArgs struct {
    // contains filtered or unexported fields
}

DeclareArgs is the queue declaration values builder

func NewDeclareArgs Uses

func NewDeclareArgs() *DeclareArgs

NewDeclareArgs creates new queue declaration values builder

func (*DeclareArgs) With Uses

func (a *DeclareArgs) With(name string, value interface{}) *DeclareArgs

With sets the value by name

func (*DeclareArgs) WithMessageTTL Uses

func (a *DeclareArgs) WithMessageTTL(d time.Duration) *DeclareArgs

WithMessageTTL sets Queue message TTL. See details at https://www.rabbitmq.com/ttl.html#message-ttl-using-x-args

type Delivery Uses

type Delivery struct {
    amqp.Delivery
}

Delivery wraps amqp.Delivery struct

type Emitter Uses

type Emitter interface {
    // EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.
    EmitAsync() chan<- Message
    // EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.
    EmitErr() <-chan error
    // EmitOk returns true when the message was sent.
    EmitOk() <-chan struct{}
}

Emitter exposes a interface for publishing messages to AMQP broker

type ListenConfig Uses

type ListenConfig struct {
    // Exchange the exchange name.
    Exchange string
    // Kind the exchange type.
    Kind string
    // Key the routing key name.
    Key string
    // PassiveExchange determines a passive exchange connection it uses
    // amqp's ExchangeDeclarePassive instead the default ExchangeDeclare
    PassiveExchange bool
    // Queue the queue name
    Queue string
    // DeclareArgs is a list of arguments accepted for when declaring the queue.
    // See https://www.rabbitmq.com/queues.html#optional-arguments for more info.
    DeclareArgs *DeclareArgs
    // BindArgs is a list of arguments accepted for when binding the exchange to the queue
    BindArgs *BindArgs
}

ListenConfig carries fields for listening messages.

type Message Uses

type Message struct {
    // Exchange the exchange name.
    Exchange string
    // Kind the exchange type.
    Kind string
    // Key the routing key name.
    Key string
    // Payload the message payload.
    Payload []byte
    // DeliveryMode indicates if the is Persistent or Transient.
    DeliveryMode uint8
    // ContentType the message content-type.
    ContentType string
    // Headers the message application headers
    Headers map[string]interface{}
    // ContentEncoding the message encoding.
    ContentEncoding string
}

Message carries fields for sending messages.

type OnStateChangeFunc Uses

type OnStateChangeFunc func(name, from, to string)

OnStateChangeFunc is the callback function when circuit breaker state changes.

type Option Uses

type Option func(*Rabbus) error

Option represents an option you can pass to New. See the documentation for the individual options.

func AMQPProvider Uses

func AMQPProvider(provider AMQP) Option

AMQPProvider expose a interface for interacting with amqp broker

func Attempts Uses

func Attempts(attempts int) Option

Attempts is the max number of retries on broker outages.

func BreakerInterval Uses

func BreakerInterval(interval time.Duration) Option

BreakerInterval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts, If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.

func BreakerTimeout Uses

func BreakerTimeout(timeout time.Duration) Option

BreakerTimeout is the period of the open state, after which the state of CircuitBreaker becomes half-open. If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.

func Durable Uses

func Durable(durable bool) Option

Durable indicates of the queue will survive broker restarts. Default to true.

func OnStateChange Uses

func OnStateChange(fn OnStateChangeFunc) Option

OnStateChange is called whenever the state of CircuitBreaker changes.

func PassiveExchange Uses

func PassiveExchange(isExchangePassive bool) Option

PassiveExchange forces passive connection with all exchanges using amqp's ExchangeDeclarePassive instead the default ExchangeDeclare

func PrefetchCount Uses

func PrefetchCount(count int) Option

PrefetchCount limit the number of unacknowledged messages.

func PrefetchSize Uses

func PrefetchSize(size int) Option

PrefetchSize when greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.

func QosGlobal Uses

func QosGlobal(global bool) Option

QosGlobal when global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel. RabbitMQ does not implement the global flag.

func Sleep Uses

func Sleep(sleep time.Duration) Option

Sleep is the sleep time of the retry mechanism.

func Threshold Uses

func Threshold(threshold uint32) Option

Threshold when a threshold of failures has been reached, future calls to the broker will not run. During this state, the circuit breaker will periodically allow the calls to run and, if it is successful, will start running the function again. Default value is 5.

type Rabbus Uses

type Rabbus struct {
    AMQP
    // contains filtered or unexported fields
}

Rabbus interpret (implement) Rabbus interface definition

func New Uses

func New(dsn string, options ...Option) (*Rabbus, error)

New returns a new Rabbus configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating connection and channel.

func (*Rabbus) Close Uses

func (r *Rabbus) Close() error

Close channels and attempt to close channel and connection.

func (*Rabbus) EmitAsync Uses

func (r *Rabbus) EmitAsync() chan<- Message

EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.

func (*Rabbus) EmitErr Uses

func (r *Rabbus) EmitErr() <-chan error

EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.

func (*Rabbus) EmitOk Uses

func (r *Rabbus) EmitOk() <-chan struct{}

EmitOk returns true when the message was sent.

func (*Rabbus) Listen Uses

func (r *Rabbus) Listen(c ListenConfig) (chan ConsumerMessage, error)

Listen to a message from RabbitMQ, returns an error if exchange, queue name and function handler not passed or if an error occurred while creating amqp consumer.

func (*Rabbus) Run Uses

func (r *Rabbus) Run(ctx context.Context) error

Run starts rabbus channels for emitting and listening for amqp connection close returns ctx error in case of any.

Directories

PathSynopsis
internal/amqp

Package rabbus imports 8 packages (graph) and is imported by 2 packages. Updated 2019-08-02. Refresh now. Tools for package owners.