pubsub

package
v2.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const BufferSize = 2048

BufferSize is the maximum number of unhandled messages we will buffer for a subscriber before dropping messages.

View Source
const (
	EventPubsubWatchdog = "pubsub_watchdog"
)

Variables

View Source
var ErrDroppedMessages = xerrors.New("dropped messages")

ErrDroppedMessages is sent to ListenerWithErr if messages are dropped or might have been dropped.

Functions

This section is empty.

Types

type Listener

type Listener func(ctx context.Context, message []byte)

Listener represents a pubsub handler.

type ListenerWithErr

type ListenerWithErr func(ctx context.Context, message []byte, err error)

ListenerWithErr represents a pubsub handler that can also receive error indications

type MemoryPubsub

type MemoryPubsub struct {
	// contains filtered or unexported fields
}

MemoryPubsub is an in-memory Pubsub implementation. It's an exported type so that our test code can do type checks.

func (*MemoryPubsub) Close

func (*MemoryPubsub) Close() error

func (*MemoryPubsub) Publish

func (m *MemoryPubsub) Publish(event string, message []byte) error

func (*MemoryPubsub) Subscribe

func (m *MemoryPubsub) Subscribe(event string, listener Listener) (cancel func(), err error)

func (*MemoryPubsub) SubscribeWithErr

func (m *MemoryPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error)

type PGPubsub added in v2.8.0

type PGPubsub struct {
	// contains filtered or unexported fields
}

PGPubsub is a pubsub implementation using PostgreSQL.

func New

func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error)

New creates a new Pubsub implementation using a PostgreSQL connection.

func (*PGPubsub) Close added in v2.8.0

func (p *PGPubsub) Close() error

Close closes the pubsub instance.

func (*PGPubsub) Collect added in v2.8.0

func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric)

Collect implements, along with Describe, the prometheus.Collector interface for metrics

func (*PGPubsub) Describe added in v2.8.0

func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc)

Describe implements, along with Collect, the prometheus.Collector interface for metrics.

func (*PGPubsub) Publish added in v2.8.0

func (p *PGPubsub) Publish(event string, message []byte) error

func (*PGPubsub) Subscribe added in v2.8.0

func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error)

Subscribe calls the listener when an event matching the name is received.

func (*PGPubsub) SubscribeWithErr added in v2.8.0

func (p *PGPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error)

type Pubsub

type Pubsub interface {
	Subscribe(event string, listener Listener) (cancel func(), err error)
	SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error)
	Publish(event string, message []byte) error
	Close() error
}

Pubsub is a generic interface for broadcasting and receiving messages. Implementors should assume high-availability with the backing implementation.

func NewInMemory

func NewInMemory() Pubsub

type Watchdog added in v2.8.0

type Watchdog struct {
	// contains filtered or unexported fields
}

func NewWatchdog added in v2.8.0

func NewWatchdog(ctx context.Context, logger slog.Logger, ps Pubsub) *Watchdog

func NewWatchdogWithClock added in v2.8.0

func NewWatchdogWithClock(ctx context.Context, logger slog.Logger, ps Pubsub, c clock.Clock) *Watchdog

NewWatchdogWithClock returns a watchdog with the given clock. Product code should always call NewWatchDog.

func (*Watchdog) Close added in v2.8.0

func (w *Watchdog) Close() error

func (*Watchdog) Timeout added in v2.8.0

func (w *Watchdog) Timeout() <-chan struct{}

Timeout returns a channel that is closed if the watchdog times out. Note that the Timeout() chan will NOT be closed if the Watchdog is Close'd or its context expires, so it is important to read from the Timeout() chan in a select e.g.

w := NewWatchDog(ctx, logger, ps) select { case <-ctx.Done(): case <-w.Timeout():

   FreakOut()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL