mirror of
https://github.com/andvikt/mega_hacs.git
synced 2025-12-12 01:24:29 +05:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1aeaabfb3c | ||
|
|
a0bd8acac0 | ||
|
|
c48a3632d2 | ||
|
|
e06ba65ead | ||
|
|
22720a27bd | ||
|
|
d0769b5b02 | ||
|
|
9ae093dd91 | ||
|
|
f88109c3a6 |
@@ -20,7 +20,7 @@ from homeassistant.helpers.template import Template
|
||||
from .const import EVENT_BINARY_SENSOR, DOMAIN, CONF_CUSTOM, CONF_SKIP, CONF_INVERT, CONF_RESPONSE_TEMPLATE
|
||||
from .entities import MegaPushEntity
|
||||
from .hub import MegaD
|
||||
|
||||
from .tools import int_ignore
|
||||
|
||||
lg = logging.getLogger(__name__)
|
||||
|
||||
@@ -51,7 +51,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, asyn
|
||||
devices = []
|
||||
customize = hass.data.get(DOMAIN, {}).get(CONF_CUSTOM, {})
|
||||
for port, cfg in config_entry.data.get('binary_sensor', {}).items():
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
c = customize.get(mid, {}).get(port, {})
|
||||
if c.get(CONF_SKIP, False):
|
||||
continue
|
||||
@@ -78,14 +78,15 @@ class MegaBinarySensor(BinarySensorEntity, MegaPushEntity):
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
val = self.mega.values.get(self.port, {}).get("value") \
|
||||
or self.mega.values.get(self.port, {}).get('m')
|
||||
val = self.mega.values.get(self.port, {})
|
||||
if isinstance(val, dict):
|
||||
val = val.get("value", val.get('m'))
|
||||
if val is None and self._state is not None:
|
||||
return self._state == 'ON'
|
||||
elif val is not None:
|
||||
if val in ['ON', 'OFF']:
|
||||
return val == 'ON' if not self.invert else val == 'OFF'
|
||||
else:
|
||||
if val in ['ON', 'OFF', '1', '0']:
|
||||
return val in ['ON', '1'] if not self.invert else val in ['OFF', '0']
|
||||
elif isinstance(val, int):
|
||||
return val != 1 if not self.invert else val == 1
|
||||
|
||||
def _update(self, payload: dict):
|
||||
|
||||
@@ -63,7 +63,7 @@ async def validate_input(hass: core.HomeAssistant, data):
|
||||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for mega."""
|
||||
|
||||
VERSION = 12
|
||||
VERSION = 15
|
||||
CONNECTION_CLASS = config_entries.CONN_CLASS_ASSUMED
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
|
||||
@@ -1,6 +1,19 @@
|
||||
from dataclasses import dataclass, field
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
inputs = [
|
||||
'eact',
|
||||
'inta',
|
||||
'misc',
|
||||
]
|
||||
selectors = [
|
||||
'pty',
|
||||
'm',
|
||||
'gr',
|
||||
'd',
|
||||
'ety',
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True, eq=True)
|
||||
class Config:
|
||||
@@ -8,21 +21,16 @@ class Config:
|
||||
m: str = None
|
||||
gr: str = None
|
||||
d: str = None
|
||||
inta: str = field(compare=False, hash=False, default=None)
|
||||
ety: str = None
|
||||
inta: str = field(compare=False, hash=False, default=None)
|
||||
misc: str = field(compare=False, hash=False, default=None)
|
||||
eact: str = field(compare=False, hash=False, default=None)
|
||||
|
||||
|
||||
def parse_config(page: str):
|
||||
page = BeautifulSoup(page, features="lxml")
|
||||
ret = {}
|
||||
for x in [
|
||||
'pty',
|
||||
'm',
|
||||
'gr',
|
||||
'd',
|
||||
'ety',
|
||||
]:
|
||||
for x in selectors:
|
||||
v = page.find('select', attrs={'name': x})
|
||||
if v is None:
|
||||
continue
|
||||
@@ -31,12 +39,10 @@ def parse_config(page: str):
|
||||
if v:
|
||||
v = v['value']
|
||||
ret[x] = v
|
||||
v = page.find('input', attrs={'name': 'inta'})
|
||||
if v:
|
||||
ret['inta'] = v['value']
|
||||
v = page.find('input', attrs={'name': 'misc'})
|
||||
if v:
|
||||
ret['misc'] = v.get('checked', False)
|
||||
for x in inputs:
|
||||
v = page.find('input', attrs={'name': x})
|
||||
if v:
|
||||
ret[x] = v['value']
|
||||
return Config(**ret)
|
||||
|
||||
|
||||
|
||||
@@ -247,8 +247,8 @@ class MegaOutPort(MegaPushEntity):
|
||||
val = 0
|
||||
if val == 0:
|
||||
return self._brightness
|
||||
else:
|
||||
return val
|
||||
elif isinstance(val, (int, float)):
|
||||
return int(val / self.dimmer_scale)
|
||||
elif val is not None:
|
||||
val = val.get("value")
|
||||
if val is None:
|
||||
@@ -269,7 +269,8 @@ class MegaOutPort(MegaPushEntity):
|
||||
return
|
||||
if self.dimmer:
|
||||
val = safe_int(val)
|
||||
return val > 0 if not self.invert else val == 0
|
||||
if val is not None:
|
||||
return val > 0 if not self.invert else val == 0
|
||||
else:
|
||||
return val == 'ON' if not self.invert else val == 'OFF'
|
||||
elif val is not None:
|
||||
@@ -358,8 +359,10 @@ class MegaOutPort(MegaPushEntity):
|
||||
|
||||
|
||||
def safe_int(v):
|
||||
if v in ['ON', 'OFF']:
|
||||
return None
|
||||
if v == 'ON':
|
||||
return 1
|
||||
elif v == 'OFF':
|
||||
return 0
|
||||
try:
|
||||
return int(v)
|
||||
except (ValueError, TypeError):
|
||||
|
||||
@@ -15,7 +15,11 @@ from .tools import make_ints
|
||||
from . import hub as h
|
||||
_LOGGER = logging.getLogger(__name__).getChild('http')
|
||||
|
||||
ext = {f'ext{x}' for x in range(16)}
|
||||
|
||||
def is_ext(data: typing.Dict[str, typing.Any]):
|
||||
for x in data:
|
||||
if x.startswith('ext'):
|
||||
return True
|
||||
|
||||
|
||||
class MegaView(HomeAssistantView):
|
||||
@@ -95,17 +99,27 @@ class MegaView(HomeAssistantView):
|
||||
data['mega_id'] = hub.id
|
||||
ret = 'd' if hub.force_d else ''
|
||||
if port is not None:
|
||||
if set(data).issubset(ext):
|
||||
if is_ext(data):
|
||||
ret = '' # пока ответ всегда пустой, неясно какая будет реакция на непустой ответ
|
||||
for e in ext:
|
||||
if e in data:
|
||||
idx = e[-1]
|
||||
pt = f'{port}e{idx}'
|
||||
data['value'] = 'ON' if data[e] == '1' else 'OFF'
|
||||
if port in hub.extenders:
|
||||
pt_orig = port
|
||||
else:
|
||||
pt_orig = hub.ext_in.get(port)
|
||||
if pt_orig is None:
|
||||
hub.lg.warning(f'can not find extender for int port {port}')
|
||||
return Response(status=200)
|
||||
for e, v in data.items():
|
||||
if e.startswith('ext'):
|
||||
idx = e[3:]
|
||||
pt = f'{pt_orig}e{idx}'
|
||||
data['pt_orig'] = pt_orig
|
||||
data['value'] = 'ON' if v == '1' else 'OFF'
|
||||
data['m'] = 1 if data[e] == '0' else 0 # имитация поведения обычного входа, чтобы события обрабатывались аналогично
|
||||
hub.values[pt] = data
|
||||
for cb in self.callbacks[hub.id][pt]:
|
||||
cb(data)
|
||||
if pt in hub.ext_act:
|
||||
await hub.request(cmd=hub.ext_act[pt])
|
||||
else:
|
||||
hub.values[port] = data
|
||||
for cb in self.callbacks[hub.id][port]:
|
||||
|
||||
@@ -24,7 +24,7 @@ from .const import (
|
||||
)
|
||||
from .entities import set_events_off, BaseMegaEntity, MegaOutPort
|
||||
from .exceptions import CannotConnect, NoPort
|
||||
from .tools import make_ints
|
||||
from .tools import make_ints, int_ignore
|
||||
|
||||
TEMP_PATT = re.compile(r'temp:([01234567890\.]+)')
|
||||
HUM_PATT = re.compile(r'hum:([01234567890\.]+)')
|
||||
@@ -81,6 +81,8 @@ class MegaD:
|
||||
protected=True,
|
||||
restore_on_restart=False,
|
||||
extenders=None,
|
||||
ext_in=None,
|
||||
ext_act=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize."""
|
||||
@@ -96,6 +98,8 @@ class MegaD:
|
||||
else:
|
||||
self.http = None
|
||||
self.extenders = extenders or []
|
||||
self.ext_in = ext_in or {}
|
||||
self.ext_act = ext_act or {}
|
||||
self.poll_outs = poll_outs
|
||||
self.update_all = update_all if update_all is not None else True
|
||||
self.nports = nports
|
||||
@@ -394,7 +398,7 @@ class MegaD:
|
||||
if port == 'cmd':
|
||||
return
|
||||
try:
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
except:
|
||||
self.lg.warning('can not process %s', msg)
|
||||
return
|
||||
@@ -423,7 +427,7 @@ class MegaD:
|
||||
asyncio.run_coroutine_threadsafe(self._notify(port, value), self.loop)
|
||||
|
||||
def subscribe(self, port, callback):
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
self.lg.debug(
|
||||
f'subscribe %s %s', port, callback
|
||||
)
|
||||
@@ -514,6 +518,8 @@ class MegaD:
|
||||
ret = defaultdict(lambda: defaultdict(list))
|
||||
ret['mqtt_id'] = await self.get_mqtt_id()
|
||||
ret['extenders'] = extenders = []
|
||||
ret['ext_in'] = ext_int = {}
|
||||
ret['ext_acts'] = ext_acts = {}
|
||||
async for port, cfg in self.scan_ports(nports):
|
||||
if cfg.pty == "0":
|
||||
ret['binary_sensor'][port].append({})
|
||||
@@ -533,15 +539,19 @@ class MegaD:
|
||||
])
|
||||
elif cfg == MCP230:
|
||||
extenders.append(port)
|
||||
ext_int[int(cfg.inta)] = port
|
||||
values = await self.request(pt=port, cmd='get')
|
||||
values = values.split(';')
|
||||
for n in range(len(values)):
|
||||
ext_page = await self.request(pt=port, ext=n)
|
||||
ext_cfg = parse_config(ext_page)
|
||||
pt = f'{port}e{n}'
|
||||
if ext_cfg.ety == '1':
|
||||
ret['light'][f'{port}e{n}'].append({})
|
||||
ret['light'][pt].append({})
|
||||
elif ext_cfg.ety == '0':
|
||||
ret['binary_sensor'][f'{port}e{n}'].append({})
|
||||
if ext_cfg.eact:
|
||||
ext_acts[pt] = ext_cfg.eact
|
||||
ret['binary_sensor'][pt].append({})
|
||||
elif cfg == PCA9685:
|
||||
extenders.append(port)
|
||||
values = await self.request(pt=port, cmd='get')
|
||||
|
||||
@@ -26,6 +26,7 @@ from .const import (
|
||||
CONF_CUSTOM,
|
||||
CONF_SKIP,
|
||||
)
|
||||
from .tools import int_ignore
|
||||
|
||||
lg = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,7 +62,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, asyn
|
||||
devices = []
|
||||
customize = hass.data.get(DOMAIN, {}).get(CONF_CUSTOM, {})
|
||||
for port, cfg in config_entry.data.get('light', {}).items():
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
c = customize.get(mid, {}).get(port, {})
|
||||
if c.get(CONF_SKIP, False) or c.get(CONF_DOMAIN, 'light') != 'light':
|
||||
continue
|
||||
|
||||
@@ -22,6 +22,8 @@ from .const import CONF_KEY, TEMP, HUM, W1, W1BUS, CONF_CONV_TEMPLATE
|
||||
from .hub import MegaD
|
||||
import re
|
||||
|
||||
from .tools import int_ignore
|
||||
|
||||
lg = logging.getLogger(__name__)
|
||||
TEMP_PATT = re.compile(r'temp:([01234567890\.]+)')
|
||||
HUM_PATT = re.compile(r'hum:([01234567890\.]+)')
|
||||
@@ -81,7 +83,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, asyn
|
||||
hub: MegaD = hass.data['mega'][mid]
|
||||
devices = []
|
||||
for port, cfg in config_entry.data.get('sensor', {}).items():
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
for data in cfg:
|
||||
hub.lg.debug(f'add sensor on port %s with data %s', port, data)
|
||||
sensor = Mega1WSensor(
|
||||
|
||||
@@ -18,6 +18,7 @@ from homeassistant.core import HomeAssistant
|
||||
from . import hub as h
|
||||
from .entities import MegaOutPort
|
||||
from .const import CONF_DIMMER, CONF_SWITCH, DOMAIN, CONF_CUSTOM, CONF_SKIP
|
||||
from .tools import int_ignore
|
||||
|
||||
_LOGGER = lg = logging.getLogger(__name__)
|
||||
|
||||
@@ -50,7 +51,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, asyn
|
||||
|
||||
customize = hass.data.get(DOMAIN, {}).get(CONF_CUSTOM, {})
|
||||
for port, cfg in config_entry.data.get('light', {}).items():
|
||||
port = int(port)
|
||||
port = int_ignore(port)
|
||||
c = customize.get(mid, {}).get(port, {})
|
||||
if c.get(CONF_SKIP, False) or c.get(CONF_DOMAIN, 'light') != 'switch':
|
||||
continue
|
||||
|
||||
@@ -10,4 +10,11 @@ def make_ints(d: dict):
|
||||
if 'm' not in d:
|
||||
d['m'] = 0
|
||||
if 'click' not in d:
|
||||
d['click'] = 0
|
||||
d['click'] = 0
|
||||
|
||||
|
||||
def int_ignore(x):
|
||||
try:
|
||||
return int(x)
|
||||
except (TypeError, ValueError):
|
||||
return x
|
||||
Reference in New Issue
Block a user