hare

package module
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

README

pinpt-logo

Hare is a simple RabbitMQ set of helpers for Golang

Go GoDoc

Setup

You'll need the following to do dev:

  • Golang
  • RabbitMQ

Install

This project requires Go Modules.

go get -u github.com/pinpt/hare

Example

conn, err := hare.New(logger, "amqp://guest:guest@localhost:5672/", "test", "topic")
if err != nil {
	panic(err)
}
defer conn.Close()
ch, err := conn.Channel() // create a new channel
if err != nil {
	panic(err)
}
defer ch.Close()
q, err := ch.Bind("hi", "queue", []string{"#"})
if err != nil {
	panic(err)
}
defer q.Close()
go func() {
	for msg := range q.Messages() {
		fmt.Println("received a message", string(msg.Body))
		q.Ack(msg.DeliveryTag)
	}
}()
if err := q.Publish(amqp.Publishing{
	Body: []byte("hello"),
}); err != nil {
	panic(err)
}

License

All of this code is proprietary and Copyright © 2020 by Pinpoint Software, Inc. We ♥️ Open Source and welcome contributions and feedback.

Documentation

Overview

Package hare is a lightweight wrapper around RabbitMQ which includes batteries when using it with Go. It provides features such as automatic reconnection on disconnect from the broker, automatic confirmation on publish, etc

Example usage:

conn, err := hare.New(logger, "amqp://guest:guest@localhost:5672/", "test", "topic")
if err != nil {
	panic(err)
}
defer conn.Close()
ch, err := conn.Channel() // create a new channel
if err != nil {
	panic(err)
}
defer ch.Close()
q, err := ch.Bind("hi", "queue", []string{"#"})
if err != nil {
	panic(err)
}
defer q.Close()
go func() {
	for msg := range q.Messages() {
		fmt.Println("received a message", string(msg.Body))
		q.Ack(msg.DeliveryTag)
	}
}()
if err := q.Publish(amqp.Publishing{
	Body: []byte("hello"),
}); err != nil {
	panic(err)
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConfirmFailed will be returned if a publish confirm failed
	ErrConfirmFailed = errors.New("confirm: failed")
	// ErrConfirmTimeout will be returned if a publish confirm timed out
	ErrConfirmTimeout = errors.New("confirm: timeout")
)

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is the channel to the RabbitMQ server

func (*Channel) Ack added in v0.0.6

func (ch *Channel) Ack(deliveryTag uint64) error

Ack will ack reliably the deliveryTag

func (*Channel) Bind

func (ch *Channel) Bind(tag string, name string, bindingKeys []string, opts ...WithQueueOption) (*Queue, error)

Bind will start listening on a channel using queue name and binding keys

func (*Channel) Close

func (ch *Channel) Close() error

Close the channel and remove it from tracking by the Connection

func (*Channel) IsClosed added in v0.0.10

func (ch *Channel) IsClosed() bool

IsClosed returns true if the channel is closed

func (*Channel) Nack added in v0.0.6

func (ch *Channel) Nack(deliveryTag uint64, requeue bool) error

Nack will nack reliably the deliveryTag

func (*Channel) Publish

func (ch *Channel) Publish(key string, msg amqp.Publishing, opts ...WithOption) error

Publish will publish a message to the exchange associated with this connection

func (*Channel) SetPrefetch

func (ch *Channel) SetPrefetch(prefetch int) error

SetPrefetch will set the prefetch for the channel

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a connection to the RabbitMQ server

func New

func New(logger Logger, amqpURI string, exchangeName string, exchangeType string, opts ...WithConnectionOption) (*Connection, error)

New will return a new connetion to Rabbit MQ server

func (*Connection) Close

func (conn *Connection) Close() error

Close the connection and all associated channels. Once closed you cannot reused this connection

func (*Connection) CreateChannel

func (conn *Connection) CreateChannel() (*Channel, error)

CreateChannel will create a new Channel which will be tracked by this Connection

func (*Connection) Publish

func (conn *Connection) Publish(key string, msg amqp.Publishing, opts ...WithOption) error

Publish will publish a message to the exchange associated with this connection

func (*Connection) SetConfirmTimeout

func (conn *Connection) SetConfirmTimeout(timeout time.Duration)

SetConfirmTimeout will change the confirm timeout on publishing

type ConnectionConfig

type ConnectionConfig struct {
	Durable            bool
	DeleteWhenComplete bool
	Arguments          amqp.Table
}

ConnectionConfig is a config for the connection

type Logger

type Logger interface {
	// Debug log a message
	Debug(msg string, keys ...interface{})
	// Info log a message
	Info(msg string, keys ...interface{})
	// Error log a message
	Error(msg string, keys ...interface{})
	// Warn log a message
	Warn(msg string, keys ...interface{})
	// Fatal log a message and exit with error
	Fatal(msg string, keys ...interface{})
}

Logger is an interface for logging

func NewGoLogger

func NewGoLogger() Logger

NewGoLogger will return a Logger for the built-in go logger

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a set of queues

func (*Queue) Ack added in v0.0.5

func (q *Queue) Ack(deliveryTag uint64) error

Ack a message using the deliveryTag

func (*Queue) Close

func (q *Queue) Close() error

Close the queue

func (*Queue) Messages

func (q *Queue) Messages() <-chan amqp.Delivery

Messages is for receiving messsages as amqp.Delivery from the Queue

func (*Queue) Nack added in v0.0.5

func (q *Queue) Nack(deliveryTag uint64, requeue bool) error

Nack a message using the deliveryTag

func (*Queue) Publish

func (q *Queue) Publish(msg amqp.Publishing, opts ...WithOption) error

Publish will publish a message to the exchange associated with this connection

func (*Queue) PublishWithKey added in v0.0.5

func (q *Queue) PublishWithKey(key string, msg amqp.Publishing, opts ...WithOption) error

PublishWithKey will publish a message using the specified key to the exchange associated with this connection

type QueueConfig

type QueueConfig struct {
	Durable      bool
	DeleteUnused bool
	Exclusive    bool
	Arguments    amqp.Table
}

QueueConfig is a config for the queue

type WithConnectionOption

type WithConnectionOption func(config *ConnectionConfig) error

WithConnectionOption is a config function callback

func WithDeleteWhenCompleteExchange

func WithDeleteWhenCompleteExchange() WithConnectionOption

WithDeleteWhenCompleteExchange sets the exchange to be deleted when complete

func WithDurableExchange

func WithDurableExchange() WithConnectionOption

WithDurableExchange sets a durable exchange option

func WithExchangeArguments

func WithExchangeArguments(args amqp.Table) WithConnectionOption

WithExchangeArguments sets the exchange arguments

type WithOption

type WithOption func(conf *WithOptionConfig) error

WithOption is an option callback

func WithImmediateOption

func WithImmediateOption() WithOption

WithImmediateOption will set the immediate option when publishing

func WithMandatoryOption

func WithMandatoryOption() WithOption

WithMandatoryOption will set the mandatory option when publishing

func WithNoConfirmOption

func WithNoConfirmOption() WithOption

WithNoConfirmOption will set the no confirm option when publishing

type WithOptionConfig

type WithOptionConfig struct {
	Mandatory bool
	Immediate bool
	NoConfirm bool
}

WithOptionConfig is a set of option configs

type WithQueueOption

type WithQueueOption func(config *QueueConfig) error

WithQueueOption is a config function callback

func WithDeleteUnusedQueue

func WithDeleteUnusedQueue() WithQueueOption

WithDeleteUnusedQueue sets a queue to be deleted when no longer used

func WithDurableQueue

func WithDurableQueue() WithQueueOption

WithDurableQueue sets a durable queue option

func WithExclusiveQueue

func WithExclusiveQueue() WithQueueOption

WithExclusiveQueue sets the queue to be exclusive

func WithQueueArguments

func WithQueueArguments(args amqp.Table) WithQueueOption

WithQueueArguments sets the queue arguments

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL