queue

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 2 Imported by: 0

README

queue

  • 使用PostgreSQL LISTEN/NOTIFY 将这两者结合起来, 实现的轻量级消息队列。
  • 通过PostgreSQL创建通用的触发器函数来为任何表更改生成JSON通知。

示例

  • 首先在数据库中执行以下SQL
-- 创建函数
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$

    DECLARE 
        data json;
        notification json;
    
    BEGIN
    
        -- 根据操作类型将新行或新行转换为JSON.
        -- Action = DELETE?             -> OLD row
        -- Action = INSERT or UPDATE?   -> NEW row
        IF (TG_OP = 'DELETE') THEN
            data = row_to_json(OLD);
        ELSE
            data = row_to_json(NEW);
        END IF;
        
        -- 将通知构造为JSON字符串.
        notification = json_build_object(
                          'table',TG_TABLE_NAME,
                          'action', TG_OP,
                          'data', data);
        
                        
        -- 执行 pg_notify(channel, notification)
        PERFORM pg_notify('events',notification::text);
        
        -- 结果被忽略, 因为这是一个AFTER触发器.
        RETURN NULL;
    END;
    
$$ LANGUAGE plpgsql;

-- 创建队列表
CREATE TABLE IF NOT EXISTS queues (
	id SERIAL,
	event_id TEXT
);

-- 创建触发器
DO $$
BEGIN
    IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'queues_notify_event') THEN
        CREATE TRIGGER queues_notify_event  
        AFTER INSERT ON queues
        FOR EACH ROW EXECUTE PROCEDURE notify_event();
    END IF;
END
$$;
  • 监听程序
package main

import (
	"database/sql"
	"fmt"

	"github.com/giwealth/utils/queue"
)

func main() {
    // 数据库连接字符串
	var dsn string = "dbname=test_db user=postgres password=postgres host=127.0.0.1 sslmode=disable"

	db, err := sql.Open("postgres", dsn)
	if err != nil {
		panic(err)
	}

	_, err = db.Exec(initSQL)
	if err != nil {
		panic(err)
	}

	queue, err := queue.NewQueue(dsn, "events")
	if err != nil {
		panic(err)
	}

	notice := queue.WaitForNotification()
	for {
		msg := <-notice
		fmt.Println(msg)
		// 返回结果: {"table" : "queues", "action" : "INSERT", "data" : {"id":5,"event_id":"55"}}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue 队列

func NewQueue

func NewQueue(dsn, channel string) (*Queue, error)

NewQueue 构造函数 channel: 创建postgresql函数时的通道名称 dsn: postgresql连接字符串, 例: dbname=dingtalk_server user=postgres password=postgres host=127.0.0.1 sslmode=disable

func (*Queue) WaitForNotification

func (q *Queue) WaitForNotification() <-chan string

WaitForNotification 等待通知

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL