pubsub

package
v0.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2019 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package pubsub provides an easy and portable way to interact with publish/ subscribe systems. See https://github.com/eliben/gocdkx/howto/pubsub/ for how-to guides.

Subpackages contain distinct implementations of pubsub for various providers, including Cloud and on-premise solutions. For example, "gcppubsub" supports Google Cloud Pub/Sub. Your application should import one of these provider-specific subpackages and use its exported functions to get a *Topic and/or *Subscription; do not use the NewTopic/NewSubscription functions in this package. For example:

topic := mempubsub.NewTopic()
err := topic.Send(ctx.Background(), &pubsub.Message{Body: []byte("hi"))
...

Then, write your application code using the *Topic/*Subscription types. You can easily reconfigure your initialization code to choose a different provider. You can develop your application locally using memblob, or deploy it to multiple Cloud providers. You may find http://github.com/google/wire useful for managing your initialization code.

Alternatively, you can construct a *Topic/*Subscription via a URL and OpenTopic/OpenSubscription. See https://github.com/eliben/gocdkx/concepts/urls/ for more information.

At-most-once and At-least-once Delivery

Some PubSub systems guarantee that messages received by subscribers but not acknowledged are delivered again. These at-least-once systems require that subscribers call an Ack function to indicate that they have fully processed a message.

In other PubSub systems, a message will be delivered only once, if it is delivered at all. These at-most-once systems do not need subscribers to Ack; the message is essentially auto-acked when it is delivered.

This package accommodates both kinds of systems. See the provider-specific package documentation to see whether it is at-most-once or at-least-once. Some providers support both modes.

Application developers should think carefully about which kind of semantics the application needs. Even though the application code may look similar, the system-level characteristics are quite different.

After receiving a Message via Subscription.Receive:

  • If your application ever uses an at-least-once provider, it should always call Message.Ack/Nack after processing a message.
  • If your application only uses at-most-once providers, you can omit the call to Message.Ack. It should never call Message.Nack, as that operation doesn't make sense for an at-most-once system.

The Subscription constructor for at-most-once-providers will require a function that will be called whenever the application calls Message.Ack. This forces the application developer to be explicit about what happens when Ack is called, since the provider has no meaningful implementation. Common function to supply are:

  • func() {}: Do nothing. Use this if your application does call Message.Ack; it makes explicit that Ack for the provider is a no-op.
  • func() { panic("ack called!") }: panic. This is appropriate if your application only uses at-most-once providers and you don't expect it to ever call Message.Ack.
  • func() { log.Info("ack called!") }: log. Softer than panicking.

Since Message.Nack never makes sense for some providers (for example, for at-most-once providers, the provider can't redeliver the message), Nack will panic if called for some providers.

OpenCensus Integration

OpenCensus supports tracing and metric collection for multiple languages and backend providers. See https://opencensus.io.

This API collects OpenCensus traces and metrics for the following methods:

  • Topic.Send
  • Topic.Shutdown
  • Subscription.Receive
  • Subscription.Shutdown
  • The internal driver methods SendBatch, SendAcks and ReceiveBatch.

All trace and metric names begin with the package import path. The traces add the method name. For example, "github.com/eliben/gocdkx/pubsub/Topic.Send". The metrics are "completed_calls", a count of completed method calls by provider, method and status (error code); and "latency", a distribution of method latency by provider and method. For example, "github.com/eliben/gocdkx/pubsub/latency".

To enable trace collection in your application, see "Configure Exporter" at https://opencensus.io/quickstart/go/tracing. To enable metric collection in your application, see "Exporting stats" at https://opencensus.io/quickstart/go/metrics.

Index

Examples

Constants

This section is empty.

Variables

View Source
var NewSubscription = newSubscription

NewSubscription is for use by provider implementations.

View Source
var NewTopic = newTopic

NewTopic is for use by provider implementations.

View Source
var (

	// OpenCensusViews are predefined views for OpenCensus metrics.
	// The views include counts and latency distributions for API method calls.
	// See the example at https://godoc.org/go.opencensus.io/stats/view for usage.
	OpenCensusViews = oc.Views(pkgName, latencyMeasure)
)

Functions

This section is empty.

Types

type Message

type Message struct {
	// Body contains the content of the message.
	Body []byte

	// Metadata has key/value metadata for the message. It will be nil if the
	// message has no associated metadata.
	Metadata map[string]string

	// BeforeSend is a callback used when sending a message. It will always be
	// set to nil for received messages.
	//
	// The callback will be called exactly once, before the message is sent.
	//
	// asFunc converts its argument to provider-specific types.
	// See https://godoc.org/github.com/eliben/gocdkx#hdr-As for background information.
	BeforeSend func(asFunc func(interface{}) bool) error
	// contains filtered or unexported fields
}

Message contains data to be published.

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed.

func (*Message) As

func (m *Message) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/github.com/eliben/gocdkx#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider. As panics unless it is called on a message obtained from Subscription.Receive.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"

	pbapi "google.golang.org/genproto/googleapis/pubsub/v1"
)

func main() {
	// This example is specific to the gcppubsub implementation; it demonstrates
	// access to the underlying PubsubMessage type.
	// The types exposed for As by gcppubsub are documented in
	// https://godoc.org/github.com/eliben/gocdkx/pubsub/gcppubsub#hdr-As

	ctx := context.Background()
	sub, err := pubsub.OpenSubscription(ctx, "gcppubsub://project/topic")
	if err != nil {
		log.Fatal(err)
	}
	defer sub.Shutdown(ctx)

	msg, err := sub.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}
	var pm *pbapi.PubsubMessage
	if msg.As(&pm) {
		_ = pm.GetAttributes()
	}
	msg.Ack()
}
Output:

func (*Message) Nack

func (m *Message) Nack()

Nack (short for negative acknowledgment) tells the server that this Message was not processed and should be redelivered. It returns immediately, but the actual nack is sent in the background, and is not guaranteed to succeed.

Nack is a performance optimization for retrying transient failures. Nack must not be used for message parse errors or other messages that the application will never be able to process: calling Nack will cause them to be redelivered and overload the server. Instead, an application should call Ack and log the failure in some monitored way.

Nack panics for some providers, as Nack is meaningless when messages can't be redelivered.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription receives published messages.

func OpenSubscription

func OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

OpenSubscription opens the Subscription identified by the URL given. See the URLOpener documentation in provider-specific subpackages for details on supported URL formats, and https://github.com/eliben/gocdkx/concepts/urls for more information.

func (*Subscription) As

func (s *Subscription) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/github.com/eliben/gocdkx#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"

	pbraw "cloud.google.com/go/pubsub/apiv1"
)

func main() {
	// This example is specific to the gcppubsub implementation; it demonstrates
	// access to the underlying SubscriberClient type.
	// The types exposed for As by gcppubsub are documented in
	// https://godoc.org/github.com/eliben/gocdkx/pubsub/gcppubsub#hdr-As

	ctx := context.Background()
	sub, err := pubsub.OpenSubscription(ctx, "gcppubsub://project/topic")
	if err != nil {
		log.Fatal(err)
	}
	defer sub.Shutdown(ctx)

	var sc *pbraw.SubscriberClient
	if sub.As(&sc) {
		_ = sc.CallOptions
	}
}
Output:

func (*Subscription) ErrorAs

func (s *Subscription) ErrorAs(err error, i interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"

	"google.golang.org/grpc/status"
)

func main() {
	// This example is specific to the gcppubsub implementation; it demonstrates
	// access to the underlying Status type.
	// The types exposed for As by gcppubsub are documented in
	// https://godoc.org/github.com/eliben/gocdkx/pubsub/gcppubsub#hdr-As

	ctx := context.Background()
	sub, err := pubsub.OpenSubscription(ctx, "gcppubsub://project/badtopic")
	if err != nil {
		log.Fatal(err)
	}
	defer sub.Shutdown(ctx)

	msg, err := sub.Receive(ctx)
	if err != nil {
		var s *status.Status
		if sub.ErrorAs(err, &s) {
			_ = s.Code()
		}
		log.Fatal(err)
	}
	msg.Ack()
}
Output:

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error)

Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. It can be called concurrently from multiple goroutines.

Receive retries retryable errors from the underlying provider forever. Therefore, if Receive returns an error, either:

  1. It is a non-retryable error from the underlying provider, either from an attempt to fetch more messages or from an attempt to ack messages. Operator intervention may be required (e.g., invalid resource, quota error, etc.). Receive will return the same error from then on, so the application should log the error and either recreate the Subscription, or exit.
  2. The provided ctx is Done. Error() on the returned error will include both the ctx error and the underlying provider error, and ErrorAs on it can access the underlying provider error type if needed. Receive may be called again with a fresh ctx.

Callers can distinguish between the two by checking if the ctx they passed is Done, or via xerrors.Is(err, context.DeadlineExceeded or context.Canceled) on the returned error.

The Ack method of the returned Message must be called once the message has been processed, to prevent it from being received again, unless only at-most-once providers are being used; see the package doc for more).

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/

	// Variables set up elsewhere:
	ctx := context.Background()
	var subscription *pubsub.Subscription

	// Loop on received messages.
	for {
		msg, err := subscription.Receive(ctx)
		if err != nil {
			// Errors from Receive indicate that Receive will no longer succeed.
			log.Printf("Receiving message: %v", err)
			break
		}
		// Do work based on the message, for example:
		fmt.Printf("Got message: %q\n", msg.Body)
		// Messages must always be acknowledged with Ack.
		msg.Ack()
	}
}
Output:

Example (Concurrent)
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/

	// Variables set up elsewhere:
	ctx := context.Background()
	var subscription *pubsub.Subscription

	// Loop on received messages. We can use a channel as a semaphore to limit how
	// many goroutines we have active at a time as well as wait on the goroutines
	// to finish before exiting.
	const maxHandlers = 10
	sem := make(chan struct{}, maxHandlers)
recvLoop:
	for {
		msg, err := subscription.Receive(ctx)
		if err != nil {
			// Errors from Receive indicate that Receive will no longer succeed.
			log.Printf("Receiving message: %v", err)
			break
		}

		// Wait if there are too many active handle goroutines and acquire the
		// semaphore. If the context is canceled, stop waiting and start shutting
		// down.
		select {
		case sem <- struct{}{}:
		case <-ctx.Done():
			break recvLoop
		}

		// Handle the message in a new goroutine.
		go func() {
			defer func() { <-sem }() // Release the semaphore.
			defer msg.Ack()          // Messages must always be acknowledged with Ack.

			// Do work based on the message, for example:
			fmt.Printf("Got message: %q\n", msg.Body)
		}()
	}

	// We're no longer receiving messages. Wait to finish handling any
	// unacknowledged messages by totally acquiring the semaphore.
	for n := 0; n < maxHandlers; n-- {
		sem <- struct{}{}
	}
}
Output:

func (*Subscription) Shutdown

func (s *Subscription) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending ack sends and disconnects the Subscription.

type SubscriptionURLOpener

type SubscriptionURLOpener interface {
	OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)
}

SubscriptionURLOpener represents types than can open Subscriptions based on a URL. The opener must not modify the URL argument. OpenSubscriptionURL must be safe to call from multiple goroutines.

This interface is generally implemented by types in driver packages.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic publishes messages to all its subscribers.

func OpenTopic

func OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

OpenTopic opens the Topic identified by the URL given. See the URLOpener documentation in provider-specific subpackages for details on supported URL formats, and https://github.com/eliben/gocdkx/concepts/urls for more information.

func (*Topic) As

func (t *Topic) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/github.com/eliben/gocdkx#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"

	pbraw "cloud.google.com/go/pubsub/apiv1"
)

func main() {
	// This example is specific to the gcppubsub implementation; it demonstrates
	// access to the underlying PublisherClient type.
	// The types exposed for As by gcppubsub are documented in
	// https://godoc.org/github.com/eliben/gocdkx/pubsub/gcppubsub#hdr-As

	ctx := context.Background()
	topic, err := pubsub.OpenTopic(ctx, "gcppubsub://project/topic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)

	var pc *pbraw.PublisherClient
	if topic.As(&pc) {
		_ = pc
	}
}
Output:

func (*Topic) ErrorAs

func (t *Topic) ErrorAs(err error, i interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See https://godoc.org/github.com/eliben/gocdkx#hdr-As for background information.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"

	"google.golang.org/grpc/status"
)

func main() {
	// This example is specific to the gcppubsub implementation; it demonstrates
	// access to the underlying Status type.
	// The types exposed for As by gcppubsub are documented in
	// https://godoc.org/github.com/eliben/gocdkx/pubsub/gcppubsub#hdr-As

	ctx := context.Background()
	topic, err := pubsub.OpenTopic(ctx, "gcppubsub://project/topic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)

	err = topic.Send(ctx, &pubsub.Message{Body: []byte("hello")})
	if err != nil {
		var s *status.Status
		if topic.ErrorAs(err, &s) {
			_ = s.Code()
		}
		log.Fatal(err)
	}
}
Output:

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, m *Message) (err error)

Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/

	// Variables set up elsewhere:
	ctx := context.Background()
	var topic *pubsub.Topic

	err := topic.Send(ctx, &pubsub.Message{
		Body: []byte("Hello, World!\n"),
		Metadata: map[string]string{
			// These are examples of metadata.
			// There is nothing special about the key names.
			"language":   "en",
			"importance": "high",
		},
	})
	if err != nil {
		log.Fatal(err)
	}
}
Output:

func (*Topic) Shutdown

func (t *Topic) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.

type TopicURLOpener

type TopicURLOpener interface {
	OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)
}

TopicURLOpener represents types than can open Topics based on a URL. The opener must not modify the URL argument. OpenTopicURL must be safe to call from multiple goroutines.

This interface is generally implemented by types in driver packages.

type URLMux

type URLMux struct {
	// contains filtered or unexported fields
}

URLMux is a URL opener multiplexer. It matches the scheme of the URLs against a set of registered schemes and calls the opener that matches the URL's scheme. See https://github.com/eliben/gocdkx/concepts/urls/ for more information.

The zero value is a multiplexer with no registered schemes.

func DefaultURLMux

func DefaultURLMux() *URLMux

DefaultURLMux returns the URLMux used by OpenTopic and OpenSubscription.

Driver packages can use this to register their TopicURLOpener and/or SubscriptionURLOpener on the mux.

func (*URLMux) OpenSubscription

func (mux *URLMux) OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

OpenSubscription calls OpenSubscriptionURL with the URL parsed from urlstr. OpenSubscription is safe to call from multiple goroutines.

func (*URLMux) OpenSubscriptionURL

func (mux *URLMux) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)

OpenSubscriptionURL dispatches the URL to the opener that is registered with the URL's scheme. OpenSubscriptionURL is safe to call from multiple goroutines.

func (*URLMux) OpenTopic

func (mux *URLMux) OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

OpenTopic calls OpenTopicURL with the URL parsed from urlstr. OpenTopic is safe to call from multiple goroutines.

func (*URLMux) OpenTopicURL

func (mux *URLMux) OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)

OpenTopicURL dispatches the URL to the opener that is registered with the URL's scheme. OpenTopicURL is safe to call from multiple goroutines.

func (*URLMux) RegisterSubscription

func (mux *URLMux) RegisterSubscription(scheme string, opener SubscriptionURLOpener)

RegisterSubscription registers the opener with the given scheme. If an opener already exists for the scheme, RegisterSubscription panics.

func (*URLMux) RegisterTopic

func (mux *URLMux) RegisterTopic(scheme string, opener TopicURLOpener)

RegisterTopic registers the opener with the given scheme. If an opener already exists for the scheme, RegisterTopic panics.

func (*URLMux) SubscriptionSchemes

func (mux *URLMux) SubscriptionSchemes() []string

SubscriptionSchemes returns a sorted slice of the registered Subscription schemes.

func (*URLMux) TopicSchemes

func (mux *URLMux) TopicSchemes() []string

TopicSchemes returns a sorted slice of the registered Topic schemes.

func (*URLMux) ValidSubscriptionScheme

func (mux *URLMux) ValidSubscriptionScheme(scheme string) bool

ValidSubscriptionScheme returns true iff scheme has been registered for Subscriptions.

func (*URLMux) ValidTopicScheme

func (mux *URLMux) ValidTopicScheme(scheme string) bool

ValidTopicScheme returns true iff scheme has been registered for Topics.

Directories

Path Synopsis
Package awssnssqs provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package awssnssqs provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package drivertest provides a conformance test for implementations of driver.
Package drivertest provides a conformance test for implementations of driver.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
Package kafkapubsub provides an implementation of pubsub for Kafka.
Package kafkapubsub provides an implementation of pubsub for Kafka.
Package mempubsub provides an in-memory pubsub implementation.
Package mempubsub provides an in-memory pubsub implementation.
Package natspubsub provides a pubsub implementation for NATS.io.
Package natspubsub provides a pubsub implementation for NATS.io.
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL