pm

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2024 License: MIT Imports: 12 Imported by: 3

README

pm

Test Workflow codecov Go Report Card

pm is a thin Cloud Pub/Sub client wrapper which lets you manage publishing / subscribing with pluggable middleware.

Installation

go get -u github.com/k-yomo/pm

Example

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/k-yomo/pm"
	"github.com/k-yomo/pm/middleware/logging/pm_zap"
	"github.com/k-yomo/pm/middleware/pm_attributes"
	"github.com/k-yomo/pm/middleware/pm_autoack"
	"github.com/k-yomo/pm/middleware/pm_recovery"
	"go.uber.org/zap"
)

func main() {
	logger, _ := zap.NewProduction()
	ctx := context.Background()
	pubsubClient, err := pubsub.NewClient(ctx, "pm-example")
	if err != nil {
		logger.Fatal("initialize pubsub client failed", zap.Error(err))
	}
	defer pubsubClient.Close()

	pubsubPublisher := pm.NewPublisher(
		pubsubClient,
		pm.WithPublishInterceptor(
			pm_attributes.PublishInterceptor(map[string]string{"key": "value"}),
		),
	)

	pubsubSubscriber := pm.NewSubscriber(
		pubsubClient,
		pm.WithSubscriptionInterceptor(
			pm_zap.SubscriptionInterceptor(logger),
			pm_autoack.SubscriptionInterceptor(),
			pm_recovery.SubscriptionInterceptor(pm_recovery.WithDebugRecoveryHandler()),
		),
	)
	defer pubsubSubscriber.Close()

	sub := pubsubClient.Subscription("example-topic-sub")
	batchSub := pubsubClient.Subscription("example-topic-batch-sub")
	err = pubsubSubscriber.HandleSubscriptionFuncMap(map[*pubsub.Subscription]pm.MessageHandler{
		sub: exampleSubscriptionHandler,
		batchSub: pm.NewBatchMessageHandler(exampleSubscriptionBatchHandler, pm.BatchMessageHandlerConfig{
			DelayThreshold:    100 * time.Millisecond,
			CountThreshold:    1000,
			ByteThreshold:     1e6,
			BufferedByteLimit: 1e8,
		}),
	})
	if err != nil {
		logger.Fatal("register subscription failed", zap.Error(err))
	}

	pubsubSubscriber.Run(ctx)
	defer pubsubSubscriber.Close()

	pubsubPublisher.Publish(
		ctx,
		pubsubPublisher.Topic("example-topic"),
		&pubsub.Message{
			Data: []byte("test"),
		},
	)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	<-c
}

func exampleSubscriptionHandler(ctx context.Context, m *pubsub.Message) error {
	dataStr := string(m.Data)
	if dataStr == "panic" {
		panic("panic")
	}

	if dataStr == "error" {
		return errors.New("error")
	}

	fmt.Println(dataStr)
	return nil
}

func exampleSubscriptionBatchHandler(messages []*pubsub.Message) error {
	batchErr := make(pm.BatchError)
	for _, m := range messages {
		dataStr := string(m.Data)
		if dataStr == "error" {
			batchErr[m.ID] = errors.New("error")
		} else {
			fmt.Println(dataStr)
		}
	}

	return batchErr
}

Middlewares

Core Middleware

pm comes equipped with an optional middleware packages named pm_*.

Publish interceptor
interceptor description
Attributes Set custom attributes to all outgoing messages when publish
Subscription interceptor
interceptor description
Auto Ack Ack automatically depending on if error is returned when subscribe
Effectively Once De-duplicate messages with the same de-duplicate key
Logging - Zap Emit an informative zap log when subscription processing finish
Logging - Logrus Emit an informative logrus log when subscription processing finish
Recovery Gracefully recover from panics and prints the stack trace when subscribe
Custom Middleware

pm middleware is just wrapping publishing / subscribing process which means you can define your custom middleware as well.

  • publish interceptor
func MyPublishInterceptor(attrs map[string]string) pm.PublishInterceptor {
	return func (next pm.MessagePublisher) pm.MessagePublisher {
		return func (ctx context.Context, topic *pubsub.Topic, m *pubsub.Message) *pubsub.PublishResult {
			// do something before publishing 
			result := next(ctx, topic, m)
			// do something after publishing 
			return result
		}
	}
}
  • subscription interceptor
func MySubscriptionInterceptor() pm.SubscriptionInterceptor {
	return func(_ *pm.SubscriptionInfo, next pm.MessageHandler) pm.MessageHandler {
		return func(ctx context.Context, m *pubsub.Message) error {
			// do something before subscribing 
			err := next(ctx, m) 
			// do something after subscribing 
			return err
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultMessageBatchHandlerConfig = &BatchMessageHandlerConfig{
	DelayThreshold: 10 * time.Millisecond,
	CountThreshold: 100,
	ByteThreshold:  1e6,
	NumGoroutines:  10,

	BufferedByteLimit: 10 * pubsub.MaxPublishRequestBytes,
}

Functions

This section is empty.

Types

type BatchError added in v0.3.0

type BatchError map[string]error

BatchError is used to handle error for each message The key is message id

func (BatchError) Error added in v0.3.0

func (b BatchError) Error() string

type BatchMessageHandlerConfig added in v0.3.0

type BatchMessageHandlerConfig struct {
	// Process a non-empty batch after this delay has passed.
	// Defaults to DefaultMessageBatchHandlerConfig.DelayThreshold.
	DelayThreshold time.Duration

	// Process a batch when it has this many messages.
	// Defaults to DefaultMessageBatchHandlerConfig.CountThreshold.
	CountThreshold int

	// Process a batch when its size in bytes reaches this value.
	// Defaults to DefaultMessageBatchHandlerConfig.ByteThreshold.
	ByteThreshold int

	// The number of goroutines.
	// Defaults to DefaultMessageBatchHandlerConfig.NumGoroutines.
	NumGoroutines int

	// Defaults to DefaultMessageBatchHandlerConfig.BufferedByteLimit.
	BufferedByteLimit int
}

type MessageBatchHandler added in v0.3.0

type MessageBatchHandler func(messages []*pubsub.Message) error

MessageBatchHandler defines the batch message handler By default, when non-nil error is returned, all messages are processed as error in MessageHandler To handle error for each message, use BatchError

type MessageHandler

type MessageHandler = func(ctx context.Context, m *pubsub.Message) error

MessageHandler defines the message handler invoked by SubscriptionInterceptor to complete the normal message handling.

func NewBatchMessageHandler added in v0.3.0

func NewBatchMessageHandler(handler MessageBatchHandler, config BatchMessageHandlerConfig) MessageHandler

NewBatchMessageHandler initializes MessageHandler for batch message processing with config

type MessagePublisher

type MessagePublisher = func(ctx context.Context, topic *pubsub.Topic, m *pubsub.Message) *pubsub.PublishResult

MessagePublisher defines the message publisher invoked by PublishInterceptor to complete the normal message publishment.

type PublishInterceptor

type PublishInterceptor = func(next MessagePublisher) MessagePublisher

PublishInterceptor provides a hook to intercept the execution of a publishment.

type Publisher

type Publisher struct {
	*pubsub.Client
	// contains filtered or unexported fields
}

Publisher represents a wrapper of Pub/Sub client focusing on publishment.

func NewPublisher

func NewPublisher(pubsubClient *pubsub.Client, opt ...PublisherOption) *Publisher

NewPublisher initializes new Publisher.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic *pubsub.Topic, m *pubsub.Message) *pubsub.PublishResult

Publish publishes Pub/Sub message with applying middlewares

type PublisherOption

type PublisherOption interface {
	// contains filtered or unexported methods
}

PublisherOption is a option to change publisher configuration.

func WithPublishInterceptor

func WithPublishInterceptor(interceptors ...PublishInterceptor) PublisherOption

WithPublishInterceptor sets publish interceptors.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a wrapper of Pub/Sub client mainly focusing on pull subscription.

func NewSubscriber

func NewSubscriber(pubsubClient *pubsub.Client, opt ...SubscriberOption) *Subscriber

NewSubscriber initializes new Subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close()

Close closes running subscriptions gracefully.

func (*Subscriber) HandleSubscriptionFunc

func (s *Subscriber) HandleSubscriptionFunc(subscription *pubsub.Subscription, f MessageHandler) error

HandleSubscriptionFunc registers subscription handler for the given id's subscription. If subscription does not exist, it will return error.

func (*Subscriber) HandleSubscriptionFuncMap added in v0.1.2

func (s *Subscriber) HandleSubscriptionFuncMap(funcMap map[*pubsub.Subscription]MessageHandler) error

HandleSubscriptionFuncMap registers multiple subscription handlers at once. This function take map of key[subscription id]: value[corresponding message handler] pairs.

func (*Subscriber) Run

func (s *Subscriber) Run(ctx context.Context)

Run starts running registered pull subscriptions.

type SubscriberOption

type SubscriberOption interface {
	// contains filtered or unexported methods
}

SubscriberOption is a option to change subscriber configuration.

func WithSubscriptionInterceptor

func WithSubscriptionInterceptor(interceptors ...SubscriptionInterceptor) SubscriberOption

WithSubscriptionInterceptor sets subscription interceptors.

type SubscriptionInfo added in v0.1.1

type SubscriptionInfo struct {
	TopicID        string
	SubscriptionID string
}

SubscriptionInfo contains various info about the subscription.

type SubscriptionInterceptor

type SubscriptionInterceptor = func(info *SubscriptionInfo, next MessageHandler) MessageHandler

SubscriptionInterceptor provides a hook to intercept the execution of a message handling.

Directories

Path Synopsis
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL