whisper

package module
v0.0.0-...-741163d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2022 License: MIT Imports: 7 Imported by: 0

README

whisper

Simple implementation of an Event Bus using the Publish/Subscribe pattern. Whisper comes with default implementations for Google Pub/Sub and Redis Pub/Sub. It also provides a simple interface for implementing your own Pub/Sub for example using Kafka.

Qucik Overview

Whisper's event bus allows publish/subscribe-style communication between your microservices without requiring the components to explicitly be aware of each other, as shown in the following diagram:

event-pubsub

Source: .NET Microservices

A trimmed down version of the above diagram is shown below:

pubsub-basic

Source: .NET Microservices

Installation

go get github.com/10hourlabs/whisper

Usage

Google Pub/Sub
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"sync"

	"github.com/10hourlabs/whisper"
)

type HelloWorldPayload struct {
	CreatedAt int64  `json:"created_at"`
	UpdatedAt int64  `json:"updated_at"`
	Message   string `json:"message"`
}

type HelloWorldEvent struct{}

func (*HelloWorldEvent) GetEventName() whisper.Event {
	return "hello-world"
}

func (*HelloWorldEvent) GetSubscriptionID() string {
	return "tentn-example-topic-dev-sub"
}

func (*HelloWorldEvent) GetContext() context.Context {
	return context.Background()
}

func (*HelloWorldEvent) ValidatePayload(payload []byte) error {
	var p HelloWorldPayload
	if err := json.Unmarshal(payload, &p); err != nil {
		return whisper.ErrInvalidPayload
	}
	return nil
}

func (*HelloWorldEvent) Handle(ctx context.Context, body []byte) error {
	var data HelloWorldPayload
	json.Unmarshal(body, &data) // gauranteed to not error
	fmt.Printf("%s\n", data.Message)
	return nil
}

func main() {
	bus := whisper.NewEventBus(context.Background(), "connection-string")
	bus.RegisterEvents(&HelloWorldEvent{})
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		if err := whisper.Listen(bus, whisper.NewGooglePubSub()); err != nil {
			log.Fatalf("failed to subscribe: %v\n", err)
		}
		wg.Done()
	}()
	wg.Wait()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEventNotFound = errors.New("event not found")
View Source
var ErrInvalidPayload = errors.New("invalid payload")

Functions

func Listen

func Listen(e *EventBus, c Client) error

func RegisterClient

func RegisterClient(name string, c Client)

func Retry

func Retry(fn func() error, times int, delay *time.Duration) error

func Wait

func Wait(delay time.Duration)

Types

type Client

type Client interface {
	Connect(ctx context.Context, conn string) error
	Close() error
	Publish(ctx context.Context, topic string, msg []byte) error
	Subscribe(dispatch EventDispatcher, handlers ...EventHandler) error
}

func GetClient

func GetClient(conn string) (Client, error)

type Event

type Event string

Event is a string that represents an event

type EventBus

type EventBus struct {
	// Context is the context for the event client
	Context context.Context

	// Connection is the connection string to the event bus
	Connection string

	// EventHandlers is a list of event handlers
	EventHandlers []EventHandler
	// contains filtered or unexported fields
}

func NewEventBus

func NewEventBus(ctx context.Context, conn string) *EventBus

func (*EventBus) Dispatch

func (c *EventBus) Dispatch(ctx context.Context, e Event, payload []byte) error

func (*EventBus) RegisterEvents

func (c *EventBus) RegisterEvents(events ...EventHandler)

type EventDispatcher

type EventDispatcher func(ctx context.Context, event Event, payload []byte) error

EventDispatcher is a function that dispatches an event

type EventHandler

type EventHandler interface {
	// GetEventName returns the name of the event
	GetEventName() Event

	// GetSubscriptionID returns the subscription id
	GetSubscriptionID() string

	// GetContext returns the context
	GetContext() context.Context

	// ValidatePayload validates the event payload
	ValidatePayload(payload []byte) error

	// Handle handles the event
	Handle(ctx context.Context, body []byte) error
}

type EventPayload

type EventPayload struct {
	Event   Event `json:"event"`
	Payload struct {
		Version   string          `json:"version"`
		Timestamp int64           `json:"timestamp"`
		Data      json.RawMessage `json:"data"`
	} `json:"payload"`
}

EventPayload is the payload that is sent to the event handler

type GooglePubSub

type GooglePubSub struct {
	*pubsub.Client
}

func NewGooglePubSub

func NewGooglePubSub() *GooglePubSub

func (*GooglePubSub) Connect

func (g *GooglePubSub) Connect(ctx context.Context, conn string) error

func (*GooglePubSub) Publish

func (c *GooglePubSub) Publish(ctx context.Context, topic string, msg []byte) error

func (*GooglePubSub) Subscribe

func (c *GooglePubSub) Subscribe(dispatch EventDispatcher, events ...EventHandler) error

type RedisPubSub

type RedisPubSub struct {
}

func NewRedisPubSub

func NewRedisPubSub() *RedisPubSub

func (*RedisPubSub) Close

func (c *RedisPubSub) Close() error

func (*RedisPubSub) Connect

func (*RedisPubSub) Connect(ctx context.Context, conn string) error

func (*RedisPubSub) Publish

func (c *RedisPubSub) Publish(ctx context.Context, topic string, msg []byte) error

func (*RedisPubSub) Subscribe

func (c *RedisPubSub) Subscribe(dispatch EventDispatcher, events ...EventHandler) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL