rpc

package
v2.10.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Balancer  = "balancer"
	Connector = "connector"
	Server    = "backend"
	Database  = "database"
	Loader    = "loader"
)
View Source
const (
	DefaultQueue  = "dq"
	DefaultSuffix = ""
	DefaultExName = "exchange"
	DefaultExType = "direct"
	FanoutExType  = "fanout"
	TopicEXType   = "topic"
	DefaultRtKey  = ""
	DefaultReply  = "reply"
	JsonSuffix    = "json"
)
View Source
const (
	CodeTypeJson  = "json"
	CodeTypeProto = "proto"
)
View Source
const (
	MsgTypeRequest  MessageType = 0x00
	MsgTypePublish              = 0x01
	MsgTypeResponse             = 0x02
)

Message types

Variables

View Source
var (
	ErrorClosed  = errors.New("rabbitmq closed connection")
	ErrorBlocked = errors.New("rabbitmq blocked")
	ErrorTimeout = errors.New("rabbitmq publish timeout")
)
View Source
var (
	ErrInvalidMessage = errors.New("invalid message")
)

Functions

func DefRpcClose added in v2.7.0

func DefRpcClose()

func DefRpcInit added in v2.0.6

func DefRpcInit()

公用调用方法

func DefaultCallback

func DefaultCallback(req *MsgRpc) []byte

func Find added in v2.5.2

func Find(serverType string, arg any, options ...discover.FilterOption) *treaty.Server

func Publish

func Publish(s ReqBuilder) error

func PublishBroadcast added in v2.0.5

func PublishBroadcast(s ReqBuilder) error

func QueuePublish added in v2.0.5

func QueuePublish(s ReqBuilder) error

func QueueRequest added in v2.0.5

func QueueRequest(s ReqBuilder) error

func RemoveFindCache added in v2.5.2

func RemoveFindCache(arg any)

func Request

func Request(s ReqBuilder) error

Types

type CallbackFunc

type CallbackFunc func(req *MsgRpc) []byte

type DefaultRpcEncoder

type DefaultRpcEncoder struct {
	// contains filtered or unexported fields
}

func NewRpcEncoder

func NewRpcEncoder(encoder serialize.Serializer) *DefaultRpcEncoder

func (*DefaultRpcEncoder) Decode

func (r *DefaultRpcEncoder) Decode(data []byte, rpcMsg *MsgRpc) error

func (*DefaultRpcEncoder) DecodeMsg

func (r *DefaultRpcEncoder) DecodeMsg(data []byte, v any) error

func (*DefaultRpcEncoder) Encode

func (r *DefaultRpcEncoder) Encode(rpcMsg *MsgRpc) ([]byte, error)

Encode Protocol --------<length>--------|--type--|----<MsgId>------|-<data>- ----------3byte---------|-1 byte-|-----4 byte------|--------

func (*DefaultRpcEncoder) EncodeMsg added in v2.6.9

func (r *DefaultRpcEncoder) EncodeMsg(v any) ([]byte, error)

func (*DefaultRpcEncoder) Response

func (r *DefaultRpcEncoder) Response(v any) []byte

type EncoderRpc

type EncoderRpc interface {
	Encode(rpcMsg *MsgRpc) ([]byte, error)
	Decode(data []byte, rpcMsg *MsgRpc) error
	EncodeMsg(v any) ([]byte, error)
	DecodeMsg(data []byte, v any) error
	Response(v any) []byte
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler() *Handler

func (*Handler) DealMsg

func (h *Handler) DealMsg(codeType string, server ServerRpc, req *MsgRpc) ([]byte, error)

func (*Handler) Register

func (h *Handler) Register(msgId int32, v any)

type HandlerItem

type HandlerItem struct {
	MsgType MessageType
	InType  reflect.Type
	Func    reflect.Value
}

type HttpHandler added in v2.0.2

type HttpHandler interface {
	Run(addr ...string) error
}

type MessageType

type MessageType byte

type MsgHandler added in v2.0.3

type MsgHandler interface {
	Register(msgId int32, v any)
	DealMsg(codeType string, server ServerRpc, req *MsgRpc) ([]byte, error)
}

type MsgRpc

type MsgRpc struct {
	MsgType MessageType
	MsgId   int32
	MsgData any
}

type NatsRpc

type NatsRpc struct {
	Endpoints   []string
	Options     []nats.Option
	Client      *nats.Conn
	DialTimeout time.Duration
	RpcCoder    map[string]EncoderRpc
	Server      *treaty.Server
	DebugMsg    bool
	Prefix      string
	Finder      *discover.Finder
}

func NewRpcNats

func NewRpcNats(opts ...NatsRpcOption) *NatsRpc

func (*NatsRpc) Close added in v2.6.9

func (r *NatsRpc) Close() error

func (*NatsRpc) DealMsg

func (r *NatsRpc) DealMsg(msg *nats.Msg, callback CallbackFunc, coder EncoderRpc)

func (*NatsRpc) DecodeMsg

func (r *NatsRpc) DecodeMsg(codeType string, data []byte, v any) error

func (*NatsRpc) EncodeMsg

func (r *NatsRpc) EncodeMsg(coder EncoderRpc, msgType MessageType, msgId int32, req any) ([]byte, error)

func (*NatsRpc) Find

func (r *NatsRpc) Find(serverType string, arg any, options ...discover.FilterOption) *treaty.Server

func (*NatsRpc) GetCoder

func (r *NatsRpc) GetCoder(codeType string) EncoderRpc

func (*NatsRpc) GetServer

func (r *NatsRpc) GetServer() *treaty.Server

func (*NatsRpc) Publish

func (r *NatsRpc) Publish(s ReqBuilder) error

func (*NatsRpc) PublishBroadcast

func (r *NatsRpc) PublishBroadcast(s ReqBuilder) error

func (*NatsRpc) QueuePublish added in v2.0.3

func (r *NatsRpc) QueuePublish(s ReqBuilder) error

func (*NatsRpc) QueueRequest added in v2.0.3

func (r *NatsRpc) QueueRequest(s ReqBuilder) error

func (*NatsRpc) QueueSubscribe

func (r *NatsRpc) QueueSubscribe(s RssBuilder) error

func (*NatsRpc) RegEncoder

func (r *NatsRpc) RegEncoder(typ string, encoder EncoderRpc)

func (*NatsRpc) RemoveFindCache

func (r *NatsRpc) RemoveFindCache(arg any)

func (*NatsRpc) Request

func (r *NatsRpc) Request(s ReqBuilder) error

func (*NatsRpc) Response

func (r *NatsRpc) Response(codeType string, v any) []byte

func (*NatsRpc) SendMsg added in v2.7.0

func (r *NatsRpc) SendMsg(s ReqBuilder) error

func (*NatsRpc) Subscribe

func (r *NatsRpc) Subscribe(s RssBuilder) error

func (*NatsRpc) SubscribeBroadcast

func (r *NatsRpc) SubscribeBroadcast(s RssBuilder) error

type NatsRpcOption

type NatsRpcOption func(r *NatsRpc)

func WithNatsDebugMsg

func WithNatsDebugMsg(debug bool) NatsRpcOption

func WithNatsDialTimeout

func WithNatsDialTimeout(timeout time.Duration) NatsRpcOption

func WithNatsEndpoints

func WithNatsEndpoints(endpoints []string) NatsRpcOption

func WithNatsOptions

func WithNatsOptions(opts ...nats.Option) NatsRpcOption

func WithNatsPrefix

func WithNatsPrefix(prefix string) NatsRpcOption

func WithNatsServer

func WithNatsServer(server *treaty.Server) NatsRpcOption

type Option added in v2.0.3

type Option func(b *ServerBase)

func WithBroadcastEventHandler added in v2.0.3

func WithBroadcastEventHandler(handler CallbackFunc) Option

func WithInnerMsgHandler added in v2.0.3

func WithInnerMsgHandler(handler MsgHandler) Option

func WithPlugin added in v2.0.3

func WithPlugin(plugin ServerPlugin) Option

func WithPlugins added in v2.0.3

func WithPlugins(plugins []ServerPlugin) Option

func WithSelfEventHandler added in v2.0.3

func WithSelfEventHandler(handler CallbackFunc) Option

type RabbitMqRpc added in v2.6.9

type RabbitMqRpc struct {
	Endpoints   []string //地址取第一条
	DebugMsg    bool
	Prefix      string
	RpcCoder    map[string]EncoderRpc
	Server      *treaty.Server
	Finder      *discover.Finder
	Client      *amqp.Connection
	DialTimeout time.Duration
	ReplyQueues sync.Map
	ChanPool    *pool.Pool[*amqp.Channel]
	// contains filtered or unexported fields
}

func NewRpcRabbitMq added in v2.6.9

func NewRpcRabbitMq(opts ...RabbitMqRpcOption) *RabbitMqRpc

func (*RabbitMqRpc) AddBlockedNotifier added in v2.7.3

func (r *RabbitMqRpc) AddBlockedNotifier(notifier chan amqp.Blocking)

func (*RabbitMqRpc) AddCloseNotifier added in v2.7.3

func (r *RabbitMqRpc) AddCloseNotifier(notifier chan *amqp.Error)

func (*RabbitMqRpc) Close added in v2.6.9

func (r *RabbitMqRpc) Close() error

func (*RabbitMqRpc) DealMsg added in v2.6.9

func (r *RabbitMqRpc) DealMsg(s RssBuilder, ch *amqp.Channel, msg amqp.Delivery, callback CallbackFunc, coder EncoderRpc)

func (*RabbitMqRpc) DecodeMsg added in v2.6.9

func (r *RabbitMqRpc) DecodeMsg(codeType string, data []byte, v any) error

func (*RabbitMqRpc) EncodeMsg added in v2.6.9

func (r *RabbitMqRpc) EncodeMsg(coder EncoderRpc, msgType MessageType, msgId int32, req any) ([]byte, error)

func (*RabbitMqRpc) EncodeMsgRaw added in v2.7.0

func (r *RabbitMqRpc) EncodeMsgRaw(coder EncoderRpc, msgType MessageType, msgId int32, req any) ([]byte, error)

func (*RabbitMqRpc) Find added in v2.6.9

func (r *RabbitMqRpc) Find(serverType string, arg any, options ...discover.FilterOption) *treaty.Server

func (*RabbitMqRpc) GetCoder added in v2.6.9

func (r *RabbitMqRpc) GetCoder(codeType string) EncoderRpc

func (*RabbitMqRpc) GetReplyQueue added in v2.7.0

func (r *RabbitMqRpc) GetReplyQueue(subReply string) (*RabbitReplyQueue, error)

func (*RabbitMqRpc) GetServer added in v2.6.9

func (r *RabbitMqRpc) GetServer() *treaty.Server

func (*RabbitMqRpc) Publish added in v2.6.9

func (r *RabbitMqRpc) Publish(s ReqBuilder) error

发送消息

func (*RabbitMqRpc) PublishBroadcast added in v2.6.9

func (r *RabbitMqRpc) PublishBroadcast(s ReqBuilder) error

func (*RabbitMqRpc) QueuePublish added in v2.6.9

func (r *RabbitMqRpc) QueuePublish(s ReqBuilder) error

func (*RabbitMqRpc) QueueRequest added in v2.6.9

func (r *RabbitMqRpc) QueueRequest(s ReqBuilder) error

func (*RabbitMqRpc) QueueSubscribe added in v2.6.9

func (r *RabbitMqRpc) QueueSubscribe(s RssBuilder) error

func (*RabbitMqRpc) RegEncoder added in v2.6.9

func (r *RabbitMqRpc) RegEncoder(typ string, encoder EncoderRpc)

func (*RabbitMqRpc) RemoveFindCache added in v2.6.9

func (r *RabbitMqRpc) RemoveFindCache(arg any)

func (*RabbitMqRpc) Request added in v2.6.9

func (r *RabbitMqRpc) Request(s ReqBuilder) error

func (*RabbitMqRpc) Response added in v2.6.9

func (r *RabbitMqRpc) Response(codeType string, v any) []byte

func (*RabbitMqRpc) SendMsg added in v2.7.0

func (r *RabbitMqRpc) SendMsg(s ReqBuilder) error

发送消息

func (*RabbitMqRpc) Subscribe added in v2.6.9

func (r *RabbitMqRpc) Subscribe(s RssBuilder) error

func (*RabbitMqRpc) SubscribeBroadcast added in v2.6.9

func (r *RabbitMqRpc) SubscribeBroadcast(s RssBuilder) error

type RabbitMqRpcOption added in v2.6.9

type RabbitMqRpcOption func(r *RabbitMqRpc)

func WithRabbitBlockedNotifier added in v2.7.3

func WithRabbitBlockedNotifier(notifier chan amqp.Blocking) RabbitMqRpcOption

func WithRabbitCloseNotifier added in v2.7.3

func WithRabbitCloseNotifier(notifier chan *amqp.Error) RabbitMqRpcOption

func WithRabbitMqDebugMsg added in v2.6.9

func WithRabbitMqDebugMsg(debug bool) RabbitMqRpcOption

func WithRabbitMqDialTimeout added in v2.6.9

func WithRabbitMqDialTimeout(timeout time.Duration) RabbitMqRpcOption

func WithRabbitMqEndpoints added in v2.6.9

func WithRabbitMqEndpoints(endpoints []string) RabbitMqRpcOption

func WithRabbitMqPrefix added in v2.6.9

func WithRabbitMqPrefix(prefix string) RabbitMqRpcOption

func WithRabbitMqServer added in v2.6.9

func WithRabbitMqServer(server *treaty.Server) RabbitMqRpcOption

type RabbitReplyQueue added in v2.7.0

type RabbitReplyQueue struct {
	QueueName string
	WaitMap   map[string]*RabbitWaitItem
	WaitChan  chan *RabbitWaitItem
	ReplyChan <-chan amqp.Delivery
	RpcCoder  map[string]EncoderRpc
}

func NewRabbitReplyQueue added in v2.7.0

func NewRabbitReplyQueue(name string, ch <-chan amqp.Delivery, rpcCoder map[string]EncoderRpc) *RabbitReplyQueue

func (*RabbitReplyQueue) WaitReply added in v2.7.0

func (r *RabbitReplyQueue) WaitReply()

type RabbitWaitItem added in v2.7.0

type RabbitWaitItem struct {
	CorrId   string
	CodeType string
	MsgData  any
	MsgReply chan *MsgRpc
}

type ReqBuilder

type ReqBuilder struct {
	// contains filtered or unexported fields
}

func DefaultReqBuilder added in v2.0.3

func DefaultReqBuilder() *ReqBuilder

func NewReqBuilder

func NewReqBuilder(server *treaty.Server) *ReqBuilder

func (*ReqBuilder) Build

func (r *ReqBuilder) Build() ReqBuilder

func (*ReqBuilder) SetCodeType

func (r *ReqBuilder) SetCodeType(codeType string) *ReqBuilder

func (*ReqBuilder) SetDialTimeout added in v2.3.9

func (r *ReqBuilder) SetDialTimeout(d time.Duration) *ReqBuilder

func (*ReqBuilder) SetExName added in v2.6.9

func (r *ReqBuilder) SetExName(name string) *ReqBuilder

func (*ReqBuilder) SetExType added in v2.6.9

func (r *ReqBuilder) SetExType(typ string) *ReqBuilder

func (*ReqBuilder) SetMsgId

func (r *ReqBuilder) SetMsgId(msgId int32) *ReqBuilder

func (*ReqBuilder) SetQueue added in v2.0.3

func (r *ReqBuilder) SetQueue(queue string) *ReqBuilder

func (*ReqBuilder) SetReq

func (r *ReqBuilder) SetReq(req any) *ReqBuilder

func (*ReqBuilder) SetResp

func (r *ReqBuilder) SetResp(resp any) *ReqBuilder

func (*ReqBuilder) SetRtKey added in v2.6.9

func (r *ReqBuilder) SetRtKey(rtKey string) *ReqBuilder

func (*ReqBuilder) SetServer

func (r *ReqBuilder) SetServer(server *treaty.Server) *ReqBuilder

func (*ReqBuilder) SetServerType

func (r *ReqBuilder) SetServerType(serverType string) *ReqBuilder

func (*ReqBuilder) SetSuffix

func (r *ReqBuilder) SetSuffix(suffix string) *ReqBuilder

type RssBuilder

type RssBuilder struct {
	// contains filtered or unexported fields
}

func NewRssBuilder

func NewRssBuilder(server *treaty.Server) *RssBuilder

func (*RssBuilder) Build

func (r *RssBuilder) Build() RssBuilder

func (*RssBuilder) SetCallback

func (r *RssBuilder) SetCallback(callback CallbackFunc) *RssBuilder

func (*RssBuilder) SetCodeType

func (r *RssBuilder) SetCodeType(codeType string) *RssBuilder

func (*RssBuilder) SetDialTimeout added in v2.7.0

func (r *RssBuilder) SetDialTimeout(d time.Duration) *RssBuilder

func (*RssBuilder) SetExName added in v2.6.9

func (r *RssBuilder) SetExName(name string) *RssBuilder

func (*RssBuilder) SetExType added in v2.6.9

func (r *RssBuilder) SetExType(typ string) *RssBuilder

func (*RssBuilder) SetParallel

func (r *RssBuilder) SetParallel(parallel bool) *RssBuilder

func (*RssBuilder) SetQueue

func (r *RssBuilder) SetQueue(queue string) *RssBuilder

func (*RssBuilder) SetRtKey added in v2.6.9

func (r *RssBuilder) SetRtKey(rtKey string) *RssBuilder

func (*RssBuilder) SetServer

func (r *RssBuilder) SetServer(server *treaty.Server) *RssBuilder

func (*RssBuilder) SetSuffix

func (r *RssBuilder) SetSuffix(suffix string) *RssBuilder

type ServerBase added in v2.0.3

type ServerBase struct {
	Server     *treaty.Server
	Rpc        ServerRpc
	SubBuilder *RssBuilder
	// contains filtered or unexported fields
}

func NewServerBase added in v2.0.3

func NewServerBase(s *treaty.Server, options ...Option) *ServerBase

func (*ServerBase) AddPlugin added in v2.0.3

func (s *ServerBase) AddPlugin(plugin ServerPlugin)

func (*ServerBase) AfterInit added in v2.0.3

func (s *ServerBase) AfterInit()

func (*ServerBase) BeforeShutdown added in v2.0.3

func (s *ServerBase) BeforeShutdown()

func (*ServerBase) HandleBroadcastEvent added in v2.0.3

func (s *ServerBase) HandleBroadcastEvent(req *MsgRpc) []byte

func (*ServerBase) HandleSelfEvent added in v2.0.3

func (s *ServerBase) HandleSelfEvent(req *MsgRpc) []byte

HandleSelfEvent 内部事件处理

func (*ServerBase) Init added in v2.0.3

func (s *ServerBase) Init()

func (*ServerBase) Register added in v2.0.3

func (s *ServerBase) Register(msgId int32, v any)

func (*ServerBase) ServerMaintain added in v2.7.2

func (s *ServerBase) ServerMaintain(req *treaty.ServerMaintainReq)

func (*ServerBase) SetBroadcastEventHandler added in v2.0.3

func (s *ServerBase) SetBroadcastEventHandler(handler CallbackFunc)

func (*ServerBase) SetInnerMsgHandler added in v2.0.3

func (s *ServerBase) SetInnerMsgHandler(handler MsgHandler)

func (*ServerBase) SetSelfEventHandler added in v2.0.3

func (s *ServerBase) SetSelfEventHandler(handler CallbackFunc)

func (*ServerBase) Shutdown added in v2.0.3

func (s *ServerBase) Shutdown()

type ServerCreator

type ServerCreator func(s *treaty.Server) (ServerEntity, error)

type ServerEntity

type ServerEntity interface {
	Init()                                   //初始化
	AfterInit()                              //初始化后执行操作
	BeforeShutdown()                         //服务关闭前操作
	Shutdown()                               //服务关闭操作
	HandleSelfEvent(req *MsgRpc) []byte      //处理自己的事件
	HandleBroadcastEvent(req *MsgRpc) []byte //处理广播事件
}

ServerEntity server entity

type ServerPlugin added in v2.0.3

type ServerPlugin interface {
	Init(s *ServerBase)           //初始化
	AfterInit(s *ServerBase)      //初始化后执行操作
	BeforeShutdown(s *ServerBase) //服务关闭前操作
	Shutdown(s *ServerBase)       //服务关闭操作
}

ServerPlugin server extand

type ServerQueue added in v2.0.3

type ServerQueue struct {
	*ServerBase
	// contains filtered or unexported fields
}

ServerQueue base queue

func NewServerQueue added in v2.0.3

func NewServerQueue(queue string, s *treaty.Server, options ...Option) *ServerQueue

func (*ServerQueue) AfterInit added in v2.0.3

func (s *ServerQueue) AfterInit()

type ServerRpc

type ServerRpc interface {
	RegEncoder(typ string, encoder EncoderRpc)                                        //register encoder
	Subscribe(s RssBuilder) error                                                     //self Subscribe
	SubscribeBroadcast(s RssBuilder) error                                            //broadcast subscribe
	QueueSubscribe(s RssBuilder) error                                                //queue self Subscribe
	SendMsg(s ReqBuilder) error                                                       //send msg direct
	Publish(s ReqBuilder) error                                                       //publish
	QueuePublish(s ReqBuilder) error                                                  //queue publish
	PublishBroadcast(s ReqBuilder) error                                              //broadcast publish
	Request(s ReqBuilder) error                                                       //request
	QueueRequest(s ReqBuilder) error                                                  //queue request
	Response(codeType string, v any) []byte                                           //response the msg
	DecodeMsg(codeType string, data []byte, v any) error                              //decode msg
	GetCoder(codeType string) EncoderRpc                                              //get encoder
	GetServer() *treaty.Server                                                        //get current server
	Find(serverType string, arg any, options ...discover.FilterOption) *treaty.Server //find server
	RemoveFindCache(arg any)                                                          //clear find cache
	Close() error                                                                     //close option
}

ServerRpc rpc interface

func NewRpcServer

func NewRpcServer(cfg config.RpcConf, server *treaty.Server) ServerRpc

NewRpcServer create rpc server

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL