gorpc: github.com/valyala/gorpc Index | Examples | Files

package gorpc

import "github.com/valyala/gorpc"

Package gorpc provides simple RPC API for highload projects.

Gorpc has the following features:

* Easy-to-use API.
* Optimized for high load (>10K qps).
* Uses as low network bandwidth as possible.
* Minimizes the number of TCP connections in TIME_WAIT and WAIT_CLOSE states.
* Minimizes the number of send() and recv() syscalls.
* Provides ability to use arbitrary underlying transport.
  By default TCP is used, but TLS and UNIX sockets are already available.

Index

Examples

Package Files

client.go common.go conn_stats.go conn_stats_generic.go dispatcher.go doc.go encoding.go server.go transport.go

Constants

const (
    // DefaultConcurrency is the default number of concurrent rpc calls
    // the server can process.
    DefaultConcurrency = 8 * 1024

    // DefaultRequestTimeout is the default timeout for client request.
    DefaultRequestTimeout = 20 * time.Second

    // DefaultPendingMessages is the default number of pending messages
    // handled by Client and Server.
    DefaultPendingMessages = 32 * 1024

    // DefaultFlushDelay is the default delay between message flushes
    // on Client and Server.
    DefaultFlushDelay = -1

    // DefaultBufferSize is the default size for Client and Server buffers.
    DefaultBufferSize = 64 * 1024
)

Variables

var ErrCanceled = &ClientError{
    Canceled: true,
    err:      fmt.Errorf("the call has been canceled"),
}

ErrCanceled may be returned from rpc call if AsyncResult.Cancel has been called.

func NilErrorLogger Uses

func NilErrorLogger(format string, args ...interface{})

NilErrorLogger discards all error messages.

Pass NilErrorLogger to SetErrorLogger() in order to suppress error log generated by gorpc.

func RegisterType Uses

func RegisterType(x interface{})

RegisterType registers the given type to send via rpc.

The client must register all the response types the server may send. The server must register all the request types the client may send.

There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

There is no need in registering argument and return value types for functions and methods registered via Dispatcher.

func SetErrorLogger Uses

func SetErrorLogger(f LoggerFunc)

SetErrorLogger sets the given error logger to use in gorpc.

By default log.Printf is used for error logging.

type AsyncResult Uses

type AsyncResult struct {
    // The response can be read only after <-Done unblocks.
    Response interface{}

    // The error can be read only after <-Done unblocks.
    // The error can be casted to ClientError.
    Error error

    // Response and Error become available after <-Done unblocks.
    Done <-chan struct{}
    // contains filtered or unexported fields
}

AsyncResult is a result returned from Client.CallAsync().

func (*AsyncResult) Cancel Uses

func (m *AsyncResult) Cancel()

Cancel cancels async call.

Canceled call isn't sent to the server unless it is already sent there. Canceled call may successfully complete if it has been already sent to the server before Cancel call.

It is safe calling this function multiple times from concurrently running goroutines.

type Batch Uses

type Batch struct {
    // contains filtered or unexported fields
}

Batch allows grouping and executing multiple RPCs in a single batch.

Batch may be created via Client.NewBatch().

func (*Batch) Add Uses

func (b *Batch) Add(request interface{}) *BatchResult

Add ads new request to the RPC batch.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when Batch.Call*() is called.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*Batch) AddSkipResponse Uses

func (b *Batch) AddSkipResponse(request interface{})

AddSkipResponse adds new request to the RPC batch and doesn't care about the response.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when Batch.Call*() is called.

All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*Batch) Call Uses

func (b *Batch) Call() error

Call calls all the RPCs added via Batch.Add().

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from Batch.Add() after the Call returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.

func (*Batch) CallTimeout Uses

func (b *Batch) CallTimeout(timeout time.Duration) error

CallTimeout calls all the RPCs added via Batch.Add() and waits for all the RPC responses during the given timeout.

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from Batch.Add() after the CallTimeout returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.

type BatchResult Uses

type BatchResult struct {
    // The response can be read only after Batch.Call*() returns.
    Response interface{}

    // The error can be read only after Batch.Call*() returns.
    // The error can be casted to ClientError.
    Error error

    // <-Done unblocks after Batch.Call*() returns.
    // Response and Error become available after <-Done unblocks.
    Done <-chan struct{}
    // contains filtered or unexported fields
}

BatchResult is a result returned from Batch.Add*().

type Client Uses

type Client struct {
    // Server address to connect to.
    //
    // The address format depends on the underlying transport provided
    // by Client.Dial. The following transports are provided out of the box:
    //   * TCP - see NewTCPClient() and NewTCPServer().
    //   * TLS - see NewTLSClient() and NewTLSServer().
    //   * Unix sockets - see NewUnixClient() and NewUnixServer().
    //
    // By default TCP transport is used.
    Addr string

    // The number of concurrent connections the client should establish
    // to the sever.
    // By default only one connection is established.
    Conns int

    // The maximum number of pending requests in the queue.
    //
    // The number of pending requsts should exceed the expected number
    // of concurrent goroutines calling client's methods.
    // Otherwise a lot of ClientError.Overflow errors may appear.
    //
    // Default is DefaultPendingMessages.
    PendingRequests int

    // Delay between request flushes.
    //
    // Negative values lead to immediate requests' sending to the server
    // without their buffering. This minimizes rpc latency at the cost
    // of higher CPU and network usage.
    //
    // Default value is DefaultFlushDelay.
    FlushDelay time.Duration

    // Maximum request time.
    // Default value is DefaultRequestTimeout.
    RequestTimeout time.Duration

    // Disable data compression.
    // By default data compression is enabled.
    DisableCompression bool

    // Size of send buffer per each underlying connection in bytes.
    // Default value is DefaultBufferSize.
    SendBufferSize int

    // Size of recv buffer per each underlying connection in bytes.
    // Default value is DefaultBufferSize.
    RecvBufferSize int

    // OnConnect is called whenever connection to server is established.
    // The callback can be used for authentication/authorization/encryption
    // and/or for custom transport wrapping.
    //
    // See also Dial callback, which can be used for sophisticated
    // transport implementation.
    OnConnect OnConnectFunc

    // The client calls this callback when it needs new connection
    // to the server.
    // The client passes Client.Addr into Dial().
    //
    // Override this callback if you want custom underlying transport
    // and/or authentication/authorization.
    // Don't forget overriding Server.Listener accordingly.
    //
    // See also OnConnect for authentication/authorization purposes.
    //
    // * NewTLSClient() and NewTLSServer() can be used for encrypted rpc.
    // * NewUnixClient() and NewUnixServer() can be used for fast local
    //   inter-process rpc.
    //
    // By default it returns TCP connections established to the Client.Addr.
    Dial DialFunc

    // LogError is used for error logging.
    //
    // By default the function set via SetErrorLogger() is used.
    LogError LoggerFunc

    // Connection statistics.
    //
    // The stats doesn't reset automatically. Feel free resetting it
    // any time you wish.
    Stats ConnStats
    // contains filtered or unexported fields
}

Client implements RPC client.

The client must be started with Client.Start() before use.

It is absolutely safe and encouraged using a single client across arbitrary number of concurrently running goroutines.

Default client settings are optimized for high load, so don't override them without valid reason.

func NewTCPClient Uses

func NewTCPClient(addr string) *Client

NewTCPClient creates a client connecting over TCP to the server listening to the given addr.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewTCPServer().

func NewTLSClient Uses

func NewTLSClient(addr string, cfg *tls.Config) *Client

NewTLSClient creates a client connecting over TLS (aka SSL) to the server listening to the given addr using the given TLS config.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewTLSServer().

func NewUnixClient Uses

func NewUnixClient(addr string) *Client

NewUnixClient creates a client connecting over unix socket to the server listening to the given addr.

The returned client must be started after optional settings' adjustment.

The corresponding server must be created with NewUnixServer().

func (*Client) Call Uses

func (c *Client) Call(request interface{}) (response interface{}, err error)

Call sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during Client.RequestTimeout or server connection problems occur. The returned error can be casted to ClientError.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for distinct calls' construction.

Don't forget starting the client with Client.Start() before calling Client.Call().

func (*Client) CallAsync Uses

func (c *Client) CallAsync(request interface{}) (*AsyncResult, error)

CallAsync starts async rpc call.

Rpc call is complete after <-AsyncResult.Done unblocks. If you want canceling the request, just throw away the returned AsyncResult.

CallAsync doesn't respect Client.RequestTimeout - response timeout may be controlled by the caller via something like:

r := c.CallAsync("foobar")
select {
case <-time.After(c.RequestTimeout):
   log.Printf("rpc timeout!")
case <-r.Done:
   processResponse(r.Response, r.Error)
}

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Don't forget starting the client with Client.Start() before calling Client.CallAsync().

func (*Client) CallTimeout Uses

func (c *Client) CallTimeout(request interface{}, timeout time.Duration) (response interface{}, err error)

CallTimeout sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during the given timeout or server connection problems occur. The returned error can be casted to ClientError.

Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for distinct calls' construction.

Don't forget starting the client with Client.Start() before calling Client.Call().

func (*Client) NewBatch Uses

func (c *Client) NewBatch() *Batch

NewBatch creates new RPC batch.

It is safe creating multiple concurrent batches from a single client.

Don't forget starting the client with Client.Start() before working with batched RPC.

func (*Client) PendingRequestsCount Uses

func (c *Client) PendingRequestsCount() int

PendingRequestsCount returns the instant number of pending requests.

The main purpose of this function is to use in load-balancing schemes where load should be balanced between multiple rpc clients.

Don't forget starting the client with Client.Start() before calling this function.

func (*Client) Send Uses

func (c *Client) Send(request interface{}) error

Send sends the given request to the server and doesn't wait for response.

Since this is 'fire and forget' function, which never waits for response, it cannot guarantee that the server receives and successfully processes the given request. Though in most cases under normal conditions requests should reach the server and it should successfully process them. Send semantics is similar to UDP messages' semantics.

The server may return arbitrary response on Send() request, but the response is totally ignored.

All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Don't forget starting the client with Client.Start() before calling Client.Send().

func (*Client) Start Uses

func (c *Client) Start()

Start starts rpc client. Establishes connection to the server on Client.Addr.

All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

func (*Client) Stop Uses

func (c *Client) Stop()

Stop stops rpc client. Stopped client can be started again.

type ClientError Uses

type ClientError struct {
    // Set if the error is timeout-related.
    Timeout bool

    // Set if the error is connection-related.
    Connection bool

    // Set if the error is server-related.
    Server bool

    // Set if the error is related to internal resources' overflow.
    // Increase PendingRequests if you see a lot of such errors.
    Overflow bool

    // May be set if AsyncResult.Cancel is called.
    Canceled bool
    // contains filtered or unexported fields
}

ClientError is an error Client methods can return.

func (*ClientError) Error Uses

func (e *ClientError) Error() string

type ConnStats Uses

type ConnStats struct {
    // The number of rpc calls performed.
    RPCCalls uint64

    // The total aggregate time for all rpc calls in milliseconds.
    //
    // This time can be used for calculating the average response time
    // per RPC:
    //     avgRPCTtime = RPCTime / RPCCalls
    RPCTime uint64

    // The number of bytes written to the underlying connections.
    BytesWritten uint64

    // The number of bytes read from the underlying connections.
    BytesRead uint64

    // The number of Read() calls.
    ReadCalls uint64

    // The number of Read() errors.
    ReadErrors uint64

    // The number of Write() calls.
    WriteCalls uint64

    // The number of Write() errors.
    WriteErrors uint64

    // The number of Dial() calls.
    DialCalls uint64

    // The number of Dial() errors.
    DialErrors uint64

    // The number of Accept() calls.
    AcceptCalls uint64

    // The number of Accept() errors.
    AcceptErrors uint64
    // contains filtered or unexported fields
}

ConnStats provides connection statistics. Applied to both gorpc.Client and gorpc.Server.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCBytes Uses

func (cs *ConnStats) AvgRPCBytes() (send float64, recv float64)

AvgRPCBytes returns the average bytes sent / received per RPC.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCCalls Uses

func (cs *ConnStats) AvgRPCCalls() (write float64, read float64)

AvgRPCCalls returns the average number of write() / read() syscalls per PRC.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) AvgRPCTime Uses

func (cs *ConnStats) AvgRPCTime() time.Duration

AvgRPCTime returns the average RPC execution time.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

func (*ConnStats) Reset Uses

func (cs *ConnStats) Reset()

Reset resets all the stats counters.

func (*ConnStats) Snapshot Uses

func (cs *ConnStats) Snapshot() *ConnStats

Snapshot returns connection statistics' snapshot.

Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.

type DialFunc Uses

type DialFunc func(addr string) (conn io.ReadWriteCloser, err error)

DialFunc is a function intended for setting to Client.Dial.

It is expected that the returned conn immediately sends all the data passed via Write() to the server. Otherwise gorpc may hang. The conn implementation must call Flush() on underlying buffered streams before returning from Write().

type Dispatcher Uses

type Dispatcher struct {
    // contains filtered or unexported fields
}

Dispatcher helps constructing HandlerFunc for dispatching across multiple functions and/or services.

Dispatcher automatically registers all the request and response types for all functions and/or methods registered via AddFunc() and AddService(), so there is no need in calling RegisterType() for these types on server side. Client-side code must call RegisterType() for non-internal request and response types before issuing RPCs via DispatcherClient.

See examples for details.

Code:

d := NewDispatcher()

// Function without args and return values
incCalls := 0
d.AddFunc("Inc", func() { incCalls++ })

// Function without args
d.AddFunc("Func42", func() int { return 42 })

// Echo function for string
d.AddFunc("Echo", func(s string) string { return s })

// Function with struct arg and return value
type ExampleRequestStruct struct {
    Foo int
    Bar string
}
type ExampleResponseStruct struct {
    Baz    string
    BarLen int
}
d.AddFunc("Struct", func(s *ExampleRequestStruct) *ExampleResponseStruct {
    return &ExampleResponseStruct{
        Baz:    fmt.Sprintf("foo=%d, bar=%s", s.Foo, s.Bar),
        BarLen: len(s.Bar),
    }
})

// Echo function for map
d.AddFunc("Map", func(m map[string]int) map[string]int { return m })

// Echo function for slice
d.AddFunc("Slice", func(s []string) []string { return s })

// Function returning errors
d.AddFunc("Error", func() error { return errors.New("error") })

// Echo function, which may return error if arg is 42
d.AddFunc("Error42", func(x int) (int, error) {
    if x == 42 {
        return 0, errors.New("error42")
    }
    return x, nil
})

// Echo function with client address' validation
d.AddFunc("ClientAddr", func(clientAddr string, x int) (int, error) {
    clientHost := strings.SplitN(clientAddr, ":", 2)[0]
    if clientHost != "allowed.client.host" {
        return 0, fmt.Errorf("invalid rpc client host: [%s]", clientHost)
    }
    return x, nil
})

// Start the server serving all the registered functions above
s := NewTCPServer("127.0.0.1:12345", d.NewHandlerFunc())
if err := s.Start(); err != nil {
    log.Fatalf("Cannot start rpc server: [%s]", err)
}
defer s.Stop()

// Start the client and connect it to the server
c := NewTCPClient("127.0.0.1:12345")
c.Start()
defer c.Stop()

// Create a client wrapper for calling server functions.
dc := d.NewFuncClient(c)

// Call functions defined above
res, err := dc.Call("Inc", nil)
fmt.Printf("Inc=%+v, %+v, %d\n", res, err, incCalls)

res, err = dc.Call("Func42", nil)
fmt.Printf("Func42=%+v, %+v\n", res, err)

res, err = dc.Call("Echo", "foobar")
fmt.Printf("Echo=%+v, %+v\n", res, err)

reqst := &ExampleRequestStruct{
    Foo: 42,
    Bar: "bar",
}
res, err = dc.Call("Struct", reqst)
fmt.Printf("Struct=%+v, %+v\n", res, err)

res, err = dc.Call("Map", map[string]int{"foo": 1, "bar": 2})
resm := res.(map[string]int)
fmt.Printf("Map=foo:%d, bar:%d, %+v\n", resm["foo"], resm["bar"], err)

res, err = dc.Call("Slice", []string{"foo", "bar"})
fmt.Printf("Slice=%+v, %+v\n", res, err)

res, err = dc.Call("Error", nil)
fmt.Printf("Error=%+v, %+v\n", res, err)

res, err = dc.Call("Error42", 123)
fmt.Printf("Error42(123)=%+v, %+v\n", res, err)

res, err = dc.Call("Error42", 42)
fmt.Printf("Error42(42)=%+v, %+v\n", res, err)

Output:

Inc=<nil>, <nil>, 1
Func42=42, <nil>
Echo=foobar, <nil>
Struct=&{Baz:foo=42, bar=bar BarLen:3}, <nil>
Map=foo:1, bar:2, <nil>
Slice=[foo bar], <nil>
Error=<nil>, error
Error42(123)=123, <nil>
Error42(42)=<nil>, error42

Code:

package main

import (
    "errors"
    "fmt"
    "log"
)

type ExampleDispatcherService struct {
    state int
}

func (s *ExampleDispatcherService) Get() int { return s.state }

func (s *ExampleDispatcherService) Set(x int) { s.state = x }

func (s *ExampleDispatcherService) GetError42() (int, error) {
    if s.state == 42 {
        return 0, errors.New("error42")
    }
    return s.state, nil
}

func (s *ExampleDispatcherService) privateFunc(string) { s.state = 0 }

func main() {
    d := NewDispatcher()

    service := &ExampleDispatcherService{
        state: 123,
    }

    // Register exported service functions
    d.AddService("MyService", service)

    // Start rpc server serving registered service.
    addr := "127.0.0.1:7892"
    s := NewTCPServer(addr, d.NewHandlerFunc())
    if err := s.Start(); err != nil {
        log.Fatalf("Cannot start rpc server: [%s]", err)
    }
    defer s.Stop()

    // Start rpc client connected to the server.
    c := NewTCPClient(addr)
    c.Start()
    defer c.Stop()

    // Create client wrapper for calling service functions.
    dc := d.NewServiceClient("MyService", c)

    res, err := dc.Call("Get", nil)
    fmt.Printf("Get=%+v, %+v\n", res, err)

    service.state = 456
    res, err = dc.Call("Get", nil)
    fmt.Printf("Get=%+v, %+v\n", res, err)

    res, err = dc.Call("Set", 78)
    fmt.Printf("Set=%+v, %+v, %+v\n", res, err, service.state)

    res, err = dc.Call("GetError42", nil)
    fmt.Printf("GetError42=%+v, %+v\n", res, err)

    service.state = 42
    res, err = dc.Call("GetError42", nil)
    fmt.Printf("GetError42=%+v, %+v\n", res, err)

}

func NewDispatcher Uses

func NewDispatcher() *Dispatcher

NewDispatcher returns new dispatcher.

func (*Dispatcher) AddFunc Uses

func (d *Dispatcher) AddFunc(funcName string, f interface{})

AddFunc registers the given function f under the name funcName.

The function must accept zero, one or two input arguments. If the function has two arguments, then the first argument must have string type - the server will pass client address in this parameter.

The function must return zero, one or two values.

* If the function has two return values, then the second value must have
  error type - the server will propagate this error to the client.

* If the function returns only error value, then the server treats it
  as error, not return value, when sending to the client.

Arbitrary number of functions can be registered in the dispatcher.

See examples for details.

Code:

d := NewDispatcher()

// Function without arguments and return values
d.AddFunc("NoArgsNoRets", func() {})

// Function with one argument and no return values
d.AddFunc("OneArgNoRets", func(request string) {})

// Function without arguments and one return value
d.AddFunc("NoArgsOneRet", func() int { return 42 })

// Function with two arguments and no return values.
// The first argument must have string type - the server passes
// client address in it.
d.AddFunc("TwoArgsNoRets", func(clientAddr string, requests []byte) {})

// Function with one argument and two return values.
// The second return value must have error type.
d.AddFunc("OneArgTwoRets", func(request []string) ([]string, error) {
    if len(request) == 42 {
        return nil, errors.New("need 42 strings")
    }
    return request, nil
})

func (*Dispatcher) AddService Uses

func (d *Dispatcher) AddService(serviceName string, service interface{})

AddService registers public methods of the given service under the given name serviceName.

Since only public methods are registered, the service must have at least one public method.

All public methods must conform requirements described in AddFunc().

func (*Dispatcher) NewFuncClient Uses

func (d *Dispatcher) NewFuncClient(c *Client) *DispatcherClient

NewFuncClient returns a client suitable for calling functions registered via AddFunc().

func (*Dispatcher) NewHandlerFunc Uses

func (d *Dispatcher) NewHandlerFunc() HandlerFunc

NewHandlerFunc returns HandlerFunc serving all the functions and/or services registered via AddFunc() and AddService().

The returned HandlerFunc must be assigned to Server.Handler or passed to New*Server().

func (*Dispatcher) NewServiceClient Uses

func (d *Dispatcher) NewServiceClient(serviceName string, c *Client) *DispatcherClient

NewServiceClient returns a client suitable for calling methods of the service with name serviceName registered via AddService().

It is safe creating multiple service clients over a single underlying client.

type DispatcherBatch Uses

type DispatcherBatch struct {
    // contains filtered or unexported fields
}

DispatcherBatch allows grouping and executing multiple RPCs in a single batch.

DispatcherBatch may be created via DispatcherClient.NewBatch().

func (*DispatcherBatch) Add Uses

func (b *DispatcherBatch) Add(funcName string, request interface{}) *BatchResult

Add ads new request to the RPC batch.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*DispatcherBatch) AddSkipResponse Uses

func (b *DispatcherBatch) AddSkipResponse(funcName string, request interface{})

AddSkipResponse adds new request to the RPC batch and doesn't care about the response.

The order of batched RPCs execution on the server is unspecified.

All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.

All the non-internal request types must be registered via RegisterType() before the first call to this function.

It is safe adding multiple requests to the same batch from concurrently running goroutines.

func (*DispatcherBatch) Call Uses

func (b *DispatcherBatch) Call() error

Call calls all the RPCs added via DispatcherBatch.Add().

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the Call returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.

func (*DispatcherBatch) CallTimeout Uses

func (b *DispatcherBatch) CallTimeout(timeout time.Duration) error

CallTimeout calls all the RPCs added via DispatcherBatch.Add() and waits for all the RPC responses during the given timeout.

The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.

The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the CallTimeout returns.

It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.

type DispatcherClient Uses

type DispatcherClient struct {
    // contains filtered or unexported fields
}

DispatcherClient is a Client wrapper suitable for calling registered functions and/or for calling methods of the registered services.

func (*DispatcherClient) Call Uses

func (dc *DispatcherClient) Call(funcName string, request interface{}) (response interface{}, err error)

Call calls the given function with the given request.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) CallAsync Uses

func (dc *DispatcherClient) CallAsync(funcName string, request interface{}) (*AsyncResult, error)

CallAsync calls the given function asynchronously.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) CallTimeout Uses

func (dc *DispatcherClient) CallTimeout(funcName string, request interface{}, timeout time.Duration) (response interface{}, err error)

CallTimeout calls the given function and waits for response during the given timeout.

All the non-internal request and response types must be registered via RegisterType() before the first call to this function.

func (*DispatcherClient) NewBatch Uses

func (dc *DispatcherClient) NewBatch() *DispatcherBatch

NewBatch creates new RPC batch for the given DispatcherClient.

It is safe creating multiple concurrent batches from a single client.

Code:

// Create new dispatcher.
d := NewDispatcher()

// Register echo function in the dispatcher.
d.AddFunc("Echo", func(x int) int { return x })

// Start the server serving all the registered functions above
s := NewTCPServer("127.0.0.1:12445", d.NewHandlerFunc())
if err := s.Start(); err != nil {
    log.Fatalf("Cannot start rpc server: [%s]", err)
}
defer s.Stop()

// Start the client and connect it to the server
c := NewTCPClient("127.0.0.1:12445")
c.Start()
defer c.Stop()

// Create a client wrapper for calling server functions.
dc := d.NewFuncClient(c)

// Create new batch for calling server functions.
b := dc.NewBatch()
result := make([]*BatchResult, 3)

// Add RPC messages to the batch.
for i := 0; i < 3; i++ {
    result[i] = b.Add("Echo", i)
}

// Invoke all the RPCs added to the batch.
if err := b.Call(); err != nil {
    log.Fatalf("error when calling RPC batch: [%s]", err)
}

for i := 0; i < 3; i++ {
    r := result[i]
    fmt.Printf("response[%d]=%+v, %+v\n", i, r.Response, r.Error)
}

Output:

response[0]=0, <nil>
response[1]=1, <nil>
response[2]=2, <nil>

func (*DispatcherClient) Send Uses

func (dc *DispatcherClient) Send(funcName string, request interface{}) error

Send sends the given request to the given function and doesn't wait for response.

All the non-internal request types must be registered via RegisterType() before the first call to this function.

type HandlerFunc Uses

type HandlerFunc func(clientAddr string, request interface{}) (response interface{})

HandlerFunc is a server handler function.

clientAddr contains client address returned by Listener.Accept(). Request and response types may be arbitrary. All the request and response types the HandlerFunc may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

Hint: use Dispatcher for HandlerFunc construction.

type Listener Uses

type Listener interface {
    // Init is called on server start.
    //
    // addr contains the address set at Server.Addr.
    Init(addr string) error

    // Accept must return incoming connections from clients.
    // clientAddr must contain client's address in user-readable view.
    //
    // It is expected that the returned conn immediately
    // sends all the data passed via Write() to the client.
    // Otherwise gorpc may hang.
    // The conn implementation must call Flush() on underlying buffered
    // streams before returning from Write().
    Accept() (conn io.ReadWriteCloser, clientAddr string, err error)

    // Close closes the listener.
    // All pending calls to Accept() must immediately return errors after
    // Close is called.
    // All subsequent calls to Accept() must immediately return error.
    Close() error

    // Addr returns the listener's network address.
    ListenAddr() net.Addr
}

Listener is an interface for custom listeners intended for the Server.

type LoggerFunc Uses

type LoggerFunc func(format string, args ...interface{})

LoggerFunc is an error logging function to pass to gorpc.SetErrorLogger().

type OnConnectFunc Uses

type OnConnectFunc func(remoteAddr string, rwc io.ReadWriteCloser) (io.ReadWriteCloser, error)

OnConnectFunc is a callback, which may be called by both Client and Server on every connection creation if assigned to Client.OnConnect / Server.OnConnect.

remoteAddr is the address of the remote end for the established connection rwc.

The callback must return either rwc itself or a rwc wrapper. The returned connection wrapper MUST send all the data to the underlying rwc on every Write() call, otherwise the connection will hang forever.

The callback may be used for authentication/authorization and/or custom transport wrapping.

type Server Uses

type Server struct {
    // Address to listen to for incoming connections.
    //
    // The address format depends on the underlying transport provided
    // by Server.Listener. The following transports are provided
    // out of the box:
    //   * TCP - see NewTCPServer() and NewTCPClient().
    //   * TLS (aka SSL) - see NewTLSServer() and NewTLSClient().
    //   * Unix sockets - see NewUnixServer() and NewUnixClient().
    //
    // By default TCP transport is used.
    Addr string

    // Handler function for incoming requests.
    //
    // Server calls this function for each incoming request.
    // The function must process the request and return the corresponding response.
    //
    // Hint: use Dispatcher for HandlerFunc construction.
    Handler HandlerFunc

    // The maximum number of concurrent rpc calls the server may perform.
    // Default is DefaultConcurrency.
    Concurrency int

    // The maximum delay between response flushes to clients.
    //
    // Negative values lead to immediate requests' sending to the client
    // without their buffering. This minimizes rpc latency at the cost
    // of higher CPU and network usage.
    //
    // Default is DefaultFlushDelay.
    FlushDelay time.Duration

    // The maximum number of pending responses in the queue.
    // Default is DefaultPendingMessages.
    PendingResponses int

    // Size of send buffer per each underlying connection in bytes.
    // Default is DefaultBufferSize.
    SendBufferSize int

    // Size of recv buffer per each underlying connection in bytes.
    // Default is DefaultBufferSize.
    RecvBufferSize int

    // OnConnect is called whenever connection from client is accepted.
    // The callback can be used for authentication/authorization/encryption
    // and/or for custom transport wrapping.
    //
    // See also Listener, which can be used for sophisticated transport
    // implementation.
    OnConnect OnConnectFunc

    // The server obtains new client connections via Listener.Accept().
    //
    // Override the listener if you want custom underlying transport
    // and/or client authentication/authorization.
    // Don't forget overriding Client.Dial() callback accordingly.
    //
    // See also OnConnect for authentication/authorization purposes.
    //
    // * NewTLSClient() and NewTLSServer() can be used for encrypted rpc.
    // * NewUnixClient() and NewUnixServer() can be used for fast local
    //   inter-process rpc.
    //
    // By default it returns TCP connections accepted from Server.Addr.
    Listener Listener

    // LogError is used for error logging.
    //
    // By default the function set via SetErrorLogger() is used.
    LogError LoggerFunc

    // Connection statistics.
    //
    // The stats doesn't reset automatically. Feel free resetting it
    // any time you wish.
    Stats ConnStats
    // contains filtered or unexported fields
}

Server implements RPC server.

Default server settings are optimized for high load, so don't override them without valid reason.

Code:

// Register the given struct for passing as rpc request and/or response.
// All structs intended for passing between client and server
// must be registered via RegisterType().
//
// The struct may contain arbitrary fields, but only public (exported)
// fields are passed between client and server.
type ExampleStruct struct {
    Foo int

    // This feild won't be passed over the wire,
    // since it is private (unexported)
    bar string

    Baz string
}
RegisterType(&ExampleStruct{})

// Start echo server
handlerFunc := func(clientAddr string, request interface{}) interface{} {
    return request
}
s := NewTCPServer("127.0.0.1:43216", handlerFunc)
if err := s.Start(); err != nil {
    log.Fatalf("Cannot start server: [%s]", err)
}
defer s.Stop()

// Connect client to the echo server
c := NewTCPClient("127.0.0.1:43216")
c.Start()
defer c.Stop()

// Echo string
res, err := c.Call("foobar")
fmt.Printf("%+v, %+v\n", res, err)

// Echo int
res, err = c.Call(1234)
fmt.Printf("%+v, %+v\n", res, err)

// Echo string slice
res, err = c.Call([]string{"foo", "bar"})
fmt.Printf("%+v, %+v\n", res, err)

// Echo struct
res, err = c.Call(&ExampleStruct{
    Foo: 123,
    bar: "324",
    Baz: "mmm",
})
fmt.Printf("%+v, %+v\n", res, err)

Output:

foobar, <nil>
1234, <nil>
[foo bar], <nil>
&{Foo:123 bar: Baz:mmm}, <nil>

func NewTCPServer Uses

func NewTCPServer(addr string, handler HandlerFunc) *Server

NewTCPServer creates a server listening for TCP connections on the given addr and processing incoming requests with the given HandlerFunc.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewTCPClient().

func NewTLSServer Uses

func NewTLSServer(addr string, handler HandlerFunc, cfg *tls.Config) *Server

NewTLSServer creates a server listening for TLS (aka SSL) connections on the given addr and processing incoming requests with the given HandlerFunc. cfg must contain TLS settings for the server.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewTLSClient().

func NewUnixServer Uses

func NewUnixServer(addr string, handler HandlerFunc) *Server

NewUnixServer creates a server listening for unix connections on the given addr and processing incoming requests with the given HandlerFunc.

The returned server must be started after optional settings' adjustment.

The corresponding client must be created with NewUnixClient().

func (*Server) Serve Uses

func (s *Server) Serve() error

Serve starts rpc server and blocks until it is stopped.

func (*Server) Start Uses

func (s *Server) Start() error

Start starts rpc server.

All the request and response types the Handler may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.

func (*Server) Stop Uses

func (s *Server) Stop()

Stop stops rpc server. Stopped server can be started again.

Package gorpc imports 16 packages (graph) and is imported by 23 packages. Updated 2019-06-11. Refresh now. Tools for package owners.