рефакторинг, конфиг yaml

This commit is contained in:
Andrey
2021-01-22 10:27:02 +03:00
parent c3b9474d56
commit bb4ce882f5
12 changed files with 409 additions and 400 deletions

View File

@@ -1,21 +1,21 @@
import asyncio
import json
import logging
from collections import defaultdict
from datetime import datetime
from functools import wraps
from datetime import datetime, timedelta
import aiohttp
import typing
from bs4 import BeautifulSoup
import re
import json
from bs4 import BeautifulSoup
from homeassistant.components import mqtt
from homeassistant.const import DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_HUMIDITY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import TEMP, HUM
from .exceptions import CannotConnect
import re
TEMP_PATT = re.compile(r'temp:([01234567890\.]+)')
HUM_PATT = re.compile(r'hum:([01234567890\.]+)')
@@ -38,6 +38,7 @@ class MegaD:
def __init__(
self,
hass: HomeAssistant,
loop: asyncio.AbstractEventLoop,
host: str,
password: str,
mqtt: mqtt.MQTT,
@@ -50,29 +51,51 @@ class MegaD:
**kwargs,
):
"""Initialize."""
self.loop: asyncio.AbstractEventLoop = None
self.hass = hass
self.host = host
self.sec = password
self.mqtt = mqtt
self.id = id
self.lck = asyncio.Lock()
self.is_alive = asyncio.Condition()
self.cnd = asyncio.Condition()
self.online = True
self.entities: typing.List[Entity] = []
self.poll_interval = scan_interval
self.subscriptions = []
self.subs = None
self.lg: logging.Logger = lg.getChild(self.id)
self._scanned = {}
self.sensors = []
self.port_to_scan = port_to_scan
self.inverted = inverted or []
self.last_update = datetime.now()
self._callbacks: typing.DefaultDict[int, typing.List[typing.Callable[[dict], typing.Coroutine]]] = defaultdict(list)
self._loop = loop
self.values = {}
self.updater = DataUpdateCoordinator(
hass,
self.lg,
name="sensors",
update_method=self.poll,
update_interval=timedelta(seconds=self.poll_interval) if self.poll_interval else None,
)
if not mqtt_id:
_id = host.split(".")[-1]
self.mqtt_id = f"megad/{_id}"
else:
self.mqtt_id = mqtt_id
self._loop: asyncio.AbstractEventLoop = None
async def start(self):
self.loop = asyncio.get_event_loop()
self.subs = await self.mqtt.async_subscribe(
topic=f"{self.mqtt_id}/+",
msg_callback=self._process_msg,
qos=0,
)
async def stop(self):
self.subs()
for x in self._callbacks.values():
x.clear()
async def add_entity(self, ent):
async with self.lck:
@@ -80,55 +103,41 @@ class MegaD:
async def get_sensors(self):
self.lg.debug(self.sensors)
_ports = {x.port for x in self.sensors}
for x in _ports:
for x in self.sensors:
await self.get_port(x)
await asyncio.sleep(0.1)
@property
def is_online(self):
return (datetime.now() - self.last_update).total_seconds() < (self.poll_interval + 10)
def _warn_offline(self):
if self.online:
self.lg.warning('mega is offline')
self.hass.states.async_set(
f'mega.{self.id}',
'offline',
)
self.online = False
def _notify_online(self):
if not self.online:
self.hass.states.async_set(
f'mega.{self.id}',
'online',
)
self.online = True
async def poll(self):
"""
Send get port 0 every poll_interval. When answer is received, mega.<id> becomes online else mega.<id> becomes
offline
"""
self._loop = asyncio.get_event_loop()
while True:
if len(self.sensors) > 0:
await self.get_sensors()
else:
await self.get_port(self.port_to_scan)
await asyncio.sleep(1)
if (datetime.now() - self.last_update).total_seconds() > (self.poll_interval + 10):
await self.get_port(self.port_to_scan)
await asyncio.sleep(1)
if (datetime.now() - self.last_update).total_seconds() > (self.poll_interval + 10):
self.lg.warning('mega is offline')
self.hass.states.async_set(
f'mega.{self.id}',
'offline',
)
self.online = False
else:
self.hass.states.async_set(
f'mega.{self.id}',
'online',
)
self.online = True
for x in self.entities:
try:
await x.async_update_ha_state()
except RuntimeError:
pass
await asyncio.sleep(self.poll_interval - 1)
async def _async_notify(self):
async with self.is_alive:
self.is_alive.notify_all()
def _notify(self, *args):
asyncio.run_coroutine_threadsafe(self._async_notify(), self._loop)
self.lg.debug('poll')
if len(self.sensors) > 0:
await self.get_sensors()
else:
await self.get_port(self.port_to_scan)
return self.values
async def get_mqtt_id(self):
async with aiohttp.request(
@@ -159,85 +168,63 @@ class MegaD:
await self.send_command(cmd='s')
async def get_port(self, port):
"""
Опрашивает порт с помощью mqtt. Ждет ответ, возвращает ответ.
:param port:
:return:
"""
ftr = asyncio.get_event_loop().create_future()
def cb(msg):
self.last_update = datetime.now()
try:
if '"value":NA' in msg.payload.decode():
if not ftr.done():
ftr.set_result(None)
return
ret = json.loads(msg.payload).get('value')
if not ftr.done():
ftr.set_result(ret)
except Exception as exc:
ret = None
self.lg.exception(f'while parsing response from port {port}: {msg.payload}')
ftr.set_result(None)
self.lg.debug(
f'port: %s response: %s', port, ret
"""Запрос состояния порта. Блокируется пока не придет какое-нибудь сообщение от меги или таймаут"""
async with self.cnd:
await self.mqtt.async_publish(
topic=f'{self.mqtt_id}/cmd',
payload=f'get:{port}',
qos=2,
retain=False,
)
async with self.lck:
unsub = await self.mqtt.async_subscribe(
topic=f'{self.mqtt_id}/{port}',
msg_callback=cb,
qos=1,
)
try:
await self.mqtt.async_publish(
topic=f'{self.mqtt_id}/cmd',
payload=f'get:{port}',
qos=1,
retain=False,
)
return await asyncio.wait_for(ftr, timeout=2)
except asyncio.TimeoutError:
self.lg.warning(f'timeout on port {port}')
finally:
unsub()
await asyncio.wait_for(self.cnd.wait(), timeout=15)
async def get_all_ports(self):
for x in range(37):
asyncio.create_task(self.get_port(x))
await self.get_port(x)
async def reboot(self, save=True):
await self.save()
# await self.send_command(cmd=)
async def subscribe(self, port, callback):
@wraps(callback)
def wrapper(msg):
self.lg.debug(
'process incomming message: %s', msg
)
async def _notify(self, port, value):
async with self.cnd:
self.last_update = datetime.now()
return callback(msg)
self.values[port] = value
self.cnd.notify()
def _process_msg(self, msg):
try:
_, port = msg.topic.split('/')
except ValueError:
self.lg.warning('can not process %s', msg)
return
if port == 'cmd':
return
try:
port = int(port)
except:
self.lg.warning('can not process %s', msg)
return
self.lg.debug(
f'subscribe %s %s', port, wrapper
'process incomming message: %s', msg
)
subs = await self.mqtt.async_subscribe(
topic=f"{self.mqtt_id}/{port}",
msg_callback=wrapper,
qos=0,
)
self.subscriptions.append(subs)
value = None
try:
value = json.loads(msg.payload)
for cb in self._callbacks[port]:
cb(value)
except Exception as exc:
self.lg.warning(f'could not parse json ({msg.payload}): {exc}')
return
finally:
asyncio.run_coroutine_threadsafe(self._notify(port, value), self.loop)
def unsubscribe_all(self):
self.lg.info('unsubscribe')
for x in self.subscriptions:
self.lg.debug('unsubscribe %s', x)
x()
def subscribe(self, port, callback):
port = int(port)
self.lg.debug(
f'subscribe %s %s', port, callback
)
self._callbacks[port].append(callback)
async def authenticate(self) -> bool:
"""Test if we can authenticate with the host."""
@@ -302,11 +289,18 @@ class MegaD:
elif pty == "1" and m in ['0', '1']:
ret['light'][port].append({'dimmer': m == '1'})
elif pty == '3':
values = await self.get_port(port)
try:
await self.get_port(port)
values = self.values.get(port)
except asyncio.TimeoutError:
self.lg.warning(f'timout on port {port}')
continue
self.lg.debug(f'values: %s', values)
if values is None:
self.lg.warning(f'port {port} is of type sensor but did not respond, skipping it')
self.lg.warning(f'port {port} is of type sensor but response is None, skipping it')
continue
if isinstance(values, dict) and 'value' in values:
values = values['value']
if isinstance(values, str) and TEMP_PATT.search(values):
values = {TEMP: values}
elif not isinstance(values, dict):
@@ -315,10 +309,10 @@ class MegaD:
self.lg.debug(f'add sensor {key}')
ret['sensor'][port].append(dict(
key=key,
patt=PATTERNS.get(key),
unit_of_measurement=UNITS.get(key, UNITS[TEMP]),
# TODO: make other units, make options in config flow
device_class=CLASSES.get(key, CLASSES[TEMP]),
id_suffix=key,
))
return ret