Documentation ¶
Index ¶
- Constants
- Variables
- func InjectCtxFieldToClients(ctxField string)
- type Binding
- type BindingType
- type Client
- func (client *Client) Close()
- func (client *Client) Dial() error
- func (client *Client) Request(ctx context.Context, msg *Message, endpoint string) (<-chan ServerResponse, string)
- func (client *Client) RequestWithTimeout(ctx context.Context, msg *Message, timeout time.Duration, endpoint string) (<-chan ServerResponse, string)
- type Endpoint
- type EndpointOption
- type EventType
- type Handler
- type HandlerFunc
- type Header
- type Message
- type PipelineHandler
- type ResponseWriter
- type Server
- type ServerEvent
- type ServerOption
- type ServerResponse
- type Transport
- type TransportMessage
Constants ¶
const ( EvtRegistered EventType = iota // Registered endpoint EvtStarted = iota // Dialed transport EvtServing = iota // Serving endpoint EvtStopping = iota // Stopping server EvtStopped = iota // Server stopped and requests drained EvtTransportReset = iota // Transport connection reset )
const (
CtxCurEndpoint = "cur_endpoint"
)
Context keys.
Variables ¶
var ( ErrCancelled = context.Canceled ErrTimeout = context.DeadlineExceeded ErrResponseSent = errors.New("Response already sent") ErrClosed = errors.New("Connection closed") ErrDialFailed = errors.New("Failed to connect") )
Errors introduced by the server and the client.
Functions ¶
func InjectCtxFieldToClients ¶
func InjectCtxFieldToClients(ctxField string)
Add fieldName to the list of ctx fields that will be injected by clients as headers into outgoing messages if present in the request ctx.
Types ¶
type Binding ¶
type Binding struct { // The type of the binding. Type BindingType // The name of the binding. If this is a server binding then Name should match the endpoint name. Name string // A channel for incoming messages. Messages chan TransportMessage }
type BindingType ¶
type BindingType int
const ( ServerBinding BindingType = iota ClientBinding )
The supported types of endpoint bindings.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) Close ¶
func (client *Client) Close()
Shutdown the client and abort any pending requests with ErrCancelled. Invoking any client method after invoking Close() will result in an ErrClientClosed
func (*Client) Dial ¶
Dial the transport, bind the endpoint listener and spawn a worker for multiplexing requests.
func (*Client) Request ¶
func (client *Client) Request(ctx context.Context, msg *Message, endpoint string) (<-chan ServerResponse, string)
Create a new request to the specified endpoint. Returns a read-only channel that will emit a ServerResponse once it is received by the server and the request correlationId.
If ctx is cancelled while the request is in progress, the client will fail the request with ErrTimeout
func (*Client) RequestWithTimeout ¶
func (client *Client) RequestWithTimeout(ctx context.Context, msg *Message, timeout time.Duration, endpoint string) (<-chan ServerResponse, string)
Create a new request to the specified endpoint with a client timeout. Returns a read-only channel that will emit a ServerResponse once it is received by the server and the request correlationId.
If the timeout expires or ctx is cancelled while the request is in progress, the client will fail the request with ErrTimeout
type Endpoint ¶
type Endpoint struct { // The name of this endpoint. To avoid name clashes between similarly named // endpoints in different packages you should supply the fully qualified name // of this endpoint (e.g. com.foo.bar.svc1) Name string // The handler that will service incoming requests to this endpoint. Handler Handler }
A service endpoint.
type Handler ¶
type Handler interface {
Serve(context.Context, ResponseWriter, *Message)
}
Objects implementing the Handler interface can be registered as RPC request handlers.
type HandlerFunc ¶
type HandlerFunc func(context.Context, ResponseWriter, *Message)
The HandlerFunc type is an adapter to allow the use of ordinary functions as RPC handlers. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler object that calls f.
func (HandlerFunc) Serve ¶
func (f HandlerFunc) Serve(ctx context.Context, writer ResponseWriter, req *Message)
Serve implementation delegates the call to the wrapped function.
type Header ¶
type Header map[string]interface{}
type Message ¶
type Message struct { // A map of headers. Headers Header // The sender endpoint From string // The recipient endpoint To string // A unique identifier used by clients to match requests to incoming // responses when concurrent requests are in progress. CorrelationId string // A reply address for routing responses. ReplyTo string // The message generation timestamp. Timestamp time.Time // The message content. Payload []byte }
The Message object provides a wrapper for the actual message implementation used by each transport provider.
type PipelineHandler ¶
type PipelineHandler struct { Decoder func([]byte) (interface{}, error) Processor func(context.Context, interface{}) (interface{}, error) Encoder func(interface{}) ([]byte, error) }
This handler provides an adapter for using a message serialization framework (e.g protobuf, msgpack, json) with usrv. It applies a 3-stage pipeline to the incoming raw request:
raw payload -> decoder -> process -> encode -> write to response writer
func (PipelineHandler) Serve ¶
func (f PipelineHandler) Serve(ctx context.Context, writer ResponseWriter, reqMsg *Message)
Implementation of the Handler interface
type ResponseWriter ¶
type ResponseWriter interface { // Get back the headers that will be sent with the response payload. // Modifying headers after invoking Write (or WriteError) has no effect. Header() Header // Write an error to the response. WriteError(err error) error // Write the data to the response and return the number of bytes that were actually written. Write(data []byte) (int, error) // Flush the written data and headers and close the ResponseWriter. Repeated invocations // of Close() should fail with ErrResponseSent. Close() error }
Objects implementing the ResponseWriter interface can be used for responding to incoming RPC messages.
Since the actual reply serialization is transport-specific, each transport should define its own implementation.
type Server ¶
type Server struct { // The logger for server messages. Logger *log.Logger // contains filtered or unexported fields }
An RPC server implementation.
func NewServer ¶
func NewServer(transport Transport, options ...ServerOption) (*Server, error)
Create a new server with default settings. One or more ServerOption functional arguments may be specified to override defaults.
func (*Server) Close ¶
func (srv *Server) Close()
Shutdown the server. This function will unbind any bound endpoints and block until any pending requests have been drained.
func (*Server) Handle ¶
func (srv *Server) Handle(path string, handler Handler, options ...EndpointOption) error
Register a handler for requests to RPC endpoint identified by path. One or more EndpointOption functional arguments may be specified to further customize the endpoint by applying for example middleware.
func (*Server) ListenAndServe ¶
Dial the registered transport provider and then call Serve() to start processing incoming RPC requests.
func (*Server) SetOption ¶
func (srv *Server) SetOption(options ...ServerOption) error
Set one or more ServerOptions.
type ServerEvent ¶
type ServerOption ¶
Server functional option type.
func EventListener ¶
func EventListener(listener chan ServerEvent) ServerOption
Attach an event listener for server-generated events
func WithLogger ¶
func WithLogger(logger *log.Logger) ServerOption
Use a particular logger instance for logging server events.
type ServerResponse ¶
type ServerResponse struct { // The server response message. Message *Message // An error reported by the remote server. It will be nil if no error was reported. Error error }
This structure models a server response to an outgoing client request.
type Transport ¶
type Transport interface { // Connect to the transport. Dial() error // Disconnect. Close() // Set logger. SetLogger(logger *log.Logger) // Bind an endpoint to the transport. The implementation should monitor the passed // context and terminate the binding once the context is cancelled. Bind(bindingType BindingType, endpoint string) (*Binding, error) // Send a message. Send(msg *Message) error // Register a listener for receiving close notifications. The transport will emit an error and // close the channel if the transport is cleanly shut down or close the channel if the connection is reset. NotifyClose(c chan error) }
Objects implementing the Transport interface can be used by both the Server and Client as message transports.
type TransportMessage ¶
type TransportMessage struct { // A transport-specific ResponseWriter implementation for replying to the incoming message. ResponseWriter ResponseWriter // The incoming message payload and its metadata. Message *Message }
Transports generate TransportMessage objects for incoming messages at each bound endpoint.