mq

package module
v0.0.0-...-54e1ef1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: MIT Imports: 12 Imported by: 0

README

MQ

One Message Queue system based on Postgres table.

For projects with smaller traffic volume, it's usually not cost wise to use kafka, rocketmq(https://rocketmq.apache.org/), SQS or similar mq system. But projects still need Eventual consistency to enable distributed system, and decouple services.

Here's the solution to make use of postgres table.

It makes use of postgres transaction save point to have two save points within message consuming process. Due to the nature of RDS transaction, this system is easier to be implemented with postgres tx. pgx is used inside this repository.

It's up to project to use appropriate postgres library.

Design

This system is based on below table queues:

    CREATE TABLE IF NOT EXISTS queues (
        id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        consumer_name text NOT NULL,
        message jsonb NOT NULL,
        retry int DEFAULT 0 NOT NULL,
        is_dead boolean DEFAULT false NOT NULL,
        failed_reason text,
        check_at timestamp NOT NULL,

        created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
    );
	comment on column queues.consumer_name is 'which consumer should be used to consume this message';
    comment on column queues.message is 'one json string containing message detail, like {order_id: 12}';
    comment on column queues.retry is 'the number of times this message has been consumed';
    comment on column queues.is_dead is 'when this message is dead, it means this message has reached max retry times, yet still failed to be consumed';
    comment on column queues.failed_reason is 'log the failed message when consumer fails to consume this message';
    comment on column queues.check_at is 'when cron system should check this message and consume it';

Table queues will hold messages to be consumed. There will be a cron running periodically to scan this table and consume messages with related consumer.

This queue supports retry, delay, dead queue. If required, a queue dashboard can be created to demonstrate current list queue messages for system administrator to review queue health.

Scenario example

In commerce app, user puts one order but not paid yet. There are two things to be done after order is created:

  • Order service. Cancel this order if order is not paid for 5 minutes or other time duration.
  • Notify service. Send one email notification out for this order.

When this event (one new order is created) happens, we will insert two messages into table queues. Each message will have its own consumer.

More details can be found at dir example. This example needs docker env to enable postgres db.

This example uses echo to create one demo rest API server.

Steps to see real effect

  • cd example
  • make db-start
  • make app-start
  • run curl localhost:1325/api/order/product -X POST -d '{product: "closing"}'

If all goes well, golang echo server console will output something like this

2023/12/19 15:41:34 notification consumer is processed successfully!
2023/12/19 15:41:34 order consumer is processed successfully!

Both postgres and go code use UTC timezone. If your local Go env's timezone is UTC > UTC+0 such as UTC+8, you may observe above result. Otherwise it depends on your timezone.

Development

  • How to produce event order_created messages

see example/service.order/service.go

	msg := mq.NewOrderMessage(mq.EventOrderCreated).
		WithOrderID(orderID).
		Encode(ctx)
	if err := s.mq.SendMessage(ctx, tx, msg); err != nil {
		return fmt.Errorf("error sending mq message: %w", err)
	}
  • How to register the consumer to subscribe to event messages

see example/service.order/consumer.go see example/service.notify/consumer.go

  • When cron will run to enable consumer

see example/cmd/api/main.go

	// start mq consumer
	go func() {
		consume := mq.NeWConsume(pool, logger)
		consume.Consume()
	}()

Increase consume speed

If current consume speed is not satisfying, try alternative approach to rewrite consume.go Consume() to increase performance. Remember using query for update skip locked, so multiple goroutines or multiple pods (from kurbernetes) may consume messages concurrently.

func cron() {
	ticker := time.NewTicker(2 * time.Second)
	for {
		select {
		case <-ticker.C:
			go process()
		}
	}
}

func process() {
    const BatchProcessSize = 5
	capCh := make(chan struct{}, BatchProcessSize)

    // retrive limited number of messages to be consumed from table `queues`
    // query = select id from queues where is_dead = false and check_at < $1 limit 500
    // get queue ids to be consumed
    ids := []int64{}

	for _, id := range ids {
		capCh <- struct{}{}
		go func(queueID int64) {
			defer func() {
				<-capCh
			}()

            // get queue with lock from queue id, but skip locked
			c.consumeSingleMessage(queueID)
		}(id)
	}
}

Test

While this design has been used in a few production env products, this repo is primarily for demo purpose. No tests are created. If you feel like this design works for your project too, make sure tests are created for your projects.

Documentation

Index

Constants

View Source
const (
	ConsumerNotFound = "consumer not found"
	MaxRetry         = 5
)
View Source
const (
	RequestID string = "request_id"
)

message keys

Variables

View Source
var (
	// retry message consume
	RetryDelay = map[int]time.Duration{
		0: 2 * time.Second,
		1: 10 * time.Second,
		2: time.Minute,
		3: 30 * time.Minute,
		4: time.Hour,
	}
)

Functions

func NewContext

func NewContext(ctx context.Context, msg Message) context.Context

func RegisterConsumer

func RegisterConsumer(consumer Consumer)

Types

type Consume

type Consume interface {
	Consume()
}

func NeWConsume

func NeWConsume(pool *pgxpool.Pool, logger Logger) Consume

type Consumer

type Consumer interface {
	// consumer name. one recommended format `{service_name}:{internal_name}:{event_name}`
	Name() string

	// which event this consumer subscribes to
	Event() Event

	// consumer can decide whether it wants to consume its message in delayed time
	Delay() time.Duration

	// consume one message
	Consume(ctx context.Context, tx pgx.Tx, msg *MQMessage) error
}

type Event

type Event string

event

const (
	// event when order is created
	EventOrderCreated Event = "order_created"
)

func (Event) String

func (e Event) String() string

type Logger

type Logger interface {
	Errorf(format string, args ...interface{})
}

type MQMessage

type MQMessage struct {
	// event
	MQEvent Event `json:"event"`

	// request id
	MQRequestID string `json:"request_id,omitempty"`

	// order id, optional
	MQOrderID int64 `json:"order_id,omitempty"`
}

MQMessage holds message data, which will be saved at db table

func (*MQMessage) Encode

func (m *MQMessage) Encode(ctx context.Context) Message

func (*MQMessage) Event

func (m *MQMessage) Event() Event

func (*MQMessage) OrderID

func (m *MQMessage) OrderID() int64

func (*MQMessage) OrderMessage

func (m *MQMessage) OrderMessage() OrderMessage

convert struct to message interface to be used by consumer

func (*MQMessage) RequestID

func (m *MQMessage) RequestID() string

func (*MQMessage) Scan

func (m *MQMessage) Scan(value interface{}) error

func (MQMessage) Value

func (m MQMessage) Value() (driver.Value, error)

func (*MQMessage) WithOrderID

func (m *MQMessage) WithOrderID(orderID int64) OrderMessage

func (*MQMessage) WithRequestID

func (m *MQMessage) WithRequestID(requestID string) OrderMessage

type Message

type Message interface {
	// encode the message to be saved into postgres table
	Encode(ctx context.Context) Message

	// get event
	Event() Event

	// get request id
	RequestID() string
}

RocketMessage reprents one message used to encode and decode custom information.

type OrderMessage

type OrderMessage interface {
	Message

	// set order id
	WithOrderID(orderID int64) OrderMessage

	// get order id
	OrderID() int64
}

Order Message

func NewOrderMessage

func NewOrderMessage(e Event) OrderMessage

create one new order message

type Provider

type Provider interface {
	// send message with pgx tx
	SendMessage(ctx context.Context, tx pgx.Tx, message Message) error
}

func NewProvider

func NewProvider(pool *pgxpool.Pool) Provider

type Queue

type Queue struct {
	ID int64
	// consumer name
	ConsumerName string
	Message      MQMessage
	Retry        int
	IsDead       bool
	FailedReason *string
	CheckAt      time.Time
	CreatedAT    time.Time
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL