rabbitmq

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2022 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Event-driven architecture is a software architecture and model for application design. With an event-driven system, the capture, communication, processing, and persistence of events are the core structure of the solution. This differs from a traditional request-driven model.

Features: - broadcast event via rabbitMQ - auto reconnect pattern for rabbitMQ - fault tolerance on panic

Publisher Example:

package main

import (
	"github.com/ramoozorg/event-driven/rabbitmq"
	"os"
)

type person struct {
	Name string `bson:"name" json:"name"`
	Age  int    `bson:"age" json:"age"`
}

func main() {
	chSignal := make(chan os.Signal)
	conn, err := rabbitmq.NewConnection("test", &rabbitmq.Options{
		UriAddress:      "amqp://guest:guest@localhost:5672",
		DurableExchange: true,
		AutoAck:         true,
		ExclusiveQueue:  false,
	}, chSignal)
	if err != nil {
		panic(err)
	}

	if err := conn.ExchangeDeclare("exchange1", rabbitmq.TOPIC); err != nil {
		panic(err)
	}
	if err := conn.DeclarePublisherQueue("queue1", "exchange1", "rk", "rk2"); err != nil {
		panic(err)
	}
	if err := conn.DeclarePublisherQueue("queue2", "exchange1", "rk3"); err != nil {
		panic(err)
	}
	if err := NewEventPublish(conn); err != nil {
		panic(err)
	}
}

func NewEventPublish(conn *rabbitmq.Connection) error {
	p := person{Name: "rs", Age: 22}
	q := person{Name: "reza", Age: 23}
	if err := conn.Publish("exchange1", "rk", p, rabbitmq.PublishingOptions{}); err != nil {
		return err
	}
	if err := conn.Publish("exchange1", "rk3", q, rabbitmq.PublishingOptions{}); err != nil {
		return err
	}
	return nil
}

Consumer Example:

package main

import (
	"fmt"

	"github.com/ramoozorg/event-driven/rabbitmq"
	"go.mongodb.org/mongo-driver/bson"
)

type person struct {
	Name string `bson:"name" json:"name"`
	Age  int    `bson:"age" json:"age"`
}

func main() {
	done := make(chan bool, 1)
	conn, err := rabbitmq.NewConnection("test", &rabbitmq.Options{
		UriAddress:      rabbitmq.CreateURIAddress("guest", "guest", "localhost:5672", ""),
		DurableExchange: true,
		AutoAck:         true,
		ExclusiveQueue:  false,
	}, nil)
	if err != nil {
		panic(err)
	}

	if err := conn.ExchangeDeclare("exchange1", rabbitmq.TOPIC); err != nil {
		panic(err)
	}
	if err := conn.DeclareConsumerQueue(eventHandler, "queue1", "exchange1", "rk", "rk2"); err != nil {
		panic(err)
	}
	if err := conn.DeclareConsumerQueue(eventHandler, "queue2", "exchange1", "rk3"); err != nil {
		panic(err)
	}

	if err := conn.Consume(); err != nil {
		panic(err)
	}
	<-done
}

func eventHandler(queue string, delivery rabbitmq.Delivery) {
	p := person{}
	_ = bson.Unmarshal(delivery.Body, &p)
	fmt.Printf("New Event from exchange %v queue %v routingKey %v with body %v received\n", delivery.Exchange, queue, delivery.RoutingKey, p)
}

```

Index

Constants

View Source
const (
	JSON_ENCODER = "json"
	BSON_ENCODER = "bson"
)

Variables

View Source
var (
	SERVICE_NAME_ERROR            = errors.New("service name is empty")
	URI_ADDRESS_ERROR             = errors.New("uri address is invalid, please enter amqp://guest:guest@localhost:5672 for example")
	ROUTING_KEYS_EMPTY_ERROR      = errors.New("routing keys is empty")
	CONNECTION_CLOSED_ERROR       = errors.New("rabbitMQ connection closed, try to reconnect")
	NIL_CCONECTION_ERROR          = errors.New("nil rabbitmq connection")
	EXCHANGE_ALREADY_EXISTS_ERROR = errors.New("exchange already declared")
	QUEUE_ALREADY_EXISTS_ERROR    = errors.New("queue already declared")
	EXHCNAGE_NOT_FOUND_ERROR      = errors.New("exchange not declare")
)

Functions

func CreateURIAddress

func CreateURIAddress(username, password, address, vhost string) string

CreateURIAddress create url address from input configuration

func RegisterEncoder

func RegisterEncoder(encType string, enc Encoder)

RegisterEncoder will register the encType with the given Encoder. Useful for customization.

Types

type Connection

type Connection struct {
	ServiceCallerName string
	ConnOpt           *Options
	// contains filtered or unexported fields
}

Connection is the structure of amqp event connection

func NewConnection

func NewConnection(serviceName string, options *Options, done chan os.Signal) (*Connection, error)

NewConnection create a rabbitmq connection object

func (*Connection) Close

func (c *Connection) Close() error

Close stop rabbitMQ client

func (*Connection) Consume

func (c *Connection) Consume() error

Consume consumes the events from the queues and passes it as map of chan amqp.Delivery

func (*Connection) DeclareConsumerQueue

func (c *Connection) DeclareConsumerQueue(eventHandler EventHandler, queue, exchange string, routingKey ...string) error

DeclareConsumerQueue declare new queue and bind queue and bind exchange with routing key

func (*Connection) DeclarePublisherQueue

func (c *Connection) DeclarePublisherQueue(queue, exchange string, routingKey ...string) error

DeclarePublisherQueue declare new queue and bind queue and bind exchange with routing key

func (*Connection) ExchangeDeclare

func (c *Connection) ExchangeDeclare(exchange string, kind Kind) error

ExchangeDeclare declare new exchange with specific kind (direct, topic, fanout, headers)

func (*Connection) GetExchangeList

func (c *Connection) GetExchangeList() []string

GetExchangeList return list of exchanges

func (*Connection) GetQueueList

func (c *Connection) GetQueueList() map[string]EventHandler

GetQueueList return list of queues with handlers

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

IsConnected check rabbitMQ client is connected

func (*Connection) Publish

func (c *Connection) Publish(exchange, routingKey string, body interface{}, publishOptions PublishingOptions) error

Publish publishes a request to the amqp queue

type Delivery

type Delivery amqp.Delivery // Delivery is a channel for deliver published event

func (Delivery) Ack added in v0.0.5

func (d Delivery) Ack(multiple bool) error

Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery.

func (Delivery) Nack added in v0.0.5

func (d Delivery) Nack(multiple bool, requeue bool) error

Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.

func (Delivery) Reject added in v0.0.5

func (d Delivery) Reject(requeue bool) error

Reject delegates a negatively acknowledgement through the Acknowledger interface.

type EncodedConn

type EncodedConn struct {
	Conn *Connection
	Enc  Encoder
}

func NewEncodedConn

func NewEncodedConn(c *Connection, encType string) (*EncodedConn, error)

NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder

type Encoder

type Encoder interface {
	Encode(msg interface{}) ([]byte, error)
	Decode(data []byte, vPtr interface{}) error
}

Encoder interface is for all register encoders

func EncoderForType

func EncoderForType(encType string) Encoder

EncoderForType will return the registered Encoder for the encType.

type EventHandler

type EventHandler func(queue string, delivery Delivery) // EventHandler handle event from specific queue and routingKey

type Headers

type Headers amqp.Table // Headers table for set event header when publishing

type Kind

type Kind int // Kind is exchange type
const (
	DIRECT  Kind = iota // DIRECT a event goes to the queues whose binding key exactly matches the routing key of the event.
	FANOUT              // FANOUT exchanges can be useful when the same event needs to be sent to one or more queues with consumers who may process the same event in different ways.
	TOPIC               // TOPIC exchange is similar to direct exchange, but the routing is done according to the routing pattern. Instead of using fixed routing key, it uses wildcards.
	HEADERS             // HEADERS exchange routes events based on arguments containing headers and optional values. It uses the event header attributes for routing.
)

func (Kind) String

func (k Kind) String() string

String exchange type as string

type Options

type Options struct {
	UriAddress      string // UriAddress of rabbitmq, amqp://user:password@x.x.x.x:port
	DurableExchange bool
	AutoAck         bool
	AutoDelete      bool
	NoWait          bool
	ExclusiveQueue  bool
}

Options for new connection of rabbitmq

type PublishingOptions

type PublishingOptions struct {
	Headers Headers // rabbitMQ event headers
	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
	Priority        uint8     // 0 to 9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to to reply to (ex: RPC)
	Expiration      string    // event expiration spec
	MessageId       string    // event identifier
	Timestamp       time.Time // event timestamp
	Type            string    // event type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id
}

PublishingOptions options for event

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL