xiaomi2mqtt/get_data.py

119 lines
2.7 KiB
Python

"""
Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT
"""
import json
import os
from socket import gaierror
from typing import TypedDict
import bluetooth._bluetooth as bluez # type: ignore[import]
from paho.mqtt import publish # type: ignore[import]
from bluetooth_utils.bluetooth_utils import ( # type: ignore[import]
disable_le_scan,
enable_le_scan,
parse_le_advertising_events,
raw_packet_to_str,
toggle_device,
)
class MQTTdata(TypedDict):
"""
Class to define the MQTT data to send
Args:
TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery %
"""
mac: str
temperature: float
humidity: int
battery: int
def send_to_mqtt(data: MQTTdata) -> None:
"""
Send data from LYWSD03MMC to MQTT
Args:
data (MQTTdata): [description]
"""
msgs = [
{
"topic": f"tele/{data['mac']}/SENSOR",
"payload": json.dumps(data),
},
]
publish.multiple(msgs, hostname=os.environ.get("MQTT_HOST", "mqtt"))
def le_advertise_packet_handler( # pylint: disable=unused-argument
mac: str,
adv_type: int,
data: bytes,
rssi: int, # pylint: disable=unused-argument
) -> None:
"""
Handle new Xiaomi LYWSD03MMC BTLE advertise packet
Args:
mac (str): MAC address of the sensor
adv_type (int): NOT USED
data (bytes): data from sensor
rssi (int): NOT USED
"""
data_str = raw_packet_to_str(data)
# Check for ATC preamble
if data_str[0:2] == "11" and data_str[6:10] == "1a18":
temp = int(data_str[22:26], 16) / 10
hum = int(data_str[26:28], 16)
batt = int(data_str[28:30], 16)
mqtt_data: MQTTdata = {
"mac": mac,
"temperature": temp,
"humidity": hum,
"battery": batt,
}
try:
send_to_mqtt(mqtt_data)
except gaierror as error:
print(f"[ERROR] {error}")
if __name__ == "__main__":
def main() -> None:
"""
Main program
"""
# Use 0 for hci0
dev_id = 0
toggle_device(dev_id, True)
try:
sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member
except:
print(f"Cannot open bluetooth device {dev_id}")
raise
# Set filter to "True" to see only one packet per device
enable_le_scan(sock, filter_duplicates=False)
try:
# Called on new LE packet
parse_le_advertising_events(
sock,
handler=le_advertise_packet_handler,
debug=False,
)
# Scan until Ctrl-C
except KeyboardInterrupt:
disable_le_scan(sock)
main()