rmqclient

package module
v0.0.0-...-ebf989c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2020 License: MIT Imports: 7 Imported by: 0

README

go-rmq-client

Simple client for rabbitmq

installation

go get github.com/zaharinea/go-rmq-client

example consumer usage

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/sirupsen/logrus"
	"github.com/streadway/amqp"
	rmqclient "github.com/zaharinea/go-rmq-client"
)

func handler(ctx context.Context, msg amqp.Delivery) bool {
	fmt.Printf("event: msg=%s\n", string(msg.Body))
	return true
}

func main() {
	logger := logrus.New()
	logger.SetLevel(logrus.DebugLevel)
	consumer := rmqclient.NewConsumer("amqp://guest:guest@rmq:5672/", logger)

	queue1 := rmqclient.NewQueue("queue1", "queue1", amqp.Table{}).SetHandler(handler).SetRequeue(false).SetCountWorkers(4)
	companyExchange := rmqclient.NewExchange("exchange1", "fanout", amqp.Table{}, []*rmqclient.Queue{queue1})
	consumer.RegisterExchange(companyExchange)

	consumer.Start()

	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	consumer.Stop()
}

example producer usage

package main

import (
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"

	"github.com/sirupsen/logrus"
	rmqclient "github.com/zaharinea/go-rmq-client"
)

func main() {
	logger := logrus.New()
	logger.SetLevel(logrus.DebugLevel)
	producer := rmqclient.NewProducer("amqp://guest:guest@rmq:5672/", logger)

	producer.Start()

	count := 0
	go func() {
		for {
			message := strconv.Itoa(count)
			fmt.Printf("Send message: %s\n", message)
			producer.Publish("exchange1", "queue1", []byte(message), 0)
			time.Sleep(time.Second * 1)
			count++
		}
	}()

	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	defer producer.Stop()
}

Documentation

Index

Constants

View Source
const QueueNameKey contextKey = 0

QueueNameKey key in context

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection struct

func (*Connection) Close

func (c *Connection) Close() error

Close close connection

type Consumer

type Consumer struct {
	Connection
	// contains filtered or unexported fields
}

Consumer struct

func NewConsumer

func NewConsumer(uri string, logger Logger) *Consumer

NewConsumer returns a new Consumer struct

func (*Consumer) RegisterExchange

func (c *Consumer) RegisterExchange(exchange *Exchange)

RegisterExchange register exchange

func (*Consumer) RegisterMiddleware

func (c *Consumer) RegisterMiddleware(m ...MiddlewareFunc)

RegisterMiddleware register middleware

func (*Consumer) RegisterQueue

func (c *Consumer) RegisterQueue(queues ...*Queue)

RegisterQueue register queue

func (*Consumer) Start

func (c *Consumer) Start()

Start start Consumer

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop stop Consumer

type Exchange

type Exchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Arguments  amqp.Table
	Queues     []*Queue
}

Exchange struct

func NewExchange

func NewExchange(name string, exchangeType string, arguments amqp.Table, queues []*Queue) *Exchange

NewExchange returns a new Exchange struct

type HandlerFunc

type HandlerFunc func(context.Context, amqp.Delivery) bool

HandlerFunc defines the handler

type Logger

type Logger interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(args ...interface{})
	Infof(format string, args ...interface{})
	Info(args ...interface{})
	Warnf(format string, args ...interface{})
	Debugf(format string, args ...interface{})
	Debug(args ...interface{})
}

Logger represent common interface for logging function

type MiddlewareFunc

type MiddlewareFunc func(handler HandlerFunc) HandlerFunc

MiddlewareFunc defines the handler

type Producer

type Producer struct {
	Connection
	// contains filtered or unexported fields
}

Producer struct

func NewProducer

func NewProducer(uri string, logger Logger) *Producer

NewProducer returns a new Producer struct

func (*Producer) Publish

func (p *Producer) Publish(exchangeName string, routingKey string, data []byte, priority uint8) error

Publish send message

func (*Producer) RegisterExchange

func (p *Producer) RegisterExchange(exchange *Exchange)

RegisterExchange register exchange

func (*Producer) Start

func (p *Producer) Start()

Start start Producer

func (*Producer) Stop

func (p *Producer) Stop() error

Stop stop Producer

type Queue

type Queue struct {
	Name       string
	RoutingKey string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Arguments  amqp.Table
	// contains filtered or unexported fields
}

Queue struct

func NewQueue

func NewQueue(name string, routingKey string, arguments amqp.Table) *Queue

NewQueue returns a new Queue struct

func (*Queue) SetCountWorkers

func (q *Queue) SetCountWorkers(value int) *Queue

SetCountWorkers set count of workers

func (*Queue) SetHandler

func (q *Queue) SetHandler(handler HandlerFunc) *Queue

SetHandler register handler in Queue

func (*Queue) SetPrefetchCount

func (q *Queue) SetPrefetchCount(value int) *Queue

SetPrefetchCount set prefetch count

func (*Queue) SetRequeue

func (q *Queue) SetRequeue(value bool) *Queue

SetRequeue set requeue param

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL