""" Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT """ import os from datetime import datetime from socket import gaierror from typing import TypedDict import bluetooth._bluetooth as bluez # type: ignore[import] import paho.mqtt.publish as publish # type: ignore[import] from bluetooth_utils.bluetooth_utils import ( # type: ignore[import] disable_le_scan, enable_le_scan, parse_le_advertising_events, raw_packet_to_str, toggle_device, ) class MQTTdata(TypedDict): """ Class to define the MQTT data to send Args: TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery % """ timestamp: str mac: str temperature: float humidity: int battery: int def send_to_mqtt(data: MQTTdata) -> None: """ Send data from LYWSD03MMC to MQTT Args: data (MQTTdata): [description] """ msgs = [ { 'topic': f'sensors/home/{data["mac"]}/temperature', 'payload': data['temperature'], }, { 'topic': f'sensors/home/{data["mac"]}/humidity', 'payload': data['humidity'], }, { 'topic': f'sensors/home/{data["mac"]}/battery', 'payload': data['battery'], }, ] publish.multiple(msgs, hostname=os.environ.get('MQTT_HOST', 'mqtt')) def le_advertise_packet_handler( # pylint: disable=unused-argument mac: str, adv_type: int, data: bytes, rssi: int, ) -> None: """ Handle new Xiaomi LYWSD03MMC BTLE advertise packet Args: mac (str): MAC address of the sensor adv_type (int): NOT USED data (bytes): data from sensor rssi (int): NOT USED """ data_str = raw_packet_to_str(data) # Check for ATC preamble if data_str[0:2] == '11' and data_str[6:10] == '1a18': temp = int(data_str[22:26], 16) / 10 hum = int(data_str[26:28], 16) batt = int(data_str[28:30], 16) mqtt_data: MQTTdata = { 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'mac': mac, 'temperature': temp, 'humidity': hum, 'battery': batt, } try: send_to_mqtt(mqtt_data) except gaierror as error: print(f'[ERROR] {error}') if __name__ == '__main__': def main() -> None: """ Main program """ # Use 0 for hci0 dev_id = 0 toggle_device(dev_id, True) try: sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member except: print(f'Cannot open bluetooth device {dev_id}') raise # Set filter to "True" to see only one packet per device enable_le_scan(sock, filter_duplicates=False) try: # Called on new LE packet parse_le_advertising_events( sock, handler=le_advertise_packet_handler, debug=False, ) # Scan until Ctrl-C except KeyboardInterrupt: disable_le_scan(sock) main()