natsrpc

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

README

  _   _       _______ _____   _____  _____   _____ 
 | \ | |   /\|__   __/ ____| |  __ \|  __ \ / ____|
 |  \| |  /  \  | | | (___   | |__) | |__) | |     
 | . ` | / /\ \ | |  \___ \  |  _  /|  ___/| |     
 | |\  |/ ____ \| |  ____) | | | \ \| |    | |____ 
 |_| \_/_/    \_\_| |_____/  |_|  \_\_|     \_____|

What is NATSRPC

NATSRPC 是一个基于NATS作为消息通信,使用gRPC的方式来定义接口的RPC框架

GitHub release (with filter)

Motivation

NATS收发消息需要手动定义subject,request,reply,handler等繁琐且易出错的代码。 gRPC需要连接到可知endpoint才能发送请求。 NATRPC的目的就是要像gRPC一样定义接口,像NATS一样不关心具体网络位置,只需要监听和发送就能完成RPC调用。

Feature

  • 使用gRPC接口定义方式,使用简单,一键生成代码
  • 支持空间隔离,也可以指定id发送
  • 多服务可以负载均衡(nats的同组内随机)
  • 支持Header和返回Error
  • 支持单协程和多协程handle
  • 支持中间件
  • 支持延迟回复消息
  • 支持自定义编码器

How It Works

上层通过Server、Service、Client对nats.Conn和Subscription进行封装。
底层通过nats的request和publish来传输消息。一个Service会创建一个以service name为subject的Subscription,如果有publish方法会在创建一个用于接收publish的sub。
Client发请求时会的subject是service 的name,并且nats msg的header传递method name。
Service收到消息后取出method name,然后调用对应的handler,handler返回的结果会通过nats msg的reply subject返回给Client。

Install Tools

  1. protoc(v3.17.3) Linux/MacOS/Windows
  2. protoc-gen-gogo go install github.com/gogo/protobuf/protoc-gen-gogo@v1.3.2
  3. protoc-gen-natsrpc go install github.com/byebyebruce/natsrpc/cmd/protoc-gen-natsrpc@v0.7.0

Quick Start

  1. 引用包

    go get github.com/byebyebruce/natsrpc
    
  2. 定义服务接口 example.proto

    syntax = "proto3";
    
    package example;
    option go_package = "github.com/byebyebruce/natsrpc/example;example";
    
    message HelloRequest {
      string name = 1;
    }
    
    message HelloReply {
      string message = 1;
    }
    
    service Greeter {
      rpc Hello (HelloRequest) returns (HelloReply) {}
    }
    
  3. 生成客户端和服务端代码

    protoc --proto_path=. \
    --gogo_out=paths=source_relative:. \
    --natsrpc_out=paths=source_relative:. \
    *.proto
    
  4. Server端实现接口并创建服务

    type HelloSvc struct {
    }
    
    func (s *HelloSvc) Hello(ctx context.Context, req *example.HelloRequest) (*example.HelloReply, error) {
        return &example.HelloReply{
            Message: "hello " + req.Name,
        }, nil
    }
    
    func main() {
        conn, err := nats.Connect(*nats_url)
        defer conn.Close()
    
        server, err := natsrpc.NewServer(conn)
        defer server.Close(context.Background())
    
        svc, err := example.RegisterGreetingNRServer(server, &HelloSvc{})
        defer svc.Close()
    
        select{
        }
    }
    
    
  5. Client 调用 rpc

    client:=natsrpc.NewClient(conn)
    
    cli := example.NewGreeterNRClient(client)
    rsp,err:=cli.Hello(context.Background(), &example.HelloRequest{Name: "natsrpc"})
    

Examples

here

Bench Tool

  1. 请求 go run ./example/tool/request_bench -url=nats://127.0.0.1:4222
  2. 广播 go run ./example/tool/publish_bench -url=nats://127.0.0.1:4222

TODO

  • service 定义文件改成gRPC标准
  • 支持返回错误
  • 支持Header
  • 生成Client接口
  • 支持中间件
  • 默认多线程,同时支持单一个线程
  • 支持goroutine池
  • 支持字节池

Documentation

Index

Constants

View Source
const (
	SupportVersion_0_7_0 = true
)
View Source
const (
	Version = "v0.7.0"
)

Variables

View Source
var (
	ErrHeaderFormat     = errors.New("natsrpc: header format error")
	ErrDuplicateService = errors.New("natsrpc: duplicate service")
	ErrNoMethod         = errors.New("natsrpc: no method")
	ErrNoMeta           = errors.New("natsrpc: no meta data")
	ErrEmptyReply       = errors.New("natsrpc: reply is empty")

	// ErrReplyLater
	// It's not an error, when you want to reply message later, then return this.
	ErrReplyLater = errors.New("natsrpc: reply later")
)
View Source
var DefaultClientOptions = ClientOptions{
	// contains filtered or unexported fields
}

DefaultClientOptions 默认client选项

View Source
var DefaultServerOptions = ServerOptions{
	// contains filtered or unexported fields
}

DefaultServerOptions 默认server选项

View Source
var DefaultServiceOptions = ServiceOptions{
	// contains filtered or unexported fields
}

DefaultServiceOptions 默认service选项

View Source
var (
	// optional bool publish = 2360;
	E_Publish = &file_natsrpc_proto_extTypes[0] // true表示广播(不需要返回值), false表示request(需要返回值)
)

Extension fields to descriptorpb.MethodOptions.

View Source
var File_natsrpc_proto protoreflect.FileDescriptor

Functions

func CallHeader added in v0.6.0

func CallHeader(ctx context.Context) map[string]string

CallHeader 获得call Header

func MakeReplyFunc added in v0.6.0

func MakeReplyFunc[T any](ctx context.Context) (replay func(T, error) error)

MakeReplyFunc 构造一个延迟返回函数

func Reply added in v0.1.0

func Reply(ctx context.Context, rep interface{}, repErr error) error

Reply 用手动回复消息. 当用户要延迟返回结果时, 可以在当前handle函数 return nil, ErrReplyLater. 然后在其他地方调用Reply函数

例如:

func XXHandle(ctx context.Context, req *XXReq) (*XXRep, error) {
	go func() {
		time.Sleep(time.Second)
		Reply(ctx, &XXRep{}, nil)
	}
	return nil, ErrReplyLater
}

Types

type CallOption added in v0.0.5

type CallOption func(options *CallOptions)

CallOption call option

func WithCallHeader added in v0.1.0

func WithCallHeader(hd map[string]string) CallOption

WithCallHeader header

type CallOptions added in v0.1.0

type CallOptions struct {
	// contains filtered or unexported fields
}

CallOptions 调用选项

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client RPC client

func NewClient

func NewClient(conn *nats.Conn, opts ...ClientOption) *Client

NewClient 构造器

func (*Client) Publish

func (c *Client) Publish(service, method string, req interface{}, opt ...CallOption) error

Publish 发布

func (*Client) Request

func (c *Client) Request(ctx context.Context, service, method string, req interface{}, rep interface{}, opt ...CallOption) error

Request 请求

type ClientInterface added in v0.7.0

type ClientInterface interface {
	// Publish 发布
	Publish(service, method string, req interface{}, opt ...CallOption) error

	// Request 请求
	Request(ctx context.Context, service, method string, req interface{}, rep interface{}, opt ...CallOption) error
}

ClientInterface 客户端接口

type ClientOption added in v0.0.5

type ClientOption func(options *ClientOptions)

func WithClientEncoder added in v0.6.0

func WithClientEncoder(encoder Encoder) ClientOption

WithClientEncoder 编码

func WithClientID added in v0.0.5

func WithClientID(id string) ClientOption

WithClientID call id(不会覆盖clientOptions.id,只是用来标识这次调用)

func WithClientNamespace added in v0.0.5

func WithClientNamespace(namespace string) ClientOption

WithClientNamespace 空间集群

type ClientOptions added in v0.6.0

type ClientOptions struct {
	// contains filtered or unexported fields
}

ClientOptions client 选项

type Encoder added in v0.6.0

type Encoder interface {
	// Encode 编码
	Encode(v interface{}) ([]byte, error)

	// Decode 解码
	Decode(data []byte, vPtr interface{}) error
}

Encoder 编码器

type Handler added in v0.6.0

type Handler func(svc interface{}, ctx context.Context, req interface{}) (interface{}, error)

type Interceptor added in v0.6.0

type Interceptor func(ctx context.Context, method string, req interface{}, invoker Invoker) (interface{}, error)

type Invoker added in v0.6.0

type Invoker func(ctx context.Context, req interface{}) (interface{}, error)

type MethodDesc added in v0.6.0

type MethodDesc struct {
	MethodName  string       // 方法名
	Handler     Handler      // 方法处理函数
	IsPublish   bool         // 是否发布
	RequestType reflect.Type // 请求类型
}

MethodDesc 方法描述

func (MethodDesc) NewRequest added in v0.6.0

func (md MethodDesc) NewRequest() any

NewRequest 创建请求 后面优化成静态的

type Server

type Server struct {
	Encoder
	// contains filtered or unexported fields
}

Server RPC server

func NewServer

func NewServer(conn *nats.Conn, option ...ServerOption) (*Server, error)

NewServer 构造器

func (*Server) Close

func (s *Server) Close(ctx context.Context) (err error)

Close 关闭

func (*Server) Register

func (s *Server) Register(sd ServiceDesc, val interface{}, opts ...ServiceOption) (ServiceInterface, error)

Register 注册服务

func (*Server) Remove added in v0.6.0

func (s *Server) Remove(name string) bool

Remove 移除一个服务

func (*Server) UnSubscribeAll added in v0.6.0

func (s *Server) UnSubscribeAll() error

UnSubscribeAll 取消所有订阅

type ServerInterface added in v0.7.0

type ServerInterface interface {
	Encoder
	Remove(string) bool
}

type ServerOption added in v0.0.5

type ServerOption func(options *ServerOptions)

ServerOption server option

func WithErrorHandler added in v0.1.0

func WithErrorHandler(h func(interface{})) ServerOption

WithErrorHandler error handler

func WithServerEncoder added in v0.6.0

func WithServerEncoder(encoder Encoder) ServerOption

WithServerEncoder 编码

func WithServerRecovery added in v0.0.5

func WithServerRecovery(h func(interface{})) ServerOption

WithServerRecovery recover handler

type ServerOptions added in v0.6.0

type ServerOptions struct {
	// contains filtered or unexported fields
}

ServerOptions server 选项

type Service added in v0.0.2

type Service struct {
	// contains filtered or unexported fields
}

Service 服务

func NewService added in v0.6.0

func NewService(server ServerInterface, sd ServiceDesc, i interface{}, options ServiceOptions) (*Service, error)

NewService 创建服务

func (*Service) Call added in v0.6.0

func (s *Service) Call(ctx context.Context, methodName string, b []byte, interceptor Interceptor) ([]byte, error)

func (*Service) Close added in v0.0.2

func (s *Service) Close() bool

Close 关闭 会取消所有订阅

func (*Service) Name added in v0.0.2

func (s *Service) Name() string

Name 名字

type ServiceDesc added in v0.6.0

type ServiceDesc struct {
	ServiceName string       // 服务名
	Methods     []MethodDesc // 方法列表
	Metadata    string       // 元数据
}

ServiceDesc 服务描述

type ServiceInterface added in v0.7.0

type ServiceInterface interface {
	// Name 名字
	Name() string

	// Close 关闭
	Close() bool
}

ServiceInterface 服务

type ServiceOption added in v0.0.5

type ServiceOption func(options *ServiceOptions)

ServiceOption Service option

func WithServiceID added in v0.0.5

func WithServiceID(id string) ServiceOption

WithServiceID id

func WithServiceInterceptor added in v0.6.0

func WithServiceInterceptor(i Interceptor) ServiceOption

WithServiceInterceptor handler 拦截器

func WithServiceNamespace added in v0.0.5

func WithServiceNamespace(namespace string) ServiceOption

WithServiceNamespace 空间集群

func WithServiceSingleGoroutine added in v0.6.0

func WithServiceSingleGoroutine() ServiceOption

WithServiceSingleGoroutine 单协程,不并发handle,给那种消息需要顺序处理的情况

func WithServiceTimeout added in v0.0.5

func WithServiceTimeout(timeout time.Duration) ServiceOption

WithServiceTimeout 超时时间

type ServiceOptions added in v0.6.0

type ServiceOptions struct {
	// contains filtered or unexported fields
}

ServiceOptions Service 选项

type ServiceRegistrar added in v0.6.0

type ServiceRegistrar interface {
	// Register 注册
	Register(sd ServiceDesc, svc any, opt ...ServiceOption) (ServiceInterface, error)
}

ServiceRegistrar 注册服务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL