Documentation ¶
Index ¶
- Variables
- func Dial(reqTopic, replyTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (*rpc.Client, error)
- func NewConn(requestTopic, responseTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (io.ReadWriteCloser, error)
- func NewRPCClient(conn io.ReadWriteCloser) *rpc.Client
- func NewRPCClientCodec(encoding ContentEncoding) rpc.ClientCodec
- type CallOption
- type Client
- func (s *Client) Call(ctx context.Context, deviceID, serviceMethod string, args proto.Message, ...) error
- func (s *Client) Client() mqttadapter.MQTTClientAdapter
- func (s *Client) Close() error
- func (s *Client) IsConnected() bool
- func (s *Client) OnConnect(cb func()) int
- func (s *Client) Subscribe(topic string, qos byte, cb mqttadapter.MessageCallback)
- type ContentEncoding
- func (ContentEncoding) Descriptor() protoreflect.EnumDescriptor
- func (x ContentEncoding) Enum() *ContentEncoding
- func (ContentEncoding) EnumDescriptor() ([]byte, []int)deprecated
- func (x ContentEncoding) Number() protoreflect.EnumNumber
- func (x ContentEncoding) String() string
- func (ContentEncoding) Type() protoreflect.EnumType
- type EncodingGetter
- type MQTTContext
- func (c *MQTTContext) Bind(request interface{}) error
- func (c *MQTTContext) Encoding() ContentEncoding
- func (c *MQTTContext) ID() *rrpc.ID
- func (c *MQTTContext) Metadata() map[string]string
- func (c *MQTTContext) Method() string
- func (c *MQTTContext) PrometheusLabels() prometheus.Labels
- func (c *MQTTContext) Reply(res *rrpc.Response) bool
- func (c *MQTTContext) ReplyDesc() string
- type MetadataGetter
- type ProtobufClientCodec
- type ProtobufServerCodec
- type RPCClientCodec
- func (c *RPCClientCodec) Close() error
- func (c *RPCClientCodec) ReadResponseBody(x interface{}) error
- func (c *RPCClientCodec) ReadResponseHeader(r *rpc.Response) error
- func (c *RPCClientCodec) Reset(conn io.ReadWriteCloser)
- func (c *RPCClientCodec) SetEncoding(encoding ContentEncoding)
- func (c *RPCClientCodec) WriteRequest(r *rpc.Request, param interface{}) error
- type Request
- func (*Request) Descriptor() ([]byte, []int)deprecated
- func (x *Request) GetBody() *anypb.Any
- func (x *Request) GetEncoding() ContentEncoding
- func (x *Request) GetId() uint64
- func (x *Request) GetMetadata() map[string]string
- func (x *Request) GetMethod() string
- func (*Request) ProtoMessage()
- func (x *Request) ProtoReflect() protoreflect.Message
- func (x *Request) Reset()
- func (x *Request) String() string
- type RequestData
- type Response
- func (*Response) Descriptor() ([]byte, []int)deprecated
- func (x *Response) GetBody() *anypb.Any
- func (x *Response) GetEncoding() ContentEncoding
- func (x *Response) GetErrorMessage() string
- func (x *Response) GetId() uint64
- func (x *Response) GetMetadata() map[string]string
- func (x *Response) GetStatus() int32
- func (*Response) ProtoMessage()
- func (x *Response) ProtoReflect() protoreflect.Message
- func (x *Response) Reset()
- func (x *Response) String() string
- type ResponseData
- type Server
Constants ¶
This section is empty.
Variables ¶
var ( ContentEncoding_name = map[int32]string{ 0: "PLAIN", 1: "GZIP", 2: "DEFLATE", 3: "BROTLI", } ContentEncoding_value = map[string]int32{ "PLAIN": 0, "GZIP": 1, "DEFLATE": 2, "BROTLI": 3, } )
Enum value maps for ContentEncoding.
var ( // ErrInvalidRequestBody is an error indicating an invalid request body. ErrInvalidRequestBody = errors.New("[RRPC] invalid request body") // ErrMessageTypeNotMatch is an error indicating that the message type does not match. ErrMessageTypeNotMatch = errors.New("[RRPC] message type not match") // ErrUnknownContentEncoding is an error indicating an unknown content encoding. ErrUnknownContentEncoding = errors.New("[RRPC] unknown content encoding") )
var ( // ErrClientIsNotReady is an error indicating that the client is not ready. ErrClientIsNotReady = errors.New("[RRPC] client is not ready") )
var ( // ErrRequestNotProto is an error indicating that the request type is not a protobuf message. ErrRequestNotProto = errors.New("[RRPC] request type is not protobuf message") )
var ( // ErrRetailedMessage is an error indicating that retain message is not allowed. ErrRetailedMessage = errors.New("[RRPC] reatain message is not allowed, please set retaind=false") )
var File_base_proto protoreflect.FileDescriptor
Functions ¶
func Dial ¶
func Dial(reqTopic, replyTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (*rpc.Client, error)
Dial establishes a connection to the MQTT broker and returns a new RPC client. It takes the request topic, reply topic, MQTT client, and quality of service (QoS) as parameters. The function creates a new connection using NewConn and returns a new RPC client using reverserpc_pb.NewClient. If an error occurs during the connection establishment, it is returned along with a nil client.
func NewConn ¶
func NewConn(requestTopic, responseTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (io.ReadWriteCloser, error)
NewConn creates a new connection for reverse RPC over MQTT. It takes the request topic, response topic, MQTT client, and quality of service (QoS) as parameters. It returns an io.ReadWriteCloser and an error.
func NewRPCClient ¶
func NewRPCClient(conn io.ReadWriteCloser) *rpc.Client
NewRPCClient creates a new RPC client with the given connection. It uses the RPCClientCodec with BROTLI content encoding.
func NewRPCClientCodec ¶
func NewRPCClientCodec(encoding ContentEncoding) rpc.ClientCodec
NewRPCClientCodec creates a new instance of RPCClientCodec with the given content encoding. The connection is set to nil.
Types ¶
type CallOption ¶
type CallOption func(o *callOpt)
CallOption represents an option for making a remote procedure call.
func WithEncoding ¶
func WithEncoding(encoding ContentEncoding) CallOption
WithEncoding sets the encoding for the RPC call. It takes a pb.ContentEncoding as a parameter and returns a CallOption. The encoding determines how the data is encoded before being sent over the network. Example usage:
opt := WithEncoding(pb.ContentEncoding_GZIP) client.Call(ctx, method, request, response, opt)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents an MQTT client used for reverse RPC communication.
func NewClient ¶
func NewClient(client mqttadapter.MQTTClientAdapter, topicPrefix string, encoding ContentEncoding) *Client
NewClient creates a new instance of the Client struct with the provided parameters. It initializes the MQTT client, sets the topic prefix, quality of service (QoS), content encoding, and initializes the RPC client codec pool. The returned pointer to the Client struct can be used to interact with the MQTT client.
func (*Client) Call ¶
func (s *Client) Call(ctx context.Context, deviceID, serviceMethod string, args proto.Message, reply proto.Message, opts ...CallOption) error
Call invokes a remote procedure call (RPC) on the MQTT client. It sends the specified arguments to the specified service method and waits for the reply. The call options can be used to customize the behavior of the RPC call.
func (*Client) Client ¶
func (s *Client) Client() mqttadapter.MQTTClientAdapter
Client returns the MQTT client associated with the reverse RPC client.
func (*Client) Close ¶
Close closes the MQTT client connection. It disconnects the MQTT client and returns any error encountered.
func (*Client) IsConnected ¶
IsConnected returns a boolean value indicating whether the MQTT client is currently connected.
func (*Client) OnConnect ¶
OnConnect registers a callback function to be called when the MQTT client is connected. The callback function should have no arguments and no return value. It returns an integer value representing the registration ID.
func (*Client) Subscribe ¶
func (s *Client) Subscribe(topic string, qos byte, cb mqttadapter.MessageCallback)
Subscribe subscribes to a topic with the specified quality of service (QoS) level and registers a callback function to handle incoming messages. The topic parameter specifies the topic to subscribe to. The qos parameter specifies the desired QoS level for the subscription. The cb parameter is a callback function that will be called when a message is received on the subscribed topic.
type ContentEncoding ¶
type ContentEncoding int32
const ( ContentEncoding_PLAIN ContentEncoding = 0 ContentEncoding_GZIP ContentEncoding = 1 ContentEncoding_DEFLATE ContentEncoding = 2 ContentEncoding_BROTLI ContentEncoding = 3 )
func (ContentEncoding) Descriptor ¶
func (ContentEncoding) Descriptor() protoreflect.EnumDescriptor
func (ContentEncoding) Enum ¶
func (x ContentEncoding) Enum() *ContentEncoding
func (ContentEncoding) EnumDescriptor
deprecated
func (ContentEncoding) EnumDescriptor() ([]byte, []int)
Deprecated: Use ContentEncoding.Descriptor instead.
func (ContentEncoding) Number ¶
func (x ContentEncoding) Number() protoreflect.EnumNumber
func (ContentEncoding) String ¶
func (x ContentEncoding) String() string
func (ContentEncoding) Type ¶
func (ContentEncoding) Type() protoreflect.EnumType
type EncodingGetter ¶
type EncodingGetter interface {
Encoding() ContentEncoding
}
EncodingGetter is an interface for getting the content encoding.
type MQTTContext ¶
type MQTTContext struct {
// contains filtered or unexported fields
}
MQTTContext represents the context of an MQTT request.
func NewMQTTContext ¶
func NewMQTTContext(req *RequestData, svc *Server) *MQTTContext
NewMQTTContext creates a new MQTTContext instance.
func (*MQTTContext) Bind ¶
func (c *MQTTContext) Bind(request interface{}) error
Bind binds the request data to the given request object.
func (*MQTTContext) Encoding ¶
func (c *MQTTContext) Encoding() ContentEncoding
Encoding returns the content encoding of the MQTTContext.
func (*MQTTContext) Metadata ¶
func (c *MQTTContext) Metadata() map[string]string
Metadata returns the metadata of the MQTTContext.
func (*MQTTContext) Method ¶
func (c *MQTTContext) Method() string
Method returns the method of the MQTTContext.
func (*MQTTContext) PrometheusLabels ¶
func (c *MQTTContext) PrometheusLabels() prometheus.Labels
PrometheusLabels returns the Prometheus labels of the MQTTContext.
func (*MQTTContext) Reply ¶
func (c *MQTTContext) Reply(res *rrpc.Response) bool
Reply sends the response back to the client.
func (*MQTTContext) ReplyDesc ¶
func (c *MQTTContext) ReplyDesc() string
ReplyDesc returns the reply description of the MQTTContext.
type MetadataGetter ¶
MetadataGetter is an interface for getting metadata.
type ProtobufClientCodec ¶
type ProtobufClientCodec struct {
// contains filtered or unexported fields
}
ProtobufClientCodec is a codec used by the client to marshal and unmarshal RPC messages.
func NewClientCodecWithCompressor ¶
func NewClientCodecWithCompressor(compressor *compressor.CompressorManager) *ProtobufClientCodec
NewClientCodecWithCompressor creates a new instance of ClientCodec with the provided CompressorManager.
func NewProtobufClientCodec ¶
func NewProtobufClientCodec() *ProtobufClientCodec
NewProtobufClientCodec creates a new instance of ClientCodec with a default CompressorManager.
type ProtobufServerCodec ¶
type ProtobufServerCodec struct {
// contains filtered or unexported fields
}
ProtobufServerCodec is a codec used by the server to marshal and unmarshal RPC messages.
func NewProtobufServerCodec ¶
func NewProtobufServerCodec() *ProtobufServerCodec
NewProtobufServerCodec creates a new instance of ServerCodec with a default CompressorManager.
func NewServerCodecWithCompressor ¶
func NewServerCodecWithCompressor(compressor *compressor.CompressorManager) *ProtobufServerCodec
NewServerCodecWithCompressor creates a new instance of ServerCodec with the provided CompressorManager.
type RPCClientCodec ¶
type RPCClientCodec struct {
// contains filtered or unexported fields
}
RPCClientCodec is a custom implementation of rpc.ClientCodec interface. It handles encoding and decoding of requests and responses.
func NewRPCClientCodecWithConn ¶
func NewRPCClientCodecWithConn(conn io.ReadWriteCloser, encoding ContentEncoding) *RPCClientCodec
NewRPCClientCodecWithConn creates a new instance of RPCClientCodec with the given connection and content encoding.
func (*RPCClientCodec) Close ¶
func (c *RPCClientCodec) Close() error
Close closes the connection of the RPCClientCodec.
func (*RPCClientCodec) ReadResponseBody ¶
func (c *RPCClientCodec) ReadResponseBody(x interface{}) error
ReadResponseBody reads the RPC response body from the connection. It decodes the response body using protobuf and updates the provided target object.
func (*RPCClientCodec) ReadResponseHeader ¶
func (c *RPCClientCodec) ReadResponseHeader(r *rpc.Response) error
ReadResponseHeader reads the RPC response header from the connection. It decodes the response using the custom codec and updates the provided rpc.Response object.
func (*RPCClientCodec) Reset ¶
func (c *RPCClientCodec) Reset(conn io.ReadWriteCloser)
Reset resets the connection of the RPCClientCodec to the given connection.
func (*RPCClientCodec) SetEncoding ¶
func (c *RPCClientCodec) SetEncoding(encoding ContentEncoding)
SetEncoding sets the content encoding of the RPCClientCodec to the given encoding.
func (*RPCClientCodec) WriteRequest ¶
func (c *RPCClientCodec) WriteRequest(r *rpc.Request, param interface{}) error
WriteRequest writes the RPC request to the connection. It encodes the request body using protobuf and the specified content encoding.
type Request ¶
type Request struct { Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` Method string `protobuf:"bytes,2,opt,name=method,proto3" json:"method,omitempty"` Encoding ContentEncoding `protobuf:"varint,3,opt,name=encoding,proto3,enum=rrpc.base.v1.ContentEncoding" json:"encoding,omitempty"` Metadata map[string]string `` /* 157-byte string literal not displayed */ Body *anypb.Any `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` // contains filtered or unexported fields }
func (*Request) Descriptor
deprecated
func (*Request) GetEncoding ¶
func (x *Request) GetEncoding() ContentEncoding
func (*Request) GetMetadata ¶
func (*Request) ProtoMessage ¶
func (*Request) ProtoMessage()
func (*Request) ProtoReflect ¶
func (x *Request) ProtoReflect() protoreflect.Message
type RequestData ¶
RequestData represents a request received by the service.
func (*RequestData) GetReplyTopic ¶
func (r *RequestData) GetReplyTopic() string
GetReplyTopic returns the reply topic for the request. It replaces the word "request" with "response" in the original topic.
func (*RequestData) GetResponse ¶
func (r *RequestData) GetResponse() *ResponseData
GetResponse returns the response data for the request. It creates a new ResponseData object with the reply topic and the request's ID and encoding.
func (*RequestData) MakeErrResponse ¶
func (r *RequestData) MakeErrResponse(status int, err error) *ResponseData
MakeErrResponse creates an error response with the specified status code and error message. It sets the status code and error message in the response data and returns the modified response.
func (*RequestData) MakeOKResponse ¶
func (r *RequestData) MakeOKResponse(data proto.Message) *ResponseData
MakeOKResponse creates a successful response with the given data. It marshals the data into bytes using protocol buffers and sets the response status to 200. The data is stored in the response body as an Any message, with the type URL set to the type of the data. If an error occurs during marshaling, it returns nil.
type Response ¶
type Response struct { Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"` ErrorMessage string `protobuf:"bytes,3,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` Encoding ContentEncoding `protobuf:"varint,5,opt,name=encoding,proto3,enum=rrpc.base.v1.ContentEncoding" json:"encoding,omitempty"` Metadata map[string]string `` /* 157-byte string literal not displayed */ Body *anypb.Any `protobuf:"bytes,6,opt,name=body,proto3" json:"body,omitempty"` // contains filtered or unexported fields }
func (*Response) Descriptor
deprecated
func (*Response) GetEncoding ¶
func (x *Response) GetEncoding() ContentEncoding
func (*Response) GetErrorMessage ¶
func (*Response) GetMetadata ¶
func (*Response) ProtoMessage ¶
func (*Response) ProtoMessage()
func (*Response) ProtoReflect ¶
func (x *Response) ProtoReflect() protoreflect.Message
type ResponseData ¶
ResponseData represents a response sent by the service.
type Server ¶
Server represents a MQTT service.
func NewServer ¶
func NewServer(client mqttadapter.MQTTClientAdapter, subscribeTopic string, options ...rrpc.ServerOption) *Server
NewServer creates a new Service instance with the provided MQTT client and options. It returns a pointer to the Service and an error, if any.
func (*Server) Close ¶
Close closes the service by disconnecting the IoT client. It returns an error if there was a problem disconnecting the client.
func (*Server) IsConnected ¶
IsConnected returns a boolean value indicating whether the service is connected to the IoT client.