squeuelite

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2023 License: MIT Imports: 7 Imported by: 0

README

squeuelite - a persistent queue based on SQLite written in Go

Go

description

SQueueLite is a persistant queue based on SQLite. It uses a reliable persistent file-based battle-tested solution known as SQLite.

usage

subscriber mode
package squeuelite

//main routine
q, err := squeuelite.NewPQueue("test.db", 10)
if err != nil {
    log.Fatal(err)
}
defer q.Close()

err = q.Subscribe(func(m *PMessage) error {
        log.Printf("message received:%v",m.Data)
        errint := processMsg(m.Data) //your process function
        if errint != nil{
            //message will be marked as failed and not again delivered
            return err
        }
        //message will be deleted from persistent storage
		return nil
	})
if err != nil {
    log.Fatal(err)
}

//put routine
payload := []byte("Payload")
err = q.Put(payload)
if err != nil {
    log.Fatal(err)
}
manual mode
package squeuelite

//main routine
q, err := squeuelite.NewPQueue("test.db", 10)
if err != nil {
    log.Fatal(err)
}
defer q.Close()

payload := []byte("Payload")
err = q.Put(payload)
if err != nil {
    log.Fatal(err)
}

m, err := q.Peek()
if err != nil {
    log.Fatal(err)
}
log.Printf("message received:%v",m.Data)
err = processMsg(m.Data) //your process function
if err != nil{
    errint := q.MarkFailed(m.MessageID)
    if errint != nil{
        //handle db errors setting msg to fail
    }
} else {
    errint := q.Done(m.MessageID)
    if errint != nil{
        //handle db errors deleting completed msg
    }
}

contributors

Inital code was taken from https://github.com/litements/litequeue, which offers a persistent queue using SQLite in Python, translated to Go and adapted accordingly.

Documentation

Index

Constants

View Source
const (
	READY int64 = iota
	LOCKED
	FAILED
)

Variables

This section is empty.

Functions

This section is empty.

Types

type PMessage added in v0.2.0

type PMessage struct {
	MessageID int64
	Status    int64
	InTime    int64
	LockTime  int64
	DoneTime  int64
	Data      []byte
}

type PQueue added in v0.2.0

type PQueue struct {
	// contains filtered or unexported fields
}

func NewPQueue added in v0.2.0

func NewPQueue(connStr string, maxsize int64) (*PQueue, error)

PQueue is a persistent queue that can be used in two different modes.

It can be used manually with Peek, Get, Done, MarkFailed, Retry functions.

Alternatively it can be used with a subscription function to get automatically notified after a message is added to the queue.

func (*PQueue) Close added in v0.2.0

func (lq *PQueue) Close() error

Close closes internal channels and the database connection itself

reuse requires to create a new PQueue

func (*PQueue) Done added in v0.2.0

func (lq *PQueue) Done(messageID int64) error

Done deletes physically the message from the queue, it returns an sql.ErrorNoRows if the element could not be find.

func (*PQueue) Get added in v0.2.0

func (lq *PQueue) Get(messageID int64) (*PMessage, error)

Get returns the element with the given message ID, it returns an sql.ErrorNoRows if the element could not be find.

func (*PQueue) IsEmpty added in v0.2.0

func (lq *PQueue) IsEmpty() (bool, error)

IsEmpty returns true if the queue has 0 READY messages.

func (*PQueue) IsFull added in v0.2.0

func (lq *PQueue) IsFull() (bool, error)

IsFull returns true if the queues size is equal to max size.

func (*PQueue) MarkFailed added in v0.2.0

func (lq *PQueue) MarkFailed(messageID int64) error

MarkFailed sets the message for given ID to FAILED status, which causes the message to stay in the databse, but will not be delivered again until it is actively set to retry, it returns an sql.ErrorNoRows if the element could not be find.

func (*PQueue) Peek added in v0.2.0

func (lq *PQueue) Peek() (*PMessage, error)

Peek returnes the oldest element in the list which is in ready state, it returns sql.ErrorNoRows if no element is in ready state.

func (*PQueue) Put added in v0.2.0

func (lq *PQueue) Put(data []byte) error

Put adds a new message to the end of the queue, put will not block.

Put will return an error if the queue is full

func (*PQueue) Retry added in v0.2.0

func (lq *PQueue) Retry(messageID int64) error

Retry sets the message for given ID to READY status, which causes the message to be delivered again until it is actively set to retry, it returns an sql.ErrorNoRows if the element could not be find.

func (*PQueue) RetryAll added in v0.2.0

func (lq *PQueue) RetryAll() error

RetryAll sets all messages from FAILED to READY state.

func (*PQueue) Size added in v0.2.0

func (lq *PQueue) Size() (int64, error)

Size returns the number of elements in the persistent queue where the state is READY.

func (*PQueue) Subscribe added in v0.2.0

func (lq *PQueue) Subscribe(cb func(*PMessage) error) error

Subscribe function registers a callback function to be called after the db is initially read or a new message is put into the queue.

Only one callback function can be registered, if a second function is registered, an error is returned.

type SubscribeQueue added in v0.2.0

type SubscribeQueue interface {
	Put([]byte) error
	Subscribe(func(msg *PMessage) error) error
	Close() error
}

SubscribeQueue is a small interface works with callback and is easer for me to comprehend modelled after nats

type WorkQueue added in v0.2.0

type WorkQueue interface {
	Put([]byte) error
	Get(int64) (*PMessage, error)
	Peek() (*PMessage, error)
	Done(int64) error
	MarkFailed(int64) error
	IsEmpty() (bool, error)
	Size() (int64, error)
	IsFull() (bool, error)
	Retry(int64) error
	RetryAll() error
	Close() error
}

WorkQueue is not a "small" interface so its not very go like secondly we have to peek and then acknowledge

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL