xiaomi2mqtt/main.go
2026-03-23 09:30:43 +00:00

186 lines
5.7 KiB
Go

package main
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"tinygo.org/x/bluetooth"
"github.com/rs/zerolog"
log "github.com/rs/zerolog/log"
)
type MQTTData struct {
Mac string `json:"mac"`
Temperature float32 `json:"temperature"`
Humidity int `json:"humidity"`
Battery int `json:"battery"`
}
const clientID = "sh.wallace.scott.xiaomi2mqtt"
func sendMQTT(ctx context.Context, channel chan MQTTData) {
mqttServer, err := url.Parse(os.Getenv("MQTT_HOST"))
if err != nil {
log.Fatal().Err(err).Msg("Unable to parse MQTT_HOST environment variable")
}
var msgs int
opts := autopaho.ClientConfig{
ServerUrls: []*url.URL{mqttServer},
KeepAlive: 20, // Keepalive message should be sent every 20 seconds
// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
CleanStartOnInitialConnection: false,
// SessionExpiryInterval - Seconds that a session will survive after disconnection.
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
// the server will not queue messages while it is down. The specific setting will depend upon your needs
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
log.Info().Msg("mqtt connection up")
// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
// the connection drops)
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: "tele/+/SENSOR", QoS: 1},
},
}); err != nil {
log.Warn().Err(err).Msg("failed to subscribe. This is likely to mean no messages will be received.")
}
log.Info().Msg("mqtt subscription made")
},
OnConnectError: func(err error) { log.Error().Err(err).Msg("error whilst attempting connection") },
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// OnPublishReceived is a slice of functions that will be called when a message is received.
// You can write the function(s) yourself or use the supplied Router
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
msgs++
if (msgs % 1000) == 0 {
log.Info().Int("message count", msgs).Msg("Alive")
}
// log.Info().Str("topic", pr.Packet.Topic).Bytes("payload", pr.Packet.Payload).Bool("retain", pr.Packet.Retain).Msg("received message")
return true, nil
}},
OnClientError: func(err error) { log.Error().Err(err).Msg("client error") },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
log.Warn().Str("reason", d.Properties.ReasonString).Msg("server requested disconnect")
} else {
log.Warn().Uint8("reason_code", d.ReasonCode).Msg("server requested disconnect")
}
},
},
}
client, err := autopaho.NewConnection(ctx, opts) // starts process; will reconnect until context cancelled
if err != nil {
log.Fatal().Err(err).Msg("Unable to connect to MQTT")
}
// Wait for the connection to come up
if err = client.AwaitConnection(ctx); err != nil {
log.Fatal().Err(err).Msg("MQTT connection failed")
}
for {
select {
case msg, ok := <-channel:
if !ok {
return // Channel closed
}
topic := fmt.Sprintf("tele/%s/SENSOR", msg.Mac)
payload, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("Unable to marshal payload")
continue
}
if _, err = client.Publish(ctx, &paho.Publish{
QoS: 1,
Topic: topic,
Payload: payload,
}); err != nil {
if ctx.Err() == nil {
log.Error().Err(err).Msg("Publish failed")
}
}
case <-ctx.Done():
log.Warn().Msg("received shutdown signal")
return
}
}
}
func parseData(result bluetooth.ScanResult, channel chan MQTTData) {
if strings.HasPrefix(result.LocalName(), "ATC_") {
for _, s := range result.ServiceData() {
if len(s.Data) < 10 {
continue
}
uuid := s.UUID.Bytes()
if bytes.Equal(uuid[12:14], []byte{0x1a, 0x18}) {
if s.Data[6] != 0 {
// Not a device that sends temperature at this location
return
}
temp := float32(int(binary.BigEndian.Uint16(s.Data[6:]))) / 10
hum := int(s.Data[8])
batt := int(s.Data[9])
mqttMsg := MQTTData{
Mac: result.Address.String(),
Temperature: temp,
Humidity: hum,
Battery: batt,
}
channel <- mqttMsg
}
}
}
}
func dataToMQTT(channel chan MQTTData) func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) {
return func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) {
parseData(result, channel)
}
}
func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
var adapter = bluetooth.DefaultAdapter
err := adapter.Enable()
if err != nil {
log.Fatal().Err(err).Msg("Unable to enable Bluetooth adapter")
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
channel := make(chan MQTTData, 100)
go sendMQTT(ctx, channel)
go func() {
log.Info().Msg("Scanning...")
err = adapter.Scan(dataToMQTT(channel))
if err != nil {
log.Error().Err(err).Msg("Scanning failed to initiate")
}
}()
<-ctx.Done()
log.Info().Msg("shutting down...")
}