186 lines
5.7 KiB
Go
186 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
|
|
"github.com/eclipse/paho.golang/autopaho"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"tinygo.org/x/bluetooth"
|
|
|
|
"github.com/rs/zerolog"
|
|
log "github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type MQTTData struct {
|
|
Mac string `json:"mac"`
|
|
Temperature float32 `json:"temperature"`
|
|
Humidity int `json:"humidity"`
|
|
Battery int `json:"battery"`
|
|
}
|
|
|
|
const clientID = "sh.wallace.scott.xiaomi2mqtt"
|
|
|
|
func sendMQTT(ctx context.Context, channel chan MQTTData) {
|
|
mqttServer, err := url.Parse(os.Getenv("MQTT_HOST"))
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Unable to parse MQTT_HOST environment variable")
|
|
}
|
|
|
|
var msgs int
|
|
|
|
opts := autopaho.ClientConfig{
|
|
ServerUrls: []*url.URL{mqttServer},
|
|
KeepAlive: 20, // Keepalive message should be sent every 20 seconds
|
|
// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
|
|
CleanStartOnInitialConnection: false,
|
|
// SessionExpiryInterval - Seconds that a session will survive after disconnection.
|
|
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
|
|
// the server will not queue messages while it is down. The specific setting will depend upon your needs
|
|
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
|
|
SessionExpiryInterval: 60,
|
|
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
|
|
log.Info().Msg("mqtt connection up")
|
|
// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
|
|
// the connection drops)
|
|
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
|
|
Subscriptions: []paho.SubscribeOptions{
|
|
{Topic: "tele/+/SENSOR", QoS: 1},
|
|
},
|
|
}); err != nil {
|
|
log.Warn().Err(err).Msg("failed to subscribe. This is likely to mean no messages will be received.")
|
|
}
|
|
log.Info().Msg("mqtt subscription made")
|
|
},
|
|
OnConnectError: func(err error) { log.Error().Err(err).Msg("error whilst attempting connection") },
|
|
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
|
|
ClientConfig: paho.ClientConfig{
|
|
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
|
|
ClientID: clientID,
|
|
// OnPublishReceived is a slice of functions that will be called when a message is received.
|
|
// You can write the function(s) yourself or use the supplied Router
|
|
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
|
|
func(pr paho.PublishReceived) (bool, error) {
|
|
msgs++
|
|
if (msgs % 1000) == 0 {
|
|
log.Info().Int("message count", msgs).Msg("Alive")
|
|
}
|
|
// log.Info().Str("topic", pr.Packet.Topic).Bytes("payload", pr.Packet.Payload).Bool("retain", pr.Packet.Retain).Msg("received message")
|
|
return true, nil
|
|
}},
|
|
OnClientError: func(err error) { log.Error().Err(err).Msg("client error") },
|
|
OnServerDisconnect: func(d *paho.Disconnect) {
|
|
if d.Properties != nil {
|
|
log.Warn().Str("reason", d.Properties.ReasonString).Msg("server requested disconnect")
|
|
} else {
|
|
log.Warn().Uint8("reason_code", d.ReasonCode).Msg("server requested disconnect")
|
|
}
|
|
},
|
|
},
|
|
}
|
|
client, err := autopaho.NewConnection(ctx, opts) // starts process; will reconnect until context cancelled
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Unable to connect to MQTT")
|
|
}
|
|
// Wait for the connection to come up
|
|
if err = client.AwaitConnection(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("MQTT connection failed")
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case msg, ok := <-channel:
|
|
if !ok {
|
|
return // Channel closed
|
|
}
|
|
topic := fmt.Sprintf("tele/%s/SENSOR", msg.Mac)
|
|
payload, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Unable to marshal payload")
|
|
continue
|
|
}
|
|
if _, err = client.Publish(ctx, &paho.Publish{
|
|
QoS: 1,
|
|
Topic: topic,
|
|
Payload: payload,
|
|
}); err != nil {
|
|
if ctx.Err() == nil {
|
|
log.Error().Err(err).Msg("Publish failed")
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
log.Warn().Msg("received shutdown signal")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseData(result bluetooth.ScanResult, channel chan MQTTData) {
|
|
if strings.HasPrefix(result.LocalName(), "ATC_") {
|
|
for _, s := range result.ServiceData() {
|
|
if len(s.Data) < 10 {
|
|
continue
|
|
}
|
|
uuid := s.UUID.Bytes()
|
|
if bytes.Equal(uuid[12:14], []byte{0x1a, 0x18}) {
|
|
if s.Data[6] != 0 {
|
|
// Not a device that sends temperature at this location
|
|
return
|
|
}
|
|
temp := float32(int(binary.BigEndian.Uint16(s.Data[6:]))) / 10
|
|
hum := int(s.Data[8])
|
|
batt := int(s.Data[9])
|
|
mqttMsg := MQTTData{
|
|
Mac: result.Address.String(),
|
|
Temperature: temp,
|
|
Humidity: hum,
|
|
Battery: batt,
|
|
}
|
|
channel <- mqttMsg
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func dataToMQTT(channel chan MQTTData) func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) {
|
|
return func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) {
|
|
parseData(result, channel)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
|
|
|
var adapter = bluetooth.DefaultAdapter
|
|
|
|
err := adapter.Enable()
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Unable to enable Bluetooth adapter")
|
|
}
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
channel := make(chan MQTTData, 100)
|
|
|
|
go sendMQTT(ctx, channel)
|
|
|
|
go func() {
|
|
log.Info().Msg("Scanning...")
|
|
err = adapter.Scan(dataToMQTT(channel))
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Scanning failed to initiate")
|
|
}
|
|
}()
|
|
|
|
<-ctx.Done()
|
|
log.Info().Msg("shutting down...")
|
|
}
|