pgfifo

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2023 License: BSD-2-Clause Imports: 7 Imported by: 0

README

pgfifo

Barebones pub/sub message queue built on top of Postgres.

Inspired by pgq and other implementations of this idea around the internet. This is the smallest useful subset (in my opinion) of features for this type of thing.

Usage

First, get the module:

go get github.com/dburkart/pgfifo

Connect using a connection string supported by pq:

queue, err := pgfifo.New("postgres://postgres:password@localhost/postgres?sslmode=disable")
if err != nil {
    // Do something with the error
}

You can also set options for a given queue instance. For example, you may want to change the default prefix from the default (pgfifo) to "coolapp":

queue, err := pgfifo.New(
    "postgres://postgres:password@localhost/postgres?sslmode=disable",
    pgfifo.StringOption("TablePrefix", "coolapp"),
)
if err != nil {
    // Do something with the error
}

A full list of options can be found in the documentation for pgfifo.New().

Publish an event to a topic:

queue.Publish("/some/topic", "Data")

Create a subscription:

queue.Subscribe("/topic", func(m []*pgfifo.Message) error {
    for _, m := range m {
        var s string
        m.Decode(&s)
        fmt.Println(m.QueueTime.Format(time.RFC3339), m.Topic, s)
    }
    return nil
})

Documentation

Overview

Package pgfifo implements a barebones Pub/Sub message Queue backed by a Postgres database.

Index

Constants

This section is empty.

Variables

View Source
var Version = 1

Database version

Functions

This section is empty.

Types

type Message

type Message struct {
	QueueTime time.Time
	Topic     string
	Payload   []byte
}

A Message represents a single item in the Queue. The Payload of the message is encoded as JSON

func (*Message) Decode

func (m *Message) Decode(t any) error

Helper function to decode a message to a source type

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue object

func New

func New(connectionStr string, options ...QueueOption) (*Queue, error)

New creates and returns a new Queue in the specified database. The provided connection string should conform to a connection string acceptable to github.com/lib/pq.

Available options that can be passed in when creating a Queue:

  • TablePrefix (string) -- namespace to prefix on pgfifo tables. Defaults to "pgfifo"
  • SubscriptionBatchSize (uint) -- batch size for subscriptions. Defaults to 10

A new Queue is returned if successful, and an error is returned if creating a new queue failed for some reason.

func (*Queue) Close added in v0.2.2

func (q *Queue) Close() error

func (*Queue) Publish

func (q *Queue) Publish(topic string, data any) error

Publish a message on a particular topic We take an interface, and serialize that to the specified topic

func (*Queue) Subscribe

func (q *Queue) Subscribe(topic string, sub SubscriptionCallback) error

Subscribe creates an asynchronous subscription to a particular topic TODO: We need to figure out how to handle asynchronous errors.

type QueueOption added in v0.2.0

type QueueOption struct {
	Name  string
	Value any
}

QueueOption represents a configurable option that can be set for a Queue

func StringOption added in v0.2.0

func StringOption(name, value string) QueueOption

StringOption creates a new QueueOption containing a string

func UintOption added in v0.2.0

func UintOption(name string, value uint) QueueOption

UintOption creates a new QueueOption containing a uint

type SubscriptionCallback

type SubscriptionCallback func([]*Message) error

SubscriptionCallback is a client-provided callback. When new events are ready to be consumed, they are passed to this function. If an error is returned by this callback, all events are reprocessed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL