amqplib

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2021 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package amqplib is for encapsulating github.com/assembla/cony any operations

As a quick start for publisher:

	connectionConfig := &amqplib.AMQPConnectionConfig{
		URL:          "amqp://guest:guest@localhost/",
		ErrorHandler: nil,
}
queueConfig := &amqplib.AMQPQueueConfig{
	ExchangeName: "test-exchange",
	ExchangeType: amqplib.Fanout,
	RoutingKey: "key",
	AutoDeclareExchange: true,
	AutoDelete:   false,
}
client1 := amqplib.NewAMQPClient(connectionConfig)
defer client1.Close()
publisher, err := client1.NewPublisher(queueConfig)
if err != nil {
	panic(err)
}
defer publisher.Close()
err = publisher.Publish(amqp.Publishing{
	Body: []byte("hello"),
})
if err != nil {
	panic(err)
}

As a quick start for consumer:

connectionConfig := &amqplib.AMQPConnectionConfig{
	URL:          "amqp://guest:guest@localhost/",
	ErrorHandler: nil,
}
queueConfig := &amqplib.AMQPQueueConfig{
	ExchangeName: "test-exchange",
	ExchangeType: amqplib.Fanout,
	QueueName:    "test-queue",
	AutoDelete:   false,
}
client := amqplib.NewAMQPClient(connectionConfig)
defer client1.Close()
consumer, err := client.NewConsumer(queueConfig)
if err != nil {
	panic(err)
}
defer consumer.Close()
for delivery := range consumer.Consume() {
	fmt.Println(string(delivery.Body))
	delivery.Ack(false)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPClient

type AMQPClient struct {
	// contains filtered or unexported fields
}

func NewAMQPClient

func NewAMQPClient(connConf *AMQPConnectionConfig) *AMQPClient

func (*AMQPClient) Close

func (c *AMQPClient) Close()

func (*AMQPClient) NewConsumer

func (c *AMQPClient) NewConsumer(cfg *AMQPQueueConfig) (*AMQPConsumer, error)

func (*AMQPClient) NewPublisher

func (c *AMQPClient) NewPublisher(cfg *AMQPQueueConfig) (*AMQPPublisher, error)

type AMQPConnectionConfig

type AMQPConnectionConfig struct {
	URL          string
	ErrorHandler func(err error)
}

type AMQPConsumer

type AMQPConsumer struct {
	// contains filtered or unexported fields
}

func (*AMQPConsumer) Close

func (c *AMQPConsumer) Close()

func (*AMQPConsumer) Consume

func (c *AMQPConsumer) Consume() <-chan amqp.Delivery

type AMQPPublisher

type AMQPPublisher struct {
	Publisher *cony.Publisher
}

func (*AMQPPublisher) Close

func (p *AMQPPublisher) Close()

func (*AMQPPublisher) Publish

func (p *AMQPPublisher) Publish(data amqp.Publishing) (err error)

type AMQPQueueConfig

type AMQPQueueConfig struct {
	ExchangeName        string
	ExchangeType        ExchangeType
	AutoDeclareExchange bool
	RoutingKey          string
	QueueName           string
	ConsumerName        string
	AutoDelete          bool
}

type Consumer

type Consumer interface {
	Consume() <-chan amqp.Delivery
	Close()
}

type ExchangeType

type ExchangeType string
const (
	Fanout ExchangeType = "fanout"
	Direct ExchangeType = "direct"
	Topic  ExchangeType = "topic"
)

type Publisher

type Publisher interface {
	Publish(data amqp.Publishing) error
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL