""" Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT """ import json import os from socket import gaierror from typing import TypedDict import bluetooth._bluetooth as bluez # type: ignore[import] from paho.mqtt import publish # type: ignore[import] from bluetooth_utils.bluetooth_utils import ( # type: ignore[import] disable_le_scan, enable_le_scan, parse_le_advertising_events, raw_packet_to_str, toggle_device, ) class MQTTdata(TypedDict): """ Class to define the MQTT data to send Args: TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery % """ mac: str temperature: float humidity: int battery: int def send_to_mqtt(data: MQTTdata) -> None: """ Send data from LYWSD03MMC to MQTT Args: data (MQTTdata): [description] """ msgs = [ { "topic": f"sensors/home/{data['mac']}/temperature", "payload": data["temperature"], }, { "topic": f"sensors/home/{data['mac']}/humidity", "payload": data["humidity"], }, { "topic": f"sensors/home/{data['mac']}/battery", "payload": data["battery"], }, { "topic": "sensors/home/{data['mac']}/tele/SENSOR", "payload": json.dumps(data), }, ] publish.multiple(msgs, hostname=os.environ.get("MQTT_HOST", "mqtt")) def le_advertise_packet_handler( # pylint: disable=unused-argument mac: str, adv_type: int, data: bytes, rssi: int, # pylint: disable=unused-argument ) -> None: """ Handle new Xiaomi LYWSD03MMC BTLE advertise packet Args: mac (str): MAC address of the sensor adv_type (int): NOT USED data (bytes): data from sensor rssi (int): NOT USED """ data_str = raw_packet_to_str(data) # Check for ATC preamble if data_str[0:2] == "11" and data_str[6:10] == "1a18": temp = int(data_str[22:26], 16) / 10 hum = int(data_str[26:28], 16) batt = int(data_str[28:30], 16) mqtt_data: MQTTdata = { "mac": mac, "temperature": temp, "humidity": hum, "battery": batt, } try: send_to_mqtt(mqtt_data) except gaierror as error: print(f"[ERROR] {error}") if __name__ == "__main__": def main() -> None: """ Main program """ # Use 0 for hci0 dev_id = 0 toggle_device(dev_id, True) try: sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member except: print(f"Cannot open bluetooth device {dev_id}") raise # Set filter to "True" to see only one packet per device enable_le_scan(sock, filter_duplicates=False) try: # Called on new LE packet parse_le_advertising_events( sock, handler=le_advertise_packet_handler, debug=False, ) # Scan until Ctrl-C except KeyboardInterrupt: disable_le_scan(sock) main()