server: Index | Files | Directories

package rpc

import ""

Package rpc provides goma specific rpc features on gRPC.

Load balancing RPC client

gRPC doesn't provide good loadbalancing client (for kubernetes, yet). Typical gRPC client could be created as follows:

conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
	log.Fatalf("did not connect: %v", err)
defer conn.Close()
c := pb.NewGreeterClient(conn)

This package provides load balancing client if address has multiple IP addresses:

type GreeterClient struct {
	c *rpc.Client

func NewGreeterClient(address string, opts ...grpc.DialOption) GreeterClient {
	return GreeterClient{
		c: rpc.NewClient(address,
			func(cc *grpc.ClientConn) interface{} {
				return pb.NewGreeterClient(cc)
			}, opts...),

func (c GreeterClient) SayHello(ctx context.Context, in *pb.Req, opts...grpc.CallOption) (*pb.Resp, error) {
	var resp *pb.Resp
	var err error
	err = c.Call(ctx, c.client.Pick, "",
		func(client interface{}) error {
			resp, err = client.(GreeterClient).SayHello(ctx, in, opts...)
			return err
	return resp, err

c := NewGreeterClient(address, grpc.WithInsecure())

rpc call would return codes.Unavailable if no backends available for address.

TODO: use grpc's Balancer? TODO: use statefulset for sharding?


Package Files

client.go debug.go doc.go id.go retry.go

func Render Uses

func Render(w http.ResponseWriter, req *http.Request)

Render renders backend information. It is handled on /debug/backends on default http mux.

func RequestID Uses

func RequestID(r *gomapb.RequesterInfo) string

RequestID returns string identifier of this request.

func TagID Uses

func TagID(ctx context.Context, r *gomapb.RequesterInfo) (context.Context, string)

TagID tags request id in context.

type Client Uses

type Client struct {
    // contains filtered or unexported fields

Client represents load-balancing client.

func NewClient Uses

func NewClient(ctx context.Context, target string, newc func(cc *grpc.ClientConn) interface{}, opts ...Option) *Client

NewClient creates new load-balancing client. newc should return grpc client interface for given *grpc.ClientConn. target is <hostname>:<port>. TODO: support grpc new naming?

func (*Client) Call Uses

func (c *Client) Call(ctx context.Context, picker func(context.Context, interface{}) (*backend, error), key interface{}, f func(interface{}) error) error

Call calls new rpc call. picker and key will be used to pick backend. picker will be Client's Pick, or Shard. Pick will use key for backend addr, or empty for least loaded. Shard will use key for sharding. Rand will use key for *RandomState. f is called with grpc client inferface for selected backend.

func (*Client) Close Uses

func (c *Client) Close() error

func (*Client) Shard Uses

func (c *Client) Shard(ctx context.Context, key interface{}) (*backend, error)

Shard picks one backend from client's target to request the key, and returns selected backend.

type Option Uses

type Option func(*Client)

Option configures Client.

func DialOption Uses

func DialOption(opt grpc.DialOption) Option

DialOption returns an Option to configure dial option.

func DialOptions Uses

func DialOptions(opts ...grpc.DialOption) []Option

DialOptions converts grpc.DialOptions to Options.

type RetriableError Uses

type RetriableError struct {
    Err    error
    Max    int
    Delay  time.Duration
    Factor float64

RetriableError represents retriable error in Retry.Do.

func (RetriableError) Error Uses

func (e RetriableError) Error() string

type Retry Uses

type Retry struct {
    // MaxRetry represents how many times to retry.
    // If it is not positive, it retries while error is
    // codes.Unavailable or codes.ResourceExhausted.
    MaxRetry  int
    BaseDelay time.Duration
    MaxDelay  time.Duration

    // backoff factor. default is 1.6
    Factor float64

Retry handles rpc retry.

func (Retry) Do Uses

func (r Retry) Do(ctx context.Context, f func() error) error

Do calls f with retry, while f returns RetriableError, codes.Unavailable or codes.ResourceExhausted. It returns codes.DeadlineExceeded if ctx is cancelled. It returns last error if f returns error other than codes.Unavailable or codes.ResourceExhausted, or it reaches too many retries. It respects RetriableError.Delay or errdetail RetryInfo if it is specified as error details.


grpctestPackage grpctest provides a test server for unit tests that use gRPC.

Package rpc imports 22 packages (graph) and is imported by 11 packages. Updated 2020-10-27. Refresh now. Tools for package owners.