""" Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT """ import os from datetime import datetime import bluetooth._bluetooth as bluez import paho.mqtt.publish as publish from bluetooth_utils.bluetooth_utils import ( disable_le_scan, enable_le_scan, parse_le_advertising_events, raw_packet_to_str, toggle_device, ) def send_to_mqtt(data: dict): """ Send data from LYWSD03MMC to MQTT """ msgs = [ { 'topic': f'sensors/home/{data["mac"]}/temperature', 'payload': data['temperature'], }, { 'topic': f'sensors/home/{data["mac"]}/humidity', 'payload': data['humidity'], }, { 'topic': f'sensors/home/{data["mac"]}/battery', 'payload': data['battery'], }, ] publish.multiple(msgs, hostname=os.environ.get('MQTT_HOST', 'mqtt')) def le_advertise_packet_handler(mac: str, _, data: bytes, __): """ Handle new Xiaomi LYWSD03MMC BTLE advertise packet """ data_str = raw_packet_to_str(data) # Check for ATC preamble if data_str[0:2] == '11' and data_str[6:10] == '1a18': temp = int(data_str[22:26], 16) / 10 hum = int(data_str[26:28], 16) batt = int(data_str[28:30], 16) mqtt_data = { 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'mac': mac, 'temperature': temp, 'humidity': hum, 'battery': batt, } send_to_mqtt(mqtt_data) if __name__ == '__main__': def main(): """ Main program """ # Use 0 for hci0 dev_id = 0 toggle_device(dev_id, True) try: sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member except: print(f'Cannot open bluetooth device {dev_id}') raise # Set filter to "True" to see only one packet per device enable_le_scan(sock, filter_duplicates=False) try: # Called on new LE packet parse_le_advertising_events( sock, handler=le_advertise_packet_handler, debug=False ) # Scan until Ctrl-C except KeyboardInterrupt: disable_le_scan(sock) main()