amqp_safe

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2020 License: MIT Imports: 9 Imported by: 0

README

amqp-safe

Golang AMQP with reconnect, clustering and delivery guarantee.

import (
	amqp "github.com/xssnick/amqp-safe"
)

// Start connection and open channel, async
c := amqp.NewConnector(amqp.Config{
    Hosts: []string{"amqp://admin:password@127.0.0.1"},
}).Start()

// Callback on channel ready
c.OnReady(func() {
    if err := c.ExchangeDeclare("test-exchange", amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        log.Panic(err)
    }
   
    if _, err := c.QueueDeclare("test-queue", true, false, false, false, nil); err != nil {
        log.Panic(err)
    }

    if err := c.QueueBind("test-queue", "", "test-exchange", false, nil); err != nil {
        log.Panic(err)
    }

    err := c.Publish("test-exchange", "", amqp.Publishing{
        Body: []byte("hey"),
    })
    if err != nil {
        log.Panic(err)
    }

    c.Consume("test-queue", "", func(bytes []byte) amqp.Result {
        log.Println("event:", string(bytes))
        return amqp.ResultOK
    })
})

Documentation

Index

Constants

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

Variables

View Source
var ErrNoChannel = errors.New("no channel")
View Source
var ErrNoConnection = errors.New("not connected")
View Source
var ErrServerNAck = errors.New("not ack by server")
View Source
var ErrServerReturn = errors.New("returned by server")

Functions

This section is empty.

Types

type Acker added in v1.1.0

type Acker struct {
	// contains filtered or unexported fields
}

func (*Acker) Ack added in v1.1.0

func (a *Acker) Ack() error

func (*Acker) Nack added in v1.1.0

func (a *Acker) Nack(requeue bool) error

type Config

type Config struct {
	DialTimeout    time.Duration
	HeartbeatEvery time.Duration
	RetryEvery     time.Duration
	Logger         Logger
	Hosts          []string
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector(cfg Config) *Connector

func (*Connector) Close

func (c *Connector) Close() (err error)

func (*Connector) Consume

func (c *Connector) Consume(queue, consumer string, cb func([]byte) Result)

func (*Connector) ConsumeAckLater added in v1.1.0

func (c *Connector) ConsumeAckLater(queue, consumer string, cb func([]byte, *Acker))

TODO: remove duplicated code

func (*Connector) ExchangeBind

func (c *Connector) ExchangeBind(destination, key, source string, noWait bool, args Table) error

func (*Connector) ExchangeDeclare

func (c *Connector) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

func (*Connector) ExchangeDeclarePassive

func (c *Connector) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

func (*Connector) ExchangeDelete

func (c *Connector) ExchangeDelete(name string, ifUnused, noWait bool) error

func (*Connector) ExchangeUnbind

func (c *Connector) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error

func (*Connector) Flow

func (c *Connector) Flow(active bool) error

func (*Connector) OnChannel

func (c *Connector) OnChannel(f func())

func (*Connector) OnChannelFail

func (c *Connector) OnChannelFail(f func() bool)

func (*Connector) OnConnect

func (c *Connector) OnConnect(f func())

func (*Connector) OnConnectionFail

func (c *Connector) OnConnectionFail(f func() bool)

func (*Connector) OnReady

func (c *Connector) OnReady(f func())

func (*Connector) Publish

func (c *Connector) Publish(exchange, key string, publishing Publishing) error

func (*Connector) Qos

func (c *Connector) Qos(prefetchCount, prefetchSize int, global bool) error

func (*Connector) QueueBind

func (c *Connector) QueueBind(name, key, exchange string, noWait bool, args Table) error

func (*Connector) QueueDeclare

func (c *Connector) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

func (*Connector) QueueDeclarePassive

func (c *Connector) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

func (*Connector) QueueDelete

func (c *Connector) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

func (*Connector) QueueInspect

func (c *Connector) QueueInspect(name string) (Queue, error)

func (*Connector) QueuePurge

func (c *Connector) QueuePurge(name string, noWait bool) (int, error)

func (*Connector) QueueUnbind

func (c *Connector) QueueUnbind(name, key, exchange string, args Table) error

func (*Connector) Recover

func (c *Connector) Recover(requeue bool) error

func (*Connector) Start

func (c *Connector) Start() *Connector

type Delivery

type Delivery = amqp.Delivery

type Logger

type Logger interface {
	Println(v ...interface{})
}

type Publishing

type Publishing = amqp.Publishing

type Queue

type Queue = amqp.Queue

type Result

type Result int
const ResultError Result = 2
const ResultOK Result = 1
const ResultReject Result = 3

type Table

type Table = amqp.Table

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL