pushpop

package module
v0.0.0-...-970de76 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2018 License: MIT Imports: 5 Imported by: 0

README

PushPop

Build Status GoDoc

PushPop is a simple Postgres backed message queue for Go. It's designed to perform work asynchronously and provides at-least-once delivery. PushPop requires PostgreSQL version 9.5 or higher.

How it works

  1. Push a message to a topic, queueing it up to be popped later.
  2. Pop a message from the same topic, taking ownership of it.
  3. Do some work to satisfy the purpose of the message, and:
    • Complete it when you're done, marking it as handled and safe for deletion.
    • Discard it if the message is not useful, marking it safe for deletion.
    • Defer it if you want to try processing again later.
    • Extend it if you need more than 5 minutes to process the message.

Features

  • Probably flexible enough:
    • Messages are queued by topic, an arbitrary string.
    • Each topic provides a separate message queue.
    • Messages can be looked up by id to retrieve the status of a pending or recently processed message.
  • Probably safe enough:
    • Messages are stored in PostgreSQL, a durable store.
    • Messages are re-queued in the event that a popped message is not acknowledged, ensuring messages are not lost due to worker restart or death.
  • Probably fast enough:
    • PushPop is simlpe and fast enough to handle millions of messages a day with relatively low queue depths.
    • It may not fare so well past that. Don't build Uber dispatch on this.
  • Probably scalable enough:
    • Multiple PushPop clients can be run separately without any coordination.
    • Works great for CPU or IO heavy workloads.
  • Simple
    • PushPop only has one dependency: PostgreSQL.
    • PushPop is easy to integrate and deploy along side your existing code.
    • PushPop data can live alongside your other tables in the same database if you like.
  • Language agnostic:
    • PushPop involves no inter-process communication or language specific features.
    • It's possible (and not very difficult) to provide alternate PushPop producer or consumer implementations in other languages.

Limitations

  • Centralized:
    • Data lives in a single PostgreSQL instance.
    • You can scale it up, but PushPop has no mechanisms for sharding, failover, or geo-replicating your messages.
  • Transactional:
    • Because we're using some locking features of PostgreSQL to ensure correct and orderly delivery, there will be an upper bound on the number of messages per second.
    • Good rule of thumb: if you're even considering PushPop, you probably don't need to worry about this.
  • Single Consumer Group:
    • PushPop provides no facilities to attach multiple consumer groups to a single topic.
    • This might be a feature that can be added in the future.

Message Ordering and Failure Modes

  • Messages are strictly ordered based on wall time plus an optional deferment duration.
  • Messages are guaranteed to be delivered at least once, and may be delivered multiple times during certain failure modes. You won't regret building idempotent workers.
  • No guarantees are made in the event of PostgreSQL database operating failure.

TODO

PushPop is still alpha quality software. A few things aren't implemented yet:

  • Add requeueing of pending messages whose deadline has expired
  • Add deletion of completed or discarded messages after a certain grace period.
  • Add worker manager with signal handling.
  • Add observability features (instrumentation, stats api, etc)

Usage Example

You can find a runnable code example in the example directory. It looks like this:

package main

import (
	"log"
	"time"

	"github.com/jcoene/pushpop"
)

func main() {
	// Create a new pushpop client
	client, err := pushpop.NewClient("postgres://postgres:@127.0.0.1:5432/pushpop_example?sslmode=disable")
	if err != nil {
		log.Fatalln("unable to connect:", err)
	}
	defer client.Close() // Close any open database connections when we're done

	// Push a few messages to the "mytopic" topic.
	if err := client.NewMessage("mytopic", []byte("birds are the best")).Push(); err != nil {
		log.Fatalln("unable to push:", err)
	}
	if err := client.NewMessage("mytopic", []byte("cats are the best")).Push(); err != nil {
		log.Fatalln("unable to push:", err)
	}
	if err := client.NewMessage("mytopic", []byte("dogs are the best")).Push(); err != nil {
		log.Fatalln("unable to push:", err)
	}

	// We'll iterate over the messages in the queue until we find the one we want.
	for {
		// Pop off the next message
		msg, err := client.Pop("mytopic")
		if err != nil {
			// ErrNoMessage means that no message is available. Let's wait and try again.
			if err == pushpop.ErrNoMessage {
				time.Sleep(1 * time.Second)
				continue
			}

			// An unexpected error occured, not good!
			log.Fatalln("unable to pop:", err)
		}

		// We got a message. Let's verify that this is the message we want.
		if string(msg.Payload) == "dogs are the best" {
			log.Println("got our message:", string(msg.Payload))
			msg.Complete()
			return
		}

		// We got a different message. Let's push it back to the queue and let a cat
		// person deal with it later.
		log.Println("got other message:", string(msg.Payload))
		msg.Defer(5 * time.Second)
	}
}

License

MIT License, see LICENSE

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoMessage = sql.ErrNoRows

ErrNoMessage is returned when a Message cannot be found. It will be returned when you try to find a Message by id and it does not exist, or when you try to Pop from an empty topic.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a PushPop client with a connection pool to a backend PostgreSQL database which will be used for message persistence.

func NewClient

func NewClient(url string) (*Client, error)

NewClient creates a new Client for the given PostgreSQL database connection url. It will automatically create or update tables or indices as necessary.

func (*Client) Close

func (c *Client) Close() error

Close closes any open database connections and renders the Client unusable.

func (*Client) FindMessage

func (c *Client) FindMessage(id string) (*Message, error)

FindMessage finds a message by its primary identifier, a string representation of a 16-byte UUID. The ErrNoMessage error will be returned if the message does not exist.

func (*Client) NewMessage

func (c *Client) NewMessage(topic string, payload ...[]byte) *Message

NewMessage creates (but does not enqueue) a new Message with the given topic and optional payload.

func (*Client) Pop

func (c *Client) Pop(topic string) (*Message, error)

Pop returns the next available message from the queue of the given topic, if one exists. The message will be transitioned to the "pending" state, and the receiver becomes responsible for transitioning the state of the message using Complete, Discard, or Defer.

func (*Client) Work

func (c *Client) Work(topic string, n int, fn HandlerFunc)

Work creates a given number of workers for the supplied topic, managing the flow of messages and ensuring orderly shutdown of workers.

type HandlerFunc

type HandlerFunc func(*Message) error

HandlerFunc is a function that takes a Message and performs some work. In case of an error, the message will be reqeueued.

type Message

type Message struct {
	// Id is the UUID of the message, useful for looking up by id.
	Id string

	// Topic is the name of the topic the message has or will be queued in.
	Topic string

	// State represent the state of the Message.
	State State

	// StateTime represents a timestamp to be interpreted based on the State:
	// State_READY: The earliest time at which the message can be popped.
	// State_PENDING: The deadline by which the consumer must check-in, either
	// completing, discarding, deferring, or extending.
	// State_COMPLETED: The time the message was completed.
	// State_DISCARDED: The time the message was discarded.
	StateTime time.Time

	// Payload is a byte slice payload containing the message contents.
	Payload []byte
	// contains filtered or unexported fields
}

Message represents a single message that can be pushed or popped by a Client. While in posession of a client, Messages are stateless.

func (*Message) Complete

func (m *Message) Complete() error

Complete marks the message as complete, meaning all work has been performed and the message can be safely deleted.

func (*Message) DecodePayload

func (m *Message) DecodePayload(v interface{}) error

DecodePayload decodes the JSON payload of the message into the given object.

func (*Message) Defer

func (m *Message) Defer(dur time.Duration) error

Defer re-queues the message to be retried at a later time.

func (*Message) Discard

func (m *Message) Discard() error

Discard marks the message as discarded, meaning the work was not performed but the message has no further value and can be safely deleted.

func (*Message) EncodePayload

func (m *Message) EncodePayload(v interface{}) error

EncodePayload encodes the given object as JSON and stores it in the Payload field of the Message.

func (*Message) Extend

func (m *Message) Extend(dur time.Duration) error

Extend extends the deadline of the pending message.

func (*Message) Push

func (m *Message) Push() error

Push pushes the message to the topic queue, making it available to be popped immediately by the next available consumer.

func (*Message) PushDelay

func (m *Message) PushDelay(dur time.Duration) error

PushDelay pushes the message to the topic queue, making it available to be popped after the given duration of time.

type State

type State uint16

State represents the status of a message. A message can only be in one state at a time.

const (
	// READY: The message is enqueued and ready for processing.
	State_READY State = 0

	// PENDING: The message was popped and is in-flight with a consumer.
	State_PENDING State = 1

	// COMPLETED: The message was popped and successfully processed. At this
	// point the message is safe to delete.
	State_COMPLETED State = 2

	// DISCARDED: The message was popped and otherwise processed. At this point
	// the message is safe to delete.
	State_DISCARDED State = 3
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL