porthos

package module
v0.0.0-...-75abb26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2019 License: BSD-3-Clause Imports: 15 Imported by: 1

README

Porthos GoDoc Build Status Go Report Card License

A RPC library for the Go programming language that operates over AMQP.

Status

Beta. Server and Client API may change a bit.

Goal

Provide a language-agnostic RPC library to write distributed systems.

Client

The client is very simple. NewClient takes a broker, a service name and a timeout value (message TTL). The service name is only intended to serve as the request routing key (meaning every service name (or microservice) has its own queue). Each client declares only one response queue, in order to prevent broker's resources wastage.

Creating a new client
// first of all you need a broker
b, _ := porthos.NewBroker(os.Getenv("AMQP_URL"))
defer b.Close()

// then you create a new client (you can have as many clients as you want using the same broker)
calculatorService, _ := porthos.NewClient(b, "CalculatorService", 120)
defer calculatorService.Close()
The Call builder
.Call(methodName string)

Creates a call builder.

.WithTimeout(d duration)

Defines a timeout for the current call. Example:

calculatorService.Call("addOne").WithTimeout(2*time.Second)...
.WithMap(m map[string]interface{})

Sets the given map as the request body of the current call. The content type used is application/json. Example:

calculatorService.Call("addOne").WithMap(map[string]interface{}{"value": 20})...
.WithStruct(s interface{})

Sets the given struct as the request body of the current call. The content type used is application/json. Example:

calculatorService.Call("addOne").WithStruct(myStruct)...
.WithArgs(args ...interface{})

Sets the given args as the request body of the current call. The content type used is application/json. Example:

calculatorService.Call("add").WithArgs(1, 2)...
.WithBody(body []byte)

Sets the given byte array as the request body of the current call. The content type is application/octet-stream. Example:

calculatorService.Call("addOne").WithBody(byteArray)...
.WithBodyContentType(body []byte, contentType string)

Sets the given byte array as the request body of the current call. Also takes a contentType. Example:

calculatorService.Call("addOne").WithBodyContentType(jsonByteArrayJ, "application/json")...
.Async() (Slot, error)

Performs the remote call and returns a slot that contains the response channel. Example:

s, err := calculatorService.Call("addOne").WithArgs(1).Async()
s.Dispose()

r := <-s.ResponseChannel()
json, err := r.UnmarshalJSON()

You can easily handle timeout with a select:

select {
case r := <-s.ResponseChannel():
    json, err := r.UnmarshalJSON()
case <-time.After(2 * time.Second):
    ...
}
.Sync() (*ClientResponse, error)

Performs the remote call and returns the response. Example:

r, err := calculatorService.Call("addOne").WithMap(map[string]interface{}{"value": 20}).Sync()
json, err := r.UnmarshalJSON()
.Void() error

Performs the remote call that doesn't return anything. Example:

err := loggingService.Call("log").WithArgs("INFO", "some log message").Void()

You can find a full client example at _examples/client/example_client.go.

Server

The server also takes a broker and a service name. After that, you Register all your handlers and finally ServeForever.

Creating a new server
b, _ := porthos.NewBroker(os.Getenv("AMQP_URL"))
defer b.Close()

calculatorService, _ := porthos.NewServer(b, "CalculatorService", 10, false)
defer calculatorService.Close()
.Register(methodName string, handler MethodHandler)

Register a method with the given handler. Example:

calculatorService.Register("addOne", func(req porthos.Request, res *porthos.Response) {
    type input struct {
        Value int `json:"value"`
    }

    type output struct {
        Original int `json:"original_value"`
        Sum      int `json:"value_plus_one"`
    }

    var i input

    _ = req.Bind(&i)

    res.JSON(porthos.OK, output{i.Value, i.Value + 1})
})
.RegisterWithSpec(method string, handler MethodHandler, spec Spec)

Register a method with the given handler and a Spec. Example:

calculatorService.RegisterWithSpec("addOne", addOneHandler, porthos.Spec{
    Description: "Adds one to the given int argument",
    Request: porthos.ContentSpec{
        ContentType: "application/json",
        Body:        porthos.BodySpecFromStruct(input{}),
    },
    Response: porthos.ContentSpec{
        ContentType: "application/json",
        Body:        porthos.BodySpecFromArray(output{}),
    },
})

Through the Specs Shipper Extension the specs are shipped to a queue call porthos.specs and can be displayed in the Porthos Playground.

.AddExtension(ext Extension)

Adds the given extension to the server.

.ListenAndServe()

Starts serving RPC requests.

calculatorService.ListenAndServe()
.Close()

Close the server and AMQP channel. This method returns right after the AMQP channel is closed. In order to give time to the current request to finish (if there's one) it's up to you to wait using the NotifyClose.

.Shutdown()

Shutdown shuts down the server and AMQP channel. It provider graceful shutdown, since it will wait the result of <-s.NotifyClose().

You can find a full server example at _examples/server/example_server.go.

Extensions

Extensions can be used to add custom actions to the RPC Server. The available "events" are incoming and outgoing.

func NewLoggingExtension() *Extension {
    ext := porthos.NewExtension()

    go func() {
        for {
            select {
            case in := <-ext.Incoming():
                log.Printf("Before executing method: %s", in.Request.MethodName)
            case out := <-ext.Outgoing():
                log.Printf("After executing method: %s", out.Request.MethodName)
            }
        }
    }()

    return ext
}

Then you just have to add the extension to the server:

userService.AddExtension(NewLoggingExtension())
Built-in extensions
Metrics Shipper Extension

This extension will ship metrics to the AMQP broker, any application can consume and display them as needed.

userService.AddExtension(porthos.NewMetricsShipperExtension(broker, porthos.MetricsShipperConfig{
    BufferSize: 150,
}))
Access Log Extension
userService.AddExtension(NewAccessLogExtension())
Specs Shipper Extension
userService.AddExtension(porthos.NewSpecShipperExtension(broker))

Contributing

Please read the contributing guide

Pull requests are very much welcomed. Make sure a test or example is included that covers your change.

Docker is being used for the local environment. To build/run/test your code you can bash into the server container:

$ docker-compose run server bash
root@porthos:/go/src/github.com/porthos-rpc/porthos-go# go run example_client.go

Documentation

Overview

Package porthos is a RPC library for the Go programming language that operates over AMQP.

Client

The client is very simple. NewClient takes a broker, a service name and a timeout value (message TTL). The service name is only intended to serve as the request routing key (meaning every service name (or microservice) has its own queue). Each client declares only one response queue, in order to prevent broker's resources wastage.

	// first of all you need a broker
 	b, _ := porthos.NewBroker(os.Getenv("AMQP_URL"))
 	defer b.Close()

 	// then you create a new client (you can have as many clients as you want using the same broker)
 	calculatorService, _ := porthos.NewClient(b, "CalculatorService", 120)
 	defer calculatorService.Close()

 	// finally the remote call. It returns a response that contains the output channel.
 	ret, _ := calculatorService.Call("addOne").WithMap(map[string]interface{}{"value": 20}).Async()
 	defer ret.Dispose()

 	select {
 	case response := <-ret.ResponseChannel():
 	    jsonResponse, _ := response.UnmarshalJSON()

    fmt.Printf("Original: %f, sum: %f\n", jsonResponse["original_value"], jsonResponse["value_plus_one"])
	case <-time.After(2 * time.Second):
    	fmt.Println("Timed out :(")
	}

Server

The server also takes a broker and a service name. After that, you Register all your handlers and finally ListenAndServe.

b, _ := porthos.NewBroker(os.Getenv("AMQP_URL"))
defer b.Close()

calculatorService, _ := porthos.NewServer(b, "CalculatorService", 10, false)
defer calculatorService.Close()

calculatorService.Register("addOne", func(req porthos.Request, res *porthos.Response) {
    type input struct {
        Value int `json:"value"`
    }

    type output struct {
        Original int `json:"original_value"`
        Sum      int `json:"value_plus_one"`
    }

    var i input

    _ = req.Bind(&i)

    res.JSON(porthos.OK, output{i.Value, i.Value + 1})
})

calculatorService.Register("subtract", func(req porthos.Request, res *porthos.Response) {
    // subtraction logic here...
})

calculatorService.ListenAndServe()

Extensions

Extensions can be used to add custom actions to the RPC Server. The available "events" are incoming and outgoing.

func NewLoggingExtension() *Extension {
    ext := porthos.NewExtension()

    go func() {
        for {
            select {
            case in := <-ext.Incoming():
                log.Printf("Before executing method: %s", in.Request.MethodName)
            case out := <-ext.Outgoing():
                log.Printf("After executing method: %s", out.Request.MethodName)
            }
        }
    }()

    return ext
}

Then you just have to add the extension to the server:

userService.AddExtension(NewLoggingExtension())

Built-in extensions

Metrics Shipper Extension

This extension will ship metrics to the AMQP broker, any application can consume and display them as needed.

userService.AddExtension(porthos.NewMetricsShipperExtension(broker, porthos.MetricsShipperConfig{
    BufferSize: 150,
}))

Access Log Extension

userService.AddExtension(NewAccessLogExtension())

Index

Constants

View Source
const (
	StatusOK                          int32 = 200
	StatusCreated                     int32 = 201
	StatusAccepted                    int32 = 202
	StatusNonAuthoritativeInfo        int32 = 203
	StatusNoContent                   int32 = 204
	StatusResetContent                int32 = 205
	StatusPartialContent              int32 = 206
	StatusMovedPermanently            int32 = 301
	StatusFound                       int32 = 302
	StatusNotModified                 int32 = 304
	StatusBadRequest                  int32 = 400
	StatusUnauthorized                int32 = 401
	StatusForbidden                   int32 = 403
	StatusNotFound                    int32 = 404
	StatusMethodNotAllowed            int32 = 405
	StatusNotAcceptable               int32 = 406
	StatusConflict                    int32 = 409
	StatusGone                        int32 = 410
	StatusLocked                      int32 = 423
	StatusFailedDependency            int32 = 424
	StatusPreconditionRequired        int32 = 428
	StatusTooManyRequests             int32 = 429
	StatusRequestHeaderFieldsTooLarge int32 = 431
	StatusUnavailableForLegalReasons  int32 = 451
	StatusInternalServerError         int32 = 500
	StatusNotImplemented              int32 = 501
	StatusServiceUnavailable          int32 = 503
	StatusInsufficientStorage         int32 = 507
)

Variables

View Source
var (
	ErrTimedOut          = errors.New("timed out")
	ErrNilPublishChannel = errors.New("No AMQP channel to publish the response to.")
	ErrNotAcked          = errors.New("Request was no acked.")
)
View Source
var (
	ErrBrokerNotConnected = errors.New("Broker not connected to server.")
)
View Source
var (
	// ErrTypeCast returned when a type cast fails.
	ErrTypeCast = errors.New("Error reading string argument")
)

Functions

func NewSlot

func NewSlot() *slot

func NewUUIDv4

func NewUUIDv4() (string, error)

Types

type AccessLogExtension

type AccessLogExtension struct {
}

AccessLogExtension logs incoming requests and outgoing responses.

func (*AccessLogExtension) IncomingRequest

func (a *AccessLogExtension) IncomingRequest(req Request)

IncomingRequest logs rpc request method and arguments.

func (*AccessLogExtension) OutgoingResponse

func (a *AccessLogExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)

OutgoingResponse logs rpc response details.

func (*AccessLogExtension) ServerListening

func (a *AccessLogExtension) ServerListening(server Server) error

ServerListening this is not implemented in this extension.

type Argument

type Argument interface {
	AsString() (string, error)
	AsInt() (int, error)
	AsInt8() (int8, error)
	AsInt16() (int16, error)
	AsInt32() (int32, error)
	AsInt64() (int64, error)
	AsByte() (byte, error)
	AsBool() (bool, error)
	AsFloat32() (float32, error)
	AsFloat64() (float64, error)
	// Raw returns the argument value as a interface{}.
	Raw() interface{}
}

Argument represents an RPC method arument.

func NewArgument

func NewArgument(value interface{}) Argument

NewArgument creates a new Argument.

type BodySpecMap

type BodySpecMap map[string]FieldSpec

BodySpecMap represents a body spec.

func BodySpecFromStruct

func BodySpecFromStruct(structValue interface{}) BodySpecMap

BodySpecFromStruct creates a body spec from a struct value. You just have to pass an "instance" of your struct.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker holds an implementation-specific connection.

func NewBroker

func NewBroker(amqpURL string) (*Broker, error)

NewBroker creates a new instance of AMQP connection.

func NewBrokerConfig

func NewBrokerConfig(amqpURL string, config Config) (*Broker, error)

NewBrokerConfig returns an AMQP Connection.

func (*Broker) Close

func (b *Broker) Close()

Close the broker connection.

func (*Broker) IsConnected

func (b *Broker) IsConnected() bool

func (*Broker) NotifyConnectionClose

func (b *Broker) NotifyConnectionClose() <-chan error

NotifyConnectionClose writes in the returned channel when the connection with the broker closes.

func (*Broker) NotifyReestablish

func (b *Broker) NotifyReestablish() <-chan bool

NotifyReestablish returns a channel to notify when the connection is restablished.

func (*Broker) WaitUntilConnectionCloses

func (b *Broker) WaitUntilConnectionCloses()

WaitUntilConnectionCloses holds the execution until the connection closes.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an entry point for making remote calls.

func NewClient

func NewClient(b *Broker, serviceName string, defaultTTL time.Duration) (*Client, error)

NewClient creates a new instance of Client, responsible for making remote calls.

func (*Client) Call

func (c *Client) Call(method string) *call

Call prepares a remote call.

func (*Client) Close

func (c *Client) Close()

Close the client and AMQP chanel. Client only will die if broker was closed.

type ClientResponse

type ClientResponse struct {
	StatusCode  int32
	Headers     Headers
	Content     []byte
	ContentType string
}

ClientResponse represents the response object of a RPC call.

func (*ClientResponse) UnmarshalJSON

func (r *ClientResponse) UnmarshalJSON() (map[string]interface{}, error)

UnmarshalJSON outputs the response content to the argument pointer.

func (*ClientResponse) UnmarshalJSONTo

func (r *ClientResponse) UnmarshalJSONTo(v interface{}) error

UnmarshalJSONTo outputs the response content to the argument pointer.

type Config

type Config struct {
	ReconnectInterval time.Duration
	DialTimeout       time.Duration
}

Config to be used when creating a new connection.

type ContentSpec

type ContentSpec struct {
	ContentType string      `json:"contentType"`
	Body        interface{} `json:"body"`
}

ContentSpec to a remote procedure.

type Extension

type Extension interface {
	ServerListening(server Server) error
	IncomingRequest(req Request)
	OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)
}

func NewAccessLogExtension

func NewAccessLogExtension() Extension

NewAccessLogExtension creates a new extension that logs everything to stdout.

func NewMetricsShipperExtension

func NewMetricsShipperExtension(b *Broker, config MetricsShipperConfig) (Extension, error)

NewMetricsShipperExtension creates a new extension that logs everything to stdout.

func NewSpecShipperExtension

func NewSpecShipperExtension(b *Broker) Extension

NewSpecShipperExtension creates a new extension that ship method specs to the broker.

type FieldSpec

type FieldSpec struct {
	Type        string      `json:"type"`
	Description string      `json:"description"`
	Body        BodySpecMap `json:"body,omitempty"`
}

FieldSpec represents a spec of a body field.

func BodySpecFromArray

func BodySpecFromArray(structOfTheList interface{}) []FieldSpec

BodySpecFromArray creates a body spec from a array value. You just have to pass an "instance" of your array.

type Form

type Form interface {
	// GetArg returns an argument giving the index.
	GetArg(index int) Argument
}

Form represents a request form where data are retrieved through indexes.

func NewForm

func NewForm(contentType string, body []byte) (Form, error)

NewForm creates a new form to retrieve values from its index.

type Headers

type Headers struct {
	// contains filtered or unexported fields
}

Headers represents RPC headers (request and response).

func NewHeaders

func NewHeaders() *Headers

NewHeaders creates a new Headers object initializing the map.

func NewHeadersFromMap

func NewHeadersFromMap(m map[string]interface{}) *Headers

NewHeadersFromMap creates a new Headers from a map.

func (*Headers) Delete

func (h *Headers) Delete(key string)

Delete a header.

func (*Headers) Get

func (h *Headers) Get(key string) interface{}

Get a header.

func (*Headers) Set

func (h *Headers) Set(key string, value interface{})

Set a header.

type Map

type Map map[string]interface{}

Map is an abstraction for map[string]interface{} to be used with WithMap.

type MethodHandler

type MethodHandler func(req Request, res Response)

MethodHandler represents a rpc method handler.

type MetricsShipperConfig

type MetricsShipperConfig struct {
	BufferSize int
}

MetricsShipperConfig defines config params for the NewMetricsShipperExtension.

type MetricsShipperExtension

type MetricsShipperExtension struct {
	// contains filtered or unexported fields
}

MetricsShipperExtension logs incoming requests and outgoing responses.

func (*MetricsShipperExtension) IncomingRequest

func (a *MetricsShipperExtension) IncomingRequest(req Request)

IncomingRequest this is not implemented in this extension.

func (*MetricsShipperExtension) OutgoingResponse

func (a *MetricsShipperExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)

OutgoingResponse ships metrics based on responses to the broker.

func (*MetricsShipperExtension) ServerListening

func (a *MetricsShipperExtension) ServerListening(server Server) error

ServerListening this is not implemented in this extension.

type Options

type Options struct {
	AutoAck bool
}

Options represent all the options supported by the server.

type Request

type Request interface {
	// GetServiceName returns the service name.
	GetServiceName() string
	// GetMethodName returns the method name.
	GetMethodName() string
	// GetBody returns the request body.
	GetBody() []byte
	// Form returns a index-based form.
	Form() (Form, error)
	// Bind binds the body to an interface.
	Bind(i interface{}) error
	// WithContext returns a shallow copy of Event with its context changed to context.
	// The provided context must be non-nil.
	WithContext(context.Context) Request
	// The returned context is always non-nil; it defaults to the background context.
	// To change the context, use WithContext.
	Context() context.Context
}

Request represents a rpc request.

type Response

type Response interface {
	// JSON sets the content of the response as JSON data.
	JSON(int32, interface{})
	// Raw sets the content of the response as an array of bytes.
	Raw(int32, string, []byte)
	// Empty leaves the content of the response as empty.
	Empty(int32)
	// GetHeaders returns the response headers.
	GetHeaders() *Headers
	// GetStatusCode returns the response status.
	GetStatusCode() int32
	GetBody() []byte
	GetContentType() string
}

Response represents a rpc response.

type ResponseWriter

type ResponseWriter interface {
	Write(res Response) error
}

ResponseWriter is responsible for sending back the response to the replyTo queue.

type Server

type Server interface {
	// Register a method and its handler.
	Register(method string, handler MethodHandler)
	// Register a method, it's handler and it's specification.
	RegisterWithSpec(method string, handler MethodHandler, spec Spec)
	// AddExtension adds extensions to the server instance.
	// Extensions can be used to add custom actions to incoming and outgoing RPC calls.
	AddExtension(ext Extension)
	// ListenAndServe start serving RPC requests.
	ListenAndServe()
	// GetServiceName returns the name of this service.
	GetServiceName() string
	// GetSpecs returns all registered specs.
	GetSpecs() map[string]Spec
	// Close closes the client and AMQP channel.
	// This method returns right after the AMQP channel is closed.
	// In order to give time to the current request to finish (if there's any)
	// it's up to you to wait using the NotifyClose.
	Close()
	// Shutdown shuts down the client and AMQP channel.
	// It provider graceful shutdown, since it will wait the result
	// of <-s.NotifyClose().
	Shutdown()
	// NotifyClose returns a channel to be notified when this server closes.
	NotifyClose() <-chan bool
}

Server is used to register procedures to be invoked remotely.

func NewServer

func NewServer(b *Broker, serviceName string, options Options) (Server, error)

NewServer creates a new instance of Server, responsible for executing remote calls.

type Slot

type Slot interface {
	// ResponseChannel returns the response channel.
	ResponseChannel() <-chan ClientResponse
	// Dispose response resources.
	Dispose()
	// Correlation ID
	GetCorrelationID() (string, error)
}

Slot of a RPC call.

type Spec

type Spec struct {
	Description string      `json:"description"`
	Request     ContentSpec `json:"request"`
	Response    ContentSpec `json:"response"`
}

Spec to a remote procedure.

type SpecShipperExtension

type SpecShipperExtension struct {
	// contains filtered or unexported fields
}

SpecShipperExtension logs incoming requests and outgoing responses.

func (*SpecShipperExtension) IncomingRequest

func (s *SpecShipperExtension) IncomingRequest(req Request)

IncomingRequest this is not implemented in this extension.

func (*SpecShipperExtension) OutgoingResponse

func (s *SpecShipperExtension) OutgoingResponse(req Request, res Response, resTime time.Duration, statusCode int32)

OutgoingResponse this is not implemented in this extension.

func (*SpecShipperExtension) ServerListening

func (s *SpecShipperExtension) ServerListening(srv Server) error

ServerListening takes all registered method specs and ships to the broker.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL