SimpleCore helper mqtt extensions.

MqttExtensions#

class MqttExtensions()

MQTT related functions.

publish#

@staticmethod
def publish(testcase,
            topic,
            msg,
            qos,
            retain,
            target,
            timeout: int = 5) -> bool

Publish a topic via MQTT

Arguments:

  • testcase object - testcase class reference

  • topic str - MQTT topic

  • msg object - Stringifiable object

  • qos int - QOS to use

  • retain bool - Set retain flag

  • target str - Hostname or IP address

  • timeout int, optional - Timeout value. Defaults to 5.

Returns:

  • bool - True on success, false on failure

subscribe#

@staticmethod
def subscribe(testcase,
              topic,
              target,
              count: int = 1,
              timeout: int = 5) -> Union[str, None]

Get a message over MQTT

Arguments:

  • testcase object - testcase class reference

  • topic str - Topic to subscribe to

  • target str - Hostname or IP address

  • count int - Minimum number of messages to collect

  • timeout int, optional - Timout value. Defaults to 5.

Returns:

Union[str, None]: Collected messages or None on timeout

subscribe_as_json#

@staticmethod
def subscribe_as_json(testcase,
                      topic,
                      target,
                      count: int = 1,
                      timeout: int = 5) -> Union[dict, None]

Get a message over MQTT as json

Arguments:

  • testcase object - testcase class reference

  • topic str - Topic to subscribe to

  • target str - Hostname or IP address

  • count int - Minimum number of messages to collect

  • timeout int, optional - Timout value. Defaults to 5.

Returns:

Union[dict, None]: Collected messages as json or None on timeout or corrupted data