sqlmq

package
v0.0.0-...-d31700d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2022 License: MIT Imports: 13 Imported by: 0

README

sqlmq

A message queue based on "database/sql".DB.

Features

  • 支持多个节点生产消息、消费消息,多个消费节点对所有消息进行负载均衡。
  • 消费失败时支持重试(自定义重试等待时间)或放弃该消息。
  • 保证同一个消息至少被消费一次,但不保证只被消费一次。
  • 保证同一个消息的多次消费在时间上没有重叠。
  • 消费每个消息时新建一个Goroutine,不会等待上一个消息消费完成,才进行下一个消息的消费。因此保证不了先生产先消费。

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetRetryWait

func GetRetryWait(retriedCount uint16) time.Duration
Example
fmt.Println(GetRetryWait(0))
fmt.Println(GetRetryWait(1))
fmt.Println(GetRetryWait(2))
fmt.Println(GetRetryWait(3))
fmt.Println(GetRetryWait(4))
Output:

1s
1m0s
1h0m0s
24h0m0s
24h0m0s

Types

type DBOrTx

type DBOrTx interface {
	QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row
	ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error)
}

type Handler

type Handler func(ctx context.Context, tx *sql.Tx, msg Message) (
	retryAfter time.Duration, canCommit bool, err error,
)

On successful handling, a nil error should be returned, retryAfter and canCommit is ignored. On failing handling, a non nil error should be returned, and retryAfter means: 1. if retryAfter is positive, means try again that time period later; 2. if retryAfter is zero, means try again immediately; 3. if retryAfter is negative, means give up this message, don't try again. canCommit means when an error is returned, can the transaction be committed or must be rollbacked. If canCommit is false, this transaction is rollbacked, and another statements is executed to update retry time.

type Message

type Message interface {
	QueueName() string
	// At which time the message should be consumed(either first time or retry).
	ConsumeAt() time.Time
}

type RetryWait

type RetryWait struct {
	Waits []time.Duration
}

func (RetryWait) Get

func (retry RetryWait) Get(retriedCount uint16) time.Duration

type SqlMQ

type SqlMQ struct {
	DB     *sql.DB
	Table  Table
	Logger *logger.Logger

	// The max number of messages to be consumed concurrently.
	// If ConsumeConcurrency <= 0, the default value 10 is used.
	ConsumeConcurrency int

	// If no message is available for consuming, wait how long before try to fetch message again.
	// If IdleWait <= 0, the default value one minute is used.
	IdleWait time.Duration
	// If encounter an error when fetching message, wait how long before try to fetch message again.
	// If ErrorWait <= 0, the default value one minute is used.
	ErrorWait time.Duration
	// Transaction timeout for message fecthing and handling.
	// If TxTimeout <= 0, the default value one minute is used.
	TxTimeout time.Duration

	// The time interval to clean successfully consumed messages.
	CleanInterval time.Duration
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"os"
	"time"

	"gitee.com/go-better/dev/debug/logger"
	_ "github.com/lib/pq"
)

var testDB = getDB()
var testMQ = recreateSqlMQ()

func main() {
	testMQ.Debug(true)
	if err := testMQ.Register("test", testHandler); err != nil {
		panic(err)
	}

	produce(testMQ, "success")
	produce(testMQ, "retry")
	produce(testMQ, "given up")

	go testMQ.Consume()
	time.Sleep(5 * time.Second)
}

func testHandler(ctx context.Context, tx *sql.Tx, msg Message) (time.Duration, bool, error) {
	m := msg.(*StdMessage)
	var data string
	if err := json.Unmarshal(m.Data.([]byte), &data); err != nil {
		return 0, true, err
	}
	m.Data = data
	fmt.Println(data, m.TriedCount)
	switch m.Data {
	case "success":
		return 0, true, nil
	case "retry":
		switch m.TriedCount {
		case 0:
			return time.Second, false, errors.New("error happened")
		case 1:
			return time.Second, true, errors.New("error happened")
		default:
			return 0, true, nil
		}
	default:
		return -1, false, errors.New("given up")
	}
}

func getDB() *sql.DB {
	db, err := sql.Open("postgres", "postgres://postgres:postgres@localhost/postgres?sslmode=disable")
	if err != nil {
		panic(err)
	}
	return db
}

func recreateSqlMQ() *SqlMQ {
	if _, err := testDB.Exec("DROP TABLE IF EXISTS sqlmq"); err != nil {
		panic(err)
	}
	return getSqlMQ()
}

func getSqlMQ() *SqlMQ {
	logFile, err := os.Create(".log.json")
	if err != nil {
		panic(err)
	}

	return &SqlMQ{
		DB:            testDB,
		Table:         NewStdTable(testDB, "sqlmq", time.Hour),
		Logger:        logger.New(logFile),
		CleanInterval: time.Hour,
	}
}

func produce(mq *SqlMQ, data string) {
	if err := mq.Produce(nil, &StdMessage{Queue: "test", Data: data}); err != nil {
		panic(err)
	}
}
Output:

success 0
retry 0
given up 0
retry 1
retry 2
Example (Consume)
var mq = recreateSqlMQ()
if err := mq.Register("test3", noopHandler); err != nil {
	panic(err)
}
mq.TxTimeout = time.Nanosecond // set smallest time to make it timeout
fmt.Println(mq.consume(2*time.Minute, 3*time.Minute))
mq.TxTimeout = 0

fmt.Println(mq.consume(2*time.Minute, 3*time.Minute))

mq.Produce(nil, &StdMessage{Queue: "test3", RetryAt: time.Now().Add(time.Hour)})
fmt.Println(mq.consume(2*time.Minute, 3*time.Minute))
Output:

3m0s
2m0s
2m0s
Example (Handle)
var mq = getSqlMQ()
tx, cancel, err := mq.beginTx()
if err != nil {
	panic(err)
}
fmt.Println(mq.handle(context.Background(), cancel, tx, &StdMessage{Queue: "test"}))
Output:

1m0s unknown queue: test
Example (MarkFail)
var mq = getSqlMQ()
var buf bytes.Buffer
mq.Logger = logger.New(&buf)
mq.markFail(mq.DB, &StdMessage{}, -1, false)
fmt.Println(bytes.Contains(buf.Bytes(), []byte(`"msg":"affected 0 rows"`)))
Output:

true
Example (Validate)
var mq SqlMQ
fmt.Println(mq.validate())
mq.DB = testMQ.DB
fmt.Println(mq.validate())
mq.Table = testMQ.Table
fmt.Println(mq.validate())
Output:

SqlMQ.DB must not be nil
SqlMQ.Table must not be nil
<nil>

func (*SqlMQ) Consume

func (mq *SqlMQ) Consume()
Example
var mq = getSqlMQ()
go mq.Consume()
time.Sleep(10 * time.Millisecond)
Output:

Example (Panic)
defer func() {
	fmt.Println(strings.HasSuffix(recover().(string), "SqlMQ.DB must not be nil"))
}()
var mq SqlMQ
mq.Consume()
Output:

true

func (*SqlMQ) Debug

func (mq *SqlMQ) Debug(debug bool)

func (*SqlMQ) NotifyConsumeAt

func (mq *SqlMQ) NotifyConsumeAt(at time.Time, event interface{})

notify mq that there are messages to be consumed at a time.

func (*SqlMQ) Produce

func (mq *SqlMQ) Produce(tx *sql.Tx, msg Message) error

Produce a meesage. tx can be nil.

Example
fmt.Println(testMQ.Produce(nil, &StdMessage{Queue: "test2"}))

if err := testMQ.Register("test2", noopHandler); err != nil {
	panic(err)
}

tx, err := testDB.Begin()
if err != nil {
	panic(err)
}
err = testMQ.Produce(tx, &StdMessage{Queue: "test2"})
if err != nil {
	tx.Rollback()
} else {
	err = tx.Commit()
}
fmt.Println(err)

fmt.Println(testMQ.Produce(tx, &StdMessage{Queue: "test2", Data: make(chan int)}))
Output:

unknown queue: test2
<nil>
json: unsupported type: chan int

func (*SqlMQ) Register

func (mq *SqlMQ) Register(queueName string, handler Handler) error
Example
fmt.Println(testMQ.Register("test3", noopHandler))
fmt.Println(testMQ.Register("test3", noopHandler))
Output:

<nil>
queue test3 already registered

type StdMessage

type StdMessage struct {
	Id         int64
	Queue      string      // quene name
	Data       interface{} // data of any type
	Status     string
	CreatedAt  time.Time
	TriedCount uint16    // how many times have tried already.
	RetryAt    time.Time // next retry at when.
}

func (*StdMessage) ConsumeAt

func (msg *StdMessage) ConsumeAt() time.Time

func (*StdMessage) QueueName

func (msg *StdMessage) QueueName() string

type StdTable

type StdTable struct {
	// contains filtered or unexported fields
}

StdTable is a standard `sqlmq.Table` implementation.

func NewStdTable

func NewStdTable(db *sql.DB, name string, keep time.Duration) *StdTable

NewStdTable create a standard `sqlmq.Table` instance. db: db use to create table and index. name: database table name. keep: keep a successfully consumed message for how long before delete it.

Example
table := NewStdTable(testDB, "test_name", -1)
fmt.Println(table.Name(), table.keep)
Output:

test_name 24h0m0s

func (*StdTable) CleanMessages

func (table *StdTable) CleanMessages(db *sql.DB) (int64, error)

func (*StdTable) EarliestMessage

func (table *StdTable) EarliestMessage(tx *sql.Tx) (Message, error)

func (*StdTable) MarkGivenUp

func (table *StdTable) MarkGivenUp(db DBOrTx, message Message) error

func (*StdTable) MarkRetry

func (table *StdTable) MarkRetry(db DBOrTx, message Message, retryAfter time.Duration) error

func (*StdTable) MarkSuccess

func (table *StdTable) MarkSuccess(tx *sql.Tx, message Message) error

func (*StdTable) Name

func (table *StdTable) Name() string

func (*StdTable) ProduceMessage

func (table *StdTable) ProduceMessage(db DBOrTx, message Message) error

if ProduceMessage runs succussfully, message id is set in message(which is *StdMessage).

func (*StdTable) SetQueues

func (table *StdTable) SetQueues(queues []string)

type Table

type Table interface {
	// Set the queues for EarliestMessage. This method must be concurrency safe.
	SetQueues(queues []string)

	// Get the earliest message in the "SetQueues" which have not been "MarkSuccess".
	// The "earliest" means smallest "ConsumeAt".
	// If no such message, return a nil interface.
	// The tx must be used to exclusively lock (SELECT FOR UPDATE) the returned message.
	// Don't commit or rollback the tx.
	EarliestMessage(tx *sql.Tx) (Message, error)

	// Mark a message as consumed successfully.
	// The tx must be used to update the message. Don't commit or rollback the tx.
	MarkSuccess(tx *sql.Tx, msg Message) error

	// mark a message should be retried after a time period
	MarkRetry(db DBOrTx, msg Message, retryAfter time.Duration) error

	// mark a message as given up
	MarkGivenUp(db DBOrTx, msg Message) error

	// produce a message.
	ProduceMessage(db DBOrTx, msg Message) error
	// clean successfully consumed messages, may keep a duration after consumed for debugging.
	// return the number of cleaned messages.
	CleanMessages(db *sql.DB) (int64, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL