Documentation ¶
Overview ¶
Package grpc provides tools to support tunneling of gRPC services: carrying gRPC calls over a gRPC stream.
This support includes "pinning" an RPC channel to a single server, by sending all requests on a single gRPC stream. There are also tools for adapting certain kinds of bidirectional stream RPCs into a stub such that a single stream looks like a sequence of unary calls.
This support also includes "reverse services", where a client can initiate a connection to a server and subsequently the server can then wrap that connection with an RPC stub, used to send requests from the server to that client (and the client then replies and sends responses back to the server).
Index ¶
- Variables
- func ServeTunnel(options ...TunnelOption) error
- type Channel
- func (c *Channel) Canceled() bool
- func (c *Channel) Close() error
- func (c *Channel) Context() context.Context
- func (c *Channel) Done() <-chan struct{}
- func (c *Channel) Err() error
- func (c *Channel) Invoke(ctx context.Context, methodName string, req, resp interface{}, ...) error
- func (c *Channel) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, ...) (grpc.ClientStream, error)
- type ChannelOption
- type ChannelOptions
- type HandlerMap
- type HandlerMapConfig
- type TunnelOption
- type TunnelOptions
- func (TunnelOptions) ClientStream(stream grpctunnel.TunnelService_OpenReverseTunnelClient) TunnelOption
- func (TunnelOptions) ServerStream(stream grpctunnel.TunnelService_OpenTunnelServer) TunnelOption
- func (TunnelOptions) Service(config HandlerMapConfig) TunnelOption
- func (TunnelOptions) StopSignal(ch <-chan struct{}) TunnelOption
- type TunnelServer
- func (s *TunnelServer) AllReverseTunnels() []*Channel
- func (s *TunnelServer) AsChannel() grpc.ClientConnInterface
- func (s *TunnelServer) FindChannel(search func(*Channel) bool) *Channel
- func (s *TunnelServer) GetServiceInfo() map[string]grpc.ServiceInfo
- func (s *TunnelServer) KeyAsChannel(key interface{}) grpc.ClientConnInterface
- func (s *TunnelServer) OpenReverseTunnel(stream grpctunnel.TunnelService_OpenReverseTunnelServer) error
- func (s *TunnelServer) OpenTunnel(stream grpctunnel.TunnelService_OpenTunnelServer) error
- func (s *TunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{})
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // OptChannel exposes all the options for NewChannel, available as methods. OptChannel ChannelOptions // OptTunnel exposes all the options for ServeTunnel, available as methods. OptTunnel TunnelOptions )
Functions ¶
func ServeTunnel ¶
func ServeTunnel(options ...TunnelOption) error
Example (Reflection) ¶
ExampleServeTunnel_reflection demonstrates how to enable the reflection API on the tunnel.
defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*5) var ( tunneledService grpchantesting.TestServer handlerMapConfig HandlerMapConfig = func(h *HandlerMap) { // once it's serving, h will be registered with the combined set of services from all // OptTunnel.Service provided HandlerMapConfig values reflection.Register(h) grpchantesting.RegisterTestServiceServer(h, &tunneledService) } tunnelService = &mockServer{openTunnel: func(stream grpctunnel.TunnelService_OpenTunnelServer) error { return ServeTunnel( OptTunnel.ServerStream(stream), OptTunnel.Service(handlerMapConfig), ) }} ) conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) { grpctunnel.RegisterTunnelServiceServer(srv, tunnelService) }) defer conn.Close() tunnel, err := grpctunnel.NewTunnelServiceClient(conn).OpenTunnel(context.Background()) if err != nil { panic(err) } channel, err := NewChannel(OptChannel.ClientStream(tunnel)) if err != nil { panic(err) } stream, err := refl.NewServerReflectionClient(channel).ServerReflectionInfo(context.Background()) if err != nil { panic(err) } //lint:ignore SA1019 v1 isnt completely released yet if err := stream.Send(&refl.ServerReflectionRequest{MessageRequest: &refl.ServerReflectionRequest_ListServices{}}); err != nil { panic(err) } res, err := stream.Recv() if err != nil { panic(err) } var names []string //lint:ignore SA1019 v1 isnt completely released yet for _, svc := range res.GetListServicesResponse().GetService() { //lint:ignore SA1019 v1 isnt completely released yet names = append(names, svc.GetName()) } sort.Strings(names) for _, name := range names { fmt.Println(name) }
Output: grpc.reflection.v1.ServerReflection grpc.reflection.v1alpha.ServerReflection grpchantesting.TestService
Types ¶
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is a tunnel client, and implements grpc.ClientConnInterface. It is backed by a single stream, though this stream may be either a tunnel client, or a reverse tunnel server.
func NewChannel ¶
func NewChannel(options ...ChannelOption) (*Channel, error)
func (*Channel) NewStream ¶
func (c *Channel) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, opts ...grpc.CallOption) (grpc.ClientStream, error)
type ChannelOption ¶
type ChannelOption func(c *channelConfig)
ChannelOption is an option that may be provided to NewChannel.
type ChannelOptions ¶
type ChannelOptions struct{}
ChannelOptions exposes ChannelOption implementations as methods, which are available via the OptChannel package variable. TODO consider adding StopSignal option (for graceful close support)
func (ChannelOptions) ClientStream ¶
func (ChannelOptions) ClientStream(stream grpctunnel.TunnelService_OpenTunnelClient) ChannelOption
ClientStream configures the channel to use a client-side stream.
This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptChannel package variable.
func (ChannelOptions) ServerStream ¶
func (ChannelOptions) ServerStream(stream grpctunnel.TunnelService_OpenReverseTunnelServer) ChannelOption
ServerStream configures the channel to use a server-side stream.
This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptChannel package variable.
type HandlerMap ¶
type HandlerMap struct {
// contains filtered or unexported fields
}
HandlerMap implements grpc.ServiceRegistrar, and may be provided to reflection.Register. WARNING this type is only intended for use by HandlerMapConfig.
func (*HandlerMap) GetServiceInfo ¶
func (x *HandlerMap) GetServiceInfo() map[string]grpc.ServiceInfo
func (*HandlerMap) RegisterService ¶
func (x *HandlerMap) RegisterService(desc *grpc.ServiceDesc, impl interface{})
type HandlerMapConfig ¶
type HandlerMapConfig func(h *HandlerMap)
HandlerMapConfig models service/handler/server configuration.
type TunnelOption ¶
type TunnelOption func(c *tunnelConfig)
TunnelOption is an option that may be provided to ServeTunnel.
type TunnelOptions ¶
type TunnelOptions struct{}
TunnelOptions exposes TunnelOption implementations as methods, which are available via the OptTunnel package variable.
func (TunnelOptions) ClientStream ¶
func (TunnelOptions) ClientStream(stream grpctunnel.TunnelService_OpenReverseTunnelClient) TunnelOption
ClientStream configures the tunnel to use a client-side stream.
This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptTunnel package variable.
func (TunnelOptions) ServerStream ¶
func (TunnelOptions) ServerStream(stream grpctunnel.TunnelService_OpenTunnelServer) TunnelOption
ServerStream configures the channel to use a server-side stream.
This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptTunnel package variable.
func (TunnelOptions) Service ¶
func (TunnelOptions) Service(config HandlerMapConfig) TunnelOption
Service registers 0-n services into the internal handler, additive with any existing services. Duplicate handlers (for the same service) will result in an error. See also HandlerMap and HandlerMapConfig.
This option may be provided multiple times, and is optional. This method may be accessed via the OptTunnel package variable.
func (TunnelOptions) StopSignal ¶
func (TunnelOptions) StopSignal(ch <-chan struct{}) TunnelOption
StopSignal configures a channel that should be closed to indicate that a (graceful) stop has been initiated. A nil value will result in an error.
This option may be provided at most once. This method may be accessed via the OptTunnel package variable.
type TunnelServer ¶
type TunnelServer struct { // If set, reverse tunnels will not be allowed. The server will reply to // OpenReverseTunnel requests with an "Unimplemented" error code. NoReverseTunnels bool // If reverse tunnels are allowed, this callback may be configured to // receive information when clients open a reverse tunnel. OnReverseTunnelConnect func(channel *Channel) // If reverse tunnels are allowed, this callback may be configured to // receive information when reverse tunnels are torn down. OnReverseTunnelDisconnect func(channel *Channel) // Optional function that accepts a reverse tunnel and returns an affinity // key. The affinity key values can be used to look up outbound channels, // for targeting calls to particular clients or groups of clients. AffinityKey func(channel *Channel) interface{} // Optional channel to signal graceful close. The channel should be closed // to indicate (graceful) stop has been started. Must be set prior to // using the server. StopSignal <-chan struct{} // contains filtered or unexported fields }
TunnelServer provides an implementation for grpctunnel.TunnelServiceServer. You can register handlers with it, and it will then expose those handlers for incoming tunnels. If no handlers are registered, the server will reply to OpenTunnel requests with an "Unimplemented" error code. The server may still be used for reverse tunnels
For reverse tunnels, if supported, all connected channels (e.g. all clients that have created reverse tunnels) are available. You can also configure a listener to receive notices when channels are connected and disconnected.
Example (Reflection) ¶
ExampleServeTunnel_reflection demonstrates how to enable the reflection API on the tunnel.
defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*5) svc := new(TunnelServer) reflection.Register(svc) grpchantesting.RegisterTestServiceServer(svc, new(grpchantesting.TestServer)) conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) { grpctunnel.RegisterTunnelServiceServer(srv, svc) }) defer conn.Close() tunnel, err := grpctunnel.NewTunnelServiceClient(conn).OpenTunnel(context.Background()) if err != nil { panic(err) } channel, err := NewChannel(OptChannel.ClientStream(tunnel)) if err != nil { panic(err) } stream, err := refl.NewServerReflectionClient(channel).ServerReflectionInfo(context.Background()) if err != nil { panic(err) } //lint:ignore SA1019 v1 isnt completely released yet if err := stream.Send(&refl.ServerReflectionRequest{MessageRequest: &refl.ServerReflectionRequest_ListServices{}}); err != nil { panic(err) } res, err := stream.Recv() if err != nil { panic(err) } var names []string //lint:ignore SA1019 v1 isnt completely released yet for _, svc := range res.GetListServicesResponse().GetService() { //lint:ignore SA1019 v1 isnt completely released yet names = append(names, svc.GetName()) } sort.Strings(names) for _, name := range names { fmt.Println(name) }
Output: grpc.reflection.v1.ServerReflection grpc.reflection.v1alpha.ServerReflection grpchantesting.TestService
func (*TunnelServer) AllReverseTunnels ¶
func (s *TunnelServer) AllReverseTunnels() []*Channel
func (*TunnelServer) AsChannel ¶
func (s *TunnelServer) AsChannel() grpc.ClientConnInterface
func (*TunnelServer) FindChannel ¶
func (s *TunnelServer) FindChannel(search func(*Channel) bool) *Channel
func (*TunnelServer) GetServiceInfo ¶
func (s *TunnelServer) GetServiceInfo() map[string]grpc.ServiceInfo
func (*TunnelServer) KeyAsChannel ¶
func (s *TunnelServer) KeyAsChannel(key interface{}) grpc.ClientConnInterface
func (*TunnelServer) OpenReverseTunnel ¶
func (s *TunnelServer) OpenReverseTunnel(stream grpctunnel.TunnelService_OpenReverseTunnelServer) error
func (*TunnelServer) OpenTunnel ¶
func (s *TunnelServer) OpenTunnel(stream grpctunnel.TunnelService_OpenTunnelServer) error
func (*TunnelServer) RegisterService ¶
func (s *TunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{})