consulmq

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2022 License: MIT Imports: 7 Imported by: 0

README

ConsulMq

Docs -- https://pkg.go.dev/github.com/peterfraedrich/consulmq

Go Go Report Card Coverage Status

ConsulMQ allows you to use Hashicorp Consul as a messaging queue. The idea is that you're already using Consul for configuration, monitoring, key-value DB, service-mesh, and a host of other functions, it would be nice to be able to have a simple message queue also running in Consul and eliminate the need for extra infrastrucutre (like RabbitMQ or Kafka, etc.).

Features

  • Durable, distributed task/message queue
  • ConsulMQ nodes register with Consul, providing real-time visibility into how many nodes are connected
  • Simple, easy-to-use API
  • Based on well-established devops/infrastructure tools

TL;DR

package main

import (
	"fmt"

	"github.com/peterfraedrich/consulmq"
)

func main() {

	mq, err := consulmq.Connect(consulmq.Config{
		Address:    "172.17.0.2:8500",
		Datacenter: "dc1",
		Token:      "",
		MQName:     "cmq",
	})
	if err != nil {
		panic(err)
	}

	i := 0
	for i <= 100 {
		// Put and item on the queue
		qo, err := mq.Push([]byte("Hello, is it me you're looking for?"))
		if err != nil {
			panic(err)
		}
		fmt.Println(qo.ID)
		i++
	}
	fmt.Println("++++++++++++++++++++++++++++++++++++++++++++++++++++++")
	x := 0
	for x <= 100 {
		// Pop an item off the queue
		_, qo, err := mq.Pop()
		if err != nil {
			panic(err)
		}
		fmt.Println(qo.ID)
		x++
	}
}

How it works

ConsulMQ uses Consul's key/value store as a messaging queue. Each queue consists of an index and the queued messages. The _index record is a JSON list holds the order and mapping for all of the messages in the queue. The queued messages are represented by a unique ID and stored under that ID as the message's key. When an operation is requested (Push, Pop, etc.), the index is updated with the changes and the message ID used to locate the appropriate message.

Pro/Con

Pros

  • Using a tool that's already in production eliminates the need for spinning up yet-another-tool
  • Consul is a well-known entity
  • Changes are distributed to all Consul nodes which eliminates a single point of failure

Cons

  • Consul wasn't really designed to do this
  • The use of locks means that only one node can write to an index at a time
  • Will not be as performant as a dedicated message broker or stream platform (AMQP, Kafka, etc.)

Roadmap

  • Enforce TTL's
  • Consul Enterprise Namespace compatibility
  • Logging & Monitoring
  • Additional operations
    • Search
    • PushAtIndex
    • PopFromindex
    • Drain
    • ClearQueue

License: MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Address and port number of the Consul endpoint to connect to
	// EX: 172.16.0.2:8500
	// Default is "localhost:8500"
	Address string `yaml:"address"`
	// Datacenter is a Consul concept that allows for separating assets.
	// Consul and ConsulMQ's default is "dc1"
	Datacenter string `yaml:"datacenter"`
	// Consul ACL Token
	// Default is empty (no token)
	Token string `yaml:"token"`
	// Unqiue name of the message queue.
	// Default is "consulmq"
	MQName string `yaml:"mqname"`
	// A TTL for messages on the queue.
	// Default is 10 years (effectively no TTL).
	// TODO: Enforce TTL's
	TTL time.Duration
}

Config is for passing configuration into the Connect function

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

MQ provides methods for manipulating the message queue

func Connect

func Connect(config Config) (*MQ, error)

Connect sets up the connection to the message queue Connect will generate a unique machine ID that persists across restarts and will use this ID to register as a service with Consul.

func (*MQ) DeleteQueue added in v1.0.5

func (mq *MQ) DeleteQueue() error

DeleteQueue deletes an entire queue, leaving nothing intact. You must call Connect after calling DeleteQueue as the configured queue is no longer available.

func (*MQ) EmptyQueue added in v1.0.5

func (mq *MQ) EmptyQueue() error

EmptyQueue empties the contents of a queue but leaves the queue intact

func (*MQ) Peek added in v1.0.6

func (mq *MQ) Peek() ([]byte, *QueueObject, error)

func (*MQ) Pop added in v1.0.0

func (mq *MQ) Pop() ([]byte, *QueueObject, error)

Pop removes the next object in the queue. Pop returns the message body as bytes and a QueueObject with the object ID, the object's CTime, the TTL deadline, and the message body as bytes.

func (*MQ) PopLast added in v1.0.0

func (mq *MQ) PopLast() ([]byte, *QueueObject, error)

PopLast removes the last (newest) object in the queue. PopLast returns the message body as bytes and a QueueObject with the object ID, the object's CTime, the TTL deadline, and the message body as bytes.

func (*MQ) Push added in v1.0.0

func (mq *MQ) Push(body []byte) (*QueueObject, error)

Push an object to the rear of the queue. Push returns a QueueObject with the object ID, the object's CTime, the TTL deadline, and the body that was passed to the function

func (*MQ) PushFirst added in v1.0.0

func (mq *MQ) PushFirst(body []byte) (*QueueObject, error)

PushFirst pushes a new element to the front of the queue. PushFirst returns a QueueObject with the object ID, the object's CTime, the TTL deadline, and the body that was passed to the function

type QueueObject added in v1.0.0

type QueueObject struct {
	// Unique ID of the object
	ID string
	// Creation time of the object
	CreatedAt time.Time
	// When the object will be deleted
	TTLDeadline time.Time
	// Any tags for the object (TBI)
	// TODO: Implement message tagging
	Tags []string
	// The actual data to be put on the queue
	Body []byte
}

QueueObject is a container around any data in the queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL