pubsubpoc

package module
v0.0.0-...-d93812f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2020 License: MIT Imports: 6 Imported by: 0

README

pubsub-poc

Example

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"cloud.google.com/go/pubsub"
	pubsubpoc "github.com/allisson/pubsub-poc"
	gocloudpubsub "gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/gcppubsub"
)

func publish(ctx context.Context, projectID, topicID string) {
	driverURL := fmt.Sprintf("gcppubsub://projects/%s/topics/%s", projectID, topicID)
	producer, err := pubsubpoc.OpenProducer(ctx, driverURL)
	if err != nil {
		log.Fatal(err)
	}

	counter := 0
	for {
		counter++
		msg := &gocloudpubsub.Message{Body: []byte(fmt.Sprintf(`{"id": %d}`, counter))}
		producer.Send(ctx, msg)
		time.Sleep(5 * time.Second)
	}
}

func main() {
	// Use pubsub emulator
	// docker run --rm -p "8085:8085" allisson/gcloud-pubsub-emulator
	os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8085")

	// Fixtures
	ctx := context.Background()
	projectID := "my-project"
	topicID := "my-topic"
	subID := "my-subscription"
	consumerHandler := func(ctx context.Context, msg *gocloudpubsub.Message) error {
		time.Sleep(10 * time.Second)
		return nil
	}
	maxGoroutines := 10

	// Create topic
	topic, err := pubsubpoc.GCPCreateTopic(ctx, projectID, topicID)
	if err != nil {
		log.Fatal(err)
	}

	// Create subscription
	subConfig := pubsub.SubscriptionConfig{Topic: topic, AckDeadline: 60 * time.Second}
	_, err = pubsubpoc.GCPCreateSubscription(ctx, projectID, subID, subConfig)
	if err != nil {
		log.Fatal(err)
	}

	// Publish messages every 5 seconds
	go publish(ctx, projectID, topicID)

	// Open consumer
	driverURL := fmt.Sprintf("gcppubsub://projects/%s/subscriptions/%s", projectID, subID)
	consumer, err := pubsubpoc.OpenConsumer(ctx, driverURL, consumerHandler, maxGoroutines)
	if err != nil {
		log.Fatal(err)
	}

	// Graceful shutdown
	idleChan := make(chan struct{})
	go func() {
		sigint := make(chan os.Signal, 1)

		// interrupt signal sent from terminal
		signal.Notify(sigint, os.Interrupt)
		// sigterm signal sent from kubernetes
		signal.Notify(sigint, syscall.SIGTERM)

		<-sigint

		// We received an interrupt signal, shut down.
		if err := consumer.Shutdown(ctx); err != nil {
			log.Printf("consumer_shutdown_error, err=%s\n", err.Error())
		}

		close(idleChan)
	}()

	// Start consumer
	if err := consumer.Start(ctx); err != nil {
		log.Printf("consumer_error, err=%s\n", err.Error())
	}

	<-idleChan

	log.Println("consumer_shutdown_completed")
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GCPCreateSubscription

func GCPCreateSubscription(ctx context.Context, projectID, subID string, subConfig pubsub.SubscriptionConfig) (*pubsub.Subscription, error)

GCPCreateSubscription creates a new subscription on google cloud pubsub and deal if subscription already exists.

func GCPCreateTopic

func GCPCreateTopic(ctx context.Context, projectID, topicID string) (*pubsub.Topic, error)

GCPCreateTopic creates a new topic on google cloud pubsub and deal if topic already exists.

func NewDevelopmentConfig

func NewDevelopmentConfig() zap.Config

NewDevelopmentConfig is a reasonable development logging configuration. Logging is enabled at DebugLevel and above.

It enables development mode (which makes DPanicLevel logs panic), uses a console encoder, writes to standard error, and disables sampling. Stacktraces are automatically included on logs of WarnLevel and above.

func NewDevelopmentEncoderConfig

func NewDevelopmentEncoderConfig() zapcore.EncoderConfig

NewDevelopmentEncoderConfig returns an opinionated EncoderConfig for development environments.

func NewLogger

func NewLogger() (logger *zap.Logger, err error)

NewLogger returns a zap logger based on LOG_CONFIG envvar if LOG_CONFIG=="production" returns zap.NewProduction() else returns zap.NewDevelopment()

func NewProductionConfig

func NewProductionConfig() zap.Config

NewProductionConfig is a reasonable production logging configuration. Logging is enabled at InfoLevel and above.

It uses a JSON encoder, writes to standard error, and enables sampling. Stacktraces are automatically included on logs of ErrorLevel and above.

func NewProductionEncoderConfig

func NewProductionEncoderConfig() zapcore.EncoderConfig

NewProductionEncoderConfig returns an opinionated EncoderConfig for production environments.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to consume message from a specific subscription.

func OpenConsumer

func OpenConsumer(ctx context.Context, driverURL string, fn Handler, maxGoroutines int) (Consumer, error)

OpenConsumer returns a new consumer.

func (*Consumer) Shutdown

func (c *Consumer) Shutdown(ctx context.Context) error

Shutdown subscription.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start message consumption.

type Handler

type Handler func(ctx context.Context, msg *gocloudpubsub.Message) error

Handler represents a function to be passed to consumer.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer allows you to send messages to a specific topic.

func OpenProducer

func OpenProducer(ctx context.Context, driverURL string) (Producer, error)

OpenProducer returns a new producer.

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg *gocloudpubsub.Message) error

Send message to the topic.

func (*Producer) Shutdown

func (p *Producer) Shutdown(ctx context.Context) error

Shutdown topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL