mqi

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2020 License: MIT Imports: 6 Imported by: 0

README

MQI

Message Queue Interface(MQI) using AMQP

PkgGoDev Go Report Card GitHub tag (latest by date) GitHub


Getting Started

Prerequisite

You need an active running instance of RabbitMQ server somewhere you can access

Installing

go get it (pun intended 😸)

go get github.com/JuneKimDev/mqi

Usage

package main

import (
  "log"
  "sync"

  "github.com/JuneKimDev/mqi"
  "github.com/streadway/amqp"
)

// Consumer worker function
func mockConsumerFunc(msg amqp.Delivery) error {
  log.Println("Inside of mockConsumerFunc | Test consumer function")
  log.Println(msg.Body)
  err := mqi.Publish("test.exchangeAnother", "test.topic3", amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte("Great job")})
  if err != nil {
    return err
  }
  return nil
}

// RabbitMQ setup
func getMockChannelWithConsumer() mqi.Channel {
  return mqi.GetChannel().
    WithBroadcast(NewBroadcast("test.broadcast").
      AddQueue(NewBroadcastQueue().
        AddTopic(NewTopic("test.bctp")).
        AddConsumer(NewConsumer("test.bccsm").WithFunc(mockFunc)))).
    WithExchange(NewExchange("test.exchange").
      AddQueue(NewQueue("test.q1").
        AddTopic(NewTopic("test.topic1")).
        AddConsumer(NewConsumer("test.consumer1").WithFunc(mockFunc))).
      AddQueue(NewQueue("test.q2").
        AddTopic(NewTopic("test.topic2")).
        AddConsumer(NewConsumer("test.consumer2").WithFunc(mockFunc))))
}

func main() {
  var forever sync.WaitGroup
  forever.Add(1)
  // Connect to RabbitMQ
  getMockChannelWithConsumer().Start()
  log.Println("The app is running in forever loop")
  forever.Wait() // Prevents service from exiting
}

Documentation

Overview

Package mqi is Message Queue Interface using AMQP

Requires active running instance of RabbitMQ server

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Close

func Close()

Close closes connection and channels

func Publish

func Publish(exchangeName string, topic string, msg amqp.Publishing) error

Publish publishes a message with a topic to Exchange

Types

type Channel

type Channel interface {
	URI() string
	Prefetch() int
	Exchange() Exchange
	Broadcast() Exchange
	Conn() *amqp.Connection
	Sub() *amqp.Channel
	Pub() *amqp.Channel
	UpdateChan() chan<- Channel
	KillChan() chan bool
	ErrChan() chan *amqp.Error
	IsStarted() bool
	IsOptionalQueue() bool
	WithURI(uri string) Channel
	WithPrefetch(prefetch int) Channel
	WithExchange(ex Exchange) Channel
	WithBroadcast(ex Exchange) Channel
	WithConn(conn *amqp.Connection) Channel
	WithSub(sub *amqp.Channel) Channel
	WithPub(pub *amqp.Channel) Channel
	WithUpdateChan(c chan<- Channel) Channel
	WithKillChan(c chan bool) Channel
	WithErrChan(c chan *amqp.Error) Channel
	WithStarted(b bool) Channel
	WithOptionalQueue(b bool) Channel
	Start()
	// contains filtered or unexported methods
}

Channel interface

func GetChannel added in v0.2.0

func GetChannel() Channel

GetChannel returns current Channel

func NewChannel

func NewChannel(st Store) Channel

NewChannel constructor

type Consumer

type Consumer interface {
	Name() string
	TypeString() string
	IsAutoAck() bool
	IsExclusive() bool
	IsNoLocal() bool
	IsNoWait() bool
	Func() func(msg amqp.Delivery) error
	WithName(name string) Consumer
	WithTypeString(str string) Consumer
	WithAutoAck(b bool) Consumer
	WithExclusive(b bool) Consumer
	WithNoLocal(b bool) Consumer
	WithNoWait(b bool) Consumer
	WithFunc(fn func(msg amqp.Delivery) error) Consumer
	// contains filtered or unexported methods
}

Consumer interface

func NewConsumer

func NewConsumer(name string) Consumer

NewConsumer constructs a consumer

func NewTempConsumer added in v0.4.0

func NewTempConsumer(name string) Consumer

NewTempConsumer constructs a temporary consumer

type Exchange

type Exchange interface {
	Name() string
	TypeString() string
	IsDurable() bool
	IsAutoDeleteEnabled() bool
	IsExclusive() bool
	IsInternal() bool
	IsNoWait() bool
	Args() amqp.Table
	CountQueues() int
	QueueAt(i int) Queue
	CountAllConsumers() int
	WithName(name string) Exchange
	WithTypeString(str string) Exchange
	WithDurable(b bool) Exchange
	WithAutoDeleteEnabled(b bool) Exchange
	WithExclusive(b bool) Exchange
	WithInternal(b bool) Exchange
	WithNoWait(b bool) Exchange
	WithArgs(t amqp.Table) Exchange
	AddQueue(q Queue) Exchange
	UpdateQueue(q Queue) Exchange
	// contains filtered or unexported methods
}

Exchange interface

func NewBroadcast added in v0.4.0

func NewBroadcast(name string) Exchange

NewBroadcast constructs an exchange for broadcast

func NewExchange

func NewExchange(name string) Exchange

NewExchange constructs an exchange

type Queue

type Queue interface {
	Name() string
	TypeString() string
	IsDurable() bool
	IsAutoDeleteEnabled() bool
	IsExclusive() bool
	IsNoWait() bool
	Args() amqp.Table
	Ref() *amqp.Queue
	CountTopics() int
	TopicAt(i int) Topic
	CountConsumers() int
	ConsumerAt(i int) Consumer
	WithName(name string) Queue

	WithDurable(b bool) Queue
	WithAutoDeleteEnabled(b bool) Queue
	WithExclusive(b bool) Queue
	WithNoWait(b bool) Queue
	WithArgs(t amqp.Table) Queue
	WithRef(ref *amqp.Queue) Queue
	AddTopic(tp Topic) Queue
	AddConsumer(cs Consumer) Queue
	// contains filtered or unexported methods
}

Queue interface

func AddTempQueue added in v0.2.0

func AddTempQueue(q Queue) Queue

AddTempQueue adds a queue which gets auto-deleted after execution of consumer function

func NewBroadcastQueue added in v0.4.0

func NewBroadcastQueue() Queue

NewBroadcastQueue constructs a broadcast queue

func NewQueue

func NewQueue(name string) Queue

NewQueue constructs a queue

func NewTempQueue added in v0.4.0

func NewTempQueue(name string) Queue

NewTempQueue constructs a temporary queue

type Store

type Store interface {
	ReqChan() chan chan Channel
	UpdateChan() chan Channel
	WithReqChan(ch chan chan Channel) Store
	WithUpdateChan(ch chan Channel) Store
	// contains filtered or unexported methods
}

Store interface

type Topic

type Topic interface {
	Name() string
	WithName(name string) Topic
	// contains filtered or unexported methods
}

Topic interface

func NewTopic

func NewTopic(name string) Topic

NewTopic constructor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL