151 lines
4.3 KiB
Python
Executable file
151 lines
4.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
"""MiJia GATT to MQTT"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import urllib.request
|
|
|
|
from dotenv import load_dotenv
|
|
import paho.mqtt.client as mqtt
|
|
from bluepy import btle
|
|
|
|
load_dotenv() # Cargamos las variables de entorno necesarias
|
|
|
|
PING_URL = os.getenv('PING_URL')
|
|
|
|
MQTT_SENSOR_NAME = os.getenv('MQTT_SENSOR_NAME')
|
|
MQTT_TOPIC_STATE = os.getenv('MQTT_TOPIC_STATE')
|
|
MQTT_TOPIC = os.getenv('MQTT_TOPIC')
|
|
|
|
MQTT_PUBLISH_DELAY = int(os.getenv('MQTT_PUBLISH_DELAY'))
|
|
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID')
|
|
|
|
MQTT_SERVER = os.getenv('MQTT_SERVER')
|
|
MQTT_PORT = int(os.getenv('MQTT_PORT'))
|
|
MQTT_USER = os.getenv('MQTT_USER')
|
|
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
|
|
|
|
MIJIA_BTLE_ADDRESS = os.getenv('MIJIA_BTLE_ADDRESS')
|
|
|
|
MIJIA_BATTERY_SERVICE_UUID = btle.UUID('180f')
|
|
MIJIA_BATTERY_CHARACTERISTIC_UUID = btle.UUID('2a19')
|
|
|
|
MIJIA_DATA_SERVICE_UUID = btle.UUID('226c0000-6476-4566-7562-66734470666d')
|
|
MIJIA_DATA_CHARACTERISTIC_UUID = btle.UUID('226caa55-6476-4566-7562-66734470666d')
|
|
MIJIA_DATA_CHARACTERISTIC_HANDLE = 0x0010
|
|
|
|
BTLE_SUBSCRIBE_VALUE = bytes([0x01, 0x00])
|
|
BTLE_UNSUBSCRIBE_VALUE = bytes([0x00, 0x00])
|
|
|
|
battery = None
|
|
temperature = None
|
|
humidity = None
|
|
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
client.publish(MQTT_TOPIC_STATE, 'online', 1, True)
|
|
|
|
|
|
class MyDelegate(btle.DefaultDelegate):
|
|
def __init__(self):
|
|
btle.DefaultDelegate.__init__(self)
|
|
|
|
def handleNotification(self, cHandle, data):
|
|
fetch_sensor_data(bytearray(data).decode('utf-8'))
|
|
|
|
|
|
def main():
|
|
mqttc = mqtt.Client(MQTT_CLIENT_ID)
|
|
mqttc.username_pw_set(MQTT_USER, MQTT_PASSWORD)
|
|
mqttc.will_set(MQTT_TOPIC_STATE, 'offline', 1, True)
|
|
mqttc.on_connect = on_connect
|
|
|
|
mqttc.connect(MQTT_SERVER, MQTT_PORT, 60)
|
|
mqttc.loop_start()
|
|
|
|
last_msg_time = time.time()
|
|
|
|
while True:
|
|
try:
|
|
print('Connecting to ' + MIJIA_BTLE_ADDRESS)
|
|
dev = btle.Peripheral(MIJIA_BTLE_ADDRESS)
|
|
print('Set delegate')
|
|
dev.setDelegate(MyDelegate())
|
|
|
|
# Get battery level
|
|
if battery is None:
|
|
fetch_battery_level(dev)
|
|
print('Battery level: ' + str(battery))
|
|
|
|
# Subscribe to data characteristic
|
|
if temperature is None or humidity is None:
|
|
dev.writeCharacteristic(MIJIA_DATA_CHARACTERISTIC_HANDLE, BTLE_SUBSCRIBE_VALUE, True)
|
|
while True:
|
|
if dev.waitForNotifications(1.0):
|
|
print('Temperature: ' + temperature)
|
|
print('Humidity: ' + humidity)
|
|
dev.writeCharacteristic(MIJIA_DATA_CHARACTERISTIC_HANDLE, BTLE_UNSUBSCRIBE_VALUE, True)
|
|
dev.disconnect()
|
|
break
|
|
|
|
if battery is not None and temperature is not None and humidity is not None:
|
|
delay_gap = time.time() - last_msg_time
|
|
if delay_gap < MQTT_PUBLISH_DELAY:
|
|
time.sleep(MQTT_PUBLISH_DELAY - delay_gap)
|
|
|
|
publish_sensor_data(mqttc)
|
|
last_msg_time = time.time()
|
|
reset_variables()
|
|
if PING_URL:
|
|
urllib.request.urlopen(PING_URL).read()
|
|
break
|
|
|
|
except (btle.BTLEDisconnectError, IOError):
|
|
print("Disconnected :(")
|
|
|
|
def reset_variables():
|
|
global battery
|
|
global temperature
|
|
global humidity
|
|
|
|
battery = None
|
|
temperature = None
|
|
humidity = None
|
|
|
|
|
|
def fetch_battery_level(dev):
|
|
global battery
|
|
|
|
battery_service = dev.getServiceByUUID(MIJIA_BATTERY_SERVICE_UUID)
|
|
battery_characteristic = battery_service.getCharacteristics(MIJIA_BATTERY_CHARACTERISTIC_UUID)[0]
|
|
battery = ord(battery_characteristic.read())
|
|
|
|
|
|
def fetch_sensor_data(temp_hum):
|
|
global temperature
|
|
global humidity
|
|
|
|
pattern = re.compile('T=([\d.-]+) H=([\d.-]+)')
|
|
match = re.match(pattern, temp_hum)
|
|
if match:
|
|
temperature = match.group(1)
|
|
humidity = match.group(2)
|
|
|
|
|
|
def publish_sensor_data(mqttc):
|
|
payload = {
|
|
"id": MQTT_SENSOR_NAME,
|
|
"battery": battery,
|
|
"temperature": float(temperature),
|
|
"humidity": float(humidity)
|
|
}
|
|
mqttc.publish(MQTT_TOPIC, json.dumps(payload), 1, True)
|
|
time.sleep(MQTT_PUBLISH_DELAY)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print('Starting MiJia GATT client')
|
|
main()
|