totem

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2023 License: Apache-2.0 Imports: 49 Imported by: 13

README

Totem is a Go library that can turn a single gRPC stream into bidirectional unary gRPC servers.

Background

Streaming RPCs enable several useful design patterns for client-server connections that can't be done with unary RPCs. For example, keeping track of long-lived client connections, and sending server-initiated requests to such clients. However, implementing bidirectional messaging over streams can quickly become very complicated for non-trivial use cases. Totem enables these design patterns and abstracts away the underlying stream, allowing you to implement your streaming RPC in terms of simpler unary RPCs.

Examples

See the examples directory for example code.

Documentation

Index

Constants

View Source
const Forward = "(forward)"

Method placeholder to distinguish forwarded raw RPC messages.

View Source
const (
	ServerReflection_ListServices_FullMethodName = "/totem.ServerReflection/ListServices"
)
View Source
const TracerName = "totem"

Variables

View Source
var (
	ReplicationStrategy_name = map[int32]string{
		0: "First",
		1: "Broadcast",
	}
	ReplicationStrategy_value = map[string]int32{
		"First":     0,
		"Broadcast": 1,
	}
)

Enum value maps for ReplicationStrategy.

View Source
var (
	// optional totem.QOS qos = 10001;
	E_Qos = &file_github_com_kralicky_totem_extensions_proto_extTypes[0]
)

Extension fields to descriptorpb.MethodOptions.

View Source
var (
	// optional totem.Visibility visibility = 10002;
	E_Visibility = &file_github_com_kralicky_totem_extensions_proto_extTypes[1]
)

Extension fields to descriptorpb.ServiceOptions.

View Source
var ErrTimeout = fmt.Errorf("timed out")
View Source
var File_github_com_kralicky_totem_extensions_proto protoreflect.FileDescriptor
View Source
var File_github_com_kralicky_totem_totem_proto protoreflect.FileDescriptor
View Source
var ServerReflection_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "totem.ServerReflection",
	HandlerType: (*ServerReflectionServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "ListServices",
			Handler:    _ServerReflection_ListServices_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "github.com/kralicky/totem/totem.proto",
}

ServerReflection_ServiceDesc is the grpc.ServiceDesc for ServerReflection service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

View Source
var TracingEnabled = false

Controls whether or not tracing is enabled. Must only be set once at startup. Defaults to false.

Functions

func RegisterServerReflectionServer added in v1.1.14

func RegisterServerReflectionServer(s grpc.ServiceRegistrar, srv ServerReflectionServer)

func TracerProvider added in v1.2.0

func TracerProvider(opts ...resource.Option) (_tp trace.TracerProvider)

func WaitErrOrTimeout

func WaitErrOrTimeout(errC <-chan error, timeout time.Duration) error

Types

type ClientConn added in v1.1.14

type ClientConn struct {
	// contains filtered or unexported fields
}

func (*ClientConn) Invoke added in v1.1.14

func (cc *ClientConn) Invoke(
	ctx context.Context,
	method string,
	req any,
	reply any,
	callOpts ...grpc.CallOption,
) error

func (*ClientConn) NewStream added in v1.1.14

func (cc *ClientConn) NewStream(
	ctx context.Context,
	desc *grpc.StreamDesc,
	method string,
	opts ...grpc.CallOption,
) (grpc.ClientStream, error)

type ClientStream

type ClientStream interface {
	Stream
	grpc.ClientStream
}

type DiscoveryRequest added in v1.1.14

type DiscoveryRequest struct {
	Initiator     string   `protobuf:"bytes,1,opt,name=initiator,proto3" json:"initiator,omitempty"`
	Visited       []string `protobuf:"bytes,2,rep,name=visited,proto3" json:"visited,omitempty"`
	RemainingHops int32    `protobuf:"varint,3,opt,name=remainingHops,proto3" json:"remainingHops,omitempty"`
	// contains filtered or unexported fields
}

func (*DiscoveryRequest) Descriptor deprecated added in v1.1.14

func (*DiscoveryRequest) Descriptor() ([]byte, []int)

Deprecated: Use DiscoveryRequest.ProtoReflect.Descriptor instead.

func (*DiscoveryRequest) GetInitiator added in v1.1.14

func (x *DiscoveryRequest) GetInitiator() string

func (*DiscoveryRequest) GetRemainingHops added in v1.1.14

func (x *DiscoveryRequest) GetRemainingHops() int32

func (*DiscoveryRequest) GetVisited added in v1.1.14

func (x *DiscoveryRequest) GetVisited() []string

func (*DiscoveryRequest) ProtoMessage added in v1.1.14

func (*DiscoveryRequest) ProtoMessage()

func (*DiscoveryRequest) ProtoReflect added in v1.1.14

func (x *DiscoveryRequest) ProtoReflect() protoreflect.Message

func (*DiscoveryRequest) Reset added in v1.1.14

func (x *DiscoveryRequest) Reset()

func (*DiscoveryRequest) String added in v1.1.14

func (x *DiscoveryRequest) String() string

type InterceptorConfig added in v1.1.14

type InterceptorConfig struct {
	// This interceptor functions similarly to a standard unary server interceptor,
	// and will be called for RPCs that are about to be invoked locally. When
	// an RPC is passed through to a spliced stream, this interceptor will not
	// be called.
	Incoming grpc.UnaryServerInterceptor

	// This interceptor functions similarly to a standard unary client interceptor,
	// with the one caveat that the [grpc.ClientConn] passed to the interceptor
	// will always be nil, and must not be used. The interceptor should still
	// forward the nil argument to the invoker for potential forward compatibility.
	// This interceptor is not called for RPCs being passed through to a spliced
	// stream.
	Outgoing grpc.UnaryClientInterceptor
}

type MD added in v1.1.14

type MD struct {
	Data map[string]*MDValues `` /* 149-byte string literal not displayed */
	// contains filtered or unexported fields
}

func FromMD added in v1.1.14

func FromMD(md metadata.MD) *MD

func (*MD) Descriptor deprecated added in v1.1.14

func (*MD) Descriptor() ([]byte, []int)

Deprecated: Use MD.ProtoReflect.Descriptor instead.

func (*MD) GetData added in v1.1.14

func (x *MD) GetData() map[string]*MDValues

func (*MD) KV added in v1.1.14

func (md *MD) KV() []attribute.KeyValue

func (*MD) Keys added in v1.1.14

func (md *MD) Keys() []string

func (*MD) ProtoMessage added in v1.1.14

func (*MD) ProtoMessage()

func (*MD) ProtoReflect added in v1.1.14

func (x *MD) ProtoReflect() protoreflect.Message

func (*MD) Reset added in v1.1.14

func (x *MD) Reset()

func (*MD) String added in v1.1.14

func (x *MD) String() string

func (*MD) ToMD added in v1.1.14

func (md *MD) ToMD() metadata.MD

type MDValues added in v1.1.14

type MDValues struct {
	Items []string `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
	// contains filtered or unexported fields
}

func (*MDValues) Descriptor deprecated added in v1.1.14

func (*MDValues) Descriptor() ([]byte, []int)

Deprecated: Use MDValues.ProtoReflect.Descriptor instead.

func (*MDValues) GetItems added in v1.1.14

func (x *MDValues) GetItems() []string

func (*MDValues) ProtoMessage added in v1.1.14

func (*MDValues) ProtoMessage()

func (*MDValues) ProtoReflect added in v1.1.14

func (x *MDValues) ProtoReflect() protoreflect.Message

func (*MDValues) Reset added in v1.1.14

func (x *MDValues) Reset()

func (*MDValues) String added in v1.1.14

func (x *MDValues) String() string

type MethodInvoker

type MethodInvoker interface {
	Invoke(ctx context.Context, rpc *RPC) ([]byte, error)
	TopologyFlags() TopologyFlags
}

type MetricsExporter added in v1.1.14

type MetricsExporter struct {
	// contains filtered or unexported fields
}

func NewMetricsExporter added in v1.1.14

func NewMetricsExporter(provider metric.MeterProvider, staticAttrs ...attribute.KeyValue) *MetricsExporter

func (*MetricsExporter) TrackRxBytes added in v1.1.14

func (m *MetricsExporter) TrackRxBytes(service, method string, count int64)

func (*MetricsExporter) TrackSvcRxLatency added in v1.1.14

func (m *MetricsExporter) TrackSvcRxLatency(service, method string, latency time.Duration)

func (*MetricsExporter) TrackSvcTxLatency added in v1.1.14

func (m *MetricsExporter) TrackSvcTxLatency(service, method string, latency time.Duration)

func (*MetricsExporter) TrackTxBytes added in v1.1.14

func (m *MetricsExporter) TrackTxBytes(service, method string, count int64)

type QOS added in v1.1.14

type QOS struct {
	ReplicationStrategy ReplicationStrategy `protobuf:"varint,1,opt,name=replicationStrategy,proto3,enum=totem.ReplicationStrategy" json:"replicationStrategy,omitempty"`
	// contains filtered or unexported fields
}

func (*QOS) Descriptor deprecated added in v1.1.14

func (*QOS) Descriptor() ([]byte, []int)

Deprecated: Use QOS.ProtoReflect.Descriptor instead.

func (*QOS) GetReplicationStrategy added in v1.1.14

func (x *QOS) GetReplicationStrategy() ReplicationStrategy

func (*QOS) ProtoMessage added in v1.1.14

func (*QOS) ProtoMessage()

func (*QOS) ProtoReflect added in v1.1.14

func (x *QOS) ProtoReflect() protoreflect.Message

func (*QOS) Reset added in v1.1.14

func (x *QOS) Reset()

func (*QOS) String added in v1.1.14

func (x *QOS) String() string

type RPC

type RPC struct {
	Tag         uint64 `protobuf:"varint,1,opt,name=tag,proto3" json:"tag,omitempty"`
	ServiceName string `protobuf:"bytes,2,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
	MethodName  string `protobuf:"bytes,3,opt,name=methodName,proto3" json:"methodName,omitempty"`
	// Types that are assignable to Content:
	//
	//	*RPC_Request
	//	*RPC_Response
	Content  isRPC_Content `protobuf_oneof:"content"`
	Metadata *MD           `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

func (*RPC) Descriptor deprecated

func (*RPC) Descriptor() ([]byte, []int)

Deprecated: Use RPC.ProtoReflect.Descriptor instead.

func (*RPC) GetContent

func (m *RPC) GetContent() isRPC_Content

func (*RPC) GetMetadata added in v1.1.14

func (x *RPC) GetMetadata() *MD

func (*RPC) GetMethodName added in v1.1.14

func (x *RPC) GetMethodName() string

func (*RPC) GetRequest

func (x *RPC) GetRequest() []byte

func (*RPC) GetResponse

func (x *RPC) GetResponse() *Response

func (*RPC) GetServiceName added in v1.1.14

func (x *RPC) GetServiceName() string

func (*RPC) GetTag

func (x *RPC) GetTag() uint64

func (*RPC) ProtoMessage

func (*RPC) ProtoMessage()

func (*RPC) ProtoReflect

func (x *RPC) ProtoReflect() protoreflect.Message

func (*RPC) QualifiedMethodName added in v1.1.14

func (r *RPC) QualifiedMethodName() string

func (*RPC) Reset

func (x *RPC) Reset()

func (*RPC) String

func (x *RPC) String() string

type RPC_Request

type RPC_Request struct {
	Request []byte `protobuf:"bytes,4,opt,name=request,proto3,oneof"`
}

type RPC_Response

type RPC_Response struct {
	Response *Response `protobuf:"bytes,5,opt,name=response,proto3,oneof"`
}

type ReplicationStrategy added in v1.1.14

type ReplicationStrategy int32
const (
	ReplicationStrategy_First     ReplicationStrategy = 0
	ReplicationStrategy_Broadcast ReplicationStrategy = 1
)

func (ReplicationStrategy) Descriptor added in v1.1.14

func (ReplicationStrategy) Enum added in v1.1.14

func (ReplicationStrategy) EnumDescriptor deprecated added in v1.1.14

func (ReplicationStrategy) EnumDescriptor() ([]byte, []int)

Deprecated: Use ReplicationStrategy.Descriptor instead.

func (ReplicationStrategy) Number added in v1.1.14

func (ReplicationStrategy) String added in v1.1.14

func (x ReplicationStrategy) String() string

func (ReplicationStrategy) Type added in v1.1.14

type Response

type Response struct {
	Response    []byte         `protobuf:"bytes,1,opt,name=response,proto3" json:"response,omitempty"`
	StatusProto *status.Status `protobuf:"bytes,2,opt,name=statusProto,proto3" json:"statusProto,omitempty"`
	// contains filtered or unexported fields
}

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) GetResponse

func (x *Response) GetResponse() []byte

func (*Response) GetStatus added in v1.1.14

func (r *Response) GetStatus() *status.Status

func (*Response) GetStatusProto added in v1.1.14

func (x *Response) GetStatusProto() *status.Status

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

type Server

type Server struct {
	ServerOptions
	// contains filtered or unexported fields
}

func NewServer

func NewServer(stream Stream, opts ...ServerOption) (*Server, error)

func (*Server) Context added in v1.1.14

func (r *Server) Context() context.Context

Returns the server's stream context. Only valid after Serve has been called.

func (*Server) RegisterService

func (r *Server) RegisterService(desc *grpc.ServiceDesc, impl interface{})

Implements grpc.ServiceRegistrar

func (*Server) Serve

func (r *Server) Serve() (grpc.ClientConnInterface, <-chan error)

Serve starts the totem server, which takes control of the stream and begins handling incoming and outgoing RPCs.

func (*Server) Splice

func (r *Server) Splice(stream Stream, opts ...ServerOption) error

Splice configures this server to forward any incoming RPCs for the given service(s) to a different totem stream. The totem server will handle closing the spliced stream.

type ServerOption added in v1.1.14

type ServerOption func(*ServerOptions)

func WithDiscoveryHopLimit added in v1.2.0

func WithDiscoveryHopLimit(limit int32) ServerOption

func WithInterceptors added in v1.1.14

func WithInterceptors(config InterceptorConfig) ServerOption

func WithMetrics added in v1.1.14

func WithMetrics(provider *metric.MeterProvider, staticAttrs ...attribute.KeyValue) ServerOption

func WithName added in v1.1.14

func WithName(name string) ServerOption

func WithTracerOptions added in v1.2.0

func WithTracerOptions(opts ...resource.Option) ServerOption

type ServerOptions added in v1.1.14

type ServerOptions struct {
	// contains filtered or unexported fields
}

type ServerReflectionClient added in v1.1.14

type ServerReflectionClient interface {
	ListServices(ctx context.Context, in *DiscoveryRequest, opts ...grpc.CallOption) (*ServiceInfo, error)
}

ServerReflectionClient is the client API for ServerReflection service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

func NewServerReflectionClient added in v1.1.14

func NewServerReflectionClient(cc grpc.ClientConnInterface) ServerReflectionClient

type ServerReflectionServer added in v1.1.14

type ServerReflectionServer interface {
	ListServices(context.Context, *DiscoveryRequest) (*ServiceInfo, error)
	// contains filtered or unexported methods
}

ServerReflectionServer is the server API for ServerReflection service. All implementations must embed UnimplementedServerReflectionServer for forward compatibility

type ServerStream

type ServerStream interface {
	Stream
	grpc.ServerStream
}

type ServiceHandler added in v1.1.14

type ServiceHandler struct {
	Descriptor     *descriptorpb.ServiceDescriptorProto
	MethodInvokers map[string]MethodInvoker
	MethodQOS      map[string]*QOS
	TopologyFlags  TopologyFlags
	// contains filtered or unexported fields
}

func NewDefaultServiceHandler added in v1.1.14

func NewDefaultServiceHandler(
	ctx context.Context,
	descriptor *descriptorpb.ServiceDescriptorProto,
	invoker MethodInvoker,
) *ServiceHandler

type ServiceHandlerList added in v1.1.14

type ServiceHandlerList struct {
	// contains filtered or unexported fields
}

func (*ServiceHandlerList) Append added in v1.1.14

func (s *ServiceHandlerList) Append(sh *ServiceHandler)

func (*ServiceHandlerList) First added in v1.1.14

func (s *ServiceHandlerList) First() *ServiceHandler

func (*ServiceHandlerList) Len added in v1.1.14

func (s *ServiceHandlerList) Len() int

func (*ServiceHandlerList) Range added in v1.1.14

func (s *ServiceHandlerList) Range(fn func(sh *ServiceHandler) bool) bool

type ServiceInfo added in v1.1.14

type ServiceInfo struct {
	Services []*descriptorpb.ServiceDescriptorProto `protobuf:"bytes,1,rep,name=services,proto3" json:"services,omitempty"`
	// contains filtered or unexported fields
}

func (*ServiceInfo) Descriptor deprecated added in v1.1.14

func (*ServiceInfo) Descriptor() ([]byte, []int)

Deprecated: Use ServiceInfo.ProtoReflect.Descriptor instead.

func (*ServiceInfo) GetServices added in v1.1.14

func (x *ServiceInfo) GetServices() []*descriptorpb.ServiceDescriptorProto

func (*ServiceInfo) MethodNames added in v1.1.14

func (i *ServiceInfo) MethodNames() []string

func (*ServiceInfo) ProtoMessage added in v1.1.14

func (*ServiceInfo) ProtoMessage()

func (*ServiceInfo) ProtoReflect added in v1.1.14

func (x *ServiceInfo) ProtoReflect() protoreflect.Message

func (*ServiceInfo) Reset added in v1.1.14

func (x *ServiceInfo) Reset()

func (*ServiceInfo) ServiceNames added in v1.2.0

func (i *ServiceInfo) ServiceNames() []string

func (*ServiceInfo) String added in v1.1.14

func (x *ServiceInfo) String() string

type Stream

type Stream interface {
	Send(*RPC) error
	Recv() (*RPC, error)
	Context() context.Context
}

type StreamController added in v1.1.14

type StreamController struct {
	UnimplementedServerReflectionServer
	StreamControllerOptions
	// contains filtered or unexported fields
}

func NewStreamController added in v1.1.14

func NewStreamController(stream Stream, opts StreamControllerOptions) *StreamController

NewStreamController creates a new stream controller for the given stream and method set. There can be at most one stream controller per stream.

func (*StreamController) CloseOrRecv added in v1.1.14

func (sh *StreamController) CloseOrRecv() error

If the stream is a client stream, this will call CloseSend on the stream. If the stream is a server stream, this will call Recv on the stream until it returns io.EOF.

func (*StreamController) Kick added in v1.1.14

func (sh *StreamController) Kick(err error)

func (*StreamController) ListServices added in v1.1.14

func (sh *StreamController) ListServices(ctx context.Context, req *DiscoveryRequest) (*ServiceInfo, error)

func (*StreamController) NewInvoker added in v1.1.14

func (sh *StreamController) NewInvoker() *streamControllerInvoker

func (*StreamController) RegisterServiceHandler added in v1.1.14

func (sh *StreamController) RegisterServiceHandler(handler *ServiceHandler)

func (*StreamController) Reply added in v1.1.14

func (sh *StreamController) Reply(ctx context.Context, tag uint64, data []byte)

func (*StreamController) ReplyErr added in v1.1.14

func (sh *StreamController) ReplyErr(ctx context.Context, tag uint64, reply error)

func (*StreamController) Request added in v1.1.14

func (sh *StreamController) Request(ctx context.Context, m *RPC) <-chan *RPC

func (*StreamController) Run added in v1.1.14

func (sh *StreamController) Run(ctx context.Context) error

Run will start the stream controller and block until the stream is finished. This function should only be called once.

type StreamControllerOptions added in v1.1.14

type StreamControllerOptions struct {
	Metrics *MetricsExporter
	Name    string
	Logger  *zap.Logger

	// Rx/Tx metrics are tracked in the following places:
	// - For outgoing requests/incoming replies, in the clientconn.
	// - For incoming requests/outgoing replies, in the localServiceInvoker.
	WorkerPoolParams WorkerPoolParameters

	// TracerOptions should contain service name/namespace keys if multiple
	// services are running in the same process.
	TracerOptions []resource.Option

	BaseTopologyFlags TopologyFlags
}

type TopologyFlags added in v1.2.0

type TopologyFlags int
const (
	TopologyLocal TopologyFlags = 1 << iota
	TopologySelf
	TopologySpliced
)

func (TopologyFlags) DisplayName added in v1.2.0

func (tf TopologyFlags) DisplayName() string

type UnimplementedServerReflectionServer added in v1.1.14

type UnimplementedServerReflectionServer struct {
}

UnimplementedServerReflectionServer must be embedded to have forward compatible implementations.

func (UnimplementedServerReflectionServer) ListServices added in v1.1.14

type UnsafeServerReflectionServer added in v1.1.14

type UnsafeServerReflectionServer interface {
	// contains filtered or unexported methods
}

UnsafeServerReflectionServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to ServerReflectionServer will result in compilation errors.

type Visibility added in v1.1.14

type Visibility struct {
	SplicedClients bool `protobuf:"varint,1,opt,name=splicedClients,proto3" json:"splicedClients,omitempty"`
	// contains filtered or unexported fields
}

func (*Visibility) Descriptor deprecated added in v1.1.14

func (*Visibility) Descriptor() ([]byte, []int)

Deprecated: Use Visibility.ProtoReflect.Descriptor instead.

func (*Visibility) GetSplicedClients added in v1.1.14

func (x *Visibility) GetSplicedClients() bool

func (*Visibility) ProtoMessage added in v1.1.14

func (*Visibility) ProtoMessage()

func (*Visibility) ProtoReflect added in v1.1.14

func (x *Visibility) ProtoReflect() protoreflect.Message

func (*Visibility) Reset added in v1.1.14

func (x *Visibility) Reset()

func (*Visibility) String added in v1.1.14

func (x *Visibility) String() string

type WorkerPoolParameters added in v1.2.0

type WorkerPoolParameters struct {
	MaxWorkers       int
	MinWorkers       int
	MaxCapacity      int
	ResizingStrategy pond.ResizingStrategy
	IdleTimeout      time.Duration
}

func DefaultWorkerPoolParams added in v1.2.0

func DefaultWorkerPoolParams() WorkerPoolParameters

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL