usrv

package module
v0.0.0-...-7ffd1ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2015 License: MIT Imports: 8 Imported by: 0

README

usrv

Build Status codecov.io

A microservice framework for go

Dependencies

To use usrv you need the following dependencies:

go get golang.org/x/net/context
go get github.com/achilleasa/usrv-service-adapters

The goal of this framework

The goal of this framework is to provide a low-level api for creating microservices. It is not meant to be used stand-alone. In fact, it doesn't even support any sort of serialization mechanism (all messages work with byte slice payloads).

It should be used as part of a microservice platform library that builds on top of usrv to provide message serialization and service discovery mechanisms.

Transports

The core of this framework is the notion of a transport. The transport is responsible for binding to a service endpoint (either as a server or a client) and handle the flow of messages from and to the endpoint.

Usrv ships with an amqp transport implementation as well as an in-memory transport that can be used for running tests.

The framework provides a common abstraction to the actual low-level message types used by each transport making it easier to add new transports and use them as drop-in replacements for existing transports.

AMQP transport

The AMQP transport uses rabbitmq as the transport backend. Given a service endpoint name, all servers bind to the same queue name making it easy to horizontally scale your service by launching more instances. Each client allocates a private queue which is then used as a reply channel for outgoing messages.

The server

Server options

When invoking the NewServer method to create a new server you may specify zero or more server options. The following options are supported:

Option name Description
WithLogger Attach a specific log.Logger instance to the server. By default a NULL logger is used
EventListener Register a channel for receiving server events.

Server events

The server will emit various events while running. You can register a listener for those events using the EventListener server option. The table below summarizes the list of events that the server reports. EvtRegistered EventType = iota // Registered endpoint EvtStarted = iota // Dialed transport EvtServing = iota // Serving endpoint EvtStopping = iota // Stopping server EvtStopped = iota // Server stopped and requests drained EvtTransportReset = iota // Transport connection reset

Event name Event includes endpoint name? Description
EvtRegistered yes Registered endpoint
EvtStarted no Dialed transport
EvtServing yes Serving endpoint
EvtStopping no Stopping server
EvtStopped no Server stopped and requests drained
EvtTransportReset no Transport connection reset

Request handlers

usrv request handlers are modeled after the handlers from the built-in go http package. The framework defines the Handler interface that should be implemented by request handlers and the HandlerFunc helper that allows you to wrap ordinary functions with a Handler-compatible signature.

The request handlers receive as arguments:

  • a context object which may be used for handling cancellations and timeouts.
  • a ResponseWriter implementation for the transport in use. The handler should respond to the incoming request (with a message or with an error) using the response writer.
  • a Message instance that wraps the received low-level transport message.

To register a request handler you need to use the Handle method of the Server object. The method receives as arguments:

  • the endpoint to serve.
  • the handler for incoming requests.
  • zero or more middleware to be applied.
Middleware

Request middleware are built using the functional argument pattern. The serve as generators, returning a method to be invoked with a pointer to an Endpoint. The following middleware are available:

Middleware name Description
Throttle Limit the max number of concurrent requests (maxConcurrent param). In addition, an execution timeout (as a time.Duration object) may be specified. If the request cannot be served within the specified deadline, it will be automatically aborted with a ErrTimeout error.

Listening and processing requests

After defining the endpoint handlers for your services, you can instruct the server to bind to them and begin processing requests by invoking the ListenAndServe method. This method blocks until the server is shutdown so its best to spawn a go-routine and invoke ListenAndServe inside it.

Shutting down the server

You can trigger a graceful server shutdown by invoking the Close method. When this method is invoked, any pending requests will be aborted with a ErrTimeout error.

The server will unbind the service endpoint so no further requests are received and then the method will block until all pending requests have been flushed.

The client

The client provides a Future-based API for performing asynchronous RPC calls.

When a new request is made, the client will assign a unique correlation id to it, add it to a list of pending requests and return a read-only channel where the caller can block waiting for the response to arrive.

The client uses a background worker to match incoming server responses to pending requests and emits the response to the channel allocated by the matched request.

The client provides two methods for executing requests Request and RequestWithTimeout. Both methods receive a context as their first argument. The RequestWithTimeout method also allows you to specify a timeout for the request. If the timeout expires before a response is received, the client will automatically expire the request with an ErrTimeout error.

Both request methods return a <- chan ServerReponse to the caller. The ServerResponse struct contains a Message and an Error. If an error occurs, Message will be nil and the Error will contain the error that occured.

Examples

Coming soon...

Testing

Coming soon...

License

usrv is distributed under the MIT license.

Documentation

Index

Constants

View Source
const (
	EvtRegistered     EventType = iota // Registered endpoint
	EvtStarted                  = iota // Dialed transport
	EvtServing                  = iota // Serving endpoint
	EvtStopping                 = iota // Stopping server
	EvtStopped                  = iota // Server stopped and requests drained
	EvtTransportReset           = iota // Transport connection reset
)
View Source
const (
	CtxCurEndpoint = "cur_endpoint"
)

Context keys.

Variables

View Source
var (
	ErrCancelled          = context.Canceled
	ErrTimeout            = context.DeadlineExceeded
	ErrResponseSent       = errors.New("Response already sent")
	ErrClosed             = errors.New("Connection closed")
	ErrDialFailed         = errors.New("Failed to connect")
	ErrServiceUnavailable = errors.New("Service not available")
)

Errors introduced by the server and the client.

Functions

func InjectCtxFieldToClients

func InjectCtxFieldToClients(ctxField string)

Add fieldName to the list of ctx fields that will be injected by clients as headers into outgoing messages if present in the request ctx.

Types

type Binding

type Binding struct {
	// The type of the binding.
	Type BindingType

	// The name of the binding. If this is a server binding then Name should match the endpoint name.
	Name string

	// A channel for incoming messages.
	Messages chan TransportMessage
}

type BindingType

type BindingType int
const (
	ServerBinding BindingType = iota
	ClientBinding
)

The supported types of endpoint bindings.

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(transport Transport) *Client

Create a new client for the given endpoint.

func (*Client) Close

func (client *Client) Close()

Shutdown the client and abort any pending requests with ErrCancelled. Invoking any client method after invoking Close() will result in an ErrClientClosed

func (*Client) Dial

func (client *Client) Dial() error

Dial the transport, bind the endpoint listener and spawn a worker for multiplexing requests.

func (*Client) Request

func (client *Client) Request(ctx context.Context, msg *Message, endpoint string) (<-chan ServerResponse, string)

Create a new request to the specified endpoint. Returns a read-only channel that will emit a ServerResponse once it is received by the server and the request correlationId.

If ctx is cancelled while the request is in progress, the client will fail the request with ErrTimeout

func (*Client) RequestWithTimeout

func (client *Client) RequestWithTimeout(ctx context.Context, msg *Message, timeout time.Duration, endpoint string) (<-chan ServerResponse, string)

Create a new request to the specified endpoint with a client timeout. Returns a read-only channel that will emit a ServerResponse once it is received by the server and the request correlationId.

If the timeout expires or ctx is cancelled while the request is in progress, the client will fail the request with ErrTimeout

type Endpoint

type Endpoint struct {
	// The name of this endpoint. To avoid name clashes between similarly named
	// endpoints in different packages you should supply the fully qualified name
	// of this endpoint (e.g. com.foo.bar.svc1)
	Name string

	// The handler that will service incoming requests to this endpoint.
	Handler Handler
}

A service endpoint.

type EndpointOption

type EndpointOption func(ep *Endpoint) error

Endpoint functional option type.

type EventType

type EventType int

type Handler

type Handler interface {
	Serve(context.Context, ResponseWriter, *Message)
}

Objects implementing the Handler interface can be registered as RPC request handlers.

type HandlerFunc

type HandlerFunc func(context.Context, ResponseWriter, *Message)

The HandlerFunc type is an adapter to allow the use of ordinary functions as RPC handlers. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler object that calls f.

func (HandlerFunc) Serve

func (f HandlerFunc) Serve(ctx context.Context, writer ResponseWriter, req *Message)

Serve implementation delegates the call to the wrapped function.

type Header map[string]interface{}

func (Header) Get

func (h Header) Get(key string) interface{}

Get a key from the header map. Returns nil if key does not exist

func (Header) Set

func (h Header) Set(key string, value interface{})

Set a key to the header map

type Message

type Message struct {
	// A map of headers.
	Headers Header

	// The sender endpoint
	From string

	// The recipient endpoint
	To string

	// A unique identifier used by clients to match requests to incoming
	// responses when concurrent requests are in progress.
	CorrelationId string

	// A reply address for routing responses.
	ReplyTo string

	// The message generation timestamp.
	Timestamp time.Time

	// The message content.
	Payload []byte
}

The Message object provides a wrapper for the actual message implementation used by each transport provider.

type PipelineHandler

type PipelineHandler struct {
	Decoder   func([]byte) (interface{}, error)
	Processor func(context.Context, interface{}) (interface{}, error)
	Encoder   func(interface{}) ([]byte, error)
}

This handler provides an adapter for using a message serialization framework (e.g protobuf, msgpack, json) with usrv. It applies a 3-stage pipeline to the incoming raw request:

raw payload -> decoder -> process -> encode -> write to response writer

func (PipelineHandler) Serve

func (f PipelineHandler) Serve(ctx context.Context, writer ResponseWriter, reqMsg *Message)

Implementation of the Handler interface

type ResponseWriter

type ResponseWriter interface {

	// Get back the headers that will be sent with the response payload.
	// Modifying headers after invoking Write (or WriteError) has no effect.
	Header() Header

	// Write an error to the response.
	WriteError(err error) error

	// Write the data to the response and return the number of bytes that were actually written.
	Write(data []byte) (int, error)

	// Flush the written data and headers and close the ResponseWriter. Repeated invocations
	// of Close() should fail with ErrResponseSent.
	Close() error
}

Objects implementing the ResponseWriter interface can be used for responding to incoming RPC messages.

Since the actual reply serialization is transport-specific, each transport should define its own implementation.

type Server

type Server struct {

	// The logger for server messages.
	Logger *log.Logger
	// contains filtered or unexported fields
}

An RPC server implementation.

func NewServer

func NewServer(transport Transport, options ...ServerOption) (*Server, error)

Create a new server with default settings. One or more ServerOption functional arguments may be specified to override defaults.

func (*Server) Close

func (srv *Server) Close()

Shutdown the server. This function will unbind any bound endpoints and block until any pending requests have been drained.

func (*Server) Handle

func (srv *Server) Handle(path string, handler Handler, options ...EndpointOption) error

Register a handler for requests to RPC endpoint identified by path. One or more EndpointOption functional arguments may be specified to further customize the endpoint by applying for example middleware.

func (*Server) ListenAndServe

func (srv *Server) ListenAndServe() error

Dial the registered transport provider and then call Serve() to start processing incoming RPC requests.

func (*Server) SetOption

func (srv *Server) SetOption(options ...ServerOption) error

Set one or more ServerOptions.

type ServerEvent

type ServerEvent struct {
	Type     EventType
	Endpoint string
}

type ServerOption

type ServerOption func(srv *Server) error

Server functional option type.

func EventListener

func EventListener(listener chan ServerEvent) ServerOption

Attach an event listener for server-generated events

func WithLogger

func WithLogger(logger *log.Logger) ServerOption

Use a particular logger instance for logging server events.

type ServerResponse

type ServerResponse struct {
	// The server response message.
	Message *Message

	// An error reported by the remote server. It will be nil if no error was reported.
	Error error
}

This structure models a server response to an outgoing client request.

type Transport

type Transport interface {

	// Connect to the transport.
	Dial() error

	// Disconnect.
	Close()

	// Set logger.
	SetLogger(logger *log.Logger)

	// Bind an endpoint to the transport. The implementation should monitor the passed
	// context and terminate the binding once the context is cancelled.
	Bind(bindingType BindingType, endpoint string) (*Binding, error)

	// Send a message.
	Send(msg *Message) error

	// Register a listener for receiving close notifications. The transport will emit an error and
	// close the channel if the transport is cleanly shut down or close the channel if the connection is reset.
	NotifyClose(c chan error)
}

Objects implementing the Transport interface can be used by both the Server and Client as message transports.

type TransportMessage

type TransportMessage struct {
	// A transport-specific ResponseWriter implementation for replying to the incoming message.
	ResponseWriter ResponseWriter

	// The incoming message payload and its metadata.
	Message *Message
}

Transports generate TransportMessage objects for incoming messages at each bound endpoint.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL