From cf4a2a954ba8923c716a9992dec793cd2a90dd5c Mon Sep 17 00:00:00 2001 From: Scott Wallace Date: Sat, 11 Dec 2021 16:13:22 +0000 Subject: [PATCH] Strict typing --- get_data.py | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/get_data.py b/get_data.py index 8307c2a..91d5f32 100644 --- a/get_data.py +++ b/get_data.py @@ -4,11 +4,12 @@ Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT import os from datetime import datetime from socket import gaierror +from typing import TypedDict -import bluetooth._bluetooth as bluez -import paho.mqtt.publish as publish +import bluetooth._bluetooth as bluez # type: ignore[import] +import paho.mqtt.publish as publish # type: ignore[import] -from bluetooth_utils.bluetooth_utils import ( +from bluetooth_utils.bluetooth_utils import ( # type: ignore[import] disable_le_scan, enable_le_scan, parse_le_advertising_events, @@ -17,9 +18,27 @@ from bluetooth_utils.bluetooth_utils import ( ) -def send_to_mqtt(data: dict): +class MQTTdata(TypedDict): + """ + Class to define the MQTT data to send + + Args: + TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery % + """ + + timestamp: str + mac: str + temperature: float + humidity: int + battery: int + + +def send_to_mqtt(data: MQTTdata) -> None: """ Send data from LYWSD03MMC to MQTT + + Args: + data (MQTTdata): [description] """ msgs = [ { @@ -38,22 +57,29 @@ def send_to_mqtt(data: dict): publish.multiple(msgs, hostname=os.environ.get('MQTT_HOST', 'mqtt')) -def le_advertise_packet_handler( +def le_advertise_packet_handler( # pylint: disable=unused-argument mac: str, - adv_type: int, # pylint: disable=unused-argument + adv_type: int, data: bytes, - rssi: int, # pylint: disable=unused-argument -): + rssi: int, +) -> None: """ Handle new Xiaomi LYWSD03MMC BTLE advertise packet + + Args: + mac (str): MAC address of the sensor + adv_type (int): NOT USED + data (bytes): data from sensor + rssi (int): NOT USED """ data_str = raw_packet_to_str(data) + # Check for ATC preamble if data_str[0:2] == '11' and data_str[6:10] == '1a18': temp = int(data_str[22:26], 16) / 10 hum = int(data_str[26:28], 16) batt = int(data_str[28:30], 16) - mqtt_data = { + mqtt_data: MQTTdata = { 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'mac': mac, 'temperature': temp, @@ -68,7 +94,7 @@ def le_advertise_packet_handler( if __name__ == '__main__': - def main(): + def main() -> None: """ Main program """