fix dimmer attr

This commit is contained in:
Викторов Андрей Германович
2023-10-15 19:23:46 +03:00
parent 39c23a62cf
commit bd98fa216d

View File

@@ -12,8 +12,21 @@ from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.restore_state import RestoreEntity
from . import hub as h
from .const import DOMAIN, CONF_CUSTOM, CONF_INVERT, EVENT_BINARY_SENSOR, LONG, \
LONG_RELEASE, RELEASE, PRESS, SINGLE_CLICK, DOUBLE_CLICK, EVENT_BINARY, CONF_SMOOTH, CONF_RANGE
from .const import (
DOMAIN,
CONF_CUSTOM,
CONF_INVERT,
EVENT_BINARY_SENSOR,
LONG,
LONG_RELEASE,
RELEASE,
PRESS,
SINGLE_CLICK,
DOUBLE_CLICK,
EVENT_BINARY,
CONF_SMOOTH,
CONF_RANGE,
)
_events_on = False
_LOGGER = logging.getLogger(__name__)
@@ -22,7 +35,7 @@ _LOGGER = logging.getLogger(__name__)
async def _set_events_on():
global _events_on, _task_set_ev_on
await asyncio.sleep(10)
_LOGGER.debug('events on')
_LOGGER.debug("events on")
_events_on = True
@@ -31,6 +44,7 @@ def set_events_off():
_events_on = False
_task_set_ev_on = None
_task_set_ev_on = None
@@ -40,20 +54,21 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
Also provides some basic entity information: unique_id, name, availiability
All base entities are polled in order to be online or offline
"""
def __init__(
self,
mega: 'h.MegaD',
port: typing.Union[int, str, typing.List[int]],
config_entry: ConfigEntry = None,
id_suffix=None,
name=None,
unique_id=None,
http_cmd='get',
addr: str=None,
index=None,
customize=None,
smooth=None,
**kwargs,
self,
mega: "h.MegaD",
port: typing.Union[int, str, typing.List[int]],
config_entry: ConfigEntry = None,
id_suffix=None,
name=None,
unique_id=None,
http_cmd="get",
addr: str = None,
index=None,
customize=None,
smooth=None,
**kwargs,
):
self._smooth = smooth
self.http_cmd = http_cmd
@@ -65,11 +80,19 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
self._mega_id = mega.id
self._lg = None
if not isinstance(port, list):
self._unique_id = unique_id or f"mega_{mega.id}_{port}" + \
(f"_{id_suffix}" if id_suffix else "")
_pt = port if not mega.new_naming else f'{port:02}' if isinstance(port, int) else port
self._name = name or f"{mega.id}_{_pt}" + \
(f"_{id_suffix}" if id_suffix else "")
self._unique_id = unique_id or f"mega_{mega.id}_{port}" + (
f"_{id_suffix}" if id_suffix else ""
)
_pt = (
port
if not mega.new_naming
else f"{port:02}"
if isinstance(port, int)
else port
)
self._name = name or f"{mega.id}_{_pt}" + (
f"_{id_suffix}" if id_suffix else ""
)
self._customize: dict = None
else:
assert id_suffix is not None
@@ -83,7 +106,7 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
self.addr = addr
self.id_suffix = id_suffix
self._can_smooth_hard = None
if self.http_cmd == 'ds2413':
if self.http_cmd == "ds2413":
self.mega.ds2413_ports |= {self.port}
super().__init__(coordinator=mega.updater)
@@ -92,12 +115,12 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
return False
def get_attribute(self, name, default=None):
attr = getattr(self, f'_{name}', None)
attr = getattr(self, f"_{name}", None)
if attr is None and self._state is not None:
if name == 'is_on':
if name == "is_on":
attr = self._state.state
else:
attr = self._state.attributes.get(f'{name}', default)
attr = self._state.attributes.get(f"{name}", default)
return attr if attr is not None else default
@property
@@ -118,7 +141,7 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
@property
def enabled(self):
if '<' in self.name:
if "<" in self.name:
return False
else:
return super().enabled
@@ -130,12 +153,17 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
if self.hass is None or self.entity_id is None:
return {}
if self._customize is None:
c_entity_id = self.hass.data.get(DOMAIN, {}).get(CONF_CUSTOM).get('entities', {}).get(self.entity_id, {})
c_entity_id = (
self.hass.data.get(DOMAIN, {})
.get(CONF_CUSTOM)
.get("entities", {})
.get(self.entity_id, {})
)
c = self.hass.data.get(DOMAIN, {}).get(CONF_CUSTOM) or {}
c = c.get(self._mega_id) or {}
c = c.get(self.port) or {}
if self.addr is not None and self.index is not None and isinstance(c, dict):
idx = self.addr.lower() + f'_a' if self.index == 0 else '_b'
idx = self.addr.lower() + f"_a" if self.index == 0 else "_b"
c = c.get(idx, {})
c.update(c_entity_id)
self._customize = c
@@ -147,18 +175,24 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
if isinstance(self.port, list):
pt_idx = self.id_suffix
else:
_pt = self.port if not self.mega.new_naming else f'{self.port:02}' if isinstance(self.port, int) else self.port
if isinstance(_pt, str) and 'e' in _pt:
pt_idx, _ = _pt.split('e')
_pt = (
self.port
if not self.mega.new_naming
else f"{self.port:02}"
if isinstance(self.port, int)
else self.port
)
if isinstance(_pt, str) and "e" in _pt:
pt_idx, _ = _pt.split("e")
else:
pt_idx = _pt
return DeviceInfo(
identifiers={
# Serial numbers are unique identifiers within a specific domain
(DOMAIN, f'{self._mega_id}', pt_idx)
(DOMAIN, f"{self._mega_id}", pt_idx)
},
name=self.name,
manufacturer='ab-log.ru',
manufacturer="ab-log.ru",
sw_version=self.mega.fw,
via_device=(DOMAIN, self._mega_id),
)
@@ -176,7 +210,13 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
c = self.customize.get(CONF_NAME)
if not isinstance(c, str):
if not isinstance(self.port, list):
_pt = self.port if not self.mega.new_naming else f'{self.port:02}' if isinstance(self.port, int) else self.port
_pt = (
self.port
if not self.mega.new_naming
else f"{self.port:02}"
if isinstance(self.port, int)
else self.port
)
c = self._name or f"{self.mega.id}_p{_pt}"
else:
c = self.id_suffix
@@ -192,7 +232,7 @@ class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
self._state = await self.async_get_last_state()
async def get_state(self):
self.lg.debug(f'state is %s', self.state)
self.lg.debug(f"state is %s", self.state)
self.async_write_ha_state()
@@ -212,51 +252,42 @@ class MegaPushEntity(BaseMegaEntity):
if self.hass is None:
return
self.async_write_ha_state()
self.lg.debug(f'state after update %s', self.state)
if not self.entity_id.startswith('binary_sensor'):
_LOGGER.debug('skip event because not a bnary sens')
self.lg.debug(f"state after update %s", self.state)
if not self.entity_id.startswith("binary_sensor"):
_LOGGER.debug("skip event because not a bnary sens")
return
ll: bool = self.mega.last_long.get(self.port, False)
if safe_int(value.get('click', 0)) == 1:
if safe_int(value.get("click", 0)) == 1:
self.hass.bus.async_fire(
event_type=EVENT_BINARY,
event_data={
'entity_id': self.entity_id,
'type': SINGLE_CLICK
}
event_data={"entity_id": self.entity_id, "type": SINGLE_CLICK},
)
elif safe_int(value.get('click', 0)) == 2:
elif safe_int(value.get("click", 0)) == 2:
self.hass.bus.async_fire(
event_type=EVENT_BINARY,
event_data={
'entity_id': self.entity_id,
'type': DOUBLE_CLICK
}
event_data={"entity_id": self.entity_id, "type": DOUBLE_CLICK},
)
elif safe_int(value.get('m', 0)) == 2:
elif safe_int(value.get("m", 0)) == 2:
self.mega.last_long[self.port] = True
self.hass.bus.async_fire(
event_type=EVENT_BINARY,
event_data={
'entity_id': self.entity_id,
'type': LONG
}
event_data={"entity_id": self.entity_id, "type": LONG},
)
elif safe_int(value.get('m', 0)) == 1:
elif safe_int(value.get("m", 0)) == 1:
self.hass.bus.async_fire(
event_type=EVENT_BINARY,
event_data={
'entity_id': self.entity_id,
'type': LONG_RELEASE if ll else RELEASE,
}
"entity_id": self.entity_id,
"type": LONG_RELEASE if ll else RELEASE,
},
)
elif safe_int(value.get('m', None)) == 0:
elif safe_int(value.get("m", None)) == 0:
self.hass.bus.async_fire(
event_type=EVENT_BINARY,
event_data={
'entity_id': self.entity_id,
'type': PRESS,
}
"entity_id": self.entity_id,
"type": PRESS,
},
)
self.mega.last_long[self.port] = False
return
@@ -266,21 +297,13 @@ class MegaPushEntity(BaseMegaEntity):
class MegaOutPort(MegaPushEntity):
def __init__(
self,
dimmer=False,
dimmer_scale=1,
*args, **kwargs
):
super().__init__(
*args, **kwargs
)
def __init__(self, dimmer=False, dimmer_scale=1, *args, **kwargs):
super().__init__(*args, **kwargs)
self._brightness = None
self._is_on = None
self.dimmer = dimmer
self.dimmer_scale = dimmer_scale
self.is_extender = isinstance(self.port, str) and 'e' in self.port
self.is_extender = isinstance(self.port, str) and "e" in self.port
self.task: asyncio.Task = None
self._restore_brightness = None
self._last_called: float = 0
@@ -313,9 +336,14 @@ class MegaOutPort(MegaPushEntity):
return
val = self.mega.values.get(self.port, {})
if isinstance(val, dict) and len(val) == 0 and self._state is not None:
ret = safe_int(self._state.attributes.get("brightness"), def_on=self.max_dim, def_off=0, def_val=0)
ret = safe_int(
self._state.attributes.get("brightness"),
def_on=self.max_dim,
def_off=0,
def_val=0,
)
ret = self._calc_brightness(ret)
elif isinstance(self.port, str) and 'e' in self.port:
elif isinstance(self.port, str) and "e" in self.port:
if isinstance(val, str):
val = safe_int(val, def_on=self.max_dim, def_off=0, def_val=0)
else:
@@ -340,46 +368,68 @@ class MegaOutPort(MegaPushEntity):
def is_on(self) -> bool:
val = self.mega.values.get(self.port, {})
if isinstance(val, dict) and len(val) == 0 and self._state is not None:
return self._state == 'ON'
elif isinstance(self.port, str) and 'e' in self.port and val:
return self._state == "ON"
elif isinstance(self.port, str) and "e" in self.port and val:
if val is None:
return
if self.dimmer:
if hasattr(self, "dimmer") and self.dimmer:
val = safe_int(val)
if val is not None:
return val > 0 if not self.invert else val == 0
else:
return val == 'ON' if not self.invert else val == 'OFF'
return val == "ON" if not self.invert else val == "OFF"
elif val is not None:
val = val.get("value")
if not isinstance(val, str) and self.index is not None and self.addr is not None:
if (
not isinstance(val, str)
and self.index is not None
and self.addr is not None
):
if not isinstance(val, dict):
self.mega.lg.warning(f'{self.entity_id}: {val} is not a dict')
self.mega.lg.warning(f"{self.entity_id}: {val} is not a dict")
return
_val = val.get(self.addr, val.get(self.addr.lower(), val.get(self.addr.upper())))
_val = val.get(
self.addr, val.get(self.addr.lower(), val.get(self.addr.upper()))
)
if not isinstance(_val, str):
self.mega.lg.warning(f'{self.entity_id}: can not get {self.addr} from {val}, recieved {_val}')
self.mega.lg.warning(
f"{self.entity_id}: can not get {self.addr} from {val}, recieved {_val}"
)
return
_val = _val.split('/')
_val = _val.split("/")
if len(_val) >= 2:
self.mega.lg.debug('%s parsed values: %s[%s]="%s"', self.entity_id, _val, self.index, _val)
self.mega.lg.debug(
'%s parsed values: %s[%s]="%s"',
self.entity_id,
_val,
self.index,
_val,
)
val = _val[self.index]
else:
self.mega.lg.warning(f'{self.entity_id}: {_val} has wrong length')
self.mega.lg.warning(f"{self.entity_id}: {_val} has wrong length")
return
elif self.index is not None and self.addr is None:
self.mega.lg.warning(f'{self.entity_id} does not has addr')
self.mega.lg.warning(f"{self.entity_id} does not has addr")
return
self.mega.lg.debug('%s.state = %s', self.entity_id, val)
self.mega.lg.debug("%s.state = %s", self.entity_id, val)
if not self.invert:
return val == 'ON' or str(val) == '1' or (safe_int(val) is not None and safe_int(val) > 0)
return (
val == "ON"
or str(val) == "1"
or (safe_int(val) is not None and safe_int(val) > 0)
)
else:
return val == 'OFF' or str(val) == '0' or (safe_int(val) is not None and safe_int(val) == 0)
return (
val == "OFF"
or str(val) == "0"
or (safe_int(val) is not None and safe_int(val) == 0)
)
@property
def cmd_port(self):
if self.index is not None:
return f'{self.port}A' if self.index == 0 else f'{self.port}B'
return f"{self.port}A" if self.index == 0 else f"{self.port}B"
else:
return self.port
@@ -400,9 +450,7 @@ class MegaOutPort(MegaPushEntity):
if isinstance(self.port, str):
self.mega.values[self.port] = value[0]
else:
self.mega.values[self.port] = {
'value': value[0]
}
self.mega.values[self.port] = {"value": value[0]}
if update_state:
self.async_write_ha_state()
@@ -412,13 +460,15 @@ class MegaOutPort(MegaPushEntity):
tm = (self.smooth.total_seconds() * pct) if transition is None else transition
if self.task is not None:
self.task.cancel()
self.task = asyncio.create_task(self.mega.smooth_dim(
(self.cmd_port, from_, to_),
time=tm,
can_smooth_hardware=self.can_smooth_hardware,
max_values=[255 if self.dimmer_scale == 1 else 4095],
updater=partial(self.update_from_smooth, update_state=update_state),
))
self.task = asyncio.create_task(
self.mega.smooth_dim(
(self.cmd_port, from_, to_),
time=tm,
can_smooth_hardware=self.can_smooth_hardware,
max_values=[255 if self.dimmer_scale == 1 else 4095],
updater=partial(self.update_from_smooth, update_state=update_state),
)
)
def _calc_brightness(self, brightness):
if brightness is None:
@@ -454,9 +504,9 @@ class MegaOutPort(MegaPushEntity):
brightness = self._calc_brightness(brightness)
_prev = safe_int(self.brightness) or 0
self._brightness = brightness
if self.dimmer and brightness == 0:
if hasattr(self, "dimmer") and self.dimmer and brightness == 0:
cmd = self.max_dim
elif self.dimmer:
elif hasattr(self, "dimmer") and self.dimmer:
cmd = min((brightness * self.dimmer_scale, self.max_dim))
if self.smooth_dim or transition:
self._set_dim_brightness(from_=_prev, to_=cmd, transition=transition)
@@ -471,7 +521,7 @@ class MegaOutPort(MegaPushEntity):
"cnt": round(transition / (abs(_prev - brightness) / 255)),
}
if self.addr:
_cmd['addr'] = self.addr
_cmd["addr"] = self.addr
if not (self.smooth_dim or transition):
await self.mega.request(**_cmd, priority=-1)
if self.index is not None:
@@ -480,29 +530,31 @@ class MegaOutPort(MegaPushEntity):
port=self.port,
force_http=True,
conv=False,
http_cmd='list',
http_cmd="list",
)
elif isinstance(self.port, str) and 'e' in self.port:
elif isinstance(self.port, str) and "e" in self.port:
if not self.dimmer:
self.mega.values[self.port] = 'ON' if not self.invert else 'OFF'
self.mega.values[self.port] = "ON" if not self.invert else "OFF"
else:
self.mega.values[self.port] = cmd
else:
self.mega.values[self.port] = {'value': cmd}
self.mega.values[self.port] = {"value": cmd}
await self.get_state()
async def async_turn_off(self, transition=None, **kwargs) -> None:
if (time.time() - self._last_called) < 0.1:
return
self._last_called = time.time()
self._restore_brightness = self._cal_reverse_brightness(safe_int(self._brightness))
self._restore_brightness = self._cal_reverse_brightness(
safe_int(self._brightness)
)
if not self.dimmer:
transition = None
cmd = "0" if not self.invert else "1"
_cmd = {"cmd": f"{self.cmd_port}:{cmd}"}
_prev = safe_int(self.brightness) or 0
if self.addr:
_cmd['addr'] = self.addr
_cmd["addr"] = self.addr
if not (self.smooth_dim or transition):
await self.mega.request(**_cmd, priority=-1)
else:
@@ -517,12 +569,12 @@ class MegaOutPort(MegaPushEntity):
port=self.port,
force_http=True,
conv=False,
http_cmd='list',
http_cmd="list",
)
elif isinstance(self.port, str) and 'e' in self.port:
self.mega.values[self.port] = 'OFF' if not self.invert else 'ON'
elif isinstance(self.port, str) and "e" in self.port:
self.mega.values[self.port] = "OFF" if not self.invert else "ON"
else:
self.mega.values[self.port] = {'value': cmd}
self.mega.values[self.port] = {"value": cmd}
await self.get_state()
async def async_will_remove_from_hass(self) -> None:
@@ -530,13 +582,12 @@ class MegaOutPort(MegaPushEntity):
self.task.cancel()
def safe_int(v, def_on=1, def_off=0, def_val=None):
if v == 'ON':
if v == "ON":
return def_on
elif v == 'OFF':
elif v == "OFF":
return def_off
try:
return int(v)
except (ValueError, TypeError):
return def_val
return def_val