pqueue

package module
v0.0.0-...-0228d8e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2021 License: MIT Imports: 14 Imported by: 0

README

Persistent queue

Persistent queue based on bbolt DB.

Supposed to be used as embeddable persistent queue to any Go application.

Features:

  • messages are not limited by RAM size, stream-oriented data
  • works good for small (automatic inlining) and for big streams
  • simple, portable storage structure
  • go-routine safe
  • supports multiple-writers and multiple-readers
  • supports ack/nack (commit with discard)

See go-docs for examples and details.

Requirements:

  • go 1.17

Motivation

I wanted to create an application for resource-constrained devices (ie: AWS Lightsail, Raspberry Pi Zero W, etc..) which should store (always) and forward (eventually) information with very unreliable network connection (ie: days without a link). Information could be small (sensors) or huge (webhooks). Devices themselves may experience a power outage.

It means:

  • stored information (number and individual records) may grow much above RAM
  • stored information should not be marked as processed before explicit commit
  • full-packed solutions like RabbitMQ/Kafka/etc can not be applied
  • due to distributed nature of the system duplicates inevitable, however, should be a cheap way to deduplicate (ie: unique ID)

Documentation

Index

Examples

Constants

View Source
const (
	DefaultBucket     = "messages"
	DefaultStorageDir = "queue-data"
	DefaultInlineSize = 8192
)

Variables

View Source
var (
	ErrEmpty = errors.New("queue is empty")
)

Functions

This section is empty.

Types

type ClosableQueue

type ClosableQueue struct {
	*Queue
}

func Default

func Default(dir string) (*ClosableQueue, error)

Default is alias to Open with all defaults. dir/data as storage dir and dir/index.db as metadata storage. Queue must be closed to avoid resource leak.

Example
package main

import (
	"bytes"
	"context"
	"fmt"
	"os"

	"github.com/reddec/pqueue"
)

func main() {
	_ = os.RemoveAll("./data") // remove old queue
	// error handling omitted for convenience
	q, _ := pqueue.Default("./data")
	id, _ := q.Put(bytes.NewBufferString("hello world"), nil)
	fmt.Println("id:", id)

	msg, _ := q.Get(context.TODO())
	defer msg.Commit(true)

	fmt.Println("got id:", msg.ID())
	data, _ := msg.Bytes() // this will read all payload into memory; for streaming use Open()
	fmt.Println(string(data))
}
Output:

id: 1
got id: 1
hello world

func Open

func Open(indexFile string, config Config) (*ClosableQueue, error)

Open queue and allocate resources. Queue must be closed to avoid resource leak.

func (*ClosableQueue) Close

func (cq *ClosableQueue) Close() error

Close internal database.

type Config

type Config struct {
	StorageDir string // directory to store files, default is DefaultStorageDir
	Bucket     string // bucket name in bbolt db, default is DefaultBucket
	InlineSize int    // payload bigger then the size will be stored outside of queue, default is DefaultInlineSize
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

func (*Message) Attempt

func (m *Message) Attempt() int64

Attempt number started from 1. Increased every commit with discard=false.

func (*Message) Bytes

func (m *Message) Bytes() ([]byte, error)

Bytes of payload. Danger for big payload!

func (*Message) Commit

func (m *Message) Commit(discard bool) error

Commit message from the queue.

If discard flag set, message will be completely removed from the queue, otherwise message will be released and available for next Get operation.

func (*Message) Get

func (m *Message) Get(propertyName string) string

Get property and interpret it as string. Sugar for Properties().

func (*Message) ID

func (m *Message) ID() uint64

ID of message. Unique within queue.

func (*Message) Open

func (m *Message) Open() (io.ReadSeekCloser, error)

Open stream of payload. Can be invoked several times to get multiple parallel streams. All streams must be closed individually.

func (*Message) Properties

func (m *Message) Properties() map[string][]byte

Properties as user provided for Put. Any modifications will not be visible for next Get/Try operation (ie: not saved).

func (*Message) Size

func (m *Message) Size() int64

Size of content in bytes.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is designed to be universal and process messages with payload bigger than RAM by storing data as separated file and keeping in queue only reference to that file. Small messages will be inlined in the queue.

Important: to re-use queue it's required to define same storage location as before, otherwise links to payload files will be broken.

func New

func New(db *bbolt.DB, config Config) *Queue

New queue using pre-allocated BBoltDB.

func (*Queue) Clear

func (bq *Queue) Clear() error

Clear queue items and linked files. This may take a time in case of big queue. Running clear with opened linked files may cause platform-depended behaviour.

func (*Queue) Get

func (bq *Queue) Get(ctx context.Context) (message *Message, err error)

Get message from the queue or block till message will be available or context canceled. Returned message MUST be committed.

func (*Queue) Put

func (bq *Queue) Put(data io.Reader, properties map[string][]byte) (uint64, error)

Put item to queue. If data stream is smaller or equal to inline size it will be stored in queue metadata, otherwise it will be stored as linked file.

Properties (could be nil) always stored in queue. Do not put too much information into properties because queue item should be read in-memory before processing.

Returns unique ID of the message.

func (*Queue) Stats

func (bq *Queue) Stats() Stats

Stats snapshot. It requires view transaction to the kv database, so it is not free in terms of performance.

func (*Queue) Try

func (bq *Queue) Try() (message *Message, err error)

Try getting message from the queue or return ErrEmpty. Returned message MUST be committed.

type Stats

type Stats struct {
	Added    int64 // total amount of successfully added items to the queue (can only grow)
	Returned int64 // total amount of successfully committed items with discard=false (can only grow)
	Removed  int64 // total amount of successfully committed items with discard=true (can only grow)
	Size     int64 // size of the queue
	Locked   int64 // number of locked messages
}

Stats of queue operation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL