add http support

This commit is contained in:
Andrey
2021-01-25 12:25:11 +03:00
parent 876a1f0cc8
commit 8bbb4ab271
13 changed files with 361 additions and 119 deletions

View File

@@ -14,8 +14,9 @@ from homeassistant.const import DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_HUMIDITY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import TEMP, HUM
from .exceptions import CannotConnect
from .const import TEMP, HUM, PATT_SPLIT, DOMAIN, CONF_HTTP
from .exceptions import CannotConnect, MqttNotConfigured
from .http import MegaView
TEMP_PATT = re.compile(r'temp:([01234567890\.]+)')
HUM_PATT = re.compile(r'hum:([01234567890\.]+)')
@@ -32,6 +33,10 @@ CLASSES = {
HUM: DEVICE_CLASS_HUMIDITY
}
class NoPort(Exception):
pass
class MegaD:
"""MegaD Hub"""
@@ -44,13 +49,24 @@ class MegaD:
mqtt: mqtt.MQTT,
lg: logging.Logger,
id: str,
mqtt_inputs: bool = True,
mqtt_id: str = None,
scan_interval=60,
port_to_scan=0,
nports=38,
inverted: typing.List[int] = None,
update_all=True,
**kwargs,
):
"""Initialize."""
if mqtt_inputs is None or mqtt_inputs == 'None' or mqtt_inputs is False:
self.http = hass.data[DOMAIN][CONF_HTTP]
self.http.allowed_hosts |= {host}
else:
self.http = None
self.update_all = update_all if update_all is not None else True
self.nports = nports
self.mqtt_inputs = mqtt_inputs
self.loop: asyncio.AbstractEventLoop = None
self.hass = hass
self.host = host
@@ -58,6 +74,7 @@ class MegaD:
self.mqtt = mqtt
self.id = id
self.lck = asyncio.Lock()
self._http_lck = asyncio.Lock()
self._notif_lck = asyncio.Lock()
self.cnd = asyncio.Condition()
self.online = True
@@ -89,14 +106,16 @@ class MegaD:
async def start(self):
self.loop = asyncio.get_event_loop()
self.subs = await self.mqtt.async_subscribe(
topic=f"{self.mqtt_id}/+",
msg_callback=self._process_msg,
qos=0,
)
if self.mqtt is not None:
self.subs = await self.mqtt.async_subscribe(
topic=f"{self.mqtt_id}/+",
msg_callback=self._process_msg,
qos=0,
)
async def stop(self):
self.subs()
if self.subs is not None:
self.subs()
for x in self._callbacks.values():
x.clear()
@@ -136,6 +155,9 @@ class MegaD:
offline
"""
self.lg.debug('poll')
if self.mqtt is None:
await self.get_all_ports()
return
if len(self.sensors) > 0:
await self.get_sensors()
else:
@@ -154,25 +176,51 @@ class MegaD:
return _id or 'megad/' + self.host.split('.')[-1]
async def send_command(self, port=None, cmd=None):
if port:
url = f"http://{self.host}/{self.sec}/?pt={port}&cmd={cmd}"
else:
url = f"http://{self.host}/{self.sec}/?cmd={cmd}"
self.lg.debug('run command: %s', url)
async with self.lck:
return await self.request(pt=port, cmd=cmd)
async def request(self, **kwargs):
cmd = '&'.join([f'{k}={v}' for k, v in kwargs.items() if v is not None])
url = f"http://{self.host}/{self.sec}/?{cmd}"
self.lg.debug('request: %s', url)
async with self._http_lck:
async with aiohttp.request("get", url=url) as req:
if req.status != 200:
self.lg.warning('%s returned %s (%s)', url, req.status, await req.text())
return False
return None
else:
return True
return await req.text()
async def save(self):
await self.send_command(cmd='s')
def parse_response(self, ret):
if ret is None:
raise NoPort()
if ':' in ret:
ret = PATT_SPLIT.split(ret)
ret = dict([
x.split(':') for x in ret if x.count(':') == 1
])
elif 'ON' in ret:
ret = {'value': 'ON'}
elif 'OFF' in ret:
ret = {'value': 'OFF'}
else:
ret = {'value': ret}
return ret
async def get_port(self, port):
"""Запрос состояния порта. Блокируется пока не придет какое-нибудь сообщение от меги или таймаут"""
"""
Запрос состояния порта. Состояние всегда возвращается в виде объекта, всегда сохраняется в центральное
хранилище values
"""
self.lg.debug(f'get port %s', port)
if self.mqtt is None:
ret = await self.request(pt=port, cmd='get')
ret = self.parse_response(ret)
self.values[port] = ret
return ret
async with self._notif_lck:
async with self.notifiers[port]:
cnd = self.notifiers[port]
@@ -184,13 +232,19 @@ class MegaD:
)
try:
await asyncio.wait_for(cnd.wait(), timeout=10)
return self.values[port]
return self.values.get(port)
except asyncio.TimeoutError:
self.lg.error(f'timeout when getting port {port}')
async def get_all_ports(self):
for x in range(37):
await self.get_port(x)
if not self.mqtt_inputs:
ret = await self.request(cmd='all')
for port, x in enumerate(ret.split(';')):
ret = self.parse_response(x)
self.values[port] = ret
else:
for x in range(self.nports + 1):
await self.get_port(x)
async def reboot(self, save=True):
await self.save()
@@ -198,7 +252,6 @@ class MegaD:
async def _notify(self, port, value):
async with self.notifiers[port]:
cnd = self.notifiers[port]
self.values[port] = value
cnd.notify_all()
def _process_msg(self, msg):
@@ -222,6 +275,7 @@ class MegaD:
value = None
try:
value = json.loads(msg.payload)
self.values[port] = value
for cb in self._callbacks[port]:
cb(value)
except Exception as exc:
@@ -235,7 +289,10 @@ class MegaD:
self.lg.debug(
f'subscribe %s %s', port, callback
)
self._callbacks[port].append(callback)
if self.mqtt_inputs:
self._callbacks[port].append(callback)
else:
self.http.callbacks[port].append(callback)
async def authenticate(self) -> bool:
"""Test if we can authenticate with the host."""
@@ -263,6 +320,8 @@ class MegaD:
)
async with aiohttp.request('get', url) as req:
html = await req.text()
if req.status != 200:
return
tree = BeautifulSoup(html, features="lxml")
pty = tree.find('select', attrs={'name': 'pty'})
if pty is None:
@@ -286,15 +345,16 @@ class MegaD:
self._scanned[port] = (pty, m)
return pty, m
async def scan_ports(self,):
for x in range(38):
async def scan_ports(self, nports=37):
for x in range(nports+1):
ret = await self.scan_port(x)
if ret:
yield [x, *ret]
self.nports = nports+1
async def get_config(self):
async def get_config(self, nports=37):
ret = defaultdict(lambda: defaultdict(list))
async for port, pty, m in self.scan_ports():
async for port, pty, m in self.scan_ports(nports):
if pty == "0":
ret['binary_sensor'][port].append({})
elif pty == "1" and m in ['0', '1']: