pubsub

package module
v2.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 12 Imported by: 8

README

Actions Status

PubSub provides a simple helper library for doing publish and subscribe style asynchronous tasks in Go, usually in a web or micro service. PubSub allows you to write publishers and subscribers, fully typed, and swap out providers (Google Cloud PubSub, AWS SQS etc) as required.

PubSub also abstracts away the creation of the queues and their subscribers, so you shouldn't have to write any cloud specific code, but still gives you options to set concurrency, deadlines, error handling etc.

Middleware is also included, including logging, tracing and error handling!

Table of Contents

Example

Here's a basic example using Nats streaming server and a basic subscriber function that prints hello.

To publish messages, you can call Publish, you can publish a Protobuf or JSON serializable object (i.e, most Go objects).

Publish is Protobuf by default..

Publisher

pubsub.Publish(ctx, "topic-name", &User{Id: "usr_0001"})

to publish a JSON object

pubsub.PublishJSON(ctx, "topic-name", &User{Id: "usr_0001"})

This can be useful if the application subscribing isn't good with Protobuf or is external to your company for example. However Protobuf is recommended for speed, type safety and forwards compatability.

Subscriber

Subscribing to a topic is done with a single function, you'll receive a context, the object that was in the queue and the pubsub message, which includes some metadata and timing information, should you need it.

func PrintHello(ctx context.Context, msg *HelloMsg, m *pubsub.Msg) error {
	fmt.Printf("Message received %+v\n\n", m)

	fmt.Printf(msg.Greeting + " " + msg.Name + "\n")

	return nil
}

First though, you need to "Setup" your subscribers

type Subscriber struct{}

func (s *Subscriber) Setup(c *pubsub.Client) {
	c.On(pubsub.HandlerOptions{
		Topic:   HelloTopic,
		Name:    "print-hello",
		Handler: PrintHello,
		AutoAck: true,
		JSON:    true,
	})
}

pubsub.Subscribe(&Subscriber{})

Full Example

You can see a full example in the example folder.

Middleware

Default

PubSub provides a helper to setup the default middleware.

At the time of writing this includes, Logrus, Opentracing, Prometheus, Recovery (Handles panics) and Audit Logging

To use this, simple include it when initialising PubSub

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    provider,
	Middleware:  defaults.Middleware,
})

You can optionaly provide a recovery handler too.

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    provider,
	Middleware:  defaults.MiddlewareWithRecovery(func(p interface{}) (err error){
		// log p or report to an error reporter
	}),
})

Logrus

When enabled, the Logrus middleware will output something similar to below. Note that the level is DEBUG by default. To see the logs, you'll need to set logrus.SetLevel(logrus.DebugLevel) or use something like github.com/lileio/Logr which can set it from ENV variables.

time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publishing"
time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publish confirmed"
time="2019-09-23T12:46:13Z" level=debug msg="Published PubSub Msg" component=pubsub duration=143.545203ms metadata="map[x-b3-parentspanid:622cff2be9102141 x-b3-sampled:1 x-b3-flags:0 x-audit-user:xxxx@example.co.uk x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704]" topic=hello.world
time="2019-09-23T12:46:16Z" level=debug msg="Processed PubSub Msg" component=pubsub duration=1.702259988s handler=function_name id=734207593944188 metadata="map[x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704 x-audit-user:xxxx@example.co.uk x-b3-parentspanid:622cff2be9102141 x-b3-flags:0 x-b3-sampled:1]" topic=hello.work

Opentracing

The Opentracing middle adds tags ands logs to spans which will later be sent to something like Zipkin or Jaeger when setup in the application. Note that the Opentracing middleware only adds things to the context but isn't responsible for setting up Opentracing and it's reporting, for that, see here.

Prometheus

The Prometheus middleware includes some counters and histograms to help with monitoring, you can see there here but these include.

pubsub_message_published_total{topic,service}
pubsub_outgoing_bytes{topic,service}
pubsub_publish_durations_histogram_seconds
pubsub_server_handled_total{"topic", "service", "success"}
pubsub_incoming_bytes{"topic", "service"}
pubsub_subscribe_durations_histogram_seconds{"topic", "service"}

Here's an example query to get messages handled (by a subscriber) every minute, make sure your prometheus step is also 1m

sum(increase(pubsub_server_handled_total[1m])) by (topic, success)

Providers

Google Cloud PubSub

To setup a Google Cloud client, you can do the following..

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    google.NewGoogleCloud('projectid'),
	Middleware:  defaults.Middleware,
})

If you're on Google Cloud vms you're environment likely already has credentials setup, but locally you can set them up with default credentials, if you're on Kubernetes, I reccomend setting up service account and then making a secret file and setting the GOOGLE_APPLICATION_CREDENTIALS to the filepath of that JSON secret key.

The Google PubSub provider is tested heavily in production by Echo and works well, we have however noticed some strange behaviour from Google subscribers, as they try to be clever and balance traffic and other strange things. For example, if you want to only process 2 messages at a time, and don't process the two you're given, then can often result in a pause before more messages are sent to you, this can be hard to debug as a queue builds up, but often fixes itself.

Nats Streaming Server

To setup a Nats Streaming client, you can do the following. Optionally passing options for the original client

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    nats.NewNats('clustername', opts),
	Middleware:  defaults.Middleware,
})

Note this driver is for Nats Streaming, and not for plain Nats.

AWS SQS/SNS

Currently there is no provider for AWS SNS and SQS. Please feel free to make a pull request!

Kafka

There's an experimental provider for Kafka available here, but it's limiting in options you can override. I'd love to see someone take this on and help it become more bullet proof. But things like retries are hard.

Documentation

Overview

Package pubsub implements publish subscriber patterns for usage in Golang

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddPublisherClient added in v2.3.1

func AddPublisherClient(cli *Client)

AddPublisherClient allows another client to bet set, for publishing only

func SetClient

func SetClient(cli *Client)

SetClient sets the global pubsub client, useful in tests

func Shutdown

func Shutdown()

func Subscribe

func Subscribe(s Subscriber)

Subscribe starts a run loop with a Subscriber that listens to topics and waits for a syscall.SIGINT or syscall.SIGTERM

func WaitForAllPublishing added in v2.3.1

func WaitForAllPublishing()

WaitForAllPublishing waits for all in flight publisher messages to go, before returning

Types

type Client

type Client struct {
	ServiceName string
	Provider    Provider
	Middleware  []Middleware
}

Client holds a reference to a Provider

func GetClient added in v2.6.0

func GetClient() *Client

GetClient get the global pubsub client, useful in tests

func (Client) On

func (c Client) On(opts HandlerOptions)

On takes HandlerOptions and subscribes to a topic, waiting for a protobuf message calling the function when a message is received

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, topic string, msg interface{}, isJSON bool) error

Publish published on the client

type Handler

type Handler interface{}

Handler is a specific callback used for Subscribe in the format of.. func(ctx context.Context, obj proto.Message, msg *Msg) error for example, you can unmarshal a custom type.. func(ctx context.Context, accounts accounts.Account, msg *Msg) error you can also unmarshal a JSON object by supplying any type of interface{} func(ctx context.Context, accounts models.SomeJSONAccount, msg *Msg) error

type HandlerOptions

type HandlerOptions struct {
	// The topic to subscribe to
	Topic string
	// The name of this subscriber/function
	Name string
	// The name of this subscriber/function's service
	ServiceName string
	// The function to invoke
	Handler Handler
	// A message deadline/timeout
	Deadline time.Duration
	// Concurrency sets the maximum number of msgs to be run concurrently
	// default: 20
	Concurrency int
	// Auto Ack the message automatically if return err == nil
	AutoAck bool
	// Decode JSON objects from pubsub instead of protobuf
	JSON bool
	// StartFromBeginning starts a new subscriber from
	// the beginning of messages available, if supported
	StartFromBeginning bool
	// Unique subscriber means that all subscribers will receive all messages
	Unique bool
}

HandlerOptions defines the options for a subscriber handler

type MessageWrapper

type MessageWrapper struct {
	Data                 []byte               `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	Metadata             map[string]string    `` /* 157-byte string literal not displayed */
	PublishTime          *timestamp.Timestamp `protobuf:"bytes,4,opt,name=publish_time,json=publishTime,proto3" json:"publish_time,omitempty"`
	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
	XXX_unrecognized     []byte               `json:"-"`
	XXX_sizecache        int32                `json:"-"`
}

Msg is a wrapper message that alllows us to keep metadata and other different and useful information across all providers

func (*MessageWrapper) Descriptor

func (*MessageWrapper) Descriptor() ([]byte, []int)

func (*MessageWrapper) GetData

func (m *MessageWrapper) GetData() []byte

func (*MessageWrapper) GetMetadata

func (m *MessageWrapper) GetMetadata() map[string]string

func (*MessageWrapper) GetPublishTime

func (m *MessageWrapper) GetPublishTime() *timestamp.Timestamp

func (*MessageWrapper) ProtoMessage

func (*MessageWrapper) ProtoMessage()

func (*MessageWrapper) Reset

func (m *MessageWrapper) Reset()

func (*MessageWrapper) String

func (m *MessageWrapper) String() string

func (*MessageWrapper) XXX_DiscardUnknown

func (m *MessageWrapper) XXX_DiscardUnknown()

func (*MessageWrapper) XXX_Marshal

func (m *MessageWrapper) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MessageWrapper) XXX_Merge

func (m *MessageWrapper) XXX_Merge(src proto.Message)

func (*MessageWrapper) XXX_Size

func (m *MessageWrapper) XXX_Size() int

func (*MessageWrapper) XXX_Unmarshal

func (m *MessageWrapper) XXX_Unmarshal(b []byte) error

type Middleware

type Middleware interface {
	SubscribeInterceptor(opts HandlerOptions, next MsgHandler) MsgHandler
	PublisherMsgInterceptor(serviceName string, next PublishHandler) PublishHandler
}

Middleware is an interface to provide subscriber and publisher interceptors

type Msg

type Msg struct {
	ID          string
	Metadata    map[string]string
	Data        []byte
	PublishTime *time.Time

	Ack  func()
	Nack func()
}

Msg is a lile representation of a pub sub message

type MsgHandler

type MsgHandler func(ctx context.Context, m Msg) error

MsgHandler is the internal or raw message handler

type NoopProvider

type NoopProvider struct{}

NoopProvider is a simple provider that does nothing, for testing, defaults

func (NoopProvider) Publish

func (np NoopProvider) Publish(ctx context.Context, topic string, m *Msg) error

Publish does nothing

func (NoopProvider) Shutdown

func (np NoopProvider) Shutdown()

Shutdown shutsdown immediately

func (NoopProvider) Subscribe

func (np NoopProvider) Subscribe(opts HandlerOptions, h MsgHandler)

Subscribe does nothing

type Provider

type Provider interface {
	Publish(ctx context.Context, topic string, m *Msg) error
	Subscribe(opts HandlerOptions, handler MsgHandler)
	Shutdown()
}

Provider is generic interface for a pub sub provider

type PublishHandler

type PublishHandler func(ctx context.Context, topic string, m *Msg) error

PublishHandler wraps a call to publish, for interception

type PublishResult

type PublishResult struct {
	Ready chan struct{}
	Err   error
}

A PublishResult holds the result from a call to Publish.

func Publish

func Publish(ctx context.Context, topic string, msg proto.Message) *PublishResult

Publish is a convenience message which publishes to the current (global) publisher as protobuf

func PublishJSON

func PublishJSON(ctx context.Context, topic string, obj interface{}) *PublishResult

PublishJSON is a convenience message which publishes to the current (global) publisher as JSON

type Subscriber

type Subscriber interface {
	// Setup is a required method that allows the subscriber service to add handlers
	// and perform any setup if required, this is usually called by pubsub upon start
	Setup(*Client)
}

Subscriber is a service that listens to events and registers handlers for those events

Directories

Path Synopsis
example
middleware
providers
kafka
THIS IS AN EXPERIMENTAL DRIVER, PLEASE USE WITH CAUTION
THIS IS AN EXPERIMENTAL DRIVER, PLEASE USE WITH CAUTION
Package test is a generated protocol buffer package.
Package test is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL