rabbitmq

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2023 License: MIT Imports: 5 Imported by: 0

README

rabbitmq-go

Golang RabbitMQ high level library

package main

import (
	"github.com/labstack/gommon/log"
	"github.com/trueifnotfalse/rabbitmq-go"
	"time"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	logger := log.New("-")
	config := rabbitmq.Config{
		Host:             "127.0.0.1",
		Port:             "5672",
		User:             "guest",
		Password:         "guest",
		ReConnect:        true,
		ReconnectTimeOut: 10*time.Second,
	}
	con := rabbitmq.NewConnector(logger, &config)
	qc := rabbitmq.NewDurableQueueConfig("hello")
	err := con.QueueDeclare(qc)
	failOnError(err, "Failed to declare a queue")

	message := struct {
		CreatedAt string `json:"created_at"`
		Text      string `json:"text"`
	}{
		CreatedAt: time.Now().Format("2006-01-02 15:04:05"),
		Text:      "Hello World!",
	}

	err = con.PublishStructToQueue("hello", message)
	failOnError(err, "Failed to publish a message")
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Host             string
	Port             string
	User             string
	Password         string
	ReConnect        bool
	ReconnectTimeOut time.Duration
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector(logger *log.Logger, c *Config) *Connector

func (*Connector) Close

func (c *Connector) Close() error

func (*Connector) Connect

func (c *Connector) Connect() *Connector

func (*Connector) Consume added in v1.2.0

func (c *Connector) Consume(cc *ConsumeConfig, handler MessageHandler) error

func (*Connector) ExchangeDeclare added in v1.4.0

func (c *Connector) ExchangeDeclare(ec *ExchangeConfig) error

func (*Connector) GetChannel

func (c *Connector) GetChannel() (*amqp.Channel, error)

func (*Connector) Publish added in v1.4.0

func (c *Connector) Publish(exchangeName, routingKey string, body []byte, deliveryMode uint8) error

func (*Connector) PublishStructToExchange added in v1.4.0

func (c *Connector) PublishStructToExchange(name string, obj interface{}, deliveryMode uint8) error

func (*Connector) PublishStructToQueue

func (c *Connector) PublishStructToQueue(name string, obj interface{}, deliveryMode uint8) error

func (*Connector) PublishToExchange added in v1.4.0

func (c *Connector) PublishToExchange(name string, body []byte, deliveryMode uint8) error

func (*Connector) PublishToQueue

func (c *Connector) PublishToQueue(name string, body []byte, deliveryMode uint8) error

func (*Connector) QueueBind added in v1.4.0

func (c *Connector) QueueBind(qbc *QueueBindConfig) error

func (*Connector) QueueDeclare

func (c *Connector) QueueDeclare(qc *QueueConfig) error

type ConsumeConfig added in v1.2.0

type ConsumeConfig struct {
	Queue         string
	Consumer      string
	AutoAck       bool
	Exclusive     bool
	NoLocal       bool
	NoWait        bool
	Args          amqp.Table
	PrefetchCount int
}

func NewAutoAckConsumeConfig added in v1.2.0

func NewAutoAckConsumeConfig(name string, consumer string) *ConsumeConfig

func NewManualAckConsumeConfig added in v1.2.0

func NewManualAckConsumeConfig(name string, consumer string) *ConsumeConfig

type ExchangeConfig added in v1.4.0

type ExchangeConfig struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

func NewDurableExchangeConfig added in v1.4.0

func NewDurableExchangeConfig(name, typeName string) *ExchangeConfig

func NewTransientExchangeConfig added in v1.4.0

func NewTransientExchangeConfig(name, typeName string) *ExchangeConfig

type MessageHandler added in v1.2.0

type MessageHandler func(<-chan amqp.Delivery)

type QueueBindConfig added in v1.4.0

type QueueBindConfig struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Args         amqp.Table
}

type QueueConfig

type QueueConfig struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

func NewDurableQueueConfig

func NewDurableQueueConfig(name string) *QueueConfig

func NewTransientQueueConfig

func NewTransientQueueConfig(name string) *QueueConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL