ezpubsub

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2020 License: MIT Imports: 7 Imported by: 0

README

ezpubsub

Documentation

ezpubsub is a set of higher-level abstractions over the Go library for Google Cloud Pub/Sub. It's built for convenience and intended to cover the vast majority of use cases with minimal fuss. If your use case isn't covered, you're advised to use the official library.

Why?

The cloud.google.com/go/pubsub library is well done and complete but also fairly low level. ezpubsub makes it easier to do the ~80% of things you're most likely to do with the library, especially when using Google Cloud Pub/Sub in development and testing environments.

ezpubsub features a small API surface area and doesn't require you to ever deal with context; if your use case requires context, use the core pubsub library.

Core concepts

The ezpubsub library gives you two core constructs, publishers and subscribers:

  • Subscribers listen for messages on the specified topic and respond to each message using the provided listener function.
  • Publishers publish messages to the specified topic either singly or in batches.

You can see examples on the GoDoc page.

Subscribers

Subscribers listen on the specified topic and apply the logic specified in the Listener function to each incoming message. Here's an example:

import (
        "cloud.google.com/go/pubsub"
        "github.com/lucperkins/ezpubsub"
        "log"
)

func main() {
        subscriberConfig := &ezpubsub.SubscriberConfig{
                Project: "my-project",
                Topic: "user-events",
                Subscription: "my-sub",
                Listener: func(msg *pubsub.Message) {
                        log.Printf("Event received: (id: %s, payload: %s)\n", msg.ID, string(msg.Data))
                        msg.Ack()
                },
        }
        subscriber, err := ezpubsub.NewSubscriber(subscriberConfig)
        if err != nil {
            // handle error
        }

        subscriber.Start()
}

Publishers

Publishers publish messages on the specified topic and handle publishing results according to the logic specified in the Notifier function.

import (
        "cloud.google.com/go/pubsub"
        "context"
        "github.com/lucperkins/ezpubsub"
        "time"
)

func main() {
        publisherConfig := &ezpubsub.PublisherConfig{
                Project: "my-project",
                Topic: "user-events",
                Notifier: func(res *pubsub.PublishResult) {
                        msgId, _ := res.Get(context.Background())
                        log.Printf("Message published: (id: %s)\n", id)
                },
        }

        publisher, err := ezpubsub.NewPublisher(publisherConfig)
        if err != nil {
                // handle error
        }

        // Publish bytes
        publisher.Publish([]byte("Hello world"))

        // Publish a string
        publisher.PublishString("Hello world")
        
        // Publish a JSON-serializable item
        event := struct {
                ID        int64
                Timestamp int64
                Message   string
        }{
                123456,
                time.Now.Uniz(),
                "Something happened",
        }
        err = publisher.PublishObject(event)
        if err != nil {
                // handle error
        }
}

Admin

ezpubsub offers an Admin type that enables you to perform basic administrative tasks. At the moment, Admin supports listing all topics in a project. More functionality will be added later.

func main() {
        project := "my-project"
        admin, err := ezpubsub.NewAdmin(project)
        if err != nil {
                // handle error
        }

        // List topics
        topics, err := admin.ListTopics()
        if err != nil {
                // handle error
        }

        fmt.Println("Listing topics:")
        for _, topic := range topics {
                fmt.Println(topic)
        }
}

Documentation

Overview

The ezpubsub library is a set of higher-level abstractions over the Go library for Google Cloud Pub/Sub. It's built for convenience and intended to cover the vast majority of use cases with minimal fuss. If your use case isn'topic covered, You're advised to use official library.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNoProjectSpecified      = errors.New("no project specified")
	ErrNoTopicSpecified        = errors.New("no topic specified")
	ErrNoSubscriptionSpecified = errors.New("no subscription specified")
)

Functions

func SimpleAckListener

func SimpleAckListener(msg *pubsub.Message)

A subscriber listener function that does nothing but ack each message. Useful in such situations where you need to "wind through" outstanding messages without processing them.

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

A simple administrative interface for Pub/Sub projects.

Example
admin, err := NewAdmin("my-project")
if err != nil {
	// handle error
}

topics, err := admin.ListTopics()
if err != nil {
	// handle error
}

fmt.Println("Listing topics:")
for _, topic := range topics {
	fmt.Println(topic)
}
Output:

func NewAdmin

func NewAdmin(project string) (*Admin, error)

Create a new Admin, specifying the Google Cloud project name.

func (*Admin) DeleteSubscription

func (a *Admin) DeleteSubscription(subscription string) error

Deletes a specified subscription.

func (*Admin) DeleteSubscriptions

func (a *Admin) DeleteSubscriptions(subscriptions ...string) error

Deletes multiple subscriptions.

func (*Admin) DeleteTopic

func (a *Admin) DeleteTopic(topicName string) error

Deletes the specified topic.

func (*Admin) ListSubscriptions

func (a *Admin) ListSubscriptions() ([]string, error)

Lists all current subscriptions.

func (*Admin) ListTopics

func (a *Admin) ListTopics() ([]string, error)

List all current topics under the specified project.

func (*Admin) SubscriptionExists

func (a *Admin) SubscriptionExists(subscriptionName string) (bool, error)

Checks if a subscription exists.

func (*Admin) TopicExists

func (a *Admin) TopicExists(topicName string) (bool, error)

Checks is a topic already exists.

type ErrorHandler

type ErrorHandler = func(error)

A function that determines how errors are handled.

type Listener

type Listener = func(*pubsub.Message)

A Listener function determines how each incoming Pub/Sub message is processed.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publishers publish messages on a specified Pub/Sub topic.

Example
publisherConfig := &PublisherConfig{
	Project: "...",
	Topic:   "...",
}

publisher, err := NewPublisher(publisherConfig)
if err != nil {
	log.Fatalf("Publisher creation error: %s", err)
}

msg := []byte("Hello world")
publisher.Publish(msg)
Output:

func NewPublisher

func NewPublisher(config *PublisherConfig) (*Publisher, error)

Create a new Publisher from a PublisherConfig.

func (*Publisher) Publish

func (p *Publisher) Publish(data []byte)

Publish the specified payload on the Publisher's topic.

func (*Publisher) PublishBatchSync

func (p *Publisher) PublishBatchSync(payloads [][]byte)

Synchronously publish a batch of message payloads, preserving message order.

func (*Publisher) PublishObject

func (p *Publisher) PublishObject(obj interface{}) error

Publish a JSON-serializable object on the Publisher's topic and throw an error if JSON marshalling is unsuccessful.

func (*Publisher) PublishString

func (p *Publisher) PublishString(s string)

Publish a string on the Publisher's topic.

type PublisherConfig

type PublisherConfig struct {
	Project         string
	Topic           string
	ErrorHandler    ErrorHandler
	ServerIdHandler ServerIdHandler
}

Publisher configuration. All fields except Notifier are mandatory.

Example
serverIdHandler := func(id string) {
	log.Printf("Message with ID %s published", id)
}

errHandler := func(err error) {
	log.Printf("Publisher error: %v", err)
}

publisherConfig := &PublisherConfig{
	Project:         "some-project",
	Topic:           "some-topic",
	ServerIdHandler: serverIdHandler,
	ErrorHandler:    errHandler,
}

publisher, err := NewPublisher(publisherConfig)
if err != nil {
	log.Fatalf("Publisher creation error: %s", err)
}

publisher.Publish([]byte("Hello world"))
Output:

type ServerIdHandler

type ServerIdHandler = func(string)

A handler for the server ID returned when publishing a message.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscribers subscribe to a specified Pub/Sub topic and process each incoming message in accordance with the supplied Listener function.

Example
subscriberConfig := &SubscriberConfig{
	Project:      "...",
	Topic:        "...",
	Subscription: "...",
	Listener: func(msg *pubsub.Message) {
		log.Printf("Message received (id: %s, payload: %s)", msg.Data, string(msg.Data))

		msg.Ack()
	},
	ErrorHandler: func(err error) {
		log.Printf("Publisher error: %v", err)
	},
}

subscriber, err := NewSubscriber(subscriberConfig)
if err != nil {
	log.Fatalf("Subscriber creation error: %s", err)
}

subscriber.Start()
Output:

func NewSubscriber

func NewSubscriber(config *SubscriberConfig) (*Subscriber, error)

Create a new Subscriber from a SubscriberConfig.

func (*Subscriber) Start

func (s *Subscriber) Start()

Start the Publisher. When started, the Publisher listens on its topic and applies the Listener function to each incoming message and the ErrorHandler function to errors.

type SubscriberConfig

type SubscriberConfig struct {
	Project      string
	Topic        string
	Subscription string
	PushEndpoint string
	Listener     Listener
	ErrorHandler ErrorHandler
}

Subscriber configuration. A Project, Topic, and Subscription are mandatory; errors are thrown if these are not provided. A Listener function is optional; if none is provided, a defaultListener is used that for each message received logs a simple string and acks the message. An ErrorHandler function is also optional; if none is provided, errors are logged to stderr.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL