event

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2023 License: MIT Imports: 8 Imported by: 0

README

Event

Our event module provides common publish/subscribe functionality for events as used at Birdie. It specializes pubsub to events, instead of general purposes messages and adds some basic schema to the events.

Metrics

The library also provides metrics both for publishers and subscribers. We use prometheus for metrics. The metrics are always sampled but are not registered by default anywhere, so if you want the metrics you need to opt-in on them by calling event.RegisterMetrics.

Here we document the metrics generated.

Publisher
event_publish_duration_seconds : histogram

Measure publish duration time for each event. Useful when debugging connectivy issues between a service and the message broker.

Labels:

  • status : "ok" or "error".
  • name : name of the event.
event_publish_total : counter

Total of published messages.

Labels:

  • status : "ok" or "error".
  • name : name of the event.
Subscription
event_process_duration_seconds : histogram

Measure how long it took to process each event.

Labels:

  • status : "ok" or "error".
  • name : name of the event.
event_process_total : counter

Total of messages processed by a subscription.

Labels:

  • status : "ok" or "error".
  • name : name of the event.

Documentation

Overview

Package event provides functionality for publish/suscribe of events.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustRegisterMetrics added in v0.0.3

func MustRegisterMetrics(registry *prometheus.Registry)

MustRegisterMetrics will register all event related metrics on the given registry. If metrics with the same name already exist no the register this function will panic.

Types

type Envelope added in v0.0.2

type Envelope[T any] struct {
	TraceID string `json:"trace_id"`
	OrgID   string `json:"organization_id"`
	Name    string `json:"name"`
	Event   T      `json:"event"`
}

Envelope represents the structure of all data that wraps all events.

type Handler added in v0.0.4

type Handler[T any] func(context.Context, T) error

Handler is responsible for handling events from a Subscription. The context passed to the handler will have all metadata relevant to that event like org and trace IDs. It will also contain a logger that can be retrieved by using slog.FromCtx.

type Message added in v0.0.2

type Message struct {
	// contains filtered or unexported fields
}

Message represents a raw message received on a subscription.

func (Message) Body added in v0.0.2

func (m Message) Body() []byte

Body of the message.

func (Message) String added in v0.0.2

func (m Message) String() string

String representation of the message.

type MessageHandler added in v0.0.2

type MessageHandler func(Message) error

MessageHandler is responsible for handling messages from a [MsgSubscription].

func SampledMessageHandler added in v0.0.3

func SampledMessageHandler(handler MessageHandler, eventName string) MessageHandler

SampledMessageHandler will instrument the given MessageHandler returning a new one that samples metrics. These will be `event_process_*` metrics using as `name` the given eventName.

type MessageSubscription added in v0.0.4

type MessageSubscription struct {
	// contains filtered or unexported fields
}

MessageSubscription represents a subscription that delivers messages as is. No assumptions are made about the message contents. This should rarely be used in favor of Subscription.

func NewRawSubscription added in v0.0.2

func NewRawSubscription(url string, maxConcurrency int) (*MessageSubscription, error)

NewRawSubscription creates a new raw subscription. It provides messages in a service like manner (serve) and manages concurrent execution, each message is processed in its own goroutines respecting the given maxConcurrency.

func (*MessageSubscription) Serve added in v0.0.4

func (r *MessageSubscription) Serve(handler MessageHandler) error

Serve will start serving all messages from the subscription calling handler for each message. It will run until [RawSubscription.Shutdown] is called. If the error is nil Ack is sent. If a non-nil error is returned by the handler Unack will be sent. Serve may be called multiple times, each time will start a new serving service that will run up to "maxConcurrency" goroutines.

func (*MessageSubscription) Shutdown added in v0.0.4

func (r *MessageSubscription) Shutdown(ctx context.Context) error

Shutdown will shutdown the subscriber, stopping any calls to [RawSubscription.Serve]. The subscription should not be used after this method is called.

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

Publisher represents a publisher of events of type T. The publisher guarantees that the events conform to our basic schema for events.

func NewPublisher

func NewPublisher[T any](name string, t *pubsub.Topic) *Publisher[T]

NewPublisher creates a new event publisher for the given event name and topic.

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(ctx context.Context, event T) error

Publish will publish the given event.

type Subscription added in v0.0.2

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription is a subscription that received only specific types of events defined by T.

func NewSubscription added in v0.0.2

func NewSubscription[T any](name, url string, maxConcurrency int) (*Subscription[T], error)

NewSubscription creates a subscription that will accept on events of the given type and name.

func (*Subscription[T]) Serve added in v0.0.2

func (s *Subscription[T]) Serve(handler Handler[T]) error

Serve will start serving all events from the subscription calling handler for each event. It will run until Subscription.Shutdown is called. If the error is nil Ack is sent. If a non-nil error is returned by the handler Unack will be sent. If a received event is not a valid JSON it will be discarded as malformed and a Nack will be sent automatically. If a received event has the wrong name it will be discarded as malformed and a Nack will be sent automatically. Serve may be called multiple times, each time will start a new serving service that will run up to "maxConcurrency" goroutines.

func (*Subscription[T]) Shutdown added in v0.0.2

func (s *Subscription[T]) Shutdown(ctx context.Context) error

Shutdown will shutdown the subscriber, stopping any calls to [RawSubscription.Serve]. The subscription should not be used after this method is called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL