Documentation ¶
Overview ¶
Package porthos is a RPC library for the Go programming language that operates over AMQP.
Client ¶
The client is very simple. NewClient takes a broker, a service name and a timeout value (message TTL). The service name is only intended to serve as the request routing key (meaning every service name (or microservice) has its own queue). Each client declares only one response queue, in order to prevent broker's resources wastage.
// first of all you need a broker b, _ := porthos.NewBroker(os.Getenv("AMQP_URL")) defer b.Close() // then you create a new client (you can have as many clients as you want using the same broker) calculatorService, _ := porthos.NewClient(b, "CalculatorService", 120) defer calculatorService.Close() // finally the remote call. It returns a response that contains the output channel. ret, _ := calculatorService.Call("addOne").WithMap(map[string]interface{}{"value": 20}).Async() defer ret.Dispose() select { case response := <-ret.ResponseChannel(): jsonResponse, _ := response.UnmarshalJSON() fmt.Printf("Original: %f, sum: %f\n", jsonResponse["original_value"], jsonResponse["value_plus_one"]) case <-time.After(2 * time.Second): fmt.Println("Timed out :(") }
Server ¶
The server also takes a broker and a service name. After that, you Register all your handlers and finally ListenAndServe.
b, _ := porthos.NewBroker(os.Getenv("AMQP_URL")) defer b.Close() calculatorService, _ := porthos.NewServer(b, "CalculatorService", 10, false) defer calculatorService.Close() calculatorService.Register("addOne", func(req porthos.Request, res *porthos.Response) { type input struct { Value int `json:"value"` } type output struct { Original int `json:"original_value"` Sum int `json:"value_plus_one"` } var i input _ = req.Bind(&i) res.JSON(porthos.OK, output{i.Value, i.Value + 1}) }) calculatorService.Register("subtract", func(req porthos.Request, res *porthos.Response) { // subtraction logic here... }) calculatorService.ListenAndServe()
Extensions ¶
Extensions can be used to add custom actions to the RPC Server. The available "events" are incoming and outgoing.
func NewLoggingExtension() *Extension { ext := porthos.NewExtension() go func() { for { select { case in := <-ext.Incoming(): log.Printf("Before executing method: %s", in.Request.MethodName) case out := <-ext.Outgoing(): log.Printf("After executing method: %s", out.Request.MethodName) } } }() return ext }
Then you just have to add the extension to the server:
userService.AddExtension(NewLoggingExtension())
Built-in extensions ¶
Metrics Shipper Extension ¶
This extension will ship metrics to the AMQP broker, any application can consume and display them as needed.
userService.AddExtension(porthos.NewMetricsShipperExtension(broker, porthos.MetricsShipperConfig{ BufferSize: 150, }))
Access Log Extension
userService.AddExtension(NewAccessLogExtension())
Index ¶
- Constants
- Variables
- func NewSlot() *slot
- func NewUUIDv4() (string, error)
- type AccessLogExtension
- type Argument
- type BodySpecMap
- type Broker
- type Client
- type ClientResponse
- type Config
- type ContentSpec
- type Extension
- type FieldSpec
- type Form
- type Headers
- type Map
- type MethodHandler
- type MetricsShipperConfig
- type MetricsShipperExtension
- type Options
- type Request
- type Response
- type ResponseWriter
- type Server
- type Slot
- type Spec
- type SpecShipperExtension
Constants ¶
const ( StatusOK int32 = 200 StatusCreated int32 = 201 StatusAccepted int32 = 202 StatusNonAuthoritativeInfo int32 = 203 StatusNoContent int32 = 204 StatusResetContent int32 = 205 StatusPartialContent int32 = 206 StatusMovedPermanently int32 = 301 StatusFound int32 = 302 StatusNotModified int32 = 304 StatusBadRequest int32 = 400 StatusForbidden int32 = 403 StatusNotFound int32 = 404 StatusMethodNotAllowed int32 = 405 StatusNotAcceptable int32 = 406 StatusConflict int32 = 409 StatusGone int32 = 410 StatusLocked int32 = 423 StatusFailedDependency int32 = 424 StatusPreconditionRequired int32 = 428 StatusTooManyRequests int32 = 429 StatusRequestHeaderFieldsTooLarge int32 = 431 StatusInternalServerError int32 = 500 StatusNotImplemented int32 = 501 StatusInsufficientStorage int32 = 507 )
Variables ¶
var ( ErrTimedOut = errors.New("timed out") ErrNilPublishChannel = errors.New("No AMQP channel to publish the response to.") ErrNotAcked = errors.New("Request was no acked.") )
var (
ErrBrokerNotConnected = errors.New("Broker not connected to server.")
)
var ( // ErrTypeCast returned when a type cast fails. ErrTypeCast = errors.New("Error reading string argument") )
Functions ¶
Types ¶
type AccessLogExtension ¶
type AccessLogExtension struct { }
AccessLogExtension logs incoming requests and outgoing responses.
func (*AccessLogExtension) IncomingRequest ¶
func (a *AccessLogExtension) IncomingRequest(req Request)
IncomingRequest logs rpc request method and arguments.
func (*AccessLogExtension) OutgoingResponse ¶
func (a *AccessLogExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)
OutgoingResponse logs rpc response details.
func (*AccessLogExtension) ServerListening ¶
func (a *AccessLogExtension) ServerListening(server Server) error
ServerListening this is not implemented in this extension.
type Argument ¶
type Argument interface { AsString() (string, error) AsInt() (int, error) AsInt8() (int8, error) AsInt16() (int16, error) AsInt32() (int32, error) AsInt64() (int64, error) AsByte() (byte, error) AsBool() (bool, error) AsFloat32() (float32, error) AsFloat64() (float64, error) // Raw returns the argument value as a interface{}. Raw() interface{} }
Argument represents an RPC method arument.
type BodySpecMap ¶
BodySpecMap represents a body spec.
func BodySpecFromStruct ¶
func BodySpecFromStruct(structValue interface{}) BodySpecMap
BodySpecFromStruct creates a body spec from a struct value. You just have to pass an "instance" of your struct.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker holds an implementation-specific connection.
func NewBrokerConfig ¶
NewBrokerConfig returns an AMQP Connection.
func (*Broker) IsConnected ¶
func (*Broker) NotifyConnectionClose ¶
NotifyConnectionClose writes in the returned channel when the connection with the broker closes.
func (*Broker) NotifyReestablish ¶
NotifyReestablish returns a channel to notify when the connection is restablished.
func (*Broker) WaitUntilConnectionCloses ¶
func (b *Broker) WaitUntilConnectionCloses()
WaitUntilConnectionCloses holds the execution until the connection closes.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an entry point for making remote calls.
type ClientResponse ¶
ClientResponse represents the response object of a RPC call.
func (*ClientResponse) UnmarshalJSON ¶
func (r *ClientResponse) UnmarshalJSON() (map[string]interface{}, error)
UnmarshalJSON outputs the response content to the argument pointer.
func (*ClientResponse) UnmarshalJSONTo ¶
func (r *ClientResponse) UnmarshalJSONTo(v interface{}) error
UnmarshalJSONTo outputs the response content to the argument pointer.
type ContentSpec ¶
type ContentSpec struct { ContentType string `json:"contentType"` Body interface{} `json:"body"` }
ContentSpec to a remote procedure.
type Extension ¶
type Extension interface { ServerListening(server Server) error IncomingRequest(req Request) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32) }
func NewAccessLogExtension ¶
func NewAccessLogExtension() Extension
NewAccessLogExtension creates a new extension that logs everything to stdout.
func NewMetricsShipperExtension ¶
func NewMetricsShipperExtension(b *Broker, config MetricsShipperConfig) (Extension, error)
NewMetricsShipperExtension creates a new extension that logs everything to stdout.
func NewSpecShipperExtension ¶
NewSpecShipperExtension creates a new extension that ship method specs to the broker.
type FieldSpec ¶
type FieldSpec struct { Type string `json:"type"` Description string `json:"description"` Body BodySpecMap `json:"body,omitempty"` }
FieldSpec represents a spec of a body field.
func BodySpecFromArray ¶
func BodySpecFromArray(structOfTheList interface{}) []FieldSpec
BodySpecFromArray creates a body spec from a array value. You just have to pass an "instance" of your array.
type Headers ¶
type Headers struct {
// contains filtered or unexported fields
}
Headers represents RPC headers (request and response).
func NewHeaders ¶
func NewHeaders() *Headers
NewHeaders creates a new Headers object initializing the map.
func NewHeadersFromMap ¶
NewHeadersFromMap creates a new Headers from a map.
type Map ¶
type Map map[string]interface{}
Map is an abstraction for map[string]interface{} to be used with WithMap.
type MethodHandler ¶
MethodHandler represents a rpc method handler.
type MetricsShipperConfig ¶
type MetricsShipperConfig struct {
BufferSize int
}
MetricsShipperConfig defines config params for the NewMetricsShipperExtension.
type MetricsShipperExtension ¶
type MetricsShipperExtension struct {
// contains filtered or unexported fields
}
MetricsShipperExtension logs incoming requests and outgoing responses.
func (*MetricsShipperExtension) IncomingRequest ¶
func (a *MetricsShipperExtension) IncomingRequest(req Request)
IncomingRequest this is not implemented in this extension.
func (*MetricsShipperExtension) OutgoingResponse ¶
func (a *MetricsShipperExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)
OutgoingResponse ships metrics based on responses to the broker.
func (*MetricsShipperExtension) ServerListening ¶
func (a *MetricsShipperExtension) ServerListening(server Server) error
ServerListening this is not implemented in this extension.
type Options ¶
type Options struct {
AutoAck bool
}
Options represent all the options supported by the server.
type Request ¶
type Request interface { // GetServiceName returns the service name. GetServiceName() string // GetMethodName returns the method name. GetMethodName() string // GetBody returns the request body. GetBody() []byte // Form returns a index-based form. Form() (Form, error) // Bind binds the body to an interface. Bind(i interface{}) error // WithContext returns a shallow copy of Event with its context changed to context. // The provided context must be non-nil. WithContext(context.Context) Request // The returned context is always non-nil; it defaults to the background context. // To change the context, use WithContext. Context() context.Context }
Request represents a rpc request.
type Response ¶
type Response interface { // JSON sets the content of the response as JSON data. JSON(int32, interface{}) // Raw sets the content of the response as an array of bytes. Raw(int32, string, []byte) // Empty leaves the content of the response as empty. Empty(int32) // GetHeaders returns the response headers. GetHeaders() *Headers // GetStatusCode returns the response status. GetStatusCode() int32 GetBody() []byte GetContentType() string }
Response represents a rpc response.
type ResponseWriter ¶
ResponseWriter is responsible for sending back the response to the replyTo queue.
type Server ¶
type Server interface { // Register a method and its handler. Register(method string, handler MethodHandler) // Register a method, it's handler and it's specification. RegisterWithSpec(method string, handler MethodHandler, spec Spec) // AddExtension adds extensions to the server instance. // Extensions can be used to add custom actions to incoming and outgoing RPC calls. AddExtension(ext Extension) // ListenAndServe start serving RPC requests. ListenAndServe() // GetServiceName returns the name of this service. GetServiceName() string // GetSpecs returns all registered specs. GetSpecs() map[string]Spec // Close closes the client and AMQP channel. // This method returns right after the AMQP channel is closed. // In order to give time to the current request to finish (if there's any) // it's up to you to wait using the NotifyClose. Close() // Shutdown shuts down the client and AMQP channel. // It provider graceful shutdown, since it will wait the result // of <-s.NotifyClose(). Shutdown() // NotifyClose returns a channel to be notified when this server closes. NotifyClose() <-chan bool }
Server is used to register procedures to be invoked remotely.
type Slot ¶
type Slot interface { // ResponseChannel returns the response channel. ResponseChannel() <-chan ClientResponse // Dispose response resources. Dispose() // Correlation ID GetCorrelationID() (string, error) }
Slot of a RPC call.
type Spec ¶
type Spec struct { Description string `json:"description"` Request ContentSpec `json:"request"` Response ContentSpec `json:"response"` }
Spec to a remote procedure.
type SpecShipperExtension ¶
type SpecShipperExtension struct {
// contains filtered or unexported fields
}
SpecShipperExtension logs incoming requests and outgoing responses.
func (*SpecShipperExtension) IncomingRequest ¶
func (s *SpecShipperExtension) IncomingRequest(req Request)
IncomingRequest this is not implemented in this extension.
func (*SpecShipperExtension) OutgoingResponse ¶
func (s *SpecShipperExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)
OutgoingResponse this is not implemented in this extension.
func (*SpecShipperExtension) ServerListening ¶
func (s *SpecShipperExtension) ServerListening(srv Server) error
ServerListening takes all registered method specs and ships to the broker.