mqtt

package
v0.0.0-...-fe06dec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TopicKafkaHeaderKey     = "topic"
	MessageIDKafkaHeaderKey = "mqtt_message_id"
	DateReceivedHeaderKey   = "date_received"
)

Variables

This section is empty.

Functions

func ControlMessageHandler

func ControlMessageHandler(ctx context.Context, kafkaWriter *kafka.Writer, topicVerifier *TopicVerifier) func(MQTT.Client, MQTT.Message)

func CreateBrokerConnection

func CreateBrokerConnection(brokerUrl string, brokerConfigFuncs ...MqttClientOptionsFunc) (MQTT.Client, error)

func DataMessageHandler

func DataMessageHandler() func(MQTT.Client, MQTT.Message)

func DefaultMessageHandler

func DefaultMessageHandler(topicVerifier *TopicVerifier, controlMessageHandler, dataMessageHandler func(MQTT.Client, MQTT.Message)) func(client MQTT.Client, message MQTT.Message)

func NewBrokerOptions

func NewBrokerOptions(brokerUrl string, opts ...MqttClientOptionsFunc) (*MQTT.ClientOptions, error)

func NewConnectorClientMQTTProxyFactory

func NewConnectorClientMQTTProxyFactory(cfg *config.Config, mqttClient MQTT.Client, topicBuilder *TopicBuilder) (controller.ConnectorClientProxyFactory, error)

func SendReconnectMessageToClient

func SendReconnectMessageToClient(mqttClient MQTT.Client, logger *logrus.Entry, topicBuilder *TopicBuilder, qos byte, publishTimeout time.Duration, clientID domain.ClientID, delay int) error

Types

type ConnectorClientMQTTProxy

type ConnectorClientMQTTProxy struct {
	Logger         *logrus.Entry
	Config         *config.Config
	OrgID          domain.OrgID
	AccountID      domain.AccountID
	ClientID       domain.ClientID
	Client         MQTT.Client
	TopicBuilder   *TopicBuilder
	Dispatchers    domain.Dispatchers
	CanonicalFacts domain.CanonicalFacts
	Tags           domain.Tags
}

func (*ConnectorClientMQTTProxy) Disconnect

func (cc *ConnectorClientMQTTProxy) Disconnect(ctx context.Context, message string) error

func (*ConnectorClientMQTTProxy) GetCanonicalFacts

func (cc *ConnectorClientMQTTProxy) GetCanonicalFacts(ctx context.Context) (domain.CanonicalFacts, error)

func (*ConnectorClientMQTTProxy) GetDispatchers

func (cc *ConnectorClientMQTTProxy) GetDispatchers(ctx context.Context) (domain.Dispatchers, error)

func (*ConnectorClientMQTTProxy) GetTags

func (*ConnectorClientMQTTProxy) Ping

func (*ConnectorClientMQTTProxy) Reconnect

func (cc *ConnectorClientMQTTProxy) Reconnect(ctx context.Context, message string, delay int) error

func (*ConnectorClientMQTTProxy) SendMessage

func (cc *ConnectorClientMQTTProxy) SendMessage(ctx context.Context, directive string, metadata interface{}, payload interface{}) (*uuid.UUID, error)

type ConnectorClientMQTTProxyFactory

type ConnectorClientMQTTProxyFactory struct {
	// contains filtered or unexported fields
}

func (*ConnectorClientMQTTProxyFactory) CreateProxy

func (ccpf *ConnectorClientMQTTProxyFactory) CreateProxy(ctx context.Context, orgID domain.OrgID, account domain.AccountID, client_id domain.ClientID, canonicalFacts domain.CanonicalFacts, dispatchers domain.Dispatchers, tags domain.Tags) (controller.ConnectorClient, error)

type MqttClientOptionsFunc

type MqttClientOptionsFunc func(*MQTT.ClientOptions) error

func WithAutoReconnect

func WithAutoReconnect(autoReconnect bool) MqttClientOptionsFunc

func WithCleanSession

func WithCleanSession(cleanSession bool) MqttClientOptionsFunc

func WithClientID

func WithClientID(clientID string) MqttClientOptionsFunc

func WithConnectionLostHandler

func WithConnectionLostHandler(handler func(MQTT.Client, error)) MqttClientOptionsFunc

func WithDefaultPublishHandler

func WithDefaultPublishHandler(msgHdlr MQTT.MessageHandler) MqttClientOptionsFunc

func WithJwtAsHttpHeader

func WithJwtAsHttpHeader(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc

func WithJwtReconnectingHandler

func WithJwtReconnectingHandler(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc

func WithOnConnectHandler

func WithOnConnectHandler(handler func(MQTT.Client)) MqttClientOptionsFunc

func WithProtocolVersion

func WithProtocolVersion(protocolVersion uint) MqttClientOptionsFunc

func WithResumeSubs

func WithResumeSubs(resumeSubs bool) MqttClientOptionsFunc

func WithTlsConfig

func WithTlsConfig(tlsConfig *tls.Config) MqttClientOptionsFunc

type Subscriber

type Subscriber struct {
	Topic      string
	EntryPoint MQTT.MessageHandler
	Qos        byte
}

type TopicBuilder

type TopicBuilder struct {
	// contains filtered or unexported fields
}

func NewTopicBuilder

func NewTopicBuilder(prefix string) *TopicBuilder

func (*TopicBuilder) BuildIncomingWildcardControlTopic

func (tb *TopicBuilder) BuildIncomingWildcardControlTopic() string

func (*TopicBuilder) BuildIncomingWildcardDataTopic

func (tb *TopicBuilder) BuildIncomingWildcardDataTopic() string

func (*TopicBuilder) BuildOutgoingControlTopic

func (tb *TopicBuilder) BuildOutgoingControlTopic(clientID domain.ClientID) string

func (*TopicBuilder) BuildOutgoingDataTopic

func (tb *TopicBuilder) BuildOutgoingDataTopic(clientID domain.ClientID) string

type TopicType

type TopicType int8
const (
	ControlTopicType TopicType = 0
	DataTopicType    TopicType = 1
)

type TopicVerifier

type TopicVerifier struct {
	// contains filtered or unexported fields
}

func NewTopicVerifier

func NewTopicVerifier(prefix string) *TopicVerifier

func (*TopicVerifier) VerifyIncomingTopic

func (tv *TopicVerifier) VerifyIncomingTopic(topic string) (TopicType, domain.ClientID, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL