SimpleCore helper mqtt extensions.
MqttExtensions#
class MqttExtensions()
MQTT related functions.
publish#
@staticmethod
def publish(testcase,
topic,
msg,
qos,
retain,
target,
timeout: int = 5) -> bool
Publish a topic via MQTT
Arguments:
testcaseobject - testcase class referencetopicstr - MQTT topicmsgobject - Stringifiable objectqosint - QOS to useretainbool - Set retain flagtargetstr - Hostname or IP addresstimeoutint, optional - Timeout value. Defaults to 5.
Returns:
bool- True on success, false on failure
subscribe#
@staticmethod
def subscribe(testcase,
topic,
target,
count: int = 1,
timeout: int = 5) -> Union[str, None]
Get a message over MQTT
Arguments:
testcaseobject - testcase class referencetopicstr - Topic to subscribe totargetstr - Hostname or IP addresscountint - Minimum number of messages to collecttimeoutint, optional - Timout value. Defaults to 5.
Returns:
Union[str, None]: Collected messages or None on timeout
subscribe_as_json#
@staticmethod
def subscribe_as_json(testcase,
topic,
target,
count: int = 1,
timeout: int = 5) -> Union[dict, None]
Get a message over MQTT as json
Arguments:
testcaseobject - testcase class referencetopicstr - Topic to subscribe totargetstr - Hostname or IP addresscountint - Minimum number of messages to collecttimeoutint, optional - Timout value. Defaults to 5.
Returns:
Union[dict, None]: Collected messages as json or None on timeout or corrupted data