broker

package module
v0.0.0-...-8910eb2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2023 License: MIT Imports: 9 Imported by: 1

README

Broker Client

Broker wraps Go AMQP 0.9.1 client (https://github.com/streadway/amqp) to expose an API in order to implement Event Driven Design pattern for Microservices Architecture.

Color

Install

go get github.com/mig-elgt/broker

Publisher Example

package main

import (
	"bytes"
	"log"
	"time"

	"github.com/mig-elgt/broker"
)

func main() {
	// Create Event Queue object
	eq, err := broker.NewEventQueue("localhost", "5672", "guest", "guest")
	if err != nil {
		panic(err)
	}
	defer eq.Close()
	// Open a new Channel
	ch, err := eq.OpenChannel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()
	// Publish Events to RabbitMQ
	// 
	// Publish Event Account Created Event
	if err := eq.PublishEvent("account.created", "users.account.created", bytes.NewBufferString("hi folks"), ch); err != nil { 
		log.Fatal(err)
	}
	// Publish Event Account Deleted Event
	if err := eq.PublishEvent("account.deleted", "users.account.deleted", bytes.NewBufferString("good bye"), ch); err != nil { 
		log.Fatal(err)
	}
}

Subscribers Example

Register your event handlers and create your function handler to perform a queue message.

package main

import (
	"encoding/json"
	"log"

	"github.com/mig-elgt/broker"
)

func main() {
	// Create Event Queue instance
	eq, err := broker.NewEventQueue("localhost", "5672", "guest", "guest")
	if err != nil {
		panic(err)
	}
	defer eq.Close()

	// Register Handle Events
	// Add Account Created handle event
	eq.HandleEvent("account.created", func(req *broker.Request) (*broker.Response, error) {
	    // Add your code here
		return &broker.Response{}, nil
	}, broker.WithQueue("accounts_created"), broker.WithRouteKey("users.account.created"))
	// Add Account Deleted handle event
	eq.HandleEvent("account.deleted", func(req *broker.Request) (*broker.Response, error) {
	    // Add your code here
		return &broker.Response{}, nil
	}, broker.WithQueue("accounts_deleted"), broker.WithRouteKey("users.account.deleted"))
	// Exec runners for each event
	<-eq.RunSubscribers()

	log.Fatalf("rabbitmq server connection is closed %v", err)
}

Run a Subcriber Example

package main

import (
	"encoding/json"
	"log"

	"github.com/mig-elgt/broker"
)

func main() {
	// Create Event Queue instance
	eq, err := broker.NewEventQueue("localhost", "5672", "guest", "guest")
	if err != nil {
		panic(err)
	}
	defer eq.Close()

	// Register a Handle Event and run a Subscriber
	eq.RunSubscriber("account.created", func(req *broker.Request) (*broker.Response, error) {
	    // Add your code here
		return &broker.Response{}, nil
	}, broker.WithQueue("accounts_created"), broker.WithRouteKey("users.account.created"))

	log.Fatalf("rabbitmq server connection is closed %v", err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEventQueue

func NewEventQueue(host, port, user, password string) (*eventQueue, error)

NewEventQueue creates new instance of eventQueue and open new rabbitmq connection.

Types

type DialOption

type DialOption func(*dialOptions)

DialOption defines a function to set up the event queue system resources, such as queue name or route keys.

func WithQueue

func WithQueue(name string) DialOption

WithQueue sets new queue name.

func WithRouteKey

func WithRouteKey(name string) DialOption

WithRouteKey sets new route key name.

type EventQueueChannel

type EventQueueChannel interface {
	ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) error
	Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error
	Close() error
}

type MessageSystem

type MessageSystem interface {
	OpenChannel() (EventQueueChannel, error)
	PublishEvent(event, routeKey string, msg io.Reader, ch EventQueueChannel) error
	Close() error
}

type Request

type Request struct {
	Body io.Reader
}

Reques holds the queue message.

type Response

type Response struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL