Documentation ¶
Index ¶
- type Config
- type ConnectionOptions
- type Emit
- type EmitOptions
- type Endpoint
- type EndpointDataHandler
- type EndpointOptions
- type Event
- type EventData
- type J
- type Request
- type RequestOptions
- type Session
- func (session *Session) Close() chan bool
- func (session *Session) CloseOnSignal() chan bool
- func (session *Session) Emit(key string) chan interface{}
- func (session *Session) Endpoint(key string) Endpoint
- func (session *Session) EndpointWithOptions(options EndpointOptions) Endpoint
- func (session *Session) LazyEmit(key string, data interface{})
- func (session *Session) LazyEndpoint(key string, handlers ...EndpointDataHandler) Endpoint
- func (session *Session) LazyListener(key string, handlers ...EndpointDataHandler) Endpoint
- func (session *Session) LazyRequest(key string, data interface{}) chan Event
- func (session *Session) Listener(key string) Endpoint
- func (session *Session) Request(key string) Request
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
Config represents a collection of options used to connect to the RabbitMQ server, as well as some Remit-specific options such as the service name.
type ConnectionOptions ¶
ConnectionOptions is the options used to connect to RabbitMQ and any connection-wide settings needed for Remit.
type Emit ¶
type Emit struct { Channel chan interface{} RoutingKey string // contains filtered or unexported fields }
Emit represents an emission to emit data to the system.
Most commonly, this is used to notify other services of changes, update configuration or the like.
For examples of Emit usage, see `Session.Emit` and `Session.LazyEmit`.
type EmitOptions ¶
type EmitOptions struct {
RoutingKey string
}
EmitOptions is a list of options that can be passed when setting up an emission.
type Endpoint ¶
type Endpoint struct { // given properties RoutingKey string Queue string // generated properties Data chan Event Ready chan bool // contains filtered or unexported fields }
Endpoint manages the RPC-style consumption and publishing of messages.
Most commonly, this is used to set up an endpoint that can be requested using `Session.Request` or `Session.LazyRequest`.
For examples of Endpoint usage, see `Session.Endpoint` and `Session.LazyEndpoint`.
func (Endpoint) Close ¶
func (endpoint Endpoint) Close()
Close closes the endpoint, stopping message consumption and closing the endpoint's receiving channel.
It will cancel consumption, but wait for all unacked messages to be handled before closing the channel, meaning no loss should occur.
The endpoint can be reopened using `Endpoint.Open`.
func (*Endpoint) OnData ¶
func (endpoint *Endpoint) OnData(handlers ...EndpointDataHandler)
OnData is used to register a data handler for a particular endpoint. A data handler here is a function (or set of functions) to run whenever a new message is received.
If multiple handlers are provided in sequence, earlier functions will act as "middleware", used primarily for the mutation of the `Event` or the like.
An example of this might be:
endpoint := remitSession.Endpoint("math.sum") endpoint.OnData(logMessage, parseArguments, handle)
In the above example, `logMessage`, `parseArguments` and `handle` will be run in sequence. If any of the functions push data to either `Event.Success` or `Event.Failure`, the chain is broken and the message immediately replied to. Otherwise, sending `true` to `Event.Next` should be performed to indicate that it's safe to move to the next step.
If `Event.Next` is pushed to on the final handler, the message will be treated as successful but the reply will contain no data.
type EndpointDataHandler ¶
type EndpointDataHandler func(Event)
EndpointDataHandler is the function spec needed for listening to endpoint data.
type EndpointOptions ¶
type EndpointOptions struct { RoutingKey string Queue string // contains filtered or unexported fields }
EndpointOptions is a list of options that can be passed when setting up an endpoint.
type Event ¶
type Event struct { // Data given in the message. // All fields are always passed back with the exception // of `Data` and `Error`, which are mutually exclusive. EventId string // the ULID of the event EventType string // the routing key used to route this message Resource string // the service that send this message Data EventData // the data this message contains (as `EventData`) Error interface{} // the error this message contains // Channels that can be used to respond to or acknowledge this message. Success chan interface{} // send data back if the handling was successful Failure chan interface{} // send an error back if the handling failed Next chan bool // skip to the next piece of middleware/function // contains filtered or unexported fields }
Event represents a single message being received. This could be data from an endpoint, a reply following a request or an emission.
type EventData ¶
type EventData map[string]interface{}
EventData - for ease of use - sets `Data` within an `Event` to be a `map[string]interface{}`. This enables us to access basic properties via indexing, but deeper handling is recommended if more control is needed.
A common practice is to use something like `gopkg.in/mgo.v2/bson` to marshal the data through a struct:
type IncomingData struct { GivenId string `bson:"id"` TargetId string `bson:"target"` } data := new(IncomingData) b, _ := bson.Marshal(event.Data) bson.Unmarshal(b, &data)
type J ¶
type J map[string]interface{}
J is a convenient aliaas for a `map[string]interface{}`, useful for dealing with JSON is a more native manner.
remit.J{ "foo": "bar", "baz": true, "qux": remit.J{ "big": false, "small": true, }, }
type Request ¶
type Request struct { RoutingKey string // contains filtered or unexported fields }
Request represents an RPC request for data.
Most commonly, this is used to contact another service to retrieve data, utilising a `Session.Endpoint`.
For examples of Request usage, see `Session.Request` and `Session.LazyRequest`.
type RequestOptions ¶
type RequestOptions struct {
RoutingKey string
}
RequestOptions is a list of options that can be passed when setting up a request.
type Session ¶
type Session struct { // the config given for this connection Config Config // contains filtered or unexported fields }
Session represents a communication session with RabbitMQ.
Most commonly, a single Remit session is used for an entire service, as separate channels are generated for each endpoint.
func Connect ¶
func Connect(options ConnectionOptions) Session
Connect connects to a RabbitMQ instance using the `Url` provided in `ConnectionOptions` and setting the _service name_ to `Name`.
The `Url` should be valid as defined by the AMQP URI scheme. Currently, this is:
amqp_URI = "amqp://" amqp_authority [ "/" vhost ] [ "?" query ] amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ] amqp_userinfo = username [ ":" password ] username = *( unreserved / pct-encoded / sub-delims ) password = *( unreserved / pct-encoded / sub-delims ) vhost = segment
More info can be found here:
https://www.rabbitmq.com/uri-spec.html
Example:
remitSession := remit.Connect(remit.ConnectionOptions{ Name: "my-service", Url: "amqp://localhost" })
func (*Session) Close ¶
Close closes the Remit session by waiting for all unacknowledged messages to be handled before closing the RabbitMQ connection and pushing `true` to the returned channel.
Example:
remitSession := remit.Connect(...) ... <-remitSession.Close()
func (*Session) CloseOnSignal ¶
CloseOnSignal returns a channel that will receive either `true` or `false` depending on whether Remit closed its connections safely as a result of an interruption signal.
The receipt of a signal causes the session to close via `Session.Close()`. A second signal being sent whilst the close is in progress will perform a "cold" shutdown, dismissing any unacknowledged messages and returning `false` to the channel straight away.
The signals currently supported are `1 SIGHUP`, `2 SIGINT`, `3 SIGQUIT` and `15 SIGTERM`.
Example:
remitSession := remit.Connect(...) ... <-remitSession.CloseOnSignal()
func (*Session) Emit ¶
Emit sets up an emitter that messages can be pushed to. For a one-liner emission, see `Session.LazyEmit`.
Especially useful for emitting system events or firing off requests if you don't care about the response.
`key` will be used as a routing key and emissions are always published to the `"remit"` exchange.
Example:
remitSession := remit.Connect(...) emitter := remitSession.Emit("service.connected") emitter <- "my-service-id" // is synonymous with remitSession.LazyEmit("service.connected", "my-service-id")
func (*Session) Endpoint ¶
Endpoint creates an endpoint for `key` but does not start consuming. For a one-liner endpoint, see `Session.LazyEndpoint`.
Using this, the usual set-up is to also add a data handler and then open the endpoint for messages:
endpoint := remitSession.Endpoint("math.sum") endpoint.OnData(sumHandler) endpoint.Open()
This would be synonymous with `Session.LazyEndpoint`'s:
endpoint := remitSession.LazyEndpoint("math.sum", sumHandler)
When this endpoint is created, both the `RoutingKey` and `Queue` will be set to the provided `key`. If you'd like to specify them as separate entities, see `Session.EndpointWithOptions`.
Example:
remitSession := remit.Connect(...) endpoint := remitSession.Endpoint("math.sum") endpoint.OnData(sumHandler) endpoint.Open()
func (*Session) EndpointWithOptions ¶
func (session *Session) EndpointWithOptions(options EndpointOptions) Endpoint
EndpointWithOptions allows you to create an endpoint with very particular options, described in the `EndpointOptions` type.
Failure to provide either a `Queue` or a `Routing` key results in a panic. If one is given and not the other, the value will be duplicated.
Example:
remitSession := remit.Connect(...) endpoint := EndpointWithOptions(remit.EndpointOptions{ RoutingKey: "maths", Queue: "math.sum", })
func (*Session) LazyEmit ¶
LazyEmit immediately publishes a message using `Session.Emit` using the given routing key and data.
Example:
remitSession := remit.Connect(...) remitSession.LazyEmit("service.connceted", "my-service-id")
func (*Session) LazyEndpoint ¶
func (session *Session) LazyEndpoint(key string, handlers ...EndpointDataHandler) Endpoint
LazyEndpoint is a lazy, one-liner version of `Session.Endpoint`.
It creates an endpoint via `Session.Endpoint`, adds the ordered data handlers given in its arguments via `Endpoint.OnData` and then opens the endpoint via `Endpoint.Open()`.
Like `Endpoint.OnData`, `Session.LazyEndpoint` is a variadic method, meaning it handles multiple data handlers, having them act as ordered middleware.
Example:
remitSession := remit.Connect(...) endpoint := remitSession.LazyEndpoint("math.sum", sumHandler)
func (*Session) LazyListener ¶
func (session *Session) LazyListener(key string, handlers ...EndpointDataHandler) Endpoint
LazyListener is a lazy, one-liner version of `Listener.
It creates a listener via `Session.Listener`, adds the ordered data handlers given in its arguments via `Endpoint.OnData` and then opens the endpoint via `Endpoint.Open()`.
Like `Endpoint.OnData`, `Session.LazyListener` is a variadic method, meaning it handles multiple data handlers, having them act as ordered middlware.
Example:
remitSession := remit.Connect(...) listener := remitSession.LazyListener("user.created", logUserDetails)
func (*Session) LazyRequest ¶
LazyRequest is a lazy, one-liner version of `Session.Request`.
It creates a requset via `Session.Request` and immediately sends the data provided via `Request.Send`, returning the channel on which the reply will appear.
Example
remitSession := remit.Connect(...) event := <-remitSession.LazyRequest("math.sum", remit.J{"numbers": []int{1, 5, 7}})
func (*Session) Listener ¶
Listener creates a listener for `key` but does not start consuming. For a one-liner listener, see `Session.LazyListener`.
In terms of the API, this is essentially the same as a `Request` but it will never reply to messages received.
In terms of its place within Remit, listeners are used to "listen" for events and react. Listeners with the same `key` and `Remit.Name` value will have requests round-robin'd between them so multiple services can handle listeners as a single unit.
A usual practice is to have listeners listen for events such as `"user.created"` or `"message.deleted"`, though they can also listen directly on existing endpoint keys without disturbing any other flows.
Example:
remitSession := remit.Connect(...) listener := remitSession.Listener("user.created") listener.OnData(logUserDetails) listener.Open()
func (*Session) Request ¶
Request creates a request with the routing key of `key` but does not immediately send. For a one-liner request, see `Session.LazyRequest`.
When requesting data, a channel is returned that you can pull data out of. Only a single event will be returned for each request made. A synchronous pattern for this would be:
request := remitSession.Request("math.sum") event := <-request.Send(remit.J{"numbers": []int{1, 5, 7}})
This would be synonymous with `Session.LazyRequest`'s:
event := <-remitSession.LazyRequest("math.sum", remit.J{"numbers": []int{1, 5, 7}})