pubsublite

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: Apache-2.0 Imports: 31 Imported by: 2

Documentation

Overview

Package pubsublite abstracts the production and consumption of records to and from GCP PubSub Lite.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func JoinTopicConsumer

func JoinTopicConsumer(topic apmqueue.Topic, consumer string) string

JoinTopicConsumer returns a Pub/Sub Lite subscription name for the given topic and consumer name.

func SplitTopicConsumer

func SplitTopicConsumer(subscriptionName string) (topic apmqueue.Topic, consumer string, err error)

SplitTopicConsumer does the opposite of JoinTopicConsumer by parsing topic and consumer out of a subscription name. Returns an error if subscription name is not in an expected format.

Types

type CommonConfig

type CommonConfig struct {
	// Project is the GCP project.
	//
	// NewManager, NewProducer, and NewConsumer will set Project from
	// $GOOGLE_APPLICATION_CREDENTIALS it is not explicitly specified.
	Project string

	// Region is the GCP region.
	Region string

	// Namespace holds a namespace for Pub/Sub Lite resources.
	//
	// This is added as a prefix for reservation, topic, and
	// subscription names, and acts as a filter on resources
	// monitored or described by the manager.
	//
	// Namespace is always removed from resource names before
	// they are returned to callers. The only way Namespace
	// will surface is in telemetry (e.g. metrics), as an
	// independent dimension. This enables users to filter
	// metrics by namespace, while maintaining stable names
	// for resources (e.g. topics).
	Namespace string

	// ClientOptions holds arbitrary Google API client options.
	ClientOptions []option.ClientOption

	// Logger to use for any errors.
	Logger *zap.Logger

	// DisableTelemetry disables the OpenTelemetry hook.
	DisableTelemetry bool

	// TracerProvider allows specifying a custom otel tracer provider.
	// Defaults to the global one.
	TracerProvider trace.TracerProvider

	// MeterProvider allows specifying a custom otel meter provider.
	// Defaults to the global one.
	MeterProvider metric.MeterProvider
}

CommonConfig defines common configuration for Pub/Sub Lite consumers, producers, and managers.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer receives PubSub Lite messages from a existing subscription(s). The underlying library processes messages concurrently per subscription and partition.

func NewConsumer

func NewConsumer(ctx context.Context, cfg ConsumerConfig) (*Consumer, error)

NewConsumer creates a new consumer instance for a single subscription.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer. Once the consumer is closed, it can't be re-used.

func (*Consumer) Healthy

func (c *Consumer) Healthy(ctx context.Context) error

Healthy returns an error if the consumer isn't healthy.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run executes the consumer in a blocking manner. It should only be called once, any subsequent calls will return an error.

type ConsumerConfig

type ConsumerConfig struct {
	CommonConfig
	// Topics holds the list of topics from which to consume.
	Topics []apmqueue.Topic
	// ConsumerName holds the consumer name. This will be combined with
	// the topic names to identify Pub/Sub Lite subscriptions, and must
	// be unique per consuming service.
	ConsumerName string
	// Processor that will be used to process each event individually.
	// Processor may be called from multiple goroutines and needs to be
	// safe for concurrent use.
	Processor apmqueue.Processor
	// Delivery mechanism to use to acknowledge the messages.
	// AtMostOnceDeliveryType and AtLeastOnceDeliveryType are supported.
	Delivery apmqueue.DeliveryType
}

ConsumerConfig defines the configuration for the PubSub Lite consumer.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages GCP Pub/Sub topics.

func NewManager

func NewManager(cfg ManagerConfig) (*Manager, error)

NewManager returns a new Manager with the given config.

func (*Manager) Close

func (m *Manager) Close() error

Close closes the manager's resources.

func (*Manager) CreateReservation

func (m *Manager) CreateReservation(ctx context.Context, name string, throughputCapacity int) error

CreateReservation creates a reservation with the given name and throughput capacity.

Reservations that already exist will be left unmodified.

func (*Manager) CreateSubscription

func (m *Manager) CreateSubscription(
	ctx context.Context, name, topic string,
	deliverImmediately bool,
) error

CreateSubscription creates a reservation with the given name and throughput capacity.

Subscriptions that already exist will be left unmodified.

func (*Manager) DeleteReservation

func (m *Manager) DeleteReservation(ctx context.Context, reservation string) error

DeleteReservation deletes the given reservation.

No error is returned if the reservation does not exist.

func (*Manager) DeleteSubscription

func (m *Manager) DeleteSubscription(ctx context.Context, subscription string) error

DeleteSubscription deletes the given subscription.

No error is returned if the subscription does not exist.

func (*Manager) DeleteTopic

func (m *Manager) DeleteTopic(ctx context.Context, topic string) error

DeleteTopic deletes the given topic.

No error is returned if the topic does not exist.

func (*Manager) ListReservationTopics

func (m *Manager) ListReservationTopics(ctx context.Context, reservation string) ([]string, error)

ListReservationTopics lists topics in the given reservation.

func (*Manager) ListReservations

func (m *Manager) ListReservations(ctx context.Context) ([]string, error)

ListReservations lists reservations in the configured project and region.

func (*Manager) ListTopicSubscriptions

func (m *Manager) ListTopicSubscriptions(ctx context.Context, topic string) ([]string, error)

ListTopicSubscriptions lists subscriptions for the given topic.

func (*Manager) MonitorConsumerLag

func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)

MonitorConsumerLag registers a callback with OpenTelemetry to measure consumer group lag for the given topics.

func (*Manager) NewTopicCreator

func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)

NewTopicCreator returns a new TopicCreator with the given config.

type ManagerConfig

type ManagerConfig struct {
	CommonConfig
}

ManagerConfig holds configuration for managing GCP Pub/Sub Lite resources.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer implementes the apmqueue.Producer interface and sends each of the events in a batch to a PubSub Lite topic, which is determined by calling the configured TopicRouter.

func NewProducer

func NewProducer(cfg ProducerConfig) (*Producer, error)

NewProducer creates a new PubSub Lite producer for a single project.

func (*Producer) Close

func (p *Producer) Close() error

Close stops the producer.

This call is blocking and will cause all the underlying clients to stop producing. If producing is asynchronous, it'll block until all messages have been produced. After Close() is called, Producer cannot be reused.

func (*Producer) Healthy

func (p *Producer) Healthy(ctx context.Context) error

Healthy is a noop at the moment. TODO(marclop) range over the producers, call .Error(), if any returns an error, return it.

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error

Produce produces N records. If the Producer is synchronous, waits until all records are produced, otherwise, returns as soon as the records are stored in the producer buffer, or when the records are produced to the queue if sync producing is configured. If the context has been enriched with metadata, each entry will be added as a record's header. Produce takes ownership of Record and any modifications after Produce is called may cause an unhandled exception.

type ProducerConfig

type ProducerConfig struct {
	CommonConfig
	// Sync can be used to indicate whether production should be synchronous.
	// Due to the mechanics of PubSub Lite publishing, producing synchronously
	// will yield poor performance the unless a single call to produce contains
	// enough records that are large enough to cause immediate flush.
	Sync bool
}

ProducerConfig for the PubSub Lite producer.

type TopicCreator

type TopicCreator struct {
	// contains filtered or unexported fields
}

TopicCreator creates GCP Pub/Sub topics.

func (*TopicCreator) CreateTopics

func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error

CreateTopics creates one or more topics.

Topics that already exist will be left unmodified.

type TopicCreatorConfig

type TopicCreatorConfig struct {
	// Reservation holds the unqualified ID of the reservation with
	// which topics will be associated. This will be combined with the
	// project ID, region ID, and namespace to form the reservation path.
	Reservation string

	// PartitionCount is the number of partitions to assign to newly
	// created topics.
	//
	// Must be greater than zero.
	PartitionCount int

	// PublishCapacityMiBPerSec defines the publish throughput capacity
	// per partition in MiB/s. Must be >= 4 and <= 16.
	PublishCapacityMiBPerSec int

	// SubscribeCapacityMiBPerSec defines the subscribe throughput capacity
	// per partition in MiB/s. Must be >= 4 and <= 32.
	SubscribeCapacityMiBPerSec int

	// PerPartitionBytes holds the provisioned storage, in bytes, per partition.
	//
	// If the number of bytes stored in any of the topic's partitions grows beyond
	// this value, older messages will be dropped to make room for newer ones,
	// regardless of the value of `RetentionDuration`. Must be >= 30 GiB.
	PerPartitionBytes int64

	// RetentionDuration indicates how long messages are retained. Must be > 0.
	RetentionDuration time.Duration
}

TopicCreatorConfig holds configuration for managing GCP Pub/Sub Lite topics.

Directories

Path Synopsis
internal
pubsubabs
Package pubsubabs provides an abstraction layer over the `pubsub` PublisherClient types to allow testing.
Package pubsubabs provides an abstraction layer over the `pubsub` PublisherClient types to allow testing.
telemetry
Package telemetry allows setting up telemetry for pubsublite consumers and producers
Package telemetry allows setting up telemetry for pubsublite consumers and producers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL