rsmq

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: MIT Imports: 7 Imported by: 0

README

go-rsmq

CI workflow Codecov Go Reference

A lightweight message queue for Go that requires no dedicated queue server. Just a Redis server.

Go implementation of https://github.com/smrchy/rsmq.

$ go get github.com/semihbkgr/go-rsmq

Redis Simple Message Queue

If you run a Redis server and currently use Amazon SQS or a similar message queue you might as well use this fast little replacement. Using a shared Redis server multiple Go processes can send / receive messages.

Example


opts := &redis.Options{Addr: "localhost:6379"}
redisClient := redis.NewClient(opts)

rsmqClient := rsmq.NewRedisSMQ(redisClient, "rsmq")
defer rsmqClient.Quit()

err := rsmqClient.CreateQueue("queue", rsmq.UnsetVt, rsmq.UnsetDelay, rsmq.UnsetMaxsize)
if err != nil {
    fmt.Println(err.Error())
}

id, err := rsmqClient.SendMessage("queue", "message", rsmq.UnsetVt)
if err != nil {
    panic(err)
}
fmt.Printf("message sent, id: %s\n", id)

msg, err := rsmqClient.PopMessage("queue")
if err != nil {
    panic(err)
}
if msg == nil {
    fmt.Println("queue is empty")
} else {
    fmt.Printf("message received, id: %s, message: %s", msg.ID, msg.Message)
}

Producer/Consumer example

Implementation Notes

All details about the queue implementation are in here.

go-rsmq follows all the naming conventions of javascript implementation.

Documentation

Index

Constants

View Source
const (
	UnsetVt      = ^uint(0)
	UnsetDelay   = ^uint(0)
	UnsetMaxsize = -(int(^uint(0)>>1) - 1)
)

Unset values are the special values to refer default values of the attributes

Variables

View Source
var (
	ErrQueueNotFound   = errors.New("queue not found")
	ErrQueueExists     = errors.New("queue exists")
	ErrMessageTooLong  = errors.New("message too long")
	ErrMessageNotFound = errors.New("message not found")
)

Errors returned on rsmq operation

View Source
var (
	ErrInvalidQname   = errors.New("queue name is in wrong pattern")
	ErrInvalidVt      = errors.New("visibility timeout is out of range [0, 9999999]")
	ErrInvalidDelay   = errors.New("delay is out of range [0, 9999999]")
	ErrInvalidMaxsize = errors.New("max size is out of range [1024, 65536] and not -1")
	ErrInvalidID      = errors.New("id is in wrong pattern")
)

Validation errors

Functions

This section is empty.

Types

type QueueAttributes

type QueueAttributes struct {
	Vt         uint
	Delay      uint
	Maxsize    int
	TotalRecv  uint64
	TotalSent  uint64
	Created    uint64
	Modified   uint64
	Msgs       uint64
	HiddenMsgs uint64
}

QueueAttributes contains some attributes and stats of queue

type QueueMessage

type QueueMessage struct {
	ID      string
	Message string
	Rc      uint64
	Fr      time.Time
	Sent    time.Time
}

QueueMessage contains content and metadata of message received from queue

type RedisSMQ

type RedisSMQ struct {
	// contains filtered or unexported fields
}

RedisSMQ is the client of rsmq to execute queue and message operations

func NewRedisSMQ

func NewRedisSMQ(client *redis.Client, ns string) *RedisSMQ

NewRedisSMQ creates and returns new rsmq client

func (*RedisSMQ) ChangeMessageVisibility

func (rsmq *RedisSMQ) ChangeMessageVisibility(qname string, id string, vt uint) error

ChangeMessageVisibility changes message visibility to refer queue vt

err:=redisRsmq.ChangeMessageVisibility(qname,id,rsmq.UnsetVt)

func (*RedisSMQ) CreateQueue

func (rsmq *RedisSMQ) CreateQueue(qname string, vt uint, delay uint, maxsize int) error

CreateQueue creates a new queue with given attributes to create new queue with default attributes:

err:=redisRsmq.CreateQueue(qname,rsmq.UnsetVt,rsmq.UnsetDelay,rsmq.UnsetMaxsize)

func (*RedisSMQ) DeleteMessage

func (rsmq *RedisSMQ) DeleteMessage(qname string, id string) error

DeleteMessage deletes message in queue

func (*RedisSMQ) DeleteQueue

func (rsmq *RedisSMQ) DeleteQueue(qname string) error

DeleteQueue deletes queue

func (*RedisSMQ) GetQueueAttributes

func (rsmq *RedisSMQ) GetQueueAttributes(qname string) (*QueueAttributes, error)

GetQueueAttributes returns queue attributes

func (*RedisSMQ) ListQueues

func (rsmq *RedisSMQ) ListQueues() ([]string, error)

ListQueues returns the slice consist of the existing queues

func (*RedisSMQ) PopMessage

func (rsmq *RedisSMQ) PopMessage(qname string) (*QueueMessage, error)

PopMessage pop message from queue

func (*RedisSMQ) Quit

func (rsmq *RedisSMQ) Quit() error

Quit closes redis client

func (*RedisSMQ) ReceiveMessage

func (rsmq *RedisSMQ) ReceiveMessage(qname string, vt uint) (*QueueMessage, error)

ReceiveMessage receives message from the queue

func (*RedisSMQ) SendMessage

func (rsmq *RedisSMQ) SendMessage(qname string, message string, delay uint) (string, error)

SendMessage sends message to the queue to refer queue delay:

id,err:=redisRsmq.SendMessage(qname,message,rsmq.UnsetDelay)

func (*RedisSMQ) SetQueueAttributes

func (rsmq *RedisSMQ) SetQueueAttributes(qname string, vt uint, delay uint, maxsize int) (*QueueAttributes, error)

SetQueueAttributes sets queue attributes to not change some attributes:

queAttrib,err:=redisRsmq.CreateQueue(qname,rsmq.UnsetVt,rsmq.UnsetDelay,newMaxsize)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL