mqtt

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

README

Venom - Executor MQTT

Step to read and write MQTT topics.

An example can be found in tests/mqtt.yml

Input

ClientType can be one of:

  • "publisher"
  • "subscriber"
  • "persistent_queue"

As the name suggests "persistent_queue" creates a persistent queue by setting the session_clean property so that later "subscriber" steps can retrieve data. The "persistent_queue" is paired with "persistSubscription" which can be true or false, this will request/release the persistent topic subscription. It is important that the persistent_queue and subscriber use the same client id to ensure the broker can track the state across connections. Remember to remove the topic registration when done. Note the use of the name "persistent_queue" rather than MQTT's more usual clean_session. This is to reduce unexpected behaviour when one leaves the "persistent_queue" option out of the step config.

Limitations and Future Improvements

Limitations

Given the way that venom starts each task with no state held across steps and that mqtt would lose messages if it subscribed after a message is sent we need a mechanism to register persistent topics for a later mqtt step. This will require tests to add some extra steps to pre-register a persistent topic subscription and de-register at the end of a sequence of steps.

MQTT could deliver messages after connection, but before the service gets a chance to bind a subscription handler. For "subscriber" steps we attach a handler using AddRoute before connection and then subscribe after connection. This should ensure that no messages are lost and subscriptions behave as you need. This is needed to avoid message loss when a persistent subscription is being used.

Future work
  • Add the ability to obtain message content from files rather than from within the yaml config. This need not be specific to this executor
  • Add support for codecs so we can support serialisation formats other than json. This should really be a capability that any executor can take advantage of rather than solved for each individually
  • Add TLs support for MQTT interface

Documentation

Index

Constants

View Source
const Name = "mqtt"

Name of executor

Variables

This section is empty.

Functions

func New

func New() venom.Executor

New returns a new Executor

Types

type Executor

type Executor struct {
	Addrs string `json:"addrs" yaml:"addrs"`

	// ClientType must be "consumer", "producer" or "persistent_queue"
	ClientType          string `json:"client_type" yaml:"clientType"`
	PersistSubscription bool   `json:"persist_subscription" yaml:"persistSubscription"`
	ClientID            string `json:"client_id" yaml:"clientId"`

	// Subscription topic
	Topics []string `json:"topics" yaml:"topics"`

	// Represents the limit of message will be read. After limit, consumer stop read message
	MessageLimit int `json:"message_limit" yaml:"messageLimit"`

	// Represents the mqtt connection timeout for reading messages. In Milliseconds. Default 5000
	ConnectTimeout int64 `json:"connect_timeout,omitempty" yaml:"connectTimeout,omitempty"`

	// Represents the timeout for reading messages. In Milliseconds. Default 5000
	Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"`

	// Used when ClientType is producer
	// Messages represents the message sent by producer
	Messages []Message `json:"messages" yaml:"messages"`
	QOS      byte      `json:"qos" yaml:"qos"`
}

func (Executor) GetDefaultAssertions

func (Executor) GetDefaultAssertions() *venom.StepAssertions

GetDefaultAssertions return default assertions for type exec

func (Executor) Run

func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, error)

type Message

type Message struct {
	Topic    string `json:"topic" yaml:"topic"`
	QOS      byte   `json:"qos" yaml:"qos"`
	Retained bool   `json:"retained" yaml:"retained"`
	Payload  string `json:"payload" yaml:"payload"`
}

Message represents the object sent or received from rabbitmq

type Result

type Result struct {
	TimeSeconds  float64       `json:"timeseconds" yaml:"timeSeconds"`
	Topics       []string      `json:"topics" yaml:"topics"`
	Messages     []interface{} `json:"messages" yaml:"messages"`
	MessagesJSON []interface{} `json:"messagesjson" yaml:"messagesJSON"`
	Err          string        `json:"err" yaml:"error"`
}

Result represents a step result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL