mijia-sensors/main.py

153 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""MiJia GATT to MQTT"""
import json
import os
import re
import time
import urllib.request
from dotenv import load_dotenv
import paho.mqtt.client as mqtt
from bluepy import btle
load_dotenv() # Cargamos las variables de entorno necesarias
PING_URL = os.getenv('PING_URL')
MQTT_SENSOR_NAME = os.getenv('MQTT_SENSOR_NAME')
MQTT_TOPIC_STATE = os.getenv('MQTT_TOPIC_STATE')
MQTT_TOPIC = os.getenv('MQTT_TOPIC')
MQTT_PUBLISH_DELAY = int(os.getenv('MQTT_PUBLISH_DELAY'))
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID')
MQTT_SERVER = os.getenv('MQTT_SERVER')
MQTT_PORT = int(os.getenv('MQTT_PORT'))
MQTT_USER = os.getenv('MQTT_USER')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
MIJIA_BTLE_ADDRESS = os.getenv('MIJIA_BTLE_ADDRESS')
MIJIA_BATTERY_SERVICE_UUID = btle.UUID('180f')
MIJIA_BATTERY_CHARACTERISTIC_UUID = btle.UUID('2a19')
MIJIA_DATA_SERVICE_UUID = btle.UUID('226c0000-6476-4566-7562-66734470666d')
MIJIA_DATA_CHARACTERISTIC_UUID = btle.UUID('226caa55-6476-4566-7562-66734470666d')
MIJIA_DATA_CHARACTERISTIC_HANDLE = 0x0010
BTLE_SUBSCRIBE_VALUE = bytes([0x01, 0x00])
BTLE_UNSUBSCRIBE_VALUE = bytes([0x00, 0x00])
battery = None
temperature = None
humidity = None
def on_connect(client, userdata, flags, rc):
client.publish(MQTT_TOPIC_STATE, 'online', 1, True)
class MyDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
fetch_sensor_data(bytearray(data).decode('utf-8'))
def main():
mqttc = mqtt.Client(MQTT_CLIENT_ID)
mqttc.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqttc.will_set(MQTT_TOPIC_STATE, 'offline', 1, True)
mqttc.on_connect = on_connect
mqttc.connect(MQTT_SERVER, MQTT_PORT, 60)
mqttc.loop_start()
last_msg_time = time.time()
while True:
try:
print('Connecting to ' + MIJIA_BTLE_ADDRESS)
dev = btle.Peripheral(MIJIA_BTLE_ADDRESS)
print('Set delegate')
dev.setDelegate(MyDelegate())
# Get battery level
if battery is None:
fetch_battery_level(dev)
print('Battery level: ' + str(battery))
# Subscribe to data characteristic
if temperature is None or humidity is None:
dev.writeCharacteristic(MIJIA_DATA_CHARACTERISTIC_HANDLE, BTLE_SUBSCRIBE_VALUE, True)
while True:
if dev.waitForNotifications(1.0):
print('Temperature: ' + temperature)
print('Humidity: ' + humidity)
dev.writeCharacteristic(MIJIA_DATA_CHARACTERISTIC_HANDLE, BTLE_UNSUBSCRIBE_VALUE, True)
dev.disconnect()
break
if battery is not None and temperature is not None and humidity is not None:
delay_gap = time.time() - last_msg_time
if delay_gap < MQTT_PUBLISH_DELAY:
time.sleep(MQTT_PUBLISH_DELAY - delay_gap)
publish_sensor_data(mqttc)
last_msg_time = time.time()
reset_variables()
if PING_URL:
urllib.request.urlopen(PING_URL).read()
print("Done.")
return 0
except (btle.BTLEDisconnectError, IOError):
print("Disconnected :(")
def reset_variables():
global battery
global temperature
global humidity
battery = None
temperature = None
humidity = None
def fetch_battery_level(dev):
global battery
battery_service = dev.getServiceByUUID(MIJIA_BATTERY_SERVICE_UUID)
battery_characteristic = battery_service.getCharacteristics(MIJIA_BATTERY_CHARACTERISTIC_UUID)[0]
battery = ord(battery_characteristic.read())
def fetch_sensor_data(temp_hum):
global temperature
global humidity
pattern = re.compile('T=([\d.-]+) H=([\d.-]+)')
match = re.match(pattern, temp_hum)
if match:
temperature = match.group(1)
humidity = match.group(2)
def publish_sensor_data(mqttc):
payload = {
"id": MQTT_SENSOR_NAME,
"battery": battery,
"temperature": float(temperature),
"humidity": float(humidity)
}
mqttc.publish(MQTT_TOPIC, json.dumps(payload), 1, True)
time.sleep(MQTT_PUBLISH_DELAY)
if __name__ == '__main__':
print('Starting MiJia GATT client')
main()