SimpleCore helper mqtt extensions.
MqttExtensions#
class MqttExtensions()
MQTT related functions.
publish#
@staticmethod
def publish(testcase,
topic,
msg,
qos,
retain,
target,
timeout: int = 5) -> bool
Publish a topic via MQTT
Arguments:
testcase
object - testcase class referencetopic
str - MQTT topicmsg
object - Stringifiable objectqos
int - QOS to useretain
bool - Set retain flagtarget
str - Hostname or IP addresstimeout
int, optional - Timeout value. Defaults to 5.
Returns:
bool
- True on success, false on failure
subscribe#
@staticmethod
def subscribe(testcase,
topic,
target,
count: int = 1,
timeout: int = 5) -> Union[str, None]
Get a message over MQTT
Arguments:
testcase
object - testcase class referencetopic
str - Topic to subscribe totarget
str - Hostname or IP addresscount
int - Minimum number of messages to collecttimeout
int, optional - Timout value. Defaults to 5.
Returns:
Union[str, None]: Collected messages or None on timeout
subscribe_as_json#
@staticmethod
def subscribe_as_json(testcase,
topic,
target,
count: int = 1,
timeout: int = 5) -> Union[dict, None]
Get a message over MQTT as json
Arguments:
testcase
object - testcase class referencetopic
str - Topic to subscribe totarget
str - Hostname or IP addresscount
int - Minimum number of messages to collecttimeout
int, optional - Timout value. Defaults to 5.
Returns:
Union[dict, None]: Collected messages as json or None on timeout or corrupted data