rabbitmqv1

package
v0.0.0-...-2c15619 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MIT Imports: 14 Imported by: 0

README

rabbitmq Wrapper

RabbitMq Wrapper is the a client API for RabbitMQ.

  • A wrapper over amqp exchanges and queues.
  • In memory retries for consuming messages when an error occured
  • CorrelationId and MessageId structure
  • Exchange Types With Direct, Fanout, Topic, ConsistentHashing
  • Retry policy (immediately , interval)
  • Multiple consumers In a single process
  • Create goroutines and consume messages asynchronously
  • Disable consume messages asynchronously if you want
  • Retry to connect another node When RabbitMq Node is Down or Broken Connection
  • Add stack trace on the message header if the error occurred when the message is consumed
  • Some extra features while publishing message (will be added)

To connect to a RabbitMQ broker...

	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost")

To connect to a RabbitMQ broker with retry policy

  • Consumer retries two times immediately if an error occured

     var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                                               rabbit.RetryCount(2,time.Duration(0)))
    
  • Create goroutines and consume messages asynchronously using PrefetchCount Prefix. Create as number of PrefetchCount as goroutines .

    	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                    		rabbit.PrefetchCount(3))
    

To send a message

    // Added

To consume a message

    onConsumed := func(message rabbit.Message) error {
    
    		var consumeMessage PersonV1
    		var err= json.Unmarshal(message.Payload, &consumeMessage)
    		if err != nil {
    			return err
    		}
    		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
    		return nil
    	}
    

    rabbitClient.AddConsumer("In.Person").
    SubscriberExchange("RoutinKey.*",rabbit.Direct ,"Person").
    HandleConsumer(onConsumed)

To Consume multiple messages

	onConsumed := func(message rabbit.Message) error {

		var consumeMessage PersonV1
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}

	onConsumed2 := func(message rabbit.Message) error {

		var consumeMessage PersonV4
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}
	rabbitClient.AddConsumer("In.Person3").
            SubscriberExchange("",rabbit.Fanout ,"ExchangeNamePerson").
            HandleConsumer(onConsumed)
            
    rabbitClient.AddConsumer("In.Person").
             SubscriberExchange("Person.*",rabbit.Direct ,"PersonV1").
             HandleConsumer(onConsumed2)
             

	rabbitClient.RunConsumers()

To Consume multiple exchange

    rabbitClient.AddConsumer("In.Lines").
    		SubscriberExchange("1", rabbit.ConsistentHashing,"OrderLineAdded").
    		SubscriberExchange("1", rabbit.ConsistentHashing,OrderLineCancelled).
    		WithSingleGoroutine(true).
    		HandleConsumer(onConsumed2)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ERRORPREFIX     = "_error"
	CONCURRENTLIMIT = 1
	RETRYCOUNT      = 0
	PREFECTCOUNT    = 1
)

Functions

func Do

func Do(fn Func) error

func PrefetchCount

func PrefetchCount(prefetchCount int) withFunc

func RetryCount

func RetryCount(retryCount int) withFunc

func RetryCountWithWait

func RetryCountWithWait(retryCount int, retryInterval time.Duration) withFunc

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func (*Channel) Publish

func (p *Channel) Publish(ctx context.Context, routingKey string, exchangeName string, payload interface{}) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewRabbitMqClient

func NewRabbitMqClient(nodes []string, userName string, password string, virtualHost string, logger log.Logger, withFunc ...withFunc) *Client

func (*Client) AddConsumer

func (c *Client) AddConsumer(queueName string) *Consumer

func (*Client) AddConsumerWithConsistentHashExchange

func (c *Client) AddConsumerWithConsistentHashExchange(queueName string, routingKey string, exchangeName string) *Consumer

func (*Client) AddPublisher

func (c *Client) AddPublisher(ctx context.Context, exchangeName string, exchangeType ExchangeType, payloads ...interface{})

func (*Client) Exit

func (c *Client) Exit(reason error) int

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, routingKey string, payload interface{}) error

func (*Client) RunConsumers

func (c *Client) RunConsumers(ctx context.Context) error

func (*Client) Shutdown

func (c *Client) Shutdown(reason string)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) HandleConsumer

func (c *Consumer) HandleConsumer(consumer handleConsumer) *Consumer

func (*Consumer) SubscriberExchange

func (c *Consumer) SubscriberExchange(routingKey string, exchangeType ExchangeType, exchangeName string) *Consumer

func (*Consumer) SubscriberExchangeWithArguments

func (c *Consumer) SubscriberExchangeWithArguments(routingKey string, exchangeType ExchangeType, exchangeName string, args amqp.Table) *Consumer

func (*Consumer) WithSingleGoroutine

func (c *Consumer) WithSingleGoroutine(value bool) *Consumer

type ConsumerBuilder

type ConsumerBuilder struct {
	// contains filtered or unexported fields
}

type ExchangeType

type ExchangeType int
const (
	Direct            ExchangeType = 1
	Fanout            ExchangeType = 2
	Topic             ExchangeType = 3
	ConsistentHashing ExchangeType = 4
	XDelayedMessage   ExchangeType = 5
)

type Func

type Func func(attempt int) (retry bool, err error)

type Message

type Message struct {
	Payload       []byte
	CorrelationId string
	MessageId     string
	Timestamp     time.Time
}

type MessageBroker

type MessageBroker interface {
	CreateChannel() (*Channel, error)
	CreateConnection(ctx context.Context, parameters MessageBrokerParameter) error
	SignalConnectionStatus(status bool)
	SignalConnection() chan bool
	IsConnected() bool
}

func NewMessageBroker

func NewMessageBroker(logger log.Logger) MessageBroker

type MessageBrokerParameter

type MessageBrokerParameter struct {
	Nodes           []string
	PrefetchCount   int
	RetryCount      int
	RetryInterval   time.Duration
	ConcurrentLimit int
	UserName        string
	Password        string

	VirtualHost string
	// contains filtered or unexported fields
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

type PublisherBuilder

type PublisherBuilder struct {
	// contains filtered or unexported fields
}

func (*PublisherBuilder) CreateChannel

func (p *PublisherBuilder) CreateChannel()

func (*PublisherBuilder) Publish

func (p *PublisherBuilder) Publish(ctx context.Context, routingKey string, exchangeName string, payload interface{}) error

func (*PublisherBuilder) SubscriberExchange

func (p *PublisherBuilder) SubscriberExchange() *PublisherBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL