msg

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: BSD-3-Clause-Clear Imports: 5 Imported by: 5

README

go-msg

GoDoc Build Status

Pub/Sub Message Primitives for Go

This library contains the basic primitives for developing pub-sub systems.

Messages are published to Topics. Servers subscribe to Messages.

These primitives specify abstract behavior of pub-sub; they do not specify implementation. A Message could exist in an in-memory array, a file, a key/value store like RabbitMQ, or even something like Amazon SNS/SQS or Google Pub/Sub. In order to tap into that backend, a concrete implementation must be written for it.

Here's a list of backends that are currently supported:

Backend Link
Channels https://github.com/zerofox-oss/go-msg/backends/mem
AWS (SNS,SQS) https://github.com/zerofox-oss/go-aws-msg
Google PubSub https://github.com/paultyng/go-msg-pubsub

How it works

Backend

A backend simply represents the infrastructure behind a pub-sub system. This is where Messages live.

Examples could include a key/value store, Google Pub/Sub, or Amazon SNS + SQS.

Message

A Message represents a discrete unit of data. It contains a body and a list of attributes. Attributes can be used to distinguish unique properties of a Message, including how to read the body. More on that in decorator patterns.

Publish

A Message is published to a Topic. A Topic writes the body and attributes of a Message to a backend using a MessageWriter. A MessageWriter may only be used for one Message, much like a net/http ResponseWriter

When the MessageWriter is closed, the data that was written to it will be published to that backend and it will no longer be able to be used.

Subscribe

A Server subscribes to Messages from a backend. It's important to note that a Server must know how to convert raw data to a Message - this will be unique to each backend. For example, the way you read message attributes from a file is very different from how you read them from SQS. A Server is always live, so it will continue to block indefinitely while it is waiting for messages until it is shut down.

When a Message is created, the Server passes it to a Receiver for processing. This is similar to how net/http Handler works. A Receiver may return an error if it was unable to process the Message. This will indicate to the Server that the Message must be retried. The specifics to this retry logic will be specific to each backend.

Benefits

This library was originally conceived because we needed a way to reduce copy-pasted code across our pub-sub systems and we wanted to try out other infrastructures.

These primitives allow us to achieve both of those goals. Want to try out Kafka instead of AWS? No problem! Just write a library that utilizes these primitives and the Kafka SDK.

What these primitives or any implementation of these primitives DO NOT DO is mask or replace all of the functionality of all infrastructures. If you want to use a particular feature of AWS that does not fit with these primitives, that's OK. It might make sense to add that feature to the primitives, it might not. We encourage you to open an issue to discuss such additions.

Aside from the code re-use benefits, there's a number of other features which we believe are useful, including:

  • Concrete implementations can be written once and distributed as libraries.

  • Decorator Patterns.

  • Built-in concurrency controls into Server.

  • Context deadlines and cancellations. This allows for clean shutdowns to prevent data loss.

  • Transaction-based Receivers.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosedMessageWriter = errors.New("msg: MessageWriter closed")

ErrClosedMessageWriter is the error used for write or close operations on a closed MessageWriter.

View Source
var ErrServerClosed = errors.New("msg: server closed")

ErrServerClosed represents a completed Shutdown

Functions

func CloneBody

func CloneBody(m *Message) (io.Reader, error)

CloneBody returns a reader with the same contents and m.Body. m.Body is reset allowing it to be read from later.

func DumpBody

func DumpBody(m *Message) ([]byte, error)

DumpBody returns the contents of m.Body while resetting m.Body allowing it to be read from later.

Types

type Attributes

type Attributes map[string][]string

Attributes represent the key-value metadata for a Message.

func (Attributes) Get

func (a Attributes) Get(key string) string

Get returns the first value associated with the given key. It is case insensitive; CanonicalMIME is used to cannonicalize the provided key. If there are no values associated with the key, Get returns "". To access multiple values of a key, or to use non-canonical keys, access the map directly.

func (Attributes) Set

func (a Attributes) Set(key, value string)

Set sets the header entries associated with key the single element element value. It replaces any existing values associated with key.

Note: MIMEHeader automatically capitalizes the first letter of the key.

type Message

type Message struct {
	Attributes Attributes
	Body       io.Reader
}

A Message represents a discrete message in a messaging system.

func WithBody

func WithBody(parent *Message, r io.Reader) *Message

WithBody creates a new Message with the given io.Reader as a Body containing the parent's Attributes.

p := &Message{
	Attributes: Attributes{},
	Body: strings.NewReader("hello world"),
}
p.Attributes.Set("hello", "world")
m := WithBody(p, strings.NewReader("world hello")

type MessageWriter

type MessageWriter interface {
	io.Writer
	// Close should be called to signify the completion of a Write. Attributes
	// that represent a transform applied to a message should also be written
	// at this time.
	//
	// Close should forward a message to another MessageWriter or persist
	// to the messaging system.
	//
	// Once Close has been called, all subsequent Write and Close calls will result
	// in an ErrClosedMessageWriter error.
	io.Closer
	Attributes() *Attributes
}

A MessageWriter interface is used to write a message to an underlying data stream.

type Receiver

type Receiver interface {
	Receive(context.Context, *Message) error
}

A Receiver processes a Message.

Receive should process the message and then return. Returning signals that the message has been processed. It is not valid to read from the Message.Body after or concurrently with the completion of the Receive call.

If Receive returns an error, the server (the caller of Receive) assumes the message has not been processed and, depending on the underlying pub/sub system, the message should be put back on the message queue.

type ReceiverFunc

type ReceiverFunc func(context.Context, *Message) error

The ReceiverFunc is an adapter to allow the use of ordinary functions as a Receiver. ReceiverFunc(f) is a Receiver that calls f.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(ctx context.Context, m *Message) error

Receive calls f(ctx,m)

type Server

type Server interface {
	// Serve is a blocking function that gets data from an input stream,
	// creates a message, and calls Receive() on the provided receiver
	// with the Message and a Context derived from context.Background().
	// For example:
	//
	// 		parentctx = context.WithCancel(context.Background())
	// 		err := r.Receive(parentctx, m)
	//
	// Serve will return ErrServerClosed after Shutdown completes. Additional
	// error types should be considered to represent error conditions unique
	// to the implementation of a specific technology.
	//
	// Serve() should continue to listen until Shutdown is called on
	// the Server.
	Serve(Receiver) error

	// Shutdown gracefully shuts down the Server by letting any messages in
	// flight finish processing.  If the provided context cancels before
	// shutdown is complete, the Context's error is returned.
	Shutdown(context.Context) error
}

A Server serves messages to a receiver.

type Topic

type Topic interface {
	// NewWriter returns a new MessageWriter
	NewWriter(context.Context) MessageWriter
}

Topic is a generic interface where messages are sent in a messaging system.

Multiple goroutines may invoke method on a Topic simultaneously.

type TopicFunc

type TopicFunc func(context.Context) MessageWriter

The TopicFunc is an adapter to allow the use of ordinary functions as a Topic. TopicFunc(f) is a Topic that calls f.

func (TopicFunc) NewWriter

func (t TopicFunc) NewWriter(ctx context.Context) MessageWriter

NewWriter calls f(ctx,m)

Directories

Path Synopsis
backends
mem
decorators
lz4
otel/tracing
Tracing provides decorators which enable distributed tracing
Tracing provides decorators which enable distributed tracing
tracing
Tracing provides decorators which enable distributed tracing
Tracing provides decorators which enable distributed tracing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL