remit

package module
v0.0.0-...-1ddd196 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2019 License: MIT Imports: 11 Imported by: 0

README

go-remit

Latest Version GoDoc Reference Go Report Card

A small set of functionality used to create microservices that don't need to be aware of one-another's existence. It uses AMQP at its core to manage service discovery-like behaviour without the need to explicitly connect one service to another.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Name string
	Url  string
}

Config represents a collection of options used to connect to the RabbitMQ server, as well as some Remit-specific options such as the service name.

type ConnectionOptions

type ConnectionOptions struct {
	Url  string
	Name string
}

ConnectionOptions is the options used to connect to RabbitMQ and any connection-wide settings needed for Remit.

type Emit

type Emit struct {
	Channel chan interface{}

	RoutingKey string
	// contains filtered or unexported fields
}

Emit represents an emission to emit data to the system.

Most commonly, this is used to notify other services of changes, update configuration or the like.

For examples of Emit usage, see `Session.Emit` and `Session.LazyEmit`.

type EmitOptions

type EmitOptions struct {
	RoutingKey string
}

EmitOptions is a list of options that can be passed when setting up an emission.

type Endpoint

type Endpoint struct {
	// given properties
	RoutingKey string
	Queue      string

	// generated properties
	Data  chan Event
	Ready chan bool
	// contains filtered or unexported fields
}

Endpoint manages the RPC-style consumption and publishing of messages.

Most commonly, this is used to set up an endpoint that can be requested using `Session.Request` or `Session.LazyRequest`.

For examples of Endpoint usage, see `Session.Endpoint` and `Session.LazyEndpoint`.

func (Endpoint) Close

func (endpoint Endpoint) Close()

Close closes the endpoint, stopping message consumption and closing the endpoint's receiving channel.

It will cancel consumption, but wait for all unacked messages to be handled before closing the channel, meaning no loss should occur.

The endpoint can be reopened using `Endpoint.Open`.

func (*Endpoint) OnData

func (endpoint *Endpoint) OnData(handlers ...EndpointDataHandler)

OnData is used to register a data handler for a particular endpoint. A data handler here is a function (or set of functions) to run whenever a new message is received.

If multiple handlers are provided in sequence, earlier functions will act as "middleware", used primarily for the mutation of the `Event` or the like.

An example of this might be:

endpoint := remitSession.Endpoint("math.sum")
endpoint.OnData(logMessage, parseArguments, handle)

In the above example, `logMessage`, `parseArguments` and `handle` will be run in sequence. If any of the functions push data to either `Event.Success` or `Event.Failure`, the chain is broken and the message immediately replied to. Otherwise, sending `true` to `Event.Next` should be performed to indicate that it's safe to move to the next step.

If `Event.Next` is pushed to on the final handler, the message will be treated as successful but the reply will contain no data.

func (*Endpoint) Open

func (endpoint *Endpoint) Open()

Open the endpoint to messages, starting consumption and pushing `true` to `Endpoint.Ready` upon completion.

The recommendation here is to ensure any and all data handlers are registered before opening the endpoint up.

type EndpointDataHandler

type EndpointDataHandler func(Event)

EndpointDataHandler is the function spec needed for listening to endpoint data.

type EndpointOptions

type EndpointOptions struct {
	RoutingKey string
	Queue      string
	// contains filtered or unexported fields
}

EndpointOptions is a list of options that can be passed when setting up an endpoint.

type Event

type Event struct {
	// Data given in the message.
	// All fields are always passed back with the exception
	// of `Data` and `Error`, which are mutually exclusive.
	EventId   string      // the ULID of the event
	EventType string      // the routing key used to route this message
	Resource  string      // the service that send this message
	Data      EventData   // the data this message contains (as `EventData`)
	Error     interface{} // the error this message contains

	// Channels that can be used to respond to or acknowledge this message.
	Success chan interface{} // send data back if the handling was successful
	Failure chan interface{} // send an error back if the handling failed
	Next    chan bool        // skip to the next piece of middleware/function
	// contains filtered or unexported fields
}

Event represents a single message being received. This could be data from an endpoint, a reply following a request or an emission.

type EventData

type EventData map[string]interface{}

EventData - for ease of use - sets `Data` within an `Event` to be a `map[string]interface{}`. This enables us to access basic properties via indexing, but deeper handling is recommended if more control is needed.

A common practice is to use something like `gopkg.in/mgo.v2/bson` to marshal the data through a struct:

type IncomingData struct {
	GivenId string `bson:"id"`
	TargetId string `bson:"target"`
}

data := new(IncomingData)
b, _ := bson.Marshal(event.Data)
bson.Unmarshal(b, &data)

type J

type J map[string]interface{}

J is a convenient aliaas for a `map[string]interface{}`, useful for dealing with JSON is a more native manner.

remit.J{
	"foo": "bar",
	"baz": true,
	"qux": remit.J{
		"big": false,
		"small": true,
	},
}

type Request

type Request struct {
	RoutingKey string
	// contains filtered or unexported fields
}

Request represents an RPC request for data.

Most commonly, this is used to contact another service to retrieve data, utilising a `Session.Endpoint`.

For examples of Request usage, see `Session.Request` and `Session.LazyRequest`.

func (*Request) Send

func (request *Request) Send(data interface{}) chan Event

Send sends some data to a previously-set-up `Request` using `Session.Request`. It returns a channel on which a single reply `Event` will be passed upon RPC completion.

type RequestOptions

type RequestOptions struct {
	RoutingKey string
}

RequestOptions is a list of options that can be passed when setting up a request.

type Session

type Session struct {
	// the config given for this connection
	Config Config
	// contains filtered or unexported fields
}

Session represents a communication session with RabbitMQ.

Most commonly, a single Remit session is used for an entire service, as separate channels are generated for each endpoint.

func Connect

func Connect(options ConnectionOptions) Session

Connect connects to a RabbitMQ instance using the `Url` provided in `ConnectionOptions` and setting the _service name_ to `Name`.

The `Url` should be valid as defined by the AMQP URI scheme. Currently, this is:

amqp_URI       = "amqp://" amqp_authority [ "/" vhost ] [ "?" query ]
amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
amqp_userinfo  = username [ ":" password ]
username       = *( unreserved / pct-encoded / sub-delims )
password       = *( unreserved / pct-encoded / sub-delims )
vhost          = segment

More info can be found here:

https://www.rabbitmq.com/uri-spec.html

Example:

remitSession := remit.Connect(remit.ConnectionOptions{
	Name: "my-service",
	Url: "amqp://localhost"
})

func (*Session) Close

func (session *Session) Close() chan bool

Close closes the Remit session by waiting for all unacknowledged messages to be handled before closing the RabbitMQ connection and pushing `true` to the returned channel.

Example:

remitSession := remit.Connect(...)
...
<-remitSession.Close()

func (*Session) CloseOnSignal

func (session *Session) CloseOnSignal() chan bool

CloseOnSignal returns a channel that will receive either `true` or `false` depending on whether Remit closed its connections safely as a result of an interruption signal.

The receipt of a signal causes the session to close via `Session.Close()`. A second signal being sent whilst the close is in progress will perform a "cold" shutdown, dismissing any unacknowledged messages and returning `false` to the channel straight away.

The signals currently supported are `1 SIGHUP`, `2 SIGINT`, `3 SIGQUIT` and `15 SIGTERM`.

Example:

remitSession := remit.Connect(...)
...
<-remitSession.CloseOnSignal()

func (*Session) Emit

func (session *Session) Emit(key string) chan interface{}

Emit sets up an emitter that messages can be pushed to. For a one-liner emission, see `Session.LazyEmit`.

Especially useful for emitting system events or firing off requests if you don't care about the response.

`key` will be used as a routing key and emissions are always published to the `"remit"` exchange.

Example:

remitSession := remit.Connect(...)

emitter := remitSession.Emit("service.connected")
emitter <- "my-service-id"

// is synonymous with
remitSession.LazyEmit("service.connected", "my-service-id")

func (*Session) Endpoint

func (session *Session) Endpoint(key string) Endpoint

Endpoint creates an endpoint for `key` but does not start consuming. For a one-liner endpoint, see `Session.LazyEndpoint`.

Using this, the usual set-up is to also add a data handler and then open the endpoint for messages:

endpoint := remitSession.Endpoint("math.sum")
endpoint.OnData(sumHandler)
endpoint.Open()

This would be synonymous with `Session.LazyEndpoint`'s:

endpoint := remitSession.LazyEndpoint("math.sum", sumHandler)

When this endpoint is created, both the `RoutingKey` and `Queue` will be set to the provided `key`. If you'd like to specify them as separate entities, see `Session.EndpointWithOptions`.

Example:

remitSession := remit.Connect(...)

endpoint := remitSession.Endpoint("math.sum")
endpoint.OnData(sumHandler)
endpoint.Open()

func (*Session) EndpointWithOptions

func (session *Session) EndpointWithOptions(options EndpointOptions) Endpoint

EndpointWithOptions allows you to create an endpoint with very particular options, described in the `EndpointOptions` type.

Failure to provide either a `Queue` or a `Routing` key results in a panic. If one is given and not the other, the value will be duplicated.

Example:

remitSession := remit.Connect(...)

endpoint := EndpointWithOptions(remit.EndpointOptions{
	RoutingKey: "maths",
	Queue: "math.sum",
})

func (*Session) LazyEmit

func (session *Session) LazyEmit(key string, data interface{})

LazyEmit immediately publishes a message using `Session.Emit` using the given routing key and data.

Example:

remitSession := remit.Connect(...)

remitSession.LazyEmit("service.connceted", "my-service-id")

func (*Session) LazyEndpoint

func (session *Session) LazyEndpoint(key string, handlers ...EndpointDataHandler) Endpoint

LazyEndpoint is a lazy, one-liner version of `Session.Endpoint`.

It creates an endpoint via `Session.Endpoint`, adds the ordered data handlers given in its arguments via `Endpoint.OnData` and then opens the endpoint via `Endpoint.Open()`.

Like `Endpoint.OnData`, `Session.LazyEndpoint` is a variadic method, meaning it handles multiple data handlers, having them act as ordered middleware.

Example:

remitSession := remit.Connect(...)

endpoint := remitSession.LazyEndpoint("math.sum", sumHandler)

func (*Session) LazyListener

func (session *Session) LazyListener(key string, handlers ...EndpointDataHandler) Endpoint

LazyListener is a lazy, one-liner version of `Listener.

It creates a listener via `Session.Listener`, adds the ordered data handlers given in its arguments via `Endpoint.OnData` and then opens the endpoint via `Endpoint.Open()`.

Like `Endpoint.OnData`, `Session.LazyListener` is a variadic method, meaning it handles multiple data handlers, having them act as ordered middlware.

Example:

remitSession := remit.Connect(...)

listener := remitSession.LazyListener("user.created", logUserDetails)

func (*Session) LazyRequest

func (session *Session) LazyRequest(key string, data interface{}) chan Event

LazyRequest is a lazy, one-liner version of `Session.Request`.

It creates a requset via `Session.Request` and immediately sends the data provided via `Request.Send`, returning the channel on which the reply will appear.

Example

remitSession := remit.Connect(...)

event := <-remitSession.LazyRequest("math.sum", remit.J{"numbers": []int{1, 5, 7}})

func (*Session) Listener

func (session *Session) Listener(key string) Endpoint

Listener creates a listener for `key` but does not start consuming. For a one-liner listener, see `Session.LazyListener`.

In terms of the API, this is essentially the same as a `Request` but it will never reply to messages received.

In terms of its place within Remit, listeners are used to "listen" for events and react. Listeners with the same `key` and `Remit.Name` value will have requests round-robin'd between them so multiple services can handle listeners as a single unit.

A usual practice is to have listeners listen for events such as `"user.created"` or `"message.deleted"`, though they can also listen directly on existing endpoint keys without disturbing any other flows.

Example:

remitSession := remit.Connect(...)

listener := remitSession.Listener("user.created")
listener.OnData(logUserDetails)
listener.Open()

func (*Session) Request

func (session *Session) Request(key string) Request

Request creates a request with the routing key of `key` but does not immediately send. For a one-liner request, see `Session.LazyRequest`.

When requesting data, a channel is returned that you can pull data out of. Only a single event will be returned for each request made. A synchronous pattern for this would be:

request := remitSession.Request("math.sum")
event := <-request.Send(remit.J{"numbers": []int{1, 5, 7}})

This would be synonymous with `Session.LazyRequest`'s:

event := <-remitSession.LazyRequest("math.sum", remit.J{"numbers": []int{1, 5, 7}})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL