nmq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2021 License: MIT Imports: 8 Imported by: 2

README

nmq

A lightweight, distributed and reliable message queue based on Redis

Get Started

Download
go get github.com/inuggets/nmq
Usage
import github.com/inuggets/nmq

// Broker
broker := nmq.NewRedisBroker(&nmq.RedisOptions{
    URL:      "localhost:6379",
    Password: "",
})

// Producer
producer := nmq.NewProducer(broker)

// Produce
err := producer.Produce("topic", "message")
if err != nil {
    log.Fatal(err)
}

// Consumer
consumer := nmq.NewConsumer(broker, "topic")
for {
    message, err := consumer.Consume()
    if err != nil {
        log.Fatal(err)
    }
    
    if m == nil {
	     time.Sleep(3 * time.Second) // If no message, waiting for a while
	     continue
	 } else {
	     // Do something
	     ...
	     
	     // Ack (will remove message from queue permanently)
	     err := consumer.Ack(m)
	     if err != nil {
	         log.Default().Println(err)
	     }
	     
	     // Or back to queue (will be consumed again)
	     err := consumer.Back(m)
	     if err != nil {
	         log.Default().Println(err)
	     }
	 }
}

License

nmq is under the MIT license. See the LICENSE for detail.

Documentation

Index

Constants

View Source
const (
	Prefix            = "nmq:"
	ReadyQueuePostfix = "ready"
	UnackQueuePostfix = "unack"
	HeartbeatPostfix  = "heartbeat"
)

Variables

This section is empty.

Functions

func ID

func ID() string

func NewRedisPool

func NewRedisPool(url, password string) *redis.Pool

func RedisScan

func RedisScan(conn redis.Conn, matched string) []string

Types

type Broker

type Broker interface {
	GetBindConsumerId() string
	Bind(consumerId string)
	Close()
	Stats() (*Stats, error)
	Push(topic, content string) (*Message, error)
	Pop(topic string) (*Message, error)
	Ack(message *Message) error
	Back(message *Message) error
	Ready(topic string, limit, offset int64) (*Result, error)
	Unack(topic string, limit, offset int64) (*Result, error)
	BackFailed(topic string) []*Message
}

func NewRedisBroker

func NewRedisBroker(opt *RedisOptions) Broker

func NewRedisBrokerWithPool

func NewRedisBrokerWithPool(pool *redis.Pool) Broker

type Consumer

type Consumer struct {
	Broker Broker
	Topic  string
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(broker Broker, topic string) *Consumer

func (*Consumer) Ack

func (p *Consumer) Ack(m *Message) error

func (*Consumer) Back

func (p *Consumer) Back(m *Message) error

func (*Consumer) Consume

func (p *Consumer) Consume() (*Message, error)

type Message

type Message struct {
	ID      string `json:"id"`
	Content string `json:"content"`
	Topic   string `json:"-"`
}

type Producer

type Producer struct {
	Broker Broker
}

func NewProducer

func NewProducer(broker Broker) *Producer

func (*Producer) Produce

func (p *Producer) Produce(topic, message string) error

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

func (*RedisBroker) Ack

func (b *RedisBroker) Ack(message *Message) error

func (*RedisBroker) Back

func (b *RedisBroker) Back(message *Message) error

func (*RedisBroker) BackFailed

func (b *RedisBroker) BackFailed(topic string) []*Message

func (*RedisBroker) Bind

func (b *RedisBroker) Bind(consumerId string)

func (*RedisBroker) Close

func (b *RedisBroker) Close()

func (*RedisBroker) GetBindConsumerId

func (b *RedisBroker) GetBindConsumerId() string

func (*RedisBroker) Pop

func (b *RedisBroker) Pop(topic string) (*Message, error)

func (*RedisBroker) Push

func (b *RedisBroker) Push(topic, content string) (*Message, error)

func (*RedisBroker) Ready

func (b *RedisBroker) Ready(topic string, limit, offset int64) (*Result, error)

func (*RedisBroker) Stats

func (b *RedisBroker) Stats() (*Stats, error)

func (*RedisBroker) Unack

func (b *RedisBroker) Unack(topic string, limit, offset int64) (*Result, error)

type RedisOptions

type RedisOptions struct {
	URL      string
	Password string
}

type Result

type Result struct {
	Total    int64      `json:"total"`
	Messages []*Message `json:"message"`
}

type Stats

type Stats struct {
	Ready      map[string]int64            `json:"ready"`
	Unack      map[string]map[string]int64 `json:"unack"`
	BackFailed map[string]int64            `json:"backfailed"`
	Consumers  []string                    `json:"consumers"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL