get port one more fix

This commit is contained in:
Andrey
2021-01-24 11:17:09 +03:00
parent 18d0aee391
commit 876a1f0cc8
2 changed files with 23 additions and 16 deletions

View File

@@ -1,4 +1,5 @@
import logging import logging
import asyncio
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME from homeassistant.const import CONF_NAME
from homeassistant.core import State from homeassistant.core import State
@@ -113,7 +114,7 @@ class MegaPushEntity(BaseMegaEntity):
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
await super().async_added_to_hass() await super().async_added_to_hass()
self.hass.async_create_task(self.mega.get_port(self.port)) asyncio.create_task(self.mega.get_port(self.port))
class MegaOutPort(MegaPushEntity): class MegaOutPort(MegaPushEntity):

View File

@@ -58,6 +58,7 @@ class MegaD:
self.mqtt = mqtt self.mqtt = mqtt
self.id = id self.id = id
self.lck = asyncio.Lock() self.lck = asyncio.Lock()
self._notif_lck = asyncio.Lock()
self.cnd = asyncio.Condition() self.cnd = asyncio.Condition()
self.online = True self.online = True
self.entities: typing.List[Entity] = [] self.entities: typing.List[Entity] = []
@@ -79,6 +80,7 @@ class MegaD:
update_method=self.poll, update_method=self.poll,
update_interval=timedelta(seconds=self.poll_interval) if self.poll_interval else None, update_interval=timedelta(seconds=self.poll_interval) if self.poll_interval else None,
) )
self.notifiers = defaultdict(asyncio.Condition)
if not mqtt_id: if not mqtt_id:
_id = host.split(".")[-1] _id = host.split(".")[-1]
self.mqtt_id = f"megad/{_id}" self.mqtt_id = f"megad/{_id}"
@@ -170,15 +172,21 @@ class MegaD:
async def get_port(self, port): async def get_port(self, port):
"""Запрос состояния порта. Блокируется пока не придет какое-нибудь сообщение от меги или таймаут""" """Запрос состояния порта. Блокируется пока не придет какое-нибудь сообщение от меги или таймаут"""
async with self.cnd: self.lg.debug(f'get port %s', port)
await self.mqtt.async_publish( async with self._notif_lck:
topic=f'{self.mqtt_id}/cmd', async with self.notifiers[port]:
payload=f'get:{port}', cnd = self.notifiers[port]
qos=2, await self.mqtt.async_publish(
retain=False, topic=f'{self.mqtt_id}/cmd',
) payload=f'get:{port}',
await asyncio.wait_for(self.cnd.wait(), timeout=15) qos=2,
await asyncio.sleep(0.05) retain=False,
)
try:
await asyncio.wait_for(cnd.wait(), timeout=10)
return self.values[port]
except asyncio.TimeoutError:
self.lg.error(f'timeout when getting port {port}')
async def get_all_ports(self): async def get_all_ports(self):
for x in range(37): for x in range(37):
@@ -188,11 +196,10 @@ class MegaD:
await self.save() await self.save()
async def _notify(self, port, value): async def _notify(self, port, value):
async with self.cnd: async with self.notifiers[port]:
self.last_update = datetime.now() cnd = self.notifiers[port]
self.values[port] = value self.values[port] = value
self.last_port = port cnd.notify_all()
self.cnd.notify_all()
def _process_msg(self, msg): def _process_msg(self, msg):
try: try:
@@ -294,8 +301,7 @@ class MegaD:
ret['light'][port].append({'dimmer': m == '1'}) ret['light'][port].append({'dimmer': m == '1'})
elif pty == '3': elif pty == '3':
try: try:
await self.get_port(port) values = await self.get_port(port)
values = self.values.get(port)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.lg.warning(f'timout on port {port}') self.lg.warning(f'timout on port {port}')
continue continue