2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
Read Xiaomi LYWSD03MMC advertised packets and send them to MQTT
|
|
|
|
"""
|
|
|
|
import os
|
|
|
|
from datetime import datetime
|
2021-06-17 08:46:26 +01:00
|
|
|
from socket import gaierror
|
2021-12-11 16:13:22 +00:00
|
|
|
from typing import TypedDict
|
2021-06-16 07:00:43 +01:00
|
|
|
|
2021-12-11 16:13:22 +00:00
|
|
|
import bluetooth._bluetooth as bluez # type: ignore[import]
|
2022-08-09 09:52:36 +01:00
|
|
|
from paho.mqtt import publish # type: ignore[import]
|
2021-06-16 07:00:43 +01:00
|
|
|
|
2021-12-11 16:13:22 +00:00
|
|
|
from bluetooth_utils.bluetooth_utils import ( # type: ignore[import]
|
2021-06-16 16:23:24 +01:00
|
|
|
disable_le_scan,
|
|
|
|
enable_le_scan,
|
|
|
|
parse_le_advertising_events,
|
|
|
|
raw_packet_to_str,
|
|
|
|
toggle_device,
|
|
|
|
)
|
2021-06-16 07:00:43 +01:00
|
|
|
|
|
|
|
|
2021-12-11 16:13:22 +00:00
|
|
|
class MQTTdata(TypedDict):
|
|
|
|
"""
|
|
|
|
Class to define the MQTT data to send
|
|
|
|
|
|
|
|
Args:
|
|
|
|
TypedDict ([type]): timestamp, MAC addr, temp, humidity %, battery %
|
|
|
|
"""
|
|
|
|
|
|
|
|
timestamp: str
|
|
|
|
mac: str
|
|
|
|
temperature: float
|
|
|
|
humidity: int
|
|
|
|
battery: int
|
|
|
|
|
|
|
|
|
|
|
|
def send_to_mqtt(data: MQTTdata) -> None:
|
2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
Send data from LYWSD03MMC to MQTT
|
2021-12-11 16:13:22 +00:00
|
|
|
|
|
|
|
Args:
|
|
|
|
data (MQTTdata): [description]
|
2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
msgs = [
|
|
|
|
{
|
|
|
|
'topic': f'sensors/home/{data["mac"]}/temperature',
|
|
|
|
'payload': data['temperature'],
|
|
|
|
},
|
|
|
|
{
|
|
|
|
'topic': f'sensors/home/{data["mac"]}/humidity',
|
|
|
|
'payload': data['humidity'],
|
|
|
|
},
|
|
|
|
{
|
|
|
|
'topic': f'sensors/home/{data["mac"]}/battery',
|
|
|
|
'payload': data['battery'],
|
|
|
|
},
|
|
|
|
]
|
|
|
|
publish.multiple(msgs, hostname=os.environ.get('MQTT_HOST', 'mqtt'))
|
|
|
|
|
|
|
|
|
2021-12-11 16:13:22 +00:00
|
|
|
def le_advertise_packet_handler( # pylint: disable=unused-argument
|
2021-06-17 08:46:26 +01:00
|
|
|
mac: str,
|
2021-12-11 16:13:22 +00:00
|
|
|
adv_type: int,
|
2021-06-17 08:46:26 +01:00
|
|
|
data: bytes,
|
2021-12-11 16:13:22 +00:00
|
|
|
rssi: int,
|
|
|
|
) -> None:
|
2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
Handle new Xiaomi LYWSD03MMC BTLE advertise packet
|
2021-12-11 16:13:22 +00:00
|
|
|
|
|
|
|
Args:
|
|
|
|
mac (str): MAC address of the sensor
|
|
|
|
adv_type (int): NOT USED
|
|
|
|
data (bytes): data from sensor
|
|
|
|
rssi (int): NOT USED
|
2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
data_str = raw_packet_to_str(data)
|
2021-12-11 16:13:22 +00:00
|
|
|
|
2021-06-16 07:00:43 +01:00
|
|
|
# Check for ATC preamble
|
|
|
|
if data_str[0:2] == '11' and data_str[6:10] == '1a18':
|
|
|
|
temp = int(data_str[22:26], 16) / 10
|
|
|
|
hum = int(data_str[26:28], 16)
|
|
|
|
batt = int(data_str[28:30], 16)
|
2021-12-11 16:13:22 +00:00
|
|
|
mqtt_data: MQTTdata = {
|
2021-06-16 16:23:24 +01:00
|
|
|
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
'mac': mac,
|
|
|
|
'temperature': temp,
|
|
|
|
'humidity': hum,
|
|
|
|
'battery': batt,
|
|
|
|
}
|
2021-06-17 08:46:26 +01:00
|
|
|
try:
|
|
|
|
send_to_mqtt(mqtt_data)
|
|
|
|
except gaierror as error:
|
|
|
|
print(f'[ERROR] {error}')
|
2021-06-16 07:00:43 +01:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
2021-12-11 16:13:22 +00:00
|
|
|
def main() -> None:
|
2021-06-16 07:00:43 +01:00
|
|
|
"""
|
|
|
|
Main program
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Use 0 for hci0
|
|
|
|
dev_id = 0
|
|
|
|
toggle_device(dev_id, True)
|
|
|
|
|
|
|
|
try:
|
|
|
|
sock = bluez.hci_open_dev(dev_id) # pylint: disable=c-extension-no-member
|
|
|
|
except:
|
|
|
|
print(f'Cannot open bluetooth device {dev_id}')
|
|
|
|
raise
|
|
|
|
|
|
|
|
# Set filter to "True" to see only one packet per device
|
|
|
|
enable_le_scan(sock, filter_duplicates=False)
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Called on new LE packet
|
|
|
|
parse_le_advertising_events(
|
2021-06-16 19:55:50 +01:00
|
|
|
sock,
|
|
|
|
handler=le_advertise_packet_handler,
|
|
|
|
debug=False,
|
2021-06-16 07:00:43 +01:00
|
|
|
)
|
|
|
|
# Scan until Ctrl-C
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
disable_le_scan(sock)
|
|
|
|
|
|
|
|
|
|
|
|
main()
|