pubsub

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2020 License: GPL-3.0 Imports: 3 Imported by: 0

README

pubsub PkgGoDev Coverage Status Build Status Go Report Card

This package aims to simplify pubsub management pubsub implementation in go programs. It allows to regroup any pubsub client through a single interface lessening the burdening of provider switching.

Usage

  • go get github.com/elmagician/pubsub

Implementations

  • Mock: testify mock implementation for unit testing
  • GCP: google pubsub implementation

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound = errors.New("not found")
)

Functions

This section is empty.

Types

type Envelop

type Envelop interface {
	// ToPubsubMessage converts the envelop to JSON representative byte table.
	// Used before emitting message to queue.
	ToPubsubMessage() (Message, error)

	// FromPubsubMessage set envelop data from message json payload.
	FromPubsubMessage(msg Message) error

	// Filter returns a logical map to filter value. The key as to match the expected pubsub metadata key
	// while the value represent the expected value to filter.
	Filter() MessageFilter

	// New generates a new empty envelop.
	// Used form message reception.
	New() Envelop
}

Envelop is an interface to link a golang object representing your relevant data to a Message interface. It act as a DTO and use Filter method to get relevant filtering data.

type Listener

type Listener interface {
	// OnMessage initializes a channel to listen to messages of provided Type/Version couple.
	// The provided channel uses the interface type messages.Message but you can
	// safely match it to the provided message type as it is assured that the message emitted in the channel
	// match the type/channel couple
	// newMessage has to be a function witch returns a new Message object. It will be call upon
	// receiving messages to ensure we are using different instances of the messages for each receive messages.
	OnMessage(envelop Envelop, newMessage func() Message) chan Message

	// OnUnmatched provides a channel to retrieve all messages that could not be matched against provided types/versions.
	OnUnmatched() chan Message

	// OnError initializes a channel to manage errors.
	OnError() chan error

	// Listen starts listening process in background.
	Listen(ctx context.Context)

	// Stop listening.
	Stop()
}

Listener provides method to setup listening process on subscription. Listened messages will be transformed to a Message interface and sent through channel.

type Message

type Message interface {
	// ID of the message in the service provider.
	ID() interface{}

	// Data payload.
	Data() []byte

	// Metadata are all the tags witch can identify the message and group it with others but are not
	// relevant as information.
	Metadata() map[string]string

	// Ack acknowledges message.
	Ack()

	// Nack refuses to acknowledge message.
	Nack()
}

Message is an interface to manage pubsub messages relevant data It has to represent the message payload and can include relevant information from message attributes. It is used to abstract messages type and organisation allowing any struct to be converted to a pubsub message while masking some pubsub logic.

type MessageCallback

type MessageCallback func(ctx context.Context, msg Message)

MessageCallback enforces callback function type on reception.

type MessageFilter

type MessageFilter map[string]string

MessageFilter represents filtering data for message reception.

type Publisher

type Publisher interface {
	// To indicates topic in witch we would like to send the message
	// if topic is used for the first time, a connection will be created
	// and kept alive regarding this topic.
	// Call Clean method to clear all saved topic.
	To(topics ...string) Publisher

	// WithOption allows to configure locally a send call.
	WithOption(opt interface{}) Publisher

	// Send message to topics listed in Send instance. It will returns a SendResults interface
	// witch you can safely discard if you don't need to check that your message
	// was correctly sent.
	Send(ctx context.Context, msg Envelop) (SendResults, error)

	// Destroy has to be called at the end of life of the publisher instance to ensure all messages are correctly
	// sent. Destroy method will only return after ensuring messages were sent or errored then it will
	// destroy connection to pubsub instance definitively.
	// Publisher cannot be used any more after Destroy.
	Destroy()
}

Publisher interface allow to build messages to be sent through pubsub instance.

type Pubsub

type Pubsub interface {
	// Publish prepare pubsub to emit a message.
	Publish() Publisher

	// Registry allow to add Topic or Subscription to Pubsub instance.
	Registry() Registry

	// Listen initialize a lister instance for provided subscription.
	Listen(subscription string) Listener

	// Receive initialize a receiver instance for provided subscription.
	Receive(subscription string) Receiver
}

Pubsub provides method to setup and use a pubsub client.

type Receiver

type Receiver interface {
	// OnMessage applies provided callback method to message matching expected Type && Version.
	OnMessage(envelop Envelop, callback MessageCallback)

	// OnUnmatched applies provided callback to unexpected message Type or Version.
	OnUnmatched(callback MessageCallback)

	// OnError applies a callback method to all received errors.
	OnError(callback func(ctx context.Context))

	// Start receiving as separated process.
	// Errors are managed through Error callback.
	Start(ctx context.Context)

	// Receive messages in current process.
	// Process will stop at the first error received and return it.
	// No errors are returned when Stop is used.
	Receive(ctx context.Context) error

	// Stop receiving.
	Stop()
}

Receiver provides method to setup reception process on subscription. Received messages will be transformed to a Message then process through provided processes.

type Registry

type Registry interface {
	// AddTopic registers a new topic using provided publication settings.
	AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error

	// MustAddTopic registers a new topic using provided publication settings or panic.
	MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) Registry

	// AddSubscription registers a subscription topic using provided receive settings.
	AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error

	// MustAddSubscription registers a subscription topic using provided receive settings or panic.
	MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) Registry

	// StopTopics has to be called to kill connection to topic instance. Passing no arguments
	// will stop all known topics.
	StopTopics(topics ...string)

	// Clear registry of all known Topics && Subscriptions. Clear will stop all topics removed.
	Clear()
}

Registry manages known topics and subscriptions.

type Result

type Result struct {
	ID    string
	Error error
}

type Results

type Results map[string]Result

type SendResults

type SendResults interface {
	// Results recovers send response and return the list of result corresponding Results structure.
	// This is a locking process. Results will await server response before returning.
	Results(ctx context.Context) Results

	// OnResults will apply callback function when server respond.
	OnResults(ctx context.Context, allback func(topic string, result Result))
}

SendResults allows to manage publish results

Directories

Path Synopsis
Package google implement elMagician pubsub interfaces for GCP Pubsub provider.
Package google implement elMagician pubsub interfaces for GCP Pubsub provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL