gorpc

package module
v0.0.0-...-908281b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2016 License: MIT Imports: 16 Imported by: 46

README

gorpc

Simple, fast and scalable golang RPC library for high load and microservices.

Gorpc provides the following features useful for highly loaded projects with RPC:

  • It minimizes the number of connect() syscalls by pipelining request and response messages over a single connection.

  • It minimizes the number of send() syscalls by packing as much as possible pending requests and responses into a single compressed buffer before passing it into send() syscall.

  • It minimizes the number of recv() syscalls by reading and buffering as much as possible data from the network.

  • It supports RPC batching, which allows preparing multiple requests and sending them to the server in a single batch.

These features help the OS minimizing overhead (CPU load, the number of TCP connections in TIME_WAIT and CLOSE_WAIT states, the number of network packets and the amount of network bandwidth) required for RPC processing under high load.

Additionally gorpc provides the following features missing in net/rpc:

  • Client automatically manages connections and automatically reconnects to the server on connection errors.
  • Client supports response timeouts.
  • Client supports RPC batching.
  • Client supports async requests' canceling.
  • Client prioritizes new requests over old pending requests if server fails to handle the given load.
  • Client detects stuck servers and immediately returns error to the caller.
  • Client supports fast message passing to the Server, i.e. requests without responses.
  • Both Client and Server provide network stats and RPC stats out of the box.
  • Commonly used RPC transports such as TCP, TLS and unix socket are available out of the box.
  • RPC transport compression is provided out of the box.
  • Server provides graceful shutdown out of the box.
  • Server supports RPC handlers' councurrency throttling out of the box.
  • Server may pass client address to RPC handlers.
  • Server gracefully handles panic in RPC handlers.
  • Dispatcher accepts functions as RPC handlers.
  • Dispatcher supports registering multiple receiver objects of the same type under distinct names.
  • Dispatcher supports RPC handlers with zero, one (request) or two (client address and request) arguments and zero, one (either response or error) or two (response, error) return values.

Dispatcher API provided by gorpc allows easily converting usual functions and/or struct methods into RPC versions on both client and server sides. See Dispatcher examples for more details.

By default TCP connections are used as underlying gorpc transport. But it is possible using arbitrary underlying transport - just provide custom implementations for Client.Dial and Server.Listener. RPC authentication, authorization and encryption can be easily implemented via custom underlying transport and/or via OnConnect callbacks. Currently gorpc provides TCP, TLS and unix socket transport out of the box.

Currently gorpc with default settings is successfully used in highly loaded production environment serving up to 40K qps. Switching from http-based rpc to gorpc reduced required network bandwidth from 300 Mbit/s to 24 Mbit/s.

Docs

See http://godoc.org/github.com/valyala/gorpc .

Usage

Server:

s := &gorpc.Server{
	// Accept clients on this TCP address.
	Addr: ":12345",

	// Echo handler - just return back the message we received from the client
	Handler: func(clientAddr string, request interface{}) interface{} {
		log.Printf("Obtained request %+v from the client %s\n", request, clientAddr)
		return request
	},
}
if err := s.Serve(); err != nil {
	log.Fatalf("Cannot start rpc server: %s", err)
}

Client:

c := &gorpc.Client{
	// TCP address of the server.
	Addr: "rpc.server.addr:12345",
}
c.Start()

// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines.
resp, err := c.Call("foobar")
if err != nil {
	log.Fatalf("Error when sending request to server: %s", err)
}
if resp.(string) != "foobar" {
	log.Fatalf("Unexpected response from the server: %+v", resp)
}

Both client and server collect connection stats - the number of bytes read / written and the number of calls / errors to send(), recv(), connect() and accept(). This stats is available at Client.Stats and Server.Stats.

See tests for more usage examples.

Documentation

Overview

Package gorpc provides simple RPC API for highload projects.

Gorpc has the following features:

  • Easy-to-use API.
  • Optimized for high load (>10K qps).
  • Uses as low network bandwidth as possible.
  • Minimizes the number of TCP connections in TIME_WAIT and WAIT_CLOSE states.
  • Minimizes the number of send() and recv() syscalls.
  • Provides ability to use arbitrary underlying transport. By default TCP is used, but TLS and UNIX sockets are already available.

Index

Examples

Constants

View Source
const (
	// DefaultConcurrency is the default number of concurrent rpc calls
	// the server can process.
	DefaultConcurrency = 8 * 1024

	// DefaultRequestTimeout is the default timeout for client request.
	DefaultRequestTimeout = 20 * time.Second

	// DefaultPendingMessages is the default number of pending messages
	// handled by Client and Server.
	DefaultPendingMessages = 32 * 1024

	// DefaultFlushDelay is the default delay between message flushes
	// on Client and Server.
	DefaultFlushDelay = -1

	// DefaultBufferSize is the default size for Client and Server buffers.
	DefaultBufferSize = 64 * 1024
)

Variables

View Source
var ErrCanceled = &ClientError{
	Canceled: true,
	err:      fmt.Errorf("the call has been canceled"),
}

ErrCanceled may be returned from rpc call if AsyncResult.Cancel has been called.

Functions

func NilErrorLogger

func NilErrorLogger(format string, args ...interface{})

NilErrorLogger discards all error messages.

Pass NilErrorLogger to SetErrorLogger() in order to suppress error log generated by gorpc.

func RegisterType

func RegisterType(x interface{})

RegisterType registers the given type to send via rpc.

The client must register all the response types the server may send. The server must register all the request types the client may send.

There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

There is no need in registering argument and return value types for functions and methods registered via Dispatcher.

func SetErrorLogger

func SetErrorLogger(f LoggerFunc)

SetErrorLogger sets the given error logger to use in gorpc.

By default log.Printf is used for error logging.

Types

type AsyncResult

type AsyncResult struct {
	// The response can be read only after <-Done unblocks.
	Response interface{}

	// The error can be read only after <-Done unblocks.
	// The error can be casted to ClientError.
	Error error

	// Response and Error become available after <-Done unblocks.
	Done <-chan struct{}
	// contains filtered or unexported fields
}

AsyncResult is a result returned from Client.CallAsync().

func (*AsyncResult) Cancel

func (m *AsyncResult) Cancel()

Cancel cancels async call.

Canceled call isn't sent to the server unless it is already sent there. Canceled call may successfully complete if it has been already sent to the server before Cancel call.

It is safe calling this function multiple times from concurrently running goroutines.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch allows grouping and executing multiple RPCs in a single batch.

Batch may be created via Client.NewBatch().

func (*Batch) Add

func (b *Batch) Add(request interface{}) *BatchResult

Add ads new request to the RPC batch.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when Batch.Call*() is called.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*Batch) AddSkipResponse

func (b *Batch) AddSkipResponse(request interface{})

AddSkipResponse adds new request to the RPC batch and doesn't care about the response.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when Batch.Call*() is called.

All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*Batch) Call

func (b *Batch) Call() error

Call calls all the RPCs added via Batch.Add().

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from Batch.Add() after the Call returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.

func (*Batch) CallTimeout

func (b *Batch) CallTimeout(timeout time.Duration) error

CallTimeout calls all the RPCs added via Batch.Add() and waits for all the RPC responses during the given timeout.

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from Batch.Add() after the CallTimeout returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.

type BatchResult

type BatchResult struct {
	// The response can be read only after Batch.Call*() returns.
	Response interface{}

	// The error can be read only after Batch.Call*() returns.
	// The error can be casted to ClientError.
	Error error

	// <-Done unblocks after Batch.Call*() returns.
	// Response and Error become available after <-Done unblocks.
	Done <-chan struct{}
	// contains filtered or unexported fields
}

BatchResult is a result returned from Batch.Add*().

type Client

type Client struct {
	// Server address to connect to.
	//
	// The address format depends on the underlying transport provided
	// by Client.Dial. The following transports are provided out of the box:
	//   * TCP - see NewTCPClient() and NewTCPServer().
	//   * TLS - see NewTLSClient() and NewTLSServer().
	//   * Unix sockets - see NewUnixClient() and NewUnixServer().
	//
	// By default TCP transport is used.
	Addr string

	// The number of concurrent connections the client should establish
	// to the sever.
	// By default only one connection is established.
	Conns int

	// The maximum number of pending requests in the queue.
	//
	// The number of pending requsts should exceed the expected number
	// of concurrent goroutines calling client's methods.
	// Otherwise a lot of ClientError.Overflow errors may appear.
	//
	// Default is DefaultPendingMessages.
	PendingRequests int

	// Delay between request flushes.
	//
	// Negative values lead to immediate requests' sending to the server
	// without their buffering. This minimizes rpc latency at the cost
	// of higher CPU and network usage.
	//
	// Default value is DefaultFlushDelay.
	FlushDelay time.Duration

	// Maximum request time.
	// Default value is DefaultRequestTimeout.
	RequestTimeout time.Duration

	// Disable data compression.
	// By default data compression is enabled.
	DisableCompression bool

	// Size of send buffer per each underlying connection in bytes.
	// Default value is DefaultBufferSize.
	SendBufferSize int

	// Size of recv buffer per each underlying connection in bytes.
	// Default value is DefaultBufferSize.
	RecvBufferSize int

	// OnConnect is called whenever connection to server is established.
	// The callback can be used for authentication/authorization/encryption
	// and/or for custom transport wrapping.
	//
	// See also Dial callback, which can be used for sophisticated
	// transport implementation.
	OnConnect OnConnectFunc

	// The client calls this callback when it needs new connection
	// to the server.
	// The client passes Client.Addr into Dial().
	//
	// Override this callback if you want custom underlying transport
	// and/or authentication/authorization.
	// Don't forget overriding Server.Listener accordingly.
	//
	// See also OnConnect for authentication/authorization purposes.
	//
	// * NewTLSClient() and NewTLSServer() can be used for encrypted rpc.
	// * NewUnixClient() and NewUnixServer() can be used for fast local
	//   inter-process rpc.
	//
	// By default it returns TCP connections established to the Client.Addr.
	Dial DialFunc

	// LogError is used for error logging.
	//
	// By default the function set via SetErrorLogger() is used.
	LogError LoggerFunc

	// Connection statistics.
	//
	// The stats doesn't reset automatically. Feel free resetting it
	// any time you wish.
	Stats ConnStats
	// contains filtered or unexported fields
}

Client implements RPC client.

The client must be started with Client.Start() before use.

It is absolutely safe and encouraged using a single client across arbitrary number of concurrently running goroutines.

Default client settings are optimized for high load, so don't override them without valid reason.

func NewTCPClient

func NewTCPClient(addr string) *Client

NewTCPClient creates a client connecting over TCP to the server listening to the given addr.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewTCPServer().

func NewTLSClient

func NewTLSClient(addr string, cfg *tls.Config) *Client

NewTLSClient creates a client connecting over TLS (aka SSL) to the server listening to the given addr using the given TLS config.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewTLSServer().

func NewUnixClient

func NewUnixClient(addr string) *Client

NewUnixClient creates a client connecting over unix socket to the server listening to the given addr.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewUnixServer().

func (*Client) Call

func (c *Client) Call(request interface{}) (response interface{}, err error)

Call sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during Client.RequestTimeout or server connection problems occur. The returned error can be casted to ClientError.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for distinct calls' construction.

Don't forget starting the client with Client.Start() before calling Client.Call().

func (*Client) CallAsync

func (c *Client) CallAsync(request interface{}) (*AsyncResult, error)

CallAsync starts async rpc call.

Rpc call is complete after <-AsyncResult.Done unblocks. If you want canceling the request, just throw away the returned AsyncResult.

CallAsync doesn't respect Client.RequestTimeout - response timeout may be controlled by the caller via something like:

r := c.CallAsync("foobar")
select {
case <-time.After(c.RequestTimeout):
   log.Printf("rpc timeout!")
case <-r.Done:
   processResponse(r.Response, r.Error)
}

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Don't forget starting the client with Client.Start() before calling Client.CallAsync().

func (*Client) CallTimeout

func (c *Client) CallTimeout(request interface{}, timeout time.Duration) (response interface{}, err error)

CallTimeout sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during the given timeout or server connection problems occur. The returned error can be casted to ClientError.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for distinct calls' construction.

Don't forget starting the client with Client.Start() before calling Client.Call().

func (*Client) NewBatch

func (c *Client) NewBatch() *Batch

NewBatch creates new RPC batch.

It is safe creating multiple concurrent batches from a single client.

Don't forget starting the client with Client.Start() before working with batched RPC.

func (*Client) PendingRequestsCount

func (c *Client) PendingRequestsCount() int

PendingRequestsCount returns the instant number of pending requests.

The main purpose of this function is to use in load-balancing schemes where load should be balanced between multiple rpc clients.

Don't forget starting the client with Client.Start() before calling this function.

func (*Client) Send

func (c *Client) Send(request interface{}) error

Send sends the given request to the server and doesn't wait for response.

Since this is 'fire and forget' function, which never waits for response, it cannot guarantee that the server receives and successfully processes the given request. Though in most cases under normal conditions requests should reach the server and it should successfully process them. Send semantics is similar to UDP messages' semantics.

The server may return arbitrary response on Send() request, but the response is totally ignored.

All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Don't forget starting the client with Client.Start() before calling Client.Send().

func (*Client) Start

func (c *Client) Start()

Start starts rpc client. Establishes connection to the server on Client.Addr.

All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

func (*Client) Stop

func (c *Client) Stop()

Stop stops rpc client. Stopped client can be started again.

type ClientError

type ClientError struct {
	// Set if the error is timeout-related.
	Timeout bool

	// Set if the error is connection-related.
	Connection bool

	// Set if the error is server-related.
	Server bool

	// Set if the error is related to internal resources' overflow.
	// Increase PendingRequests if you see a lot of such errors.
	Overflow bool

	// May be set if AsyncResult.Cancel is called.
	Canceled bool
	// contains filtered or unexported fields
}

ClientError is an error Client methods can return.

func (*ClientError) Error

func (e *ClientError) Error() string

type ConnStats

type ConnStats struct {
	// The number of rpc calls performed.
	RPCCalls uint64

	// The total aggregate time for all rpc calls in milliseconds.
	//
	// This time can be used for calculating the average response time
	// per RPC:
	//     avgRPCTtime = RPCTime / RPCCalls
	RPCTime uint64

	// The number of bytes written to the underlying connections.
	BytesWritten uint64

	// The number of bytes read from the underlying connections.
	BytesRead uint64

	// The number of Read() calls.
	ReadCalls uint64

	// The number of Read() errors.
	ReadErrors uint64

	// The number of Write() calls.
	WriteCalls uint64

	// The number of Write() errors.
	WriteErrors uint64

	// The number of Dial() calls.
	DialCalls uint64

	// The number of Dial() errors.
	DialErrors uint64

	// The number of Accept() calls.
	AcceptCalls uint64

	// The number of Accept() errors.
	AcceptErrors uint64
	// contains filtered or unexported fields
}

ConnStats provides connection statistics. Applied to both gorpc.Client and gorpc.Server.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCBytes

func (cs *ConnStats) AvgRPCBytes() (send float64, recv float64)

AvgRPCBytes returns the average bytes sent / received per RPC.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCCalls

func (cs *ConnStats) AvgRPCCalls() (write float64, read float64)

AvgRPCCalls returns the average number of write() / read() syscalls per PRC.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCTime

func (cs *ConnStats) AvgRPCTime() time.Duration

AvgRPCTime returns the average RPC execution time.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) Reset

func (cs *ConnStats) Reset()

Reset resets all the stats counters.

func (*ConnStats) Snapshot

func (cs *ConnStats) Snapshot() *ConnStats

Snapshot returns connection statistics' snapshot.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

type DialFunc

type DialFunc func(addr string) (conn io.ReadWriteCloser, err error)

DialFunc is a function intended for setting to Client.Dial.

It is expected that the returned conn immediately sends all the data passed via Write() to the server. Otherwise gorpc may hang. The conn implementation must call Flush() on underlying buffered streams before returning from Write().

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher helps constructing HandlerFunc for dispatching across multiple functions and/or services.

Dispatcher automatically registers all the request and response types for all functions and/or methods registered via AddFunc() and AddService(), so there is no need in calling RegisterType() for these types on server side. Client-side code must call RegisterType() for non-internal request and response types before issuing RPCs via DispatcherClient.

See examples for details.

Example (FuncCalls)
d := NewDispatcher()

// Function without args and return values
incCalls := 0
d.AddFunc("Inc", func() { incCalls++ })

// Function without args
d.AddFunc("Func42", func() int { return 42 })

// Echo function for string
d.AddFunc("Echo", func(s string) string { return s })

// Function with struct arg and return value
type ExampleRequestStruct struct {
	Foo int
	Bar string
}
type ExampleResponseStruct struct {
	Baz    string
	BarLen int
}
d.AddFunc("Struct", func(s *ExampleRequestStruct) *ExampleResponseStruct {
	return &ExampleResponseStruct{
		Baz:    fmt.Sprintf("foo=%d, bar=%s", s.Foo, s.Bar),
		BarLen: len(s.Bar),
	}
})

// Echo function for map
d.AddFunc("Map", func(m map[string]int) map[string]int { return m })

// Echo function for slice
d.AddFunc("Slice", func(s []string) []string { return s })

// Function returning errors
d.AddFunc("Error", func() error { return errors.New("error") })

// Echo function, which may return error if arg is 42
d.AddFunc("Error42", func(x int) (int, error) {
	if x == 42 {
		return 0, errors.New("error42")
	}
	return x, nil
})

// Echo function with client address' validation
d.AddFunc("ClientAddr", func(clientAddr string, x int) (int, error) {
	clientHost := strings.SplitN(clientAddr, ":", 2)[0]
	if clientHost != "allowed.client.host" {
		return 0, fmt.Errorf("invalid rpc client host: [%s]", clientHost)
	}
	return x, nil
})

// Start the server serving all the registered functions above
s := NewTCPServer("127.0.0.1:12345", d.NewHandlerFunc())
if err := s.Start(); err != nil {
	log.Fatalf("Cannot start rpc server: [%s]", err)
}
defer s.Stop()

// Start the client and connect it to the server
c := NewTCPClient("127.0.0.1:12345")
c.Start()
defer c.Stop()

// Create a client wrapper for calling server functions.
dc := d.NewFuncClient(c)

// Call functions defined above
res, err := dc.Call("Inc", nil)
fmt.Printf("Inc=%+v, %+v, %d\n", res, err, incCalls)

res, err = dc.Call("Func42", nil)
fmt.Printf("Func42=%+v, %+v\n", res, err)

res, err = dc.Call("Echo", "foobar")
fmt.Printf("Echo=%+v, %+v\n", res, err)

reqst := &ExampleRequestStruct{
	Foo: 42,
	Bar: "bar",
}
res, err = dc.Call("Struct", reqst)
fmt.Printf("Struct=%+v, %+v\n", res, err)

res, err = dc.Call("Map", map[string]int{"foo": 1, "bar": 2})
resm := res.(map[string]int)
fmt.Printf("Map=foo:%d, bar:%d, %+v\n", resm["foo"], resm["bar"], err)

res, err = dc.Call("Slice", []string{"foo", "bar"})
fmt.Printf("Slice=%+v, %+v\n", res, err)

res, err = dc.Call("Error", nil)
fmt.Printf("Error=%+v, %+v\n", res, err)

res, err = dc.Call("Error42", 123)
fmt.Printf("Error42(123)=%+v, %+v\n", res, err)

res, err = dc.Call("Error42", 42)
fmt.Printf("Error42(42)=%+v, %+v\n", res, err)
Output:

Inc=<nil>, <nil>, 1
Func42=42, <nil>
Echo=foobar, <nil>
Struct=&{Baz:foo=42, bar=bar BarLen:3}, <nil>
Map=foo:1, bar:2, <nil>
Slice=[foo bar], <nil>
Error=<nil>, error
Error42(123)=123, <nil>
Error42(42)=<nil>, error42
Example (ServiceCalls)
package main

import (
	"errors"
	"fmt"
	"log"
)

type ExampleDispatcherService struct {
	state int
}

func (s *ExampleDispatcherService) Get() int { return s.state }

func (s *ExampleDispatcherService) Set(x int) { s.state = x }

func (s *ExampleDispatcherService) GetError42() (int, error) {
	if s.state == 42 {
		return 0, errors.New("error42")
	}
	return s.state, nil
}

func (s *ExampleDispatcherService) privateFunc(string) { s.state = 0 }

func main() {
	d := NewDispatcher()

	service := &ExampleDispatcherService{
		state: 123,
	}

	// Register exported service functions
	d.AddService("MyService", service)

	// Start rpc server serving registered service.
	addr := "127.0.0.1:7892"
	s := NewTCPServer(addr, d.NewHandlerFunc())
	if err := s.Start(); err != nil {
		log.Fatalf("Cannot start rpc server: [%s]", err)
	}
	defer s.Stop()

	// Start rpc client connected to the server.
	c := NewTCPClient(addr)
	c.Start()
	defer c.Stop()

	// Create client wrapper for calling service functions.
	dc := d.NewServiceClient("MyService", c)

	res, err := dc.Call("Get", nil)
	fmt.Printf("Get=%+v, %+v\n", res, err)

	service.state = 456
	res, err = dc.Call("Get", nil)
	fmt.Printf("Get=%+v, %+v\n", res, err)

	res, err = dc.Call("Set", 78)
	fmt.Printf("Set=%+v, %+v, %+v\n", res, err, service.state)

	res, err = dc.Call("GetError42", nil)
	fmt.Printf("GetError42=%+v, %+v\n", res, err)

	service.state = 42
	res, err = dc.Call("GetError42", nil)
	fmt.Printf("GetError42=%+v, %+v\n", res, err)

}
Output:

Get=123, <nil>
Get=456, <nil>
Set=<nil>, <nil>, 78
GetError42=78, <nil>
GetError42=<nil>, error42

func NewDispatcher

func NewDispatcher() *Dispatcher

NewDispatcher returns new dispatcher.

func (*Dispatcher) AddFunc

func (d *Dispatcher) AddFunc(funcName string, f interface{})

AddFunc registers the given function f under the name funcName.

The function must accept zero, one or two input arguments. If the function has two arguments, then the first argument must have string type - the server will pass client address in this parameter.

The function must return zero, one or two values.

  • If the function has two return values, then the second value must have error type - the server will propagate this error to the client.

  • If the function returns only error value, then the server treats it as error, not return value, when sending to the client.

Arbitrary number of functions can be registered in the dispatcher.

See examples for details.

Example
d := NewDispatcher()

// Function without arguments and return values
d.AddFunc("NoArgsNoRets", func() {})

// Function with one argument and no return values
d.AddFunc("OneArgNoRets", func(request string) {})

// Function without arguments and one return value
d.AddFunc("NoArgsOneRet", func() int { return 42 })

// Function with two arguments and no return values.
// The first argument must have string type - the server passes
// client address in it.
d.AddFunc("TwoArgsNoRets", func(clientAddr string, requests []byte) {})

// Function with one argument and two return values.
// The second return value must have error type.
d.AddFunc("OneArgTwoRets", func(request []string) ([]string, error) {
	if len(request) == 42 {
		return nil, errors.New("need 42 strings")
	}
	return request, nil
})
Output:

func (*Dispatcher) AddService

func (d *Dispatcher) AddService(serviceName string, service interface{})

AddService registers public methods of the given service under the given name serviceName.

Since only public methods are registered, the service must have at least one public method.

All public methods must conform requirements described in AddFunc().

func (*Dispatcher) NewFuncClient

func (d *Dispatcher) NewFuncClient(c *Client) *DispatcherClient

NewFuncClient returns a client suitable for calling functions registered via AddFunc().

func (*Dispatcher) NewHandlerFunc

func (d *Dispatcher) NewHandlerFunc() HandlerFunc

NewHandlerFunc returns HandlerFunc serving all the functions and/or services registered via AddFunc() and AddService().

The returned HandlerFunc must be assigned to Server.Handler or passed to New*Server().

func (*Dispatcher) NewServiceClient

func (d *Dispatcher) NewServiceClient(serviceName string, c *Client) *DispatcherClient

NewServiceClient returns a client suitable for calling methods of the service with name serviceName registered via AddService().

It is safe creating multiple service clients over a single underlying client.

type DispatcherBatch

type DispatcherBatch struct {
	// contains filtered or unexported fields
}

DispatcherBatch allows grouping and executing multiple RPCs in a single batch.

DispatcherBatch may be created via DispatcherClient.NewBatch().

func (*DispatcherBatch) Add

func (b *DispatcherBatch) Add(funcName string, request interface{}) *BatchResult

Add ads new request to the RPC batch.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*DispatcherBatch) AddSkipResponse

func (b *DispatcherBatch) AddSkipResponse(funcName string, request interface{})

AddSkipResponse adds new request to the RPC batch and doesn't care about the response.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.

All the non-internal request types must be registered via RegisterType() before the first call to this function.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*DispatcherBatch) Call

func (b *DispatcherBatch) Call() error

Call calls all the RPCs added via DispatcherBatch.Add().

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the Call returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.

func (*DispatcherBatch) CallTimeout

func (b *DispatcherBatch) CallTimeout(timeout time.Duration) error

CallTimeout calls all the RPCs added via DispatcherBatch.Add() and waits for all the RPC responses during the given timeout.

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the CallTimeout returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.

type DispatcherClient

type DispatcherClient struct {
	// contains filtered or unexported fields
}

DispatcherClient is a Client wrapper suitable for calling registered functions and/or for calling methods of the registered services.

func (*DispatcherClient) Call

func (dc *DispatcherClient) Call(funcName string, request interface{}) (response interface{}, err error)

Call calls the given function with the given request.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) CallAsync

func (dc *DispatcherClient) CallAsync(funcName string, request interface{}) (*AsyncResult, error)

CallAsync calls the given function asynchronously.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) CallTimeout

func (dc *DispatcherClient) CallTimeout(funcName string, request interface{}, timeout time.Duration) (response interface{}, err error)

CallTimeout calls the given function and waits for response during the given timeout.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) NewBatch

func (dc *DispatcherClient) NewBatch() *DispatcherBatch

NewBatch creates new RPC batch for the given DispatcherClient.

It is safe creating multiple concurrent batches from a single client.

Example
// Create new dispatcher.
d := NewDispatcher()

// Register echo function in the dispatcher.
d.AddFunc("Echo", func(x int) int { return x })

// Start the server serving all the registered functions above
s := NewTCPServer("127.0.0.1:12445", d.NewHandlerFunc())
if err := s.Start(); err != nil {
	log.Fatalf("Cannot start rpc server: [%s]", err)
}
defer s.Stop()

// Start the client and connect it to the server
c := NewTCPClient("127.0.0.1:12445")
c.Start()
defer c.Stop()

// Create a client wrapper for calling server functions.
dc := d.NewFuncClient(c)

// Create new batch for calling server functions.
b := dc.NewBatch()
result := make([]*BatchResult, 3)

// Add RPC messages to the batch.
for i := 0; i < 3; i++ {
	result[i] = b.Add("Echo", i)
}

// Invoke all the RPCs added to the batch.
if err := b.Call(); err != nil {
	log.Fatalf("error when calling RPC batch: [%s]", err)
}

for i := 0; i < 3; i++ {
	r := result[i]
	fmt.Printf("response[%d]=%+v, %+v\n", i, r.Response, r.Error)
}
Output:

response[0]=0, <nil>
response[1]=1, <nil>
response[2]=2, <nil>

func (*DispatcherClient) Send

func (dc *DispatcherClient) Send(funcName string, request interface{}) error

Send sends the given request to the given function and doesn't wait for response.

All the non-internal request types must be registered via RegisterType() before the first call to this function.

type HandlerFunc

type HandlerFunc func(clientAddr string, request interface{}) (response interface{})

HandlerFunc is a server handler function.

clientAddr contains client address returned by Listener.Accept(). Request and response types may be arbitrary. All the request and response types the HandlerFunc may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for HandlerFunc construction.

type Listener

type Listener interface {
	// Init is called on server start.
	//
	// addr contains the address set at Server.Addr.
	Init(addr string) error

	// Accept must return incoming connections from clients.
	// clientAddr must contain client's address in user-readable view.
	//
	// It is expected that the returned conn immediately
	// sends all the data passed via Write() to the client.
	// Otherwise gorpc may hang.
	// The conn implementation must call Flush() on underlying buffered
	// streams before returning from Write().
	Accept() (conn io.ReadWriteCloser, clientAddr string, err error)

	// Close closes the listener.
	// All pending calls to Accept() must immediately return errors after
	// Close is called.
	// All subsequent calls to Accept() must immediately return error.
	Close() error

	// Addr returns the listener's network address.
	ListenAddr() net.Addr
}

Listener is an interface for custom listeners intended for the Server.

type LoggerFunc

type LoggerFunc func(format string, args ...interface{})

LoggerFunc is an error logging function to pass to gorpc.SetErrorLogger().

type OnConnectFunc

type OnConnectFunc func(remoteAddr string, rwc io.ReadWriteCloser) (io.ReadWriteCloser, error)

OnConnectFunc is a callback, which may be called by both Client and Server on every connection creation if assigned to Client.OnConnect / Server.OnConnect.

remoteAddr is the address of the remote end for the established connection rwc.

The callback must return either rwc itself or a rwc wrapper. The returned connection wrapper MUST send all the data to the underlying rwc on every Write() call, otherwise the connection will hang forever.

The callback may be used for authentication/authorization and/or custom transport wrapping.

type Server

type Server struct {
	// Address to listen to for incoming connections.
	//
	// The address format depends on the underlying transport provided
	// by Server.Listener. The following transports are provided
	// out of the box:
	//   * TCP - see NewTCPServer() and NewTCPClient().
	//   * TLS (aka SSL) - see NewTLSServer() and NewTLSClient().
	//   * Unix sockets - see NewUnixServer() and NewUnixClient().
	//
	// By default TCP transport is used.
	Addr string

	// Handler function for incoming requests.
	//
	// Server calls this function for each incoming request.
	// The function must process the request and return the corresponding response.
	//
	// Hint: use Dispatcher for HandlerFunc construction.
	Handler HandlerFunc

	// The maximum number of concurrent rpc calls the server may perform.
	// Default is DefaultConcurrency.
	Concurrency int

	// The maximum delay between response flushes to clients.
	//
	// Negative values lead to immediate requests' sending to the client
	// without their buffering. This minimizes rpc latency at the cost
	// of higher CPU and network usage.
	//
	// Default is DefaultFlushDelay.
	FlushDelay time.Duration

	// The maximum number of pending responses in the queue.
	// Default is DefaultPendingMessages.
	PendingResponses int

	// Size of send buffer per each underlying connection in bytes.
	// Default is DefaultBufferSize.
	SendBufferSize int

	// Size of recv buffer per each underlying connection in bytes.
	// Default is DefaultBufferSize.
	RecvBufferSize int

	// OnConnect is called whenever connection from client is accepted.
	// The callback can be used for authentication/authorization/encryption
	// and/or for custom transport wrapping.
	//
	// See also Listener, which can be used for sophisticated transport
	// implementation.
	OnConnect OnConnectFunc

	// The server obtains new client connections via Listener.Accept().
	//
	// Override the listener if you want custom underlying transport
	// and/or client authentication/authorization.
	// Don't forget overriding Client.Dial() callback accordingly.
	//
	// See also OnConnect for authentication/authorization purposes.
	//
	// * NewTLSClient() and NewTLSServer() can be used for encrypted rpc.
	// * NewUnixClient() and NewUnixServer() can be used for fast local
	//   inter-process rpc.
	//
	// By default it returns TCP connections accepted from Server.Addr.
	Listener Listener

	// LogError is used for error logging.
	//
	// By default the function set via SetErrorLogger() is used.
	LogError LoggerFunc

	// Connection statistics.
	//
	// The stats doesn't reset automatically. Feel free resetting it
	// any time you wish.
	Stats ConnStats
	// contains filtered or unexported fields
}

Server implements RPC server.

Default server settings are optimized for high load, so don't override them without valid reason.

Example
// Register the given struct for passing as rpc request and/or response.
// All structs intended for passing between client and server
// must be registered via RegisterType().
//
// The struct may contain arbitrary fields, but only public (exported)
// fields are passed between client and server.
type ExampleStruct struct {
	Foo int

	// This feild won't be passed over the wire,
	// since it is private (unexported)
	bar string

	Baz string
}
RegisterType(&ExampleStruct{})

// Start echo server
handlerFunc := func(clientAddr string, request interface{}) interface{} {
	return request
}
s := NewTCPServer("127.0.0.1:43216", handlerFunc)
if err := s.Start(); err != nil {
	log.Fatalf("Cannot start server: [%s]", err)
}
defer s.Stop()

// Connect client to the echo server
c := NewTCPClient("127.0.0.1:43216")
c.Start()
defer c.Stop()

// Echo string
res, err := c.Call("foobar")
fmt.Printf("%+v, %+v\n", res, err)

// Echo int
res, err = c.Call(1234)
fmt.Printf("%+v, %+v\n", res, err)

// Echo string slice
res, err = c.Call([]string{"foo", "bar"})
fmt.Printf("%+v, %+v\n", res, err)

// Echo struct
res, err = c.Call(&ExampleStruct{
	Foo: 123,
	bar: "324",
	Baz: "mmm",
})
fmt.Printf("%+v, %+v\n", res, err)
Output:

foobar, <nil>
1234, <nil>
[foo bar], <nil>
&{Foo:123 bar: Baz:mmm}, <nil>

func NewTCPServer

func NewTCPServer(addr string, handler HandlerFunc) *Server

NewTCPServer creates a server listening for TCP connections on the given addr and processing incoming requests with the given HandlerFunc.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewTCPClient().

func NewTLSServer

func NewTLSServer(addr string, handler HandlerFunc, cfg *tls.Config) *Server

NewTLSServer creates a server listening for TLS (aka SSL) connections on the given addr and processing incoming requests with the given HandlerFunc. cfg must contain TLS settings for the server.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewTLSClient().

func NewUnixServer

func NewUnixServer(addr string, handler HandlerFunc) *Server

NewUnixServer creates a server listening for unix connections on the given addr and processing incoming requests with the given HandlerFunc.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewUnixClient().

func (*Server) Serve

func (s *Server) Serve() error

Serve starts rpc server and blocks until it is stopped.

func (*Server) Start

func (s *Server) Start() error

Start starts rpc server.

All the request and response types the Handler may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

func (*Server) Stop

func (s *Server) Stop()

Stop stops rpc server. Stopped server can be started again.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL