mpubsub

package
v0.0.0-...-c20f884 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package mpubsub implements connecting to Google's PubSub service and simplifying a number of interactions with it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchConsumerFunc

type BatchConsumerFunc func(context.Context, []*Message) (bool, error)

BatchConsumerFunc is similar to ConsumerFunc, except it takes in a batch of multiple messages at once. If the boolean returned will apply to every message in the batch.

type BatchGroupFunc

type BatchGroupFunc func(a, b *Message) bool

BatchGroupFunc is an optional param to BatchConsume which allows for grouping messages into separate groups. Each message received is attempted to be placed in a group. Grouping is done by calling this function with the received message and a random message from a group, and if this function returns true then the received message is placed into that group. If this returns false for all groups then a new group is created.

This function should be a pure function.

type ConsumerFunc

type ConsumerFunc func(context.Context, *Message) (bool, error)

ConsumerFunc is a function which messages being consumed will be passed. The returned boolean and returned error are independent. If the bool is false the message will be returned to the queue for retrying later. If an error is returned it will be logged.

The Context will be canceled once the deadline has been reached (as set when Consume is called).

type ConsumerOpts

type ConsumerOpts struct {
	// Default 30s. The timeout each message has to complete before its context
	// is cancelled and the server re-publishes it
	Timeout time.Duration

	// Default 1. Number of concurrent messages to consume at a time
	Concurrent int
}

ConsumerOpts are options which effect the behavior of a Consume method call

type Message

type Message = pubsub.Message

Message aliases the type in the official driver

type PubSub

type PubSub struct {
	*pubsub.Client
	// contains filtered or unexported fields
}

PubSub is a wrapper around a pubsub client providing more functionality.

func InstPubSub

func InstPubSub(cmp *mcmp.Component, options ...PubSubOpt) *PubSub

InstPubSub instantiates a PubSub which will be initialized when the Init event is triggered on the given Component. The PubSub instance will have Close called on it when the Shutdown event is triggered on the given Component.

func (*PubSub) Topic

func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error)

Topic returns, after potentially creating, a topic of the given name

type PubSubOpt

type PubSubOpt func(*pubsubOpts)

PubSubOpt is a value which adjusts the behavior of InstPubSub.

func PubSubGCE

func PubSubGCE(gce *mdb.GCE) PubSubOpt

PubSubGCE indicates that InstPubSub should use the given GCE instance rather than instantiate its own.

type Subscription

type Subscription struct {
	*Topic
	Name string
	// contains filtered or unexported fields
}

Subscription provides methods around a subscription to a topic in PubSub

func (*Subscription) BatchConsume

func (s *Subscription) BatchConsume(
	ctx context.Context,
	fn BatchConsumerFunc, gfn BatchGroupFunc,
	opts ConsumerOpts,
)

BatchConsume is like Consume, except it groups incoming messages together, allowing them to be processed in batches instead of individually.

BatchConsume first collects messages internally for half the ConsumerOpts.Timeout value. Once that time has passed it will group all messages based on the BatchGroupFunc (if nil then all collected messages form one big group). The BatchConsumerFunc is called for each group, with the context passed in having a timeout of ConsumerOpts.Timeout/2.

The ConsumerOpts.Concurrent value determines the maximum number of messages collected during the first section of the process (before BatchConsumerFn is called).

func (*Subscription) Consume

func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts ConsumerOpts)

Consume uses the given ConsumerFunc and ConsumerOpts to process messages off the Subscription

type Topic

type Topic struct {
	*PubSub
	Name string
	// contains filtered or unexported fields
}

Topic provides methods around a particular topic in PubSub

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, data []byte) error

Publish publishes a message with the given data as its body to the Topic

func (*Topic) Subscription

func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error)

Subscription returns a Subscription instance, after potentially creating it, for the Topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL