mqttpb

package
v0.0.0-...-4208786 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ContentEncoding_name = map[int32]string{
		0: "PLAIN",
		1: "GZIP",
		2: "DEFLATE",
		3: "BROTLI",
	}
	ContentEncoding_value = map[string]int32{
		"PLAIN":   0,
		"GZIP":    1,
		"DEFLATE": 2,
		"BROTLI":  3,
	}
)

Enum value maps for ContentEncoding.

View Source
var (
	// ErrInvalidRequestBody is an error indicating an invalid request body.
	ErrInvalidRequestBody = errors.New("[RRPC] invalid request body")

	// ErrMessageTypeNotMatch is an error indicating that the message type does not match.
	ErrMessageTypeNotMatch = errors.New("[RRPC] message type not match")

	// ErrUnknownContentEncoding is an error indicating an unknown content encoding.
	ErrUnknownContentEncoding = errors.New("[RRPC] unknown content encoding")
)
View Source
var (
	// ErrClientIsNotReady is an error indicating that the client is not ready.
	ErrClientIsNotReady = errors.New("[RRPC] client is not ready")
)
View Source
var (
	// ErrRequestNotProto is an error indicating that the request type is not a protobuf message.
	ErrRequestNotProto = errors.New("[RRPC] request type is not protobuf message")
)
View Source
var (
	// ErrRetailedMessage is an error indicating that retain message is not allowed.
	ErrRetailedMessage = errors.New("[RRPC] reatain message is not allowed, please set retaind=false")
)

Functions

func Dial

func Dial(reqTopic, replyTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (*rpc.Client, error)

Dial establishes a connection to the MQTT broker and returns a new RPC client. It takes the request topic, reply topic, MQTT client, and quality of service (QoS) as parameters. The function creates a new connection using NewConn and returns a new RPC client using reverserpc_pb.NewClient. If an error occurs during the connection establishment, it is returned along with a nil client.

func NewConn

func NewConn(requestTopic, responseTopic string, c mqttadapter.MQTTClientAdapter, qos byte) (io.ReadWriteCloser, error)

NewConn creates a new connection for reverse RPC over MQTT. It takes the request topic, response topic, MQTT client, and quality of service (QoS) as parameters. It returns an io.ReadWriteCloser and an error.

func NewRPCClient

func NewRPCClient(conn io.ReadWriteCloser) *rpc.Client

NewRPCClient creates a new RPC client with the given connection. It uses the RPCClientCodec with BROTLI content encoding.

func NewRPCClientCodec

func NewRPCClientCodec(encoding ContentEncoding) rpc.ClientCodec

NewRPCClientCodec creates a new instance of RPCClientCodec with the given content encoding. The connection is set to nil.

Types

type CallOption

type CallOption func(o *callOpt)

CallOption represents an option for making a remote procedure call.

func WithEncoding

func WithEncoding(encoding ContentEncoding) CallOption

WithEncoding sets the encoding for the RPC call. It takes a pb.ContentEncoding as a parameter and returns a CallOption. The encoding determines how the data is encoded before being sent over the network. Example usage:

opt := WithEncoding(pb.ContentEncoding_GZIP)
client.Call(ctx, method, request, response, opt)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents an MQTT client used for reverse RPC communication.

func NewClient

func NewClient(client mqttadapter.MQTTClientAdapter, topicPrefix string, encoding ContentEncoding) *Client

NewClient creates a new instance of the Client struct with the provided parameters. It initializes the MQTT client, sets the topic prefix, quality of service (QoS), content encoding, and initializes the RPC client codec pool. The returned pointer to the Client struct can be used to interact with the MQTT client.

func (*Client) Call

func (s *Client) Call(ctx context.Context, deviceID, serviceMethod string, args proto.Message, reply proto.Message, opts ...CallOption) error

Call invokes a remote procedure call (RPC) on the MQTT client. It sends the specified arguments to the specified service method and waits for the reply. The call options can be used to customize the behavior of the RPC call.

func (*Client) Client

func (s *Client) Client() mqttadapter.MQTTClientAdapter

Client returns the MQTT client associated with the reverse RPC client.

func (*Client) Close

func (s *Client) Close() error

Close closes the MQTT client connection. It disconnects the MQTT client and returns any error encountered.

func (*Client) IsConnected

func (s *Client) IsConnected() bool

IsConnected returns a boolean value indicating whether the MQTT client is currently connected.

func (*Client) OnConnect

func (s *Client) OnConnect(cb func()) int

OnConnect registers a callback function to be called when the MQTT client is connected. The callback function should have no arguments and no return value. It returns an integer value representing the registration ID.

func (*Client) Subscribe

func (s *Client) Subscribe(topic string, qos byte, cb mqttadapter.MessageCallback)

Subscribe subscribes to a topic with the specified quality of service (QoS) level and registers a callback function to handle incoming messages. The topic parameter specifies the topic to subscribe to. The qos parameter specifies the desired QoS level for the subscription. The cb parameter is a callback function that will be called when a message is received on the subscribed topic.

type ContentEncoding

type ContentEncoding int32
const (
	ContentEncoding_PLAIN   ContentEncoding = 0
	ContentEncoding_GZIP    ContentEncoding = 1
	ContentEncoding_DEFLATE ContentEncoding = 2
	ContentEncoding_BROTLI  ContentEncoding = 3
)

func (ContentEncoding) Descriptor

func (ContentEncoding) Enum

func (x ContentEncoding) Enum() *ContentEncoding

func (ContentEncoding) EnumDescriptor deprecated

func (ContentEncoding) EnumDescriptor() ([]byte, []int)

Deprecated: Use ContentEncoding.Descriptor instead.

func (ContentEncoding) Number

func (ContentEncoding) String

func (x ContentEncoding) String() string

func (ContentEncoding) Type

type EncodingGetter

type EncodingGetter interface {
	Encoding() ContentEncoding
}

EncodingGetter is an interface for getting the content encoding.

type MQTTContext

type MQTTContext struct {
	// contains filtered or unexported fields
}

MQTTContext represents the context of an MQTT request.

func NewMQTTContext

func NewMQTTContext(req *RequestData, svc *Server) *MQTTContext

NewMQTTContext creates a new MQTTContext instance.

func (*MQTTContext) Bind

func (c *MQTTContext) Bind(request interface{}) error

Bind binds the request data to the given request object.

func (*MQTTContext) Encoding

func (c *MQTTContext) Encoding() ContentEncoding

Encoding returns the content encoding of the MQTTContext.

func (*MQTTContext) ID

func (c *MQTTContext) ID() *rrpc.ID

ID returns the ID of the MQTTContext.

func (*MQTTContext) Metadata

func (c *MQTTContext) Metadata() map[string]string

Metadata returns the metadata of the MQTTContext.

func (*MQTTContext) Method

func (c *MQTTContext) Method() string

Method returns the method of the MQTTContext.

func (*MQTTContext) PrometheusLabels

func (c *MQTTContext) PrometheusLabels() prometheus.Labels

PrometheusLabels returns the Prometheus labels of the MQTTContext.

func (*MQTTContext) Reply

func (c *MQTTContext) Reply(res *rrpc.Response) bool

Reply sends the response back to the client.

func (*MQTTContext) ReplyDesc

func (c *MQTTContext) ReplyDesc() string

ReplyDesc returns the reply description of the MQTTContext.

type MetadataGetter

type MetadataGetter interface {
	Metadata() map[string]string
}

MetadataGetter is an interface for getting metadata.

type ProtobufClientCodec

type ProtobufClientCodec struct {
	// contains filtered or unexported fields
}

ProtobufClientCodec is a codec used by the client to marshal and unmarshal RPC messages.

func NewClientCodecWithCompressor

func NewClientCodecWithCompressor(compressor *compressor.CompressorManager) *ProtobufClientCodec

NewClientCodecWithCompressor creates a new instance of ClientCodec with the provided CompressorManager.

func NewProtobufClientCodec

func NewProtobufClientCodec() *ProtobufClientCodec

NewProtobufClientCodec creates a new instance of ClientCodec with a default CompressorManager.

func (*ProtobufClientCodec) Marshal

func (c *ProtobufClientCodec) Marshal(req *Request) ([]byte, error)

Marshal marshals the Request message into bytes. It compresses the request body if it is not nil.

func (*ProtobufClientCodec) Unmarshal

func (c *ProtobufClientCodec) Unmarshal(body []byte, res *Response) error

Unmarshal unmarshals the bytes into the Response message. It decompresses the response body if it is not nil.

type ProtobufServerCodec

type ProtobufServerCodec struct {
	// contains filtered or unexported fields
}

ProtobufServerCodec is a codec used by the server to marshal and unmarshal RPC messages.

func NewProtobufServerCodec

func NewProtobufServerCodec() *ProtobufServerCodec

NewProtobufServerCodec creates a new instance of ServerCodec with a default CompressorManager.

func NewServerCodecWithCompressor

func NewServerCodecWithCompressor(compressor *compressor.CompressorManager) *ProtobufServerCodec

NewServerCodecWithCompressor creates a new instance of ServerCodec with the provided CompressorManager.

func (*ProtobufServerCodec) Marshal

func (c *ProtobufServerCodec) Marshal(res *Response) ([]byte, error)

Marshal marshals the Response message into bytes. It compresses the response body if it is not nil.

func (*ProtobufServerCodec) Unmarshal

func (c *ProtobufServerCodec) Unmarshal(body []byte, req *Request) error

Unmarshal unmarshals the bytes into the Request message. It decompresses the request body if it is not nil.

type RPCClientCodec

type RPCClientCodec struct {
	// contains filtered or unexported fields
}

RPCClientCodec is a custom implementation of rpc.ClientCodec interface. It handles encoding and decoding of requests and responses.

func NewRPCClientCodecWithConn

func NewRPCClientCodecWithConn(conn io.ReadWriteCloser, encoding ContentEncoding) *RPCClientCodec

NewRPCClientCodecWithConn creates a new instance of RPCClientCodec with the given connection and content encoding.

func (*RPCClientCodec) Close

func (c *RPCClientCodec) Close() error

Close closes the connection of the RPCClientCodec.

func (*RPCClientCodec) ReadResponseBody

func (c *RPCClientCodec) ReadResponseBody(x interface{}) error

ReadResponseBody reads the RPC response body from the connection. It decodes the response body using protobuf and updates the provided target object.

func (*RPCClientCodec) ReadResponseHeader

func (c *RPCClientCodec) ReadResponseHeader(r *rpc.Response) error

ReadResponseHeader reads the RPC response header from the connection. It decodes the response using the custom codec and updates the provided rpc.Response object.

func (*RPCClientCodec) Reset

func (c *RPCClientCodec) Reset(conn io.ReadWriteCloser)

Reset resets the connection of the RPCClientCodec to the given connection.

func (*RPCClientCodec) SetEncoding

func (c *RPCClientCodec) SetEncoding(encoding ContentEncoding)

SetEncoding sets the content encoding of the RPCClientCodec to the given encoding.

func (*RPCClientCodec) WriteRequest

func (c *RPCClientCodec) WriteRequest(r *rpc.Request, param interface{}) error

WriteRequest writes the RPC request to the connection. It encodes the request body using protobuf and the specified content encoding.

type Request

type Request struct {
	Id       uint64            `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	Method   string            `protobuf:"bytes,2,opt,name=method,proto3" json:"method,omitempty"`
	Encoding ContentEncoding   `protobuf:"varint,3,opt,name=encoding,proto3,enum=rrpc.base.v1.ContentEncoding" json:"encoding,omitempty"`
	Metadata map[string]string `` /* 157-byte string literal not displayed */
	Body     *anypb.Any        `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`
	// contains filtered or unexported fields
}

func (*Request) Descriptor deprecated

func (*Request) Descriptor() ([]byte, []int)

Deprecated: Use Request.ProtoReflect.Descriptor instead.

func (*Request) GetBody

func (x *Request) GetBody() *anypb.Any

func (*Request) GetEncoding

func (x *Request) GetEncoding() ContentEncoding

func (*Request) GetId

func (x *Request) GetId() uint64

func (*Request) GetMetadata

func (x *Request) GetMetadata() map[string]string

func (*Request) GetMethod

func (x *Request) GetMethod() string

func (*Request) ProtoMessage

func (*Request) ProtoMessage()

func (*Request) ProtoReflect

func (x *Request) ProtoReflect() protoreflect.Message

func (*Request) Reset

func (x *Request) Reset()

func (*Request) String

func (x *Request) String() string

type RequestData

type RequestData struct {
	Topic string
	Request
}

RequestData represents a request received by the service.

func (*RequestData) GetReplyTopic

func (r *RequestData) GetReplyTopic() string

GetReplyTopic returns the reply topic for the request. It replaces the word "request" with "response" in the original topic.

func (*RequestData) GetResponse

func (r *RequestData) GetResponse() *ResponseData

GetResponse returns the response data for the request. It creates a new ResponseData object with the reply topic and the request's ID and encoding.

func (*RequestData) MakeErrResponse

func (r *RequestData) MakeErrResponse(status int, err error) *ResponseData

MakeErrResponse creates an error response with the specified status code and error message. It sets the status code and error message in the response data and returns the modified response.

func (*RequestData) MakeOKResponse

func (r *RequestData) MakeOKResponse(data proto.Message) *ResponseData

MakeOKResponse creates a successful response with the given data. It marshals the data into bytes using protocol buffers and sets the response status to 200. The data is stored in the response body as an Any message, with the type URL set to the type of the data. If an error occurs during marshaling, it returns nil.

type Response

type Response struct {
	Id           uint64            `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	Status       int32             `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"`
	ErrorMessage string            `protobuf:"bytes,3,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"`
	Encoding     ContentEncoding   `protobuf:"varint,5,opt,name=encoding,proto3,enum=rrpc.base.v1.ContentEncoding" json:"encoding,omitempty"`
	Metadata     map[string]string `` /* 157-byte string literal not displayed */
	Body         *anypb.Any        `protobuf:"bytes,6,opt,name=body,proto3" json:"body,omitempty"`
	// contains filtered or unexported fields
}

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) GetBody

func (x *Response) GetBody() *anypb.Any

func (*Response) GetEncoding

func (x *Response) GetEncoding() ContentEncoding

func (*Response) GetErrorMessage

func (x *Response) GetErrorMessage() string

func (*Response) GetId

func (x *Response) GetId() uint64

func (*Response) GetMetadata

func (x *Response) GetMetadata() map[string]string

func (*Response) GetStatus

func (x *Response) GetStatus() int32

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

type ResponseData

type ResponseData struct {
	Topic string
	Response
}

ResponseData represents a response sent by the service.

type Server

type Server struct {
	*rrpc.Server
	// contains filtered or unexported fields
}

Server represents a MQTT service.

func NewServer

func NewServer(client mqttadapter.MQTTClientAdapter, subscribeTopic string, options ...rrpc.ServerOption) *Server

NewServer creates a new Service instance with the provided MQTT client and options. It returns a pointer to the Service and an error, if any.

func (*Server) Close

func (s *Server) Close() error

Close closes the service by disconnecting the IoT client. It returns an error if there was a problem disconnecting the client.

func (*Server) IsConnected

func (s *Server) IsConnected() bool

IsConnected returns a boolean value indicating whether the service is connected to the IoT client.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL