nrpc

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

README

NRPC

Build Status

NRPC 是一个基于 NATS 的 RPC 框架

通过 .proto 文件生成 NRPC 的客户端和服务端, 同时生 NATS 的消息处理器 MsgHandler.

为什么使用 NATS?

基于 NATS 的 Request-Reply 模型可以获得一些优于 gRPC 的特性:

  • 最小限度服务发现机制: 客户端和服务端仅需要知道 NATS 集群连接结节点, 客户端不需要做其它服务发现
  • 简单的负载均衡: 每个微服务连接相同的 NATS 集群, 使用 NATS Queue Groups 可以随机处理传入请求, 不需要额外的负载均衡器

当然在大规模请求下 NATS 集群本身可能成为一个瓶颈, 类似 gRPC 的流处理及高级认证等功能没有提供支持, 但是 NRPC 简化了操作复杂性, 如果您的应用规模适中仍不失为一个优秀的选择.

NRPC 已成功的在 OpsDash Saas 产品中应用并取得不错的进展.

概览

NRPC 附带了一个 protobuf 编译器插件 protoc-gen-nrpc,它可以 从 .proto 文件生成代码

给定一个 .proto 文件, 例如: helloworld.proto, 则可以这样使用:

$ ls
helloworld.proto
$ protoc --go_out=. --nrpc_out=. helloworld.proto
$ ls
helloworld.nrpc.go	helloworld.pb.go	helloworld.proto

生成的 .pb.go 文件由标准的 Go 插件生成, 包含了消息类的定义, .nrpc.go 文件由 nrpc 插件生成, 包含了服务接口定义以及服务端及客户端调用代码, NATS 消息 Handler.

代码结构:

RPC 模型

调用模式
Request/Reply 请求回复模式
标准模式

服务端订阅消息, 客户端发送 Request 消息获取 Reply 消息

流回复模式

该模式下, 客户端与服务端建立连接后, 服务端持续向客户端发送消息, 客户端方法签名生成回复消息回调函数用于循环读取服务端消息,直至服务端消息发送结束

示例:

Proto 定义:

option (nrpc.packageSubjectParams) = "instance"; // 包主题参数

service SvcCustomSubject {
	option (nrpc.serviceSubject) = 'custom_subject'; // 自定义服务主题
	rpc MtStreamedReply(StringArg) returns (SimpleStringReply) {
  	option (nrpc.streamedReply) = true;
	}
}

服务端实现:

func (s BasicServerImpl) MtStreamedReply(ctx context.Context, req *StringArg, send func(rep *SimpleStringReply)) error {
  if req.GetArg1() == "please fail" {
    panic("Failing")
  }
  if req.GetArg1() == "very long call" {
    select {
    case <-ctx.Done():
      return ctx.Err()
    case <-time.After(time.Minute):
      time.Sleep(time.Minute)
   return nil
    }
  }
  time.Sleep(time.Second)
  send(&SimpleStringReply{Reply: "msg1"})
  time.Sleep(250 * time.Millisecond)
  send(&SimpleStringReply{Reply: "msg2"})
  time.Sleep(250 * time.Millisecond)
  send(&SimpleStringReply{Reply: "msg3"})
  time.Sleep(250 * time.Millisecond)
 return nil
}

客户端调用:

client := NewSvcCustomSubjectClient(c, "default") // default 为包主题参数值
err := client.MtStreamedReply(context.Background(), &StringArg{Arg1: "arg"}, func(ctx context.Context, rep *SimpleStringReply) {
   fmt.Println("received", rep)
   resChan <- rep.GetReply()
})

结果:

 received reply:"msg1"
 received reply:"msg2"
 received reply:"msg3"
NoReply 无回复模式

客户端发布消息, 服务端需要实现消息处理逻辑, 无任何返回值

Proto 定义:

option (nrpc.packageSubjectParams) = "instance"; // 包主题参数

service SvcCustomSubject {
	option (nrpc.serviceSubject) = 'custom_subject'; // 自定义服务主题
  rpc MtRequestNoReply(StringArg) returns (nrpc.NoReply) {}
}

服务端实现:

func (s BasicServerImpl) MtRequestNoReply(ctx context.Context, req *StringArg) {
	s.t.Log("Will publish to MtRequestNoReply")
	s.t.Logf("client publish msg = %v\n", *req)
  // TODO: 处理客户端发送过来的消息, 对于长时处理任务, 可以创建个NoRequest API, 然后在这里给客户端发送消息,客户端订阅消息进行后续处理 如:
  s.handler.MtNoRequestPublish("default", &SimpleStringReply{Reply: "Hi there"})
}

客户端调用:

// 客户端订阅消息
c1 := NewSvcCustomSubjectClient(conn, "default")
arg := "[client.sync]req-noreply -> [server]process -> [server]publish to client"
// 创建同步消息订阅器
sub, err := c1.MtNoRequestSubscribeSync() // NoRequest 模式生成的订阅方法
if err != nil {
  t.Fatal(err)
}
defer sub.Unsubscribe()
err = c1.MtRequestNoReply(&StringArg{Arg1: arg}) // NoReply 客户端方法
if err != nil {
  t.Fatal(err)
}
// 获取消息
reply, err := sub.Next(10 * time.Second) 
if err != nil {
  t.Fatal(err)
}
NoRequest 无请求模式

服务端发布消息, 客户端订阅消息

  • 服务端生成

    服务端==不生成方法接口==, 转而生成 RPC方法名+Publish 方法, 用于消息发布

  • 客户端生成

    • RPC方法名+Subject 方法用于获取消息主题,
    • RPC方法名+Subscribe 用于异步订阅主题
    • RPC方法名+SubscribeSync 用于同步订阅主题
    • RPC方法名+SubscribeChan 用于管道订阅主题

Proto 定义:

option (nrpc.packageSubjectParams) = "instance"; // 包主题参数

service SvcCustomSubject {
	rpc MtNoRequest(nrpc.NoRequest) returns (SimpleStringReply) {}
}

服务端实现:

服务端==不生成方法接口==, 不需要具体实现, 转而生成 RPC方法名+Publish 方法, 服务端其它方法实现进行相关调用

客户端调用:

客户端直接订阅消息主题进行处理, 支持同步订阅, 异步订阅, 管道订阅

// 客户端订阅消息
c1 := NewSvcCustomSubjectClient(conn, "default")
repChan := make(chan string)
// 创建异步消息订阅器
sub, err := c1.MtNoRequestSubscribe(func(reply *SimpleStringReply) {
  defer close(repChan)
  repChan <- reply.GetReply()
})
if err != nil {
  t.Fatal(err)
}
defer sub.Unsubscribe()
// 获取消息
for rep := range repChan {
  t.Log(rep)
}
Protobuf 参数
  • nrpc.Void 空参数

    不改变调用模式, 服务端返回值带 error

  • nrpc.NoReply 改变调用模式 客户端使用PUB模式, 服务端不返回任何值(不带error)

  • nprc.NoRequest 改变调用模式, 不生成方法接口 服务端使用PUB模式发布消息, 方法签名 RPC方法名+Publish 客户端订阅消息,

    • 订阅方法签名 RPC方法名+Subscribe[Sync|Chan]
    • 获取消息主题方法签名RPC方法名+Subject

特性

查看链接获取详细信息:

安装

安装 protoc NRPC 插件:

$ go get github.com/teamlint/nrpc/protoc-gen-nrpc

编译运行示例服务端 greeter_server:

$ go get github.com/teamlint/nrpc/examples/helloworld/greeter_server
$ greeter_server
server is running, ^C quits.

编译运行示例客户端 greeter_client:

$ go get github.com/teamlint/nrpc/examples/helloworld/greeter_client
$ greeter_client
Greeting: Hello world

参考文档

学习如何使用 .proto 文件描述 gRPC 服务, 看这里. 了解更多 NATS, 访问 NATS 官网.

当前状态

当前仅提供对 Go 语言的支持

原始版本由 RapidLoop 构建, Teamlint 进行扩展完善.

TODO

  • NRPC client request timeout or CallOption or Context
  • user protogen refactor protoc-gen-nrpc
  • Hub 动态调用不同客户端, pkg实例参数?

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	SubjectRule_name = map[int32]string{
		0: "COPY",
		1: "TOLOWER",
	}
	SubjectRule_value = map[string]int32{
		"COPY":    0,
		"TOLOWER": 1,
	}
)

Enum value maps for SubjectRule.

View Source
var (
	Error_Type_name = map[int32]string{
		0: "CLIENT",
		1: "SERVER",
		3: "EOS",
		4: "SERVERTOOBUSY",
	}
	Error_Type_value = map[string]int32{
		"CLIENT":        0,
		"SERVER":        1,
		"EOS":           3,
		"SERVERTOOBUSY": 4,
	}
)

Enum value maps for Error_Type.

View Source
var (
	// A custom subject prefix to use instead of the package name
	//
	// optional string packageSubject = 50000;
	E_PackageSubject = &file_nrpc_proto_extTypes[0]
	// Parameters included in the subject at the package level
	//
	// repeated string packageSubjectParams = 50001;
	E_PackageSubjectParams = &file_nrpc_proto_extTypes[1]
	// Default rule to build a service subject from the service name
	//
	// optional nrpc.SubjectRule serviceSubjectRule = 50002;
	E_ServiceSubjectRule = &file_nrpc_proto_extTypes[2]
	// Default rule to build a method subject from its name
	//
	// optional nrpc.SubjectRule methodSubjectRule = 50003;
	E_MethodSubjectRule = &file_nrpc_proto_extTypes[3]
)

Extension fields to descriptor.FileOptions.

View Source
var (
	// A custom subject token to use instead of (service name + serviceSubjectRule)
	//
	// optional string serviceSubject = 51000;
	E_ServiceSubject = &file_nrpc_proto_extTypes[4]
	// Parameters included in the subject at the service level
	//
	// repeated string serviceSubjectParams = 51001;
	E_ServiceSubjectParams = &file_nrpc_proto_extTypes[5]
)

Extension fields to descriptor.ServiceOptions.

View Source
var (
	// A custom subject to use instead of (methor name + methodSubjectRule)
	//
	// optional string methodSubject = 52000;
	E_MethodSubject = &file_nrpc_proto_extTypes[6]
	// Parameters included in the subject at the method level
	//
	// repeated string methodSubjectParams = 52001;
	E_MethodSubjectParams = &file_nrpc_proto_extTypes[7]
	// If true, the method returns a stream of reply messages instead of just one
	//
	// optional bool streamedReply = 52002;
	E_StreamedReply = &file_nrpc_proto_extTypes[8]
)

Extension fields to descriptor.MethodOptions.

View Source
var ErrCanceled = errors.New("Call canceled")
View Source
var ErrEOS = errors.New("End of stream")
View Source
var ErrStreamInvalidMsgCount = errors.New("Stream reply received an incorrect number of messages")

ErrStreamInvalidMsgCount is when a stream reply gets a wrong number of messages

Functions

func Call

func Call(req proto.Message, rep proto.Message, nc NatsConn, subject string, encoding string, timeout time.Duration) error

func Marshal

func Marshal(encoding string, msg proto.Message) ([]byte, error)

func MarshalErrorResponse

func MarshalErrorResponse(encoding string, repErr *Error) ([]byte, error)

func ParseSubject

func ParseSubject(
	packageSubject string, packageParamsCount int,
	serviceSubject string, serviceParamsCount int,
	subject string,
) (packageParams []string, serviceParams []string,
	name string, tail []string, err error,
)

func ParseSubjectTail

func ParseSubjectTail(
	methodParamsCount int,
	tail []string,
) (
	methodParams []string, encoding string, err error,
)

func Publish

func Publish(resp proto.Message, withError *Error, nc NatsConn, subject string, encoding string) error

func Unmarshal

func Unmarshal(encoding string, data []byte, msg proto.Message) error

func UnmarshalResponse

func UnmarshalResponse(encoding string, data []byte, msg proto.Message) error

Types

type ContextKey

type ContextKey int

ContextKey type for storing values into context.Context

const (
	// RequestContextKey is the key for string the request into the context
	RequestContextKey ContextKey = iota
)

type Error

type Error struct {
	Type     Error_Type `protobuf:"varint,1,opt,name=type,proto3,enum=nrpc.Error_Type" json:"type,omitempty"`
	Message  string     `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	MsgCount uint32     `protobuf:"varint,3,opt,name=msgCount,proto3" json:"msgCount,omitempty"`
	// contains filtered or unexported fields
}

func CaptureErrors

func CaptureErrors(fn func() (proto.Message, error)) (msg proto.Message, replyError *Error)

CaptureErrors runs a handler and convert error and panics into proper Error

func (*Error) Descriptor deprecated

func (*Error) Descriptor() ([]byte, []int)

Deprecated: Use Error.ProtoReflect.Descriptor instead.

func (*Error) Error

func (e *Error) Error() string

func (*Error) GetMessage

func (x *Error) GetMessage() string

func (*Error) GetMsgCount

func (x *Error) GetMsgCount() uint32

func (*Error) GetType

func (x *Error) GetType() Error_Type

func (*Error) ProtoMessage

func (*Error) ProtoMessage()

func (*Error) ProtoReflect

func (x *Error) ProtoReflect() protoreflect.Message

func (*Error) Reset

func (x *Error) Reset()

func (*Error) String

func (x *Error) String() string

type Error_Type

type Error_Type int32
const (
	Error_CLIENT        Error_Type = 0
	Error_SERVER        Error_Type = 1
	Error_EOS           Error_Type = 3
	Error_SERVERTOOBUSY Error_Type = 4
)

func (Error_Type) Descriptor

func (Error_Type) Descriptor() protoreflect.EnumDescriptor

func (Error_Type) Enum

func (x Error_Type) Enum() *Error_Type

func (Error_Type) EnumDescriptor deprecated

func (Error_Type) EnumDescriptor() ([]byte, []int)

Deprecated: Use Error_Type.Descriptor instead.

func (Error_Type) Number

func (x Error_Type) Number() protoreflect.EnumNumber

func (Error_Type) String

func (x Error_Type) String() string

func (Error_Type) Type

type Heartbeat added in v0.0.3

type Heartbeat struct {
	Lastbeat bool `protobuf:"varint,1,opt,name=lastbeat,proto3" json:"lastbeat,omitempty"`
	// contains filtered or unexported fields
}

func (*Heartbeat) Descriptor deprecated added in v0.0.3

func (*Heartbeat) Descriptor() ([]byte, []int)

Deprecated: Use Heartbeat.ProtoReflect.Descriptor instead.

func (*Heartbeat) GetLastbeat added in v0.0.3

func (x *Heartbeat) GetLastbeat() bool

func (*Heartbeat) ProtoMessage added in v0.0.3

func (*Heartbeat) ProtoMessage()

func (*Heartbeat) ProtoReflect added in v0.0.3

func (x *Heartbeat) ProtoReflect() protoreflect.Message

func (*Heartbeat) Reset added in v0.0.3

func (x *Heartbeat) Reset()

func (*Heartbeat) String added in v0.0.3

func (x *Heartbeat) String() string

type KeepStreamAlive

type KeepStreamAlive struct {
	// contains filtered or unexported fields
}

func NewKeepStreamAlive

func NewKeepStreamAlive(nc NatsConn, subject string, encoding string, onError func()) *KeepStreamAlive

func (*KeepStreamAlive) Stop

func (k *KeepStreamAlive) Stop()

type NatsConn

type NatsConn interface {
	Publish(subj string, data []byte) error
	PublishRequest(subj, reply string, data []byte) error
	Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)

	Subscribe(subj string, handler nats.MsgHandler) (*nats.Subscription, error)
	ChanSubscribe(subj string, ch chan *nats.Msg) (*nats.Subscription, error)
	SubscribeSync(subj string) (*nats.Subscription, error)

	QueueSubscribe(subj, queue string, handler nats.MsgHandler) (*nats.Subscription, error)
	ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)
	QueueSubscribeSync(subj, queue string) (*nats.Subscription, error)
}

type NoReply

type NoReply struct {
	// contains filtered or unexported fields
}

func (*NoReply) Descriptor deprecated

func (*NoReply) Descriptor() ([]byte, []int)

Deprecated: Use NoReply.ProtoReflect.Descriptor instead.

func (*NoReply) ProtoMessage

func (*NoReply) ProtoMessage()

func (*NoReply) ProtoReflect

func (x *NoReply) ProtoReflect() protoreflect.Message

func (*NoReply) Reset

func (x *NoReply) Reset()

func (*NoReply) String

func (x *NoReply) String() string

type NoRequest

type NoRequest struct {
	// contains filtered or unexported fields
}

func (*NoRequest) Descriptor deprecated

func (*NoRequest) Descriptor() ([]byte, []int)

Deprecated: Use NoRequest.ProtoReflect.Descriptor instead.

func (*NoRequest) ProtoMessage

func (*NoRequest) ProtoMessage()

func (*NoRequest) ProtoReflect

func (x *NoRequest) ProtoReflect() protoreflect.Message

func (*NoRequest) Reset

func (x *NoRequest) Reset()

func (*NoRequest) String

func (x *NoRequest) String() string

type ReplyInboxMaker

type ReplyInboxMaker func(NatsConn) string

ReplyInboxMaker returns a new inbox subject for a given nats connection.

var GetReplyInbox ReplyInboxMaker = func(NatsConn) string {
	return nats.NewInbox()
}

GetReplyInbox is used by StreamCall to get a inbox subject It can be changed by a client lib that needs custom inbox subjects

type Request

type Request struct {
	Context context.Context
	Conn    NatsConn

	KeepStreamAlive *KeepStreamAlive
	StreamContext   context.Context
	StreamCancel    func()
	StreamMsgCount  uint32

	Subject     string
	MethodName  string
	SubjectTail []string

	CreatedAt time.Time
	StartedAt time.Time

	Encoding     string
	NoReply      bool
	ReplySubject string

	PackageParams map[string]string
	ServiceParams map[string]string

	AfterReply func(r *Request, success bool, replySuccess bool)

	Handler func(context.Context) (proto.Message, error)
	// contains filtered or unexported fields
}

Request is a server-side incoming request

func GetRequest

func GetRequest(ctx context.Context) *Request

GetRequest returns the Request associated with a context, or nil if absent

func NewRequest

func NewRequest(ctx context.Context, conn NatsConn, subject string, replySubject string) *Request

NewRequest creates a Request instance

func (*Request) Elapsed

func (r *Request) Elapsed() time.Duration

Elapsed duration since request was started

func (*Request) EnableStreamedReply

func (r *Request) EnableStreamedReply()

EnableStreamedReply enables the streamed reply mode

func (*Request) PackageParam

func (r *Request) PackageParam(key string) string

PackageParam returns a package parameter value, or "" if absent

func (*Request) Run

func (r *Request) Run() (msg proto.Message, replyError *Error)

Run the handler and capture any error. Returns the response or the error that should be returned to the caller

func (*Request) RunAndReply

func (r *Request) RunAndReply()

RunAndReply calls Run() and send the reply back to the caller

func (*Request) SendErrorTooBusy

func (r *Request) SendErrorTooBusy(msg string) error

SendErrorTooBusy cancels the request with a 'SERVERTOOBUSY' error

func (*Request) SendReply

func (r *Request) SendReply(resp proto.Message, withError *Error) error

SendReply sends a reply to the caller

func (*Request) SendStreamReply

func (r *Request) SendStreamReply(msg proto.Message)

SendStreamReply send a reply a part of a stream

func (*Request) ServiceParam

func (r *Request) ServiceParam(key string) string

ServiceParam returns a package parameter value, or "" if absent

func (*Request) SetPackageParam

func (r *Request) SetPackageParam(key, value string)

SetPackageParam sets a package param value

func (*Request) SetServiceParam

func (r *Request) SetServiceParam(key, value string)

SetServiceParam sets a service param value

func (*Request) StreamedReply

func (r *Request) StreamedReply() bool

StreamedReply returns true if the request reply is streamed

type StreamCallSubscription

type StreamCallSubscription struct {
	// contains filtered or unexported fields
}

func NewStreamCallSubscription

func NewStreamCallSubscription(
	ctx context.Context, nc NatsConn, encoding string, subject string,
	timeout time.Duration,
) (*StreamCallSubscription, error)

func StreamCall

func StreamCall(ctx context.Context, nc NatsConn, subject string, req proto.Message, encoding string, timeout time.Duration) (*StreamCallSubscription, error)

func (*StreamCallSubscription) Next

func (sub *StreamCallSubscription) Next(rep proto.Message) error

type SubjectRule

type SubjectRule int32
const (
	SubjectRule_COPY    SubjectRule = 0
	SubjectRule_TOLOWER SubjectRule = 1
)

func (SubjectRule) Descriptor

func (SubjectRule) Enum

func (x SubjectRule) Enum() *SubjectRule

func (SubjectRule) EnumDescriptor deprecated

func (SubjectRule) EnumDescriptor() ([]byte, []int)

Deprecated: Use SubjectRule.Descriptor instead.

func (SubjectRule) Number

func (x SubjectRule) Number() protoreflect.EnumNumber

func (SubjectRule) String

func (x SubjectRule) String() string

func (SubjectRule) Type

type Void

type Void struct {
	// contains filtered or unexported fields
}

func (*Void) Descriptor deprecated

func (*Void) Descriptor() ([]byte, []int)

Deprecated: Use Void.ProtoReflect.Descriptor instead.

func (*Void) ProtoMessage

func (*Void) ProtoMessage()

func (*Void) ProtoReflect

func (x *Void) ProtoReflect() protoreflect.Message

func (*Void) Reset

func (x *Void) Reset()

func (*Void) String

func (x *Void) String() string

type WorkerPool

type WorkerPool struct {
	Context context.Context
	// contains filtered or unexported fields
}

WorkerPool is a pool of workers

func NewWorkerPool

func NewWorkerPool(
	ctx context.Context,
	size uint,
	maxPending uint,
	maxPendingDuration time.Duration,
) *WorkerPool

NewWorkerPool creates a pool of workers

func (*WorkerPool) Close

func (pool *WorkerPool) Close(timeout time.Duration)

Close stops all the workers and wait for their completion If the workers do not stop before the timeout, their context is canceled Will never return if a request ignores the context

func (*WorkerPool) QueueRequest

func (pool *WorkerPool) QueueRequest(request *Request) error

QueueRequest adds a request to the queue Send a SERVERTOOBUSY error to the client if the queue is full

func (*WorkerPool) SetMaxPending

func (pool *WorkerPool) SetMaxPending(value uint)

SetMaxPending changes the queue size

func (*WorkerPool) SetMaxPendingDuration

func (pool *WorkerPool) SetMaxPendingDuration(value time.Duration)

SetMaxPendingDuration changes the max pending delay

func (*WorkerPool) SetSize

func (pool *WorkerPool) SetSize(size uint)

SetSize changes the number of workers

Directories

Path Synopsis
examples
alloptions
Package main generated by protoc-gen-nrpc.
Package main generated by protoc-gen-nrpc.
helloworld/helloworld
Package helloworld generated by protoc-gen-nrpc.
Package helloworld generated by protoc-gen-nrpc.
metrics_helloworld/helloworld
Package helloworld generated by protoc-gen-nrpc.
Package helloworld generated by protoc-gen-nrpc.
nooption
Package nooption generated by protoc-gen-nrpc.
Package nooption generated by protoc-gen-nrpc.
bindata
Package bindata generated by go-bindata.// sources: assets/nrpc.tmpl
Package bindata generated by go-bindata.// sources: assets/nrpc.tmpl

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL