pglistener

package
v0.0.0-...-d31700d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

适用于低频修改且量小的数据,单线程操作。

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler interface {
	Init(table string)
	Create(table string, content []byte)
	Update(table string, oldContent, newContent []byte)
	Delete(table string, content []byte)
	ConnLoss(table string)
}

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listen for INSERT/UPDATE/DELETE events of postgresql's table, and pass the events to defined handlers.

func New

func New(dbAddr string, db *sql.DB, logger Logger) (*Listener, error)

func (*Listener) DB

func (l *Listener) DB() *sql.DB

func (*Listener) GetChannel

func (l *Listener) GetChannel(table string) string

func (*Listener) GetTable

func (l *Listener) GetTable(channel string) string

func (*Listener) Listen

func (l *Listener) Listen(table string, columns, checkColumns string, handler Handler) error

Listen a table and notify the handler with "columns" when a row is created or updated or deleted. When a row is updated, the handler is notified only if some "columns" or "checkColumns" has changed.

Example
package main

import (
	"database/sql"
	"fmt"
	"os"
	"time"

	"gitee.com/go-better/dev/db/pgcache/pglistener"
	"gitee.com/go-better/dev/debug/errs"
	loggerPkg "gitee.com/go-better/dev/debug/logger"
)

var dbUrl = "postgres://postgres:postgres@localhost/postgres?sslmode=disable"
var testDB = connectDB(dbUrl)
var logger = loggerPkg.New(os.Stderr)

type testHandler struct {
}

func (h testHandler) Init(table string) {
	fmt.Printf("Init %s\n", table)
}

func (h testHandler) Create(table string, newBuf []byte) {
	fmt.Printf("Create %s\n  %s\n", table, newBuf)
}

func (h testHandler) Update(table string, oldBuf, newBuf []byte) {
	fmt.Printf("Update %s\n  old: %s\n  new: %s\n", table, oldBuf, newBuf)
}

func (h testHandler) Delete(table string, oldBuf []byte) {
	fmt.Printf("Delete %s\n  %s\n", table, oldBuf)
}

func (h testHandler) ConnLoss(table string) {
	fmt.Printf("ConnLoss %s\n", table)
}

func main() {
	testCreateUpdateDelete("students2")
	testCreateUpdateDelete("public.students2")

}

func testCreateUpdateDelete(table string) {
	createStudentsTable()

	listener, err := pglistener.New(dbUrl, nil, logger)
	if err != nil {
		fmt.Println(errs.WithStack(err))
		return
	}
	if err := listener.Listen(
		table,
		"$1.id, $1.name, to_char($1.time, 'YYYY-MM-DD') as time", "",
		testHandler{},
	); err != nil {
		panic(errs.WithStack(err))
	}

	// from now on, testHandler will be notified on INSERT / UPDATE / DELETE.
	if _, err := testDB.Exec(`
    INSERT INTO students2(name, time) VALUES ('李雷', '2018-09-08 15:55:00+08')
  `); err != nil {
		panic(err)
	}
	if _, err = testDB.Exec(`
    UPDATE students2 SET name = '韩梅梅', time = '2018-09-09 15:56:00+08'
  `); err != nil {
		panic(err)
	}
	// this one should not be notified
	if _, err = testDB.Exec(`
    UPDATE students2 SET time = '2018-09-09 15:57:00+08'
  `); err != nil {
		panic(err)
	}
	if _, err = testDB.Exec(`DELETE FROM students2`); err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Millisecond)
	if err := listener.Unlisten(table); err != nil {
		panic(err)
	}
}

func createStudentsTable() {
	if _, err := testDB.Exec(`
	DROP TABLE IF EXISTS students2;
	CREATE TABLE IF NOT EXISTS students2 (
		id   bigserial,
		name varchar(100),
		time timestamptz,
    other text default ''
	)`); err != nil {
		panic(err)
	}
}

func connectDB(dbUrl string) *sql.DB {
	db, err := sql.Open(`postgres`, dbUrl)
	if err != nil {
		panic(err)
	}
	return db
}
Output:

Init public.students2
Create public.students2
  {"id": 1, "name": "李雷", "time": "2018-09-08"}
Update public.students2
  old: {"id": 1, "name": "李雷", "time": "2018-09-08"}
  new: {"id": 1, "name": "韩梅梅", "time": "2018-09-09"}
Delete public.students2
  {"id": 1, "name": "韩梅梅", "time": "2018-09-09"}
Init public.students2
Create public.students2
  {"id": 1, "name": "李雷", "time": "2018-09-08"}
Update public.students2
  old: {"id": 1, "name": "李雷", "time": "2018-09-08"}
  new: {"id": 1, "name": "韩梅梅", "time": "2018-09-09"}
Delete public.students2
  {"id": 1, "name": "韩梅梅", "time": "2018-09-09"}

func (*Listener) Unlisten

func (l *Listener) Unlisten(table string) error

func (*Listener) UnlistenAll

func (l *Listener) UnlistenAll() error

type Logger

type Logger interface {
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL