From 876a1f0cc83bca9549487b2ce78ab1194c5921be Mon Sep 17 00:00:00 2001 From: Andrey Date: Sun, 24 Jan 2021 11:17:09 +0300 Subject: [PATCH] get port one more fix --- custom_components/mega/entities.py | 3 ++- custom_components/mega/hub.py | 36 +++++++++++++++++------------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/custom_components/mega/entities.py b/custom_components/mega/entities.py index e8b752c..b5879c6 100644 --- a/custom_components/mega/entities.py +++ b/custom_components/mega/entities.py @@ -1,4 +1,5 @@ import logging +import asyncio from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_NAME from homeassistant.core import State @@ -113,7 +114,7 @@ class MegaPushEntity(BaseMegaEntity): async def async_added_to_hass(self) -> None: await super().async_added_to_hass() - self.hass.async_create_task(self.mega.get_port(self.port)) + asyncio.create_task(self.mega.get_port(self.port)) class MegaOutPort(MegaPushEntity): diff --git a/custom_components/mega/hub.py b/custom_components/mega/hub.py index 5ac7247..c945914 100644 --- a/custom_components/mega/hub.py +++ b/custom_components/mega/hub.py @@ -58,6 +58,7 @@ class MegaD: self.mqtt = mqtt self.id = id self.lck = asyncio.Lock() + self._notif_lck = asyncio.Lock() self.cnd = asyncio.Condition() self.online = True self.entities: typing.List[Entity] = [] @@ -79,6 +80,7 @@ class MegaD: update_method=self.poll, update_interval=timedelta(seconds=self.poll_interval) if self.poll_interval else None, ) + self.notifiers = defaultdict(asyncio.Condition) if not mqtt_id: _id = host.split(".")[-1] self.mqtt_id = f"megad/{_id}" @@ -170,15 +172,21 @@ class MegaD: async def get_port(self, port): """Запрос состояния порта. Блокируется пока не придет какое-нибудь сообщение от меги или таймаут""" - async with self.cnd: - await self.mqtt.async_publish( - topic=f'{self.mqtt_id}/cmd', - payload=f'get:{port}', - qos=2, - retain=False, - ) - await asyncio.wait_for(self.cnd.wait(), timeout=15) - await asyncio.sleep(0.05) + self.lg.debug(f'get port %s', port) + async with self._notif_lck: + async with self.notifiers[port]: + cnd = self.notifiers[port] + await self.mqtt.async_publish( + topic=f'{self.mqtt_id}/cmd', + payload=f'get:{port}', + qos=2, + retain=False, + ) + try: + await asyncio.wait_for(cnd.wait(), timeout=10) + return self.values[port] + except asyncio.TimeoutError: + self.lg.error(f'timeout when getting port {port}') async def get_all_ports(self): for x in range(37): @@ -188,11 +196,10 @@ class MegaD: await self.save() async def _notify(self, port, value): - async with self.cnd: - self.last_update = datetime.now() + async with self.notifiers[port]: + cnd = self.notifiers[port] self.values[port] = value - self.last_port = port - self.cnd.notify_all() + cnd.notify_all() def _process_msg(self, msg): try: @@ -294,8 +301,7 @@ class MegaD: ret['light'][port].append({'dimmer': m == '1'}) elif pty == '3': try: - await self.get_port(port) - values = self.values.get(port) + values = await self.get_port(port) except asyncio.TimeoutError: self.lg.warning(f'timout on port {port}') continue