mq

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

README

Message RabbitMQ Client

This library wrap from github.com/streadway/amqp. Supporting reconnect if connection lost, easy configuration, support SIGINT, SIGTERM, SIGQUIT, SIGSTOP and closes connection

Note: I use fro production.

Config

Valid type config below, it is case-sensitive.

  • PRODUCER
  • CONSUMER
Example

This example for manual confirmation message and prefetch count 1 per consumer.

Producer
// producer.go
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/muhfaris/mq"
	"github.com/streadway/amqp"
)

func main() {
	config := mq.ConfigRabbitMQArgument{
		Name:     "publisher_rule_prepare",
		Schema:   "amqp",
		Host:     "localhost",
		Username: "admin",
		Password: "admin",
		Vhost:    "/",
		Port:     5672,
		Type:     mq.ClientProducerType,

		PublisherConfig: mq.PublisherConfig{
			ContentType:  "text/plain",
			DeliveryMode: amqp.Persistent,
		},

		QueueConfig: mq.QueueConfig{
			Name:       "insight_data",
			Durable:    true,
			AutoDelete: false,
		},

		QueueBind: mq.QueueBindConfig{
			Name:       "insight_data",
			RoutingKey: "insight_data",
		},

		ExchangeQueue: mq.ExchangeQueueConfig{
			Name:       "insight_data",
			Type:       "direct",
			Durable:    true,
			AutoDelete: false,
		},
	}

	session, err := mq.NewQueue(config)
	if err != nil {
		log.Println("error new queue:", err)
		return
	}

	var index int
	for {
		index++
		message := []byte(fmt.Sprintf("HELLO, %d", index))
		time.Sleep(time.Second * 3)
		if err := session.Push(message); err != nil {
			fmt.Printf("Push failed: %s\n", err)
		} else {
			fmt.Println("Push succeeded!")
		}
	}
}
Consumer
// config consumer.go
package main

import (
	"log"

	"github.com/muhfaris/mq"
)

func main() {
	config := mq.ConfigRabbitMQArgument{
		Name:     "consumer_prepare",
		Schema:   "amqp",
		Host:     "localhost",
		Username: "admin",
		Password: "admin",
		Vhost:    "/",
		Port:     5672,
		Type:     mq.ClientConsumerType,

		QueueConfig: mq.QueueConfig{
			Name:       "insight_data",
			Durable:    true,
			AutoDelete: false,
		},

		QueueBind: mq.QueueBindConfig{
			Name:       "insight_data",
			RoutingKey: "insight_data",
		},

		ExchangeQueue: mq.ExchangeQueueConfig{
			Name:       "insight_data",
			Type:       "direct",
			Durable:    true,
			AutoDelete: false,
		},

		QosConfig: mq.QosConfig{
			PrefetchCount: 1,
			PrefetchSize:  0,
			Global:        false,
		},
	}

	session, err := mq.NewQueue(config)
	if err != nil {
		log.Println("error new queue:", err)
		return
	}

	stopChan := make(chan bool)
	go func() {
		msgs, err := session.Stream()
		if err != nil {
			log.Println("error stream data", err)
			return
		}

 	for {
			d := <-msgs
			if err := d.Ack(false); err != nil {
				log.Println("error confirm message")
				return
			}
			log.Println("message confirmed!")
		}
	}()
	<-stopChan
}

Documentation

Index

Constants

View Source
const (
	// ClientConsumerType tipe client rabbitnya sebagai consumer
	ClientConsumerType = "CONSUMER"

	// ClientProducerType tipe client rabbitnya sebagai producer
	ClientProducerType = "PRODUCER"
)

Variables

This section is empty.

Functions

func AvailableMessageQueueType

func AvailableMessageQueueType() []string

AvailableMessageQueueType is availabel type

func MessageQueueTypeValid

func MessageQueueTypeValid(ty string) bool

MessageQueueTypeValid validate type

func SchemaValid

func SchemaValid(schema string) error

SchemaValid is validate schema connection

Types

type Closer

type Closer interface {
	RegisterSignalHandler()
	Shutdown() error
}

Closer interface is for handling reconnection logic in a sane way Every reconnection supported struct should implement those methods in order to work properly

type ConfigRabbitMQArgument

type ConfigRabbitMQArgument struct {
	Name          string
	Schema        string
	Host          string
	Port          int
	Username      string
	Password      string
	Vhost         string
	AutoReconnect bool
	Type          string

	PublisherConfig PublisherConfig
	QueueConfig     QueueConfig
	QueueBind       QueueBindConfig
	QueueConsumer   QueueConsumer
	ExchangeQueue   ExchangeQueueConfig
	QosConfig       QosConfig
}

ConfigRabbitMQArgument wrap config rabbitmq

type ExchangeQueueConfig

type ExchangeQueueConfig struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
}

ExchangeQueueConfig is wrap data for exhange queue config

type PublisherConfig

type PublisherConfig struct {
	ContentType  string
	DeliveryMode uint8
}

type QosConfig

type QosConfig struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
}

type QueueBindConfig

type QueueBindConfig struct {
	Name       string
	RoutingKey string
}

QueueBindConfig is wrap data for queue bind config

type QueueConfig

type QueueConfig struct {
	Name       string
	Durable    bool
	AutoDelete bool
}

QueueConfig is wrap data for queue config

type QueueConsumer

type QueueConsumer struct {
	AutoACK bool
}

QueueConsumer is wrap data for queue consumer

type RabbitMQ

type RabbitMQ struct {

	// publisher only
	PublisherConfig PublisherConfig

	// general
	QueueConfig   QueueConfig
	QueueBind     QueueBindConfig
	QueueConsumer QueueConsumer
	ExchangeQueue ExchangeQueueConfig
	QosConfig     QosConfig
	FnCallback    func([]byte)
	// contains filtered or unexported fields
}

RabbitMQ is wrap rabbitMQ

func NewQueue

func NewQueue(cfg ConfigRabbitMQArgument) (*RabbitMQ, error)

NewQueue is Connection to rabbit

func (*RabbitMQ) Close

func (session *RabbitMQ) Close() error

Close will cleanly shutdown the channel and connection.

func (*RabbitMQ) Push

func (session *RabbitMQ) Push(data []byte) error

Push will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePush.

func (*RabbitMQ) RegisterSignalHandler

func (session *RabbitMQ) RegisterSignalHandler()

RegisterSignalHandler watchs for interrupt signals and gracefully closes connection

func (*RabbitMQ) Shutdown

func (session *RabbitMQ) Shutdown() error

Shutdown closes the RabbitMQ connection

func (*RabbitMQ) Stream

func (session *RabbitMQ) Stream() (<-chan amqp.Delivery, error)

Stream will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server.

func (*RabbitMQ) UnsafePush

func (session *RabbitMQ) UnsafePush(data []byte) error

UnsafePush will push to the queue without checking for confirmation. It returns an error if it fails to connect. No guarantees are provided for whether the server will receive the message.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL