Make more compatible with Tasmota's output

This commit is contained in:
Scott Wallace 2022-12-10 15:32:16 +00:00
parent 35a900ed78
commit fe8c948676
Signed by: scott
GPG key ID: AA742FDC5AFE2A72

View file

@ -2,7 +2,6 @@
Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT
""" """
import os import os
from datetime import datetime
from socket import gaierror from socket import gaierror
from typing import TypedDict from typing import TypedDict
@ -26,7 +25,6 @@ class MQTTdata(TypedDict):
TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery % TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery %
""" """
timestamp: str
mac: str mac: str
temperature: float temperature: float
humidity: int humidity: int
@ -40,28 +38,34 @@ def send_to_mqtt(data: MQTTdata) -> None:
Args: Args:
data (MQTTdata): [description] data (MQTTdata): [description]
""" """
msgs = [ msgs = [
{ {
'topic': f'sensors/home/{data["mac"]}/temperature', "topic": f"sensors/home/{data['mac']}/temperature",
'payload': data['temperature'], "payload": data["temperature"],
}, },
{ {
'topic': f'sensors/home/{data["mac"]}/humidity', "topic": f"sensors/home/{data['mac']}/humidity",
'payload': data['humidity'], "payload": data["humidity"],
}, },
{ {
'topic': f'sensors/home/{data["mac"]}/battery', "topic": f"sensors/home/{data['mac']}/battery",
'payload': data['battery'], "payload": data["battery"],
},
{
"topic": "sensors/home/{data['mac']}/tele/SENSOR",
"payload": data,
}, },
] ]
publish.multiple(msgs, hostname=os.environ.get('MQTT_HOST', 'mqtt'))
publish.multiple(msgs, hostname=os.environ.get("MQTT_HOST", "mqtt"))
def le_advertise_packet_handler( # pylint: disable=unused-argument def le_advertise_packet_handler( # pylint: disable=unused-argument
mac: str, mac: str,
adv_type: int, adv_type: int,
data: bytes, data: bytes,
rssi: int, rssi: int, # pylint: disable=unused-argument
) -> None: ) -> None:
""" """
Handle new Xiaomi LYWSD03MMC BTLE advertise packet Handle new Xiaomi LYWSD03MMC BTLE advertise packet
@ -75,24 +79,23 @@ def le_advertise_packet_handler( # pylint: disable=unused-argument
data_str = raw_packet_to_str(data) data_str = raw_packet_to_str(data)
# Check for ATC preamble # Check for ATC preamble
if data_str[0:2] == '11' and data_str[6:10] == '1a18': if data_str[0:2] == "11" and data_str[6:10] == "1a18":
temp = int(data_str[22:26], 16) / 10 temp = int(data_str[22:26], 16) / 10
hum = int(data_str[26:28], 16) hum = int(data_str[26:28], 16)
batt = int(data_str[28:30], 16) batt = int(data_str[28:30], 16)
mqtt_data: MQTTdata = { mqtt_data: MQTTdata = {
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "mac": mac,
'mac': mac, "temperature": temp,
'temperature': temp, "humidity": hum,
'humidity': hum, "battery": batt,
'battery': batt,
} }
try: try:
send_to_mqtt(mqtt_data) send_to_mqtt(mqtt_data)
except gaierror as error: except gaierror as error:
print(f'[ERROR] {error}') print(f"[ERROR] {error}")
if __name__ == '__main__': if __name__ == "__main__":
def main() -> None: def main() -> None:
""" """
@ -106,7 +109,7 @@ if __name__ == '__main__':
try: try:
sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member
except: except:
print(f'Cannot open bluetooth device {dev_id}') print(f"Cannot open bluetooth device {dev_id}")
raise raise
# Set filter to "True" to see only one packet per device # Set filter to "True" to see only one packet per device
@ -123,5 +126,4 @@ if __name__ == '__main__':
except KeyboardInterrupt: except KeyboardInterrupt:
disable_le_scan(sock) disable_le_scan(sock)
main() main()