jazz

package module
v0.0.0-...-de41804 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2019 License: MIT Imports: 3 Imported by: 4

README

Jazz

Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towards slow turtles.

Jazz Jackrabbit

Usage

This library contains three major parts - exchange/queue scheme creation, publishing of messages and consuming of messages. The greatest benefit of this partitioning is that each part might be in separate application. Also due to dedicated administration part, publishing and consuming of messages is simplified to great extent.

Step 1: Connect to rabbit
import(
	"github.com/socifi/jazz"
)

var dsn = "amqp://guest:guest@localhost:5672/"

func main() {
	// ...

	c, err := jazz.Connect(dsn)
	if err != nil {
		t.Errorf("Could not connect to RabbitMQ: %v", err.Error())
		return
	}

	//...
}
Step 2: Create scheme

Scheme specification is done via structure Settings which can be easily specified in YAML. So generally you need to decode YAML and then create all queues and exchanges

It can be something really crazy like this!

var data = []byte(`
exchanges:
  exchange0:
    durable: true
    type: topic
  exchange1:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key1"
      - exchange: "exchange0"
        key: "key2"
  exchange2:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key3"
      - exchange: "exchange1"
        key: "key2"
  exchange3:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key4"
queues:
  queue0:
    durable: true
    bindings:
      - exchange: "exchange0"
        key: "key4"
  queue1:
    durable: true
    bindings:
      - exchange: "exchange1"
        key: "key2"
  queue2:
    durable: true
    bindings:
      - exchange: "exchange1"
        key: "#"
  queue3:
    durable: true
    bindings:
      - exchange: "exchange2"
        key: "#"
  queue4:
    durable: true
    bindings:
      - exchange: "exchange3"
        key: "#"
  queue5:
    durable: true
    bindings:
      - exchange: "exchange0"
        key: "#"
`)

func main() {
	// ...

	reader := bytes.NewReader(data)
	scheme, err := DecodeYaml(reader)
	if err != nil {
		t.Errorf("Could not read YAML: %v", err.Error())
		return
	}

	err = c.CreateScheme(scheme)
	if err != nil {
		t.Errorf("Could not create scheme: %v", err.Error())
		return
	}

	//...

	// Be nice and delete scheme (Not advisable in ).
	err = c.DeleteScheme(scheme)
	if err != nil {
		t.Errorf("Could not delete scheme: %v", err.Error())
		return
	}
}
Step 3: Publish and/or consume messages

You can process each queue in separate application or everything together like this:

func main() {
	// ...

	f := func(msg []byte) {
		fmt.Println(string(msg))
	}

	go c.ProcessQueue("queue1", f)
	go c.ProcessQueue("queue2", f)
	go c.ProcessQueue("queue3", f)
	go c.ProcessQueue("queue4", f)
	go c.ProcessQueue("queue5", f)
	go c.ProcessQueue("queue6", f)
	c.SendMessage("exchange0", "key1", "Hello World!")
	c.SendMessage("exchange0", "key2", "Hello!")
	c.SendMessage("exchange0", "key3", "World!")
	c.SendMessage("exchange0", "key4", "Hi!")
	c.SendMessage("exchange0", "key5", "Again!")

	//...
}

Notes

No copyright infringement intended. The name Jazz Jackrabbit and artwork of Jazz Jackrabbit is intelectual property of Epic MegaGames and was taken over from wikipedia

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Binding

type Binding struct {
	Exchange string `yaml:"exchange"`
	Key      string `yaml:"key"`
	Nowait   bool   `yaml:"nowait"`
}

Binding specifies to which exchange should be an exchange or a queue binded

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a struct which holds all necessary data for RabbitMQ connection

func Connect

func Connect(dsn string) (*Connection, error)

Connect connects to RabbitMQ by dsn and return Connection object which uses openned connection during function calls issued later in code

func (*Connection) Close

func (c *Connection) Close() error

Close closes connection to RabbitMQ

func (*Connection) CreateScheme

func (c *Connection) CreateScheme(s Settings) error

CreateScheme creates all exchanges, queues and bindinges between them as specified in yaml string

func (*Connection) DeleteScheme

func (c *Connection) DeleteScheme(s Settings) error

DeleteScheme deletes all queues and exchanges (together with bindings) as specified in yaml string

func (*Connection) ProcessQueue

func (c *Connection) ProcessQueue(name string, f func([]byte)) error

ProcessQueue calls handler function on each message delivered to a queue

func (*Connection) SendBlob

func (c *Connection) SendBlob(ex, key string, msg []byte) error

SendBlob publishes byte blob message to an exchange with specific routing key

func (*Connection) SendMessage

func (c *Connection) SendMessage(ex, key, msg string) error

SendMessage publishes plain text message to an exchange with specific routing key

type Exchange

type Exchange struct {
	Durable    bool      `yaml:"durable"`
	Autodelete bool      `yaml:"autodelete"`
	Internal   bool      `yaml:"internal"`
	Nowait     bool      `yaml:"nowait"`
	Type       string    `yaml:"type"`
	Bindings   []Binding `yaml:"bindings"`
}

Exchange is structure with specification of properties of RabbitMQ exchange

type QueueSpec

type QueueSpec struct {
	Durable    bool      `yaml:"durable"`
	Autodelete bool      `yaml:"autodelete"`
	Nowait     bool      `yaml:"nowait"`
	Exclusive  bool      `yaml:"exclusive"`
	Bindings   []Binding `yaml:"bindings"`
}

QueueSpec is a specification of properties of RabbitMQ queue

type Settings

type Settings struct {
	Exchanges map[string]Exchange  `yaml:"exchanges"`
	Queues    map[string]QueueSpec `yaml:"queues"`
}

Settings is a specification of all queues and exchanges together with all bindings.

func DecodeYaml

func DecodeYaml(r io.Reader) (Settings, error)

DecodeYaml reads yaml with specification of all exchanges and queues from io.Reader

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL