grpc

package
v0.0.0-...-c7154f0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2023 License: MIT, Apache-2.0 Imports: 18 Imported by: 0

README

gRPC Tunnels

This package is a hard fork of the project linked above.

There weren't any existing copyright notices, aside from the LICENSE file, so I've added the license header (to the original files), for clarity.

Other notable changes:

  • Deleted stream_adapter.go
  • Renamed and restructured a messages / fields
  • Used the Sesame gRPC metadata type (note that the one in the grpctunnel package was broken for binary header vals)
  • Renamed tunnel_client.go -> channel.go
  • Renamed tunnel_server.go -> tunnel.go
  • Renamed tunnel_test.go -> service_test.go
  • Simplified API by removing pointless ReverseTunnelChannel wrapper
  • Renamed TunnelChannel to Channel
  • Simplified and improved extensibility of API by removing duplicated method pairs (ServeTunnel vs ServeReverseTunnel, NewChannel vs NewReverseChannel), using the option pattern instead
  • Fixed unsafe send behavior via wrapper (see stream.go)
  • Added graceful stop support
  • Fixed CloseSend behavior (error handling)
  • Implemented flow control (involving fixing numerous deadlocks)

From the original readme

This library enables carrying gRPC over gRPC. There are a few niche use cases where this could be useful, but the most widely applicable one is likely for letting gRPC servers communicate in the reverse direction, sending requests to connected clients.

The tunnel is itself a gRPC service, which provides bidirectional streaming methods for forward and reverse tunneling. There is also API for easily configuring the server handlers, be it on the server or (in the case of reverse tunnels) on the client. Similarly, there is API for getting a "channel", from which you can create service stubs. This allows the code that uses the stubs to not even care whether it has a normal gRPC client connection or a stub that sends the data via a tunnel.

There is also API for "light-weight" tunneling, which is where a custom bidirectional stream can be used to send messages back and forth, where the messages each act as RPC requests and responses, but on a single stream (for pinning/affinity, for example).

Documentation

Overview

Package grpc provides tools to support tunneling of gRPC services: carrying gRPC calls over a gRPC stream.

This support includes "pinning" an RPC channel to a single server, by sending all requests on a single gRPC stream. There are also tools for adapting certain kinds of bidirectional stream RPCs into a stub such that a single stream looks like a sequence of unary calls.

This support also includes "reverse services", where a client can initiate a connection to a server and subsequently the server can then wrap that connection with an RPC stub, used to send requests from the server to that client (and the client then replies and sends responses back to the server).

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// OptChannel exposes all the options for NewChannel, available as methods.
	OptChannel ChannelOptions

	// OptTunnel exposes all the options for ServeTunnel, available as methods.
	OptTunnel TunnelOptions
)

Functions

func ServeTunnel

func ServeTunnel(options ...TunnelOption) error
Example (Reflection)

ExampleServeTunnel_reflection demonstrates how to enable the reflection API on the tunnel.

defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*5)

var (
	tunneledService  grpchantesting.TestServer
	handlerMapConfig HandlerMapConfig = func(h *HandlerMap) {
		// once it's serving, h will be registered with the combined set of services from all
		// OptTunnel.Service provided HandlerMapConfig values
		reflection.Register(h)
		grpchantesting.RegisterTestServiceServer(h, &tunneledService)
	}
	tunnelService = &mockServer{openTunnel: func(stream grpctunnel.TunnelService_OpenTunnelServer) error {
		return ServeTunnel(
			OptTunnel.ServerStream(stream),
			OptTunnel.Service(handlerMapConfig),
		)
	}}
)

conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) {
	grpctunnel.RegisterTunnelServiceServer(srv, tunnelService)
})
defer conn.Close()

tunnel, err := grpctunnel.NewTunnelServiceClient(conn).OpenTunnel(context.Background())
if err != nil {
	panic(err)
}

channel, err := NewChannel(OptChannel.ClientStream(tunnel))
if err != nil {
	panic(err)
}

stream, err := refl.NewServerReflectionClient(channel).ServerReflectionInfo(context.Background())
if err != nil {
	panic(err)
}

//lint:ignore SA1019 v1 isnt completely released yet
if err := stream.Send(&refl.ServerReflectionRequest{MessageRequest: &refl.ServerReflectionRequest_ListServices{}}); err != nil {
	panic(err)
}

res, err := stream.Recv()
if err != nil {
	panic(err)
}

var names []string
//lint:ignore SA1019 v1 isnt completely released yet
for _, svc := range res.GetListServicesResponse().GetService() {
	//lint:ignore SA1019 v1 isnt completely released yet
	names = append(names, svc.GetName())
}
sort.Strings(names)
for _, name := range names {
	fmt.Println(name)
}
Output:

grpc.reflection.v1.ServerReflection
grpc.reflection.v1alpha.ServerReflection
grpchantesting.TestService

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a tunnel client, and implements grpc.ClientConnInterface. It is backed by a single stream, though this stream may be either a tunnel client, or a reverse tunnel server.

func NewChannel

func NewChannel(options ...ChannelOption) (*Channel, error)

func (*Channel) Canceled

func (c *Channel) Canceled() bool

func (*Channel) Close

func (c *Channel) Close() error

func (*Channel) Context

func (c *Channel) Context() context.Context

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

func (*Channel) Err

func (c *Channel) Err() error

func (*Channel) Invoke

func (c *Channel) Invoke(ctx context.Context, methodName string, req, resp interface{}, opts ...grpc.CallOption) error

func (*Channel) NewStream

func (c *Channel) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, opts ...grpc.CallOption) (grpc.ClientStream, error)

type ChannelOption

type ChannelOption func(c *channelConfig)

ChannelOption is an option that may be provided to NewChannel.

type ChannelOptions

type ChannelOptions struct{}

ChannelOptions exposes ChannelOption implementations as methods, which are available via the OptChannel package variable. TODO consider adding StopSignal option (for graceful close support)

func (ChannelOptions) ClientStream

ClientStream configures the channel to use a client-side stream.

This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptChannel package variable.

func (ChannelOptions) ServerStream

ServerStream configures the channel to use a server-side stream.

This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptChannel package variable.

type HandlerMap

type HandlerMap struct {
	// contains filtered or unexported fields
}

HandlerMap implements grpc.ServiceRegistrar, and may be provided to reflection.Register. WARNING this type is only intended for use by HandlerMapConfig.

func (*HandlerMap) GetServiceInfo

func (x *HandlerMap) GetServiceInfo() map[string]grpc.ServiceInfo

func (*HandlerMap) RegisterService

func (x *HandlerMap) RegisterService(desc *grpc.ServiceDesc, impl interface{})

type HandlerMapConfig

type HandlerMapConfig func(h *HandlerMap)

HandlerMapConfig models service/handler/server configuration.

type TunnelOption

type TunnelOption func(c *tunnelConfig)

TunnelOption is an option that may be provided to ServeTunnel.

type TunnelOptions

type TunnelOptions struct{}

TunnelOptions exposes TunnelOption implementations as methods, which are available via the OptTunnel package variable.

func (TunnelOptions) ClientStream

ClientStream configures the tunnel to use a client-side stream.

This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptTunnel package variable.

func (TunnelOptions) ServerStream

ServerStream configures the channel to use a server-side stream.

This option group (ClientStream, ServerStream) must be provided exactly once. This method may be accessed via the OptTunnel package variable.

func (TunnelOptions) Service

func (TunnelOptions) Service(config HandlerMapConfig) TunnelOption

Service registers 0-n services into the internal handler, additive with any existing services. Duplicate handlers (for the same service) will result in an error. See also HandlerMap and HandlerMapConfig.

This option may be provided multiple times, and is optional. This method may be accessed via the OptTunnel package variable.

func (TunnelOptions) StopSignal

func (TunnelOptions) StopSignal(ch <-chan struct{}) TunnelOption

StopSignal configures a channel that should be closed to indicate that a (graceful) stop has been initiated. A nil value will result in an error.

This option may be provided at most once. This method may be accessed via the OptTunnel package variable.

type TunnelServer

type TunnelServer struct {
	// If set, reverse tunnels will not be allowed. The server will reply to
	// OpenReverseTunnel requests with an "Unimplemented" error code.
	NoReverseTunnels bool
	// If reverse tunnels are allowed, this callback may be configured to
	// receive information when clients open a reverse tunnel.
	OnReverseTunnelConnect func(channel *Channel)
	// If reverse tunnels are allowed, this callback may be configured to
	// receive information when reverse tunnels are torn down.
	OnReverseTunnelDisconnect func(channel *Channel)
	// Optional function that accepts a reverse tunnel and returns an affinity
	// key. The affinity key values can be used to look up outbound channels,
	// for targeting calls to particular clients or groups of clients.
	AffinityKey func(channel *Channel) interface{}
	// Optional channel to signal graceful close. The channel should be closed
	// to indicate (graceful) stop has been started. Must be set prior to
	// using the server.
	StopSignal <-chan struct{}
	// contains filtered or unexported fields
}

TunnelServer provides an implementation for grpctunnel.TunnelServiceServer. You can register handlers with it, and it will then expose those handlers for incoming tunnels. If no handlers are registered, the server will reply to OpenTunnel requests with an "Unimplemented" error code. The server may still be used for reverse tunnels

For reverse tunnels, if supported, all connected channels (e.g. all clients that have created reverse tunnels) are available. You can also configure a listener to receive notices when channels are connected and disconnected.

Example (Reflection)

ExampleServeTunnel_reflection demonstrates how to enable the reflection API on the tunnel.

defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*5)

svc := new(TunnelServer)
reflection.Register(svc)
grpchantesting.RegisterTestServiceServer(svc, new(grpchantesting.TestServer))

conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) { grpctunnel.RegisterTunnelServiceServer(srv, svc) })
defer conn.Close()

tunnel, err := grpctunnel.NewTunnelServiceClient(conn).OpenTunnel(context.Background())
if err != nil {
	panic(err)
}

channel, err := NewChannel(OptChannel.ClientStream(tunnel))
if err != nil {
	panic(err)
}

stream, err := refl.NewServerReflectionClient(channel).ServerReflectionInfo(context.Background())
if err != nil {
	panic(err)
}

//lint:ignore SA1019 v1 isnt completely released yet
if err := stream.Send(&refl.ServerReflectionRequest{MessageRequest: &refl.ServerReflectionRequest_ListServices{}}); err != nil {
	panic(err)
}

res, err := stream.Recv()
if err != nil {
	panic(err)
}

var names []string
//lint:ignore SA1019 v1 isnt completely released yet
for _, svc := range res.GetListServicesResponse().GetService() {
	//lint:ignore SA1019 v1 isnt completely released yet
	names = append(names, svc.GetName())
}
sort.Strings(names)
for _, name := range names {
	fmt.Println(name)
}
Output:

grpc.reflection.v1.ServerReflection
grpc.reflection.v1alpha.ServerReflection
grpchantesting.TestService

func (*TunnelServer) AllReverseTunnels

func (s *TunnelServer) AllReverseTunnels() []*Channel

func (*TunnelServer) AsChannel

func (s *TunnelServer) AsChannel() grpc.ClientConnInterface

func (*TunnelServer) FindChannel

func (s *TunnelServer) FindChannel(search func(*Channel) bool) *Channel

func (*TunnelServer) GetServiceInfo

func (s *TunnelServer) GetServiceInfo() map[string]grpc.ServiceInfo

func (*TunnelServer) KeyAsChannel

func (s *TunnelServer) KeyAsChannel(key interface{}) grpc.ClientConnInterface

func (*TunnelServer) OpenReverseTunnel

func (*TunnelServer) OpenTunnel

func (*TunnelServer) RegisterService

func (s *TunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL