pubsubboot

package
v0.0.0-...-3c51467 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDeadLetterName = "dead-letter"
	RetryDelay            = time.Minute * 2
	AckDeadline           = 10 * time.Second
	MaxAttributeLength    = 1024
)

defaultDeadLetterName is the name used to identity the dead letter channel if no other name was defined.

Variables

This section is empty.

Functions

func TrimLeftBytes

func TrimLeftBytes(str string, maxBytes int) string

TrimLeftBytes trims a string from the left until the string has max X bytes. Removes any invalid runes at the end.

func WithChannel

func WithChannel(ch *Channel) func(*PubSub)

WithChannel option adds a channel with a topic and a subscription.

The channel name is a self-chosen name separate from the topicID and subscriptionID to more easily reference the subscription in the rest of your codebase.

If you're not intending to receive any messages you can leave the subscriptionID empty. Be aware any messages sent to a topic without any subscription are essentially lost.

func WithDeadLetter

func WithDeadLetter(ch *Channel) func(*PubSub)

WithDeadLetter option adds a deadletter channel to the Pub/Sub service.

The topic and optional subscription are automatically created if they don't exist already just like a regular channel.

Without a dead letter channel messages will get NACKed on error and retried until Google pubsub automatically removes them after 7 days. This can quickly fill up your queues so you're highly advised to always add a dead letter channel.

A RichMessage will get sent to the dead-letter channel if an unrecoverable error occurred or if the max message age has expired.

Like a normal channel the subscriptionID is optional but be aware messages sent to a topic without any subscriptions are dropped immediately. When the channel name is left empty the default name "dead-letter" is used instead.

Types

type Channel

type Channel struct {
	ID             string
	TopicID        string
	SubscriptionID string

	// MaxRetryAge is the time since publishing the message within a recoverable error
	// is still NACK'ed rather than dead-lettered.
	//
	// The default MaxRetryAge is 2 minutes.
	//
	// The max age prevents messages from being requeued and retried thousands of times
	// until Google pubsub deletes them automatically after 7 days.
	//
	// When no dead letter channel is configured a message will always be NACK'ed upon a
	// recoverable error.
	MaxRetryAge time.Duration
}

Channel is a message channel containing a topic ID and optionally a subscription.

type Option

type Option func(*PubSub)

type PubSub

type PubSub struct {
	*pubsub.Client

	Channels map[string]*Channel

	// DeadLetter is the channel used for dead letter messages.
	DeadLetterChannel *Channel
	// contains filtered or unexported fields
}

PubSub adds some utility methods to the Google cloud PubSub such ensuring a topic and subscription exists and deadlettering.

It represents subscriptions and topics as a single message Channel as from an application perspective.

func NewPubSubService

func NewPubSubService(projectID string, options ...Option) *PubSub

NewPubSubService configures a new Service and connects to the pubsub server.

func (*PubSub) Channel

func (s *PubSub) Channel(channelID string) *Channel

func (*PubSub) Close

func (s *PubSub) Close() error

Close releases any resources held by the pubsub Service such as memory and goroutines.

func (*PubSub) Configure

func (s *PubSub) Configure(env *goboot.AppEnv) error

Configure implements the AppService interface and instantiates the client connection to gcloud pubsub.

func (*PubSub) CreateAll

func (s *PubSub) CreateAll() error

CreateAll ensures all topics and subscriptions exist.

func (*PubSub) DeleteAll

func (s *PubSub) DeleteAll() error

DeleteAll deletes all topics and subscriptions of all configured channels, including the dead-letter channel.

func (*PubSub) DeleteChannel

func (s *PubSub) DeleteChannel(channel string) error

DeleteChannel deletes the pubsub topic and subscription if they exist. If they don't exist nothing happens.

func (*PubSub) EnsureSubscription

func (s *PubSub) EnsureSubscription(topicID string, subID string) error

EnsureSubscription creates a subscription for specified topic. The topic must already exist.

In most cases you should use CreateAll instead.

The subscription is created with an ACK deadline of 10 seconds, meaning the message must be ACK'ed or NACK'ed within 10 seconds or else it will be re-delivered.

func (*PubSub) EnsureTopic

func (s *PubSub) EnsureTopic(topicID string) error

EnsureTopic creates a topic with specified ID if it doesn't exist already. In most cases you should use CreateAll instead.

func (*PubSub) Init

func (s *PubSub) Init() error

Init implements the AppService interface and executes the CreateAll method.

func (*PubSub) Name

func (s *PubSub) Name() string

func (*PubSub) PublishEvent

func (s *PubSub) PublishEvent(ctx context.Context, channel string, eventName string, payload any) error

PublishEvent publishes a message to the channel's topic and waits for it to be published on the server.

Google's pubsub batching is disabled by default which is only useful in very high-throughput use cases.

func (*PubSub) Receive

func (s *PubSub) Receive(ctx context.Context, channel string, f func(context.Context, *RichMessage)) error

Receive starts receiving messages on specified channel.

It is similar to a normal google pubsub subscription receiver but returns RichMessages in specified callback.

func (*PubSub) ReceiveNr

func (s *PubSub) ReceiveNr(ctx context.Context, channel string, nrOfMessages int) ([]*RichMessage, error)

ReceiveNr blocks until the specified number of messages have been retrieved.

This should only be used with caution for scripting and testing purposes.

func (*PubSub) TryPublishEvent

func (s *PubSub) TryPublishEvent(ctx context.Context, channel string, eventName string, payload any)

TryPublishEvent is the same as PublishEvent but logs any error rather than returning it.

type RichMessage

type RichMessage struct {
	*pubsub.Message
	Service *PubSub
	Channel *Channel
}

RichMessage embeds the raw gcloud pubsub message with additional details and functions.

The PubSubRichMessage primarily helps handling retryable and unrecoverable errors.

func (*RichMessage) DeadLetter

func (msg *RichMessage) DeadLetter(ctx context.Context, cause error) error

DeadLetter publishes a copy of a message to the deadletter channel and ACK's the original message.

If for some reason deadlettering the message failed an error is logged and the original message is NACK'ed.

The dead letter message adds extra attributes to the original message.

The method returns an error if neither neither ACKing or NACKing is possible.

func (*RichMessage) RetryableError

func (msg *RichMessage) RetryableError(ctx context.Context, cause error) error

RetryableError will NACK a message if it is within the max retry timespan, otherwise it will sent the message to a deadletter channel.

Returns an error if no deadlettering the message failed.

func (*RichMessage) TryDeadLetter

func (msg *RichMessage) TryDeadLetter(ctx context.Context, cause error)

TryDeadLetter is the same as DeadLetter but logs any error rather than returning it.

Messages will be redelivered automatically if not ACKed or NACKed in time.

func (*RichMessage) TryRetryableError

func (msg *RichMessage) TryRetryableError(ctx context.Context, cause error)

TryRetryableError is the same as RetryableError but logs any error rather than returning it.

Messages will be redelivered automatically if not ACKed or NACKed in time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL