mirror of
https://github.com/andvikt/mega_hacs.git
synced 2025-12-11 17:14:28 +05:00
600 lines
19 KiB
Python
600 lines
19 KiB
Python
import logging
|
|
import asyncio
|
|
import time
|
|
import typing
|
|
from datetime import timedelta
|
|
from functools import partial
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_NAME
|
|
from homeassistant.core import State
|
|
from homeassistant.helpers.entity import DeviceInfo
|
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
|
from homeassistant.helpers.restore_state import RestoreEntity
|
|
from . import hub as h
|
|
from .const import (
|
|
DOMAIN,
|
|
CONF_CUSTOM,
|
|
CONF_INVERT,
|
|
LONG,
|
|
LONG_RELEASE,
|
|
RELEASE,
|
|
PRESS,
|
|
SINGLE_CLICK,
|
|
DOUBLE_CLICK,
|
|
EVENT_BINARY,
|
|
CONF_SMOOTH,
|
|
CONF_RANGE,
|
|
)
|
|
|
|
_events_on = False
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def _set_events_on():
|
|
global _events_on, _task_set_ev_on
|
|
await asyncio.sleep(10)
|
|
_LOGGER.debug("events on")
|
|
_events_on = True
|
|
|
|
|
|
def set_events_off():
|
|
global _events_on, _task_set_ev_on
|
|
_events_on = False
|
|
_task_set_ev_on = None
|
|
|
|
|
|
_task_set_ev_on = None
|
|
|
|
|
|
class BaseMegaEntity(CoordinatorEntity, RestoreEntity):
|
|
"""
|
|
Base Mega's entity. It is responsible for storing reference to mega hub
|
|
Also provides some basic entity information: unique_id, name, availiability
|
|
All base entities are polled in order to be online or offline
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
mega: "h.MegaD",
|
|
port: typing.Union[int, str, typing.List[int]],
|
|
config_entry: ConfigEntry = None,
|
|
id_suffix=None,
|
|
name=None,
|
|
unique_id=None,
|
|
http_cmd="get",
|
|
addr: str = None,
|
|
index=None,
|
|
customize=None,
|
|
smooth=None,
|
|
**kwargs,
|
|
):
|
|
self._smooth = smooth
|
|
self.http_cmd = http_cmd
|
|
self._state: State = None
|
|
self.port = port
|
|
self.config_entry = config_entry
|
|
self.mega = mega
|
|
mega.entities.append(self)
|
|
self._mega_id = mega.id
|
|
self._lg = None
|
|
if not isinstance(port, list):
|
|
self._unique_id = unique_id or f"mega_{mega.id}_{port}" + (
|
|
f"_{id_suffix}" if id_suffix else ""
|
|
)
|
|
_pt = (
|
|
port
|
|
if not mega.new_naming
|
|
else f"{port:02}"
|
|
if isinstance(port, int)
|
|
else port
|
|
)
|
|
self._name = name or f"{mega.id}_{_pt}" + (
|
|
f"_{id_suffix}" if id_suffix else ""
|
|
)
|
|
self._customize: dict = None
|
|
else:
|
|
assert id_suffix is not None
|
|
assert name is not None
|
|
assert isinstance(customize, dict)
|
|
self._unique_id = unique_id or f"mega_{mega.id}_{id_suffix}"
|
|
self._name = name
|
|
self._customize = customize
|
|
|
|
self.index = index
|
|
self.addr = addr
|
|
self.id_suffix = id_suffix
|
|
self._can_smooth_hard = None
|
|
if self.http_cmd == "ds2413":
|
|
self.mega.ds2413_ports |= {self.port}
|
|
super().__init__(coordinator=mega.updater)
|
|
|
|
@property
|
|
def is_ws(self):
|
|
return False
|
|
|
|
def get_attribute(self, name, default=None):
|
|
attr = getattr(self, f"_{name}", None)
|
|
if attr is None and self._state is not None:
|
|
if name == "is_on":
|
|
attr = self._state.state
|
|
else:
|
|
attr = self._state.attributes.get(f"{name}", default)
|
|
return attr if attr is not None else default
|
|
|
|
@property
|
|
def can_smooth_hardware(self):
|
|
if self._can_smooth_hard is None:
|
|
if self.is_ws:
|
|
self._can_smooth_hard = False
|
|
if not isinstance(self.port, list):
|
|
self._can_smooth_hard = self.port in self.mega.smooth
|
|
else:
|
|
for x in self.port:
|
|
if isinstance(x, str):
|
|
self._can_smooth_hard = False
|
|
break
|
|
else:
|
|
self._can_smooth_hard = self.port in self.mega.smooth
|
|
return self._can_smooth_hard
|
|
|
|
@property
|
|
def enabled(self):
|
|
if "<" in self.name:
|
|
return False
|
|
else:
|
|
return super().enabled
|
|
|
|
@property
|
|
def customize(self):
|
|
if self._customize is not None:
|
|
return self._customize
|
|
if self.hass is None or self.entity_id is None:
|
|
return {}
|
|
if self._customize is None:
|
|
c_entity_id = (
|
|
self.hass.data.get(DOMAIN, {})
|
|
.get(CONF_CUSTOM)
|
|
.get("entities", {})
|
|
.get(self.entity_id, {})
|
|
)
|
|
c = self.hass.data.get(DOMAIN, {}).get(CONF_CUSTOM) or {}
|
|
c = c.get(self._mega_id) or {}
|
|
c = c.get(self.port) or {}
|
|
if self.addr is not None and self.index is not None and isinstance(c, dict):
|
|
idx = self.addr.lower() + f"_a" if self.index == 0 else "_b"
|
|
c = c.get(idx, {})
|
|
c.update(c_entity_id)
|
|
self._customize = c
|
|
|
|
return self._customize
|
|
|
|
@property
|
|
def device_info(self) -> DeviceInfo:
|
|
if isinstance(self.port, list):
|
|
pt_idx = self.id_suffix
|
|
else:
|
|
_pt = (
|
|
self.port
|
|
if not self.mega.new_naming
|
|
else f"{self.port:02}"
|
|
if isinstance(self.port, int)
|
|
else self.port
|
|
)
|
|
if isinstance(_pt, str) and "e" in _pt:
|
|
pt_idx, _ = _pt.split("e")
|
|
else:
|
|
pt_idx = _pt
|
|
return DeviceInfo(
|
|
identifiers={
|
|
# Serial numbers are unique identifiers within a specific domain
|
|
(DOMAIN, f"{self._mega_id}", pt_idx)
|
|
},
|
|
name=self.name,
|
|
manufacturer="ab-log.ru",
|
|
sw_version=self.mega.fw,
|
|
via_device=(DOMAIN, self._mega_id),
|
|
)
|
|
|
|
@property
|
|
def lg(self) -> logging.Logger:
|
|
return _LOGGER
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return self.mega.online
|
|
|
|
@property
|
|
def name(self):
|
|
c = self.customize.get(CONF_NAME)
|
|
if not isinstance(c, str):
|
|
if not isinstance(self.port, list):
|
|
_pt = (
|
|
self.port
|
|
if not self.mega.new_naming
|
|
else f"{self.port:02}"
|
|
if isinstance(self.port, int)
|
|
else self.port
|
|
)
|
|
c = self._name or f"{self.mega.id}_p{_pt}"
|
|
else:
|
|
c = self.id_suffix
|
|
return c
|
|
|
|
@property
|
|
def unique_id(self):
|
|
return self._unique_id
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
global _task_set_ev_on
|
|
await super().async_added_to_hass()
|
|
self._state = await self.async_get_last_state()
|
|
|
|
async def get_state(self):
|
|
self.lg.debug(f"state is %s", self.state)
|
|
self.async_write_ha_state()
|
|
|
|
|
|
class MegaPushEntity(BaseMegaEntity):
|
|
|
|
"""
|
|
Updates on messages from mqtt
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.mega.subscribe(self.port, callback=self.__update)
|
|
self.is_first_update = True
|
|
|
|
def __update(self, value: dict):
|
|
self._update(value)
|
|
if self.hass is None:
|
|
return
|
|
self.async_write_ha_state()
|
|
self.lg.debug(f"state after update %s", self.state)
|
|
if not self.entity_id.startswith("binary_sensor"):
|
|
_LOGGER.debug("skip event because not a bnary sens")
|
|
return
|
|
ll: bool = self.mega.last_long.get(self.port, False)
|
|
if safe_int(value.get("click", 0)) == 1:
|
|
self.hass.bus.async_fire(
|
|
event_type=EVENT_BINARY,
|
|
event_data={"entity_id": self.entity_id, "type": SINGLE_CLICK},
|
|
)
|
|
elif safe_int(value.get("click", 0)) == 2:
|
|
self.hass.bus.async_fire(
|
|
event_type=EVENT_BINARY,
|
|
event_data={"entity_id": self.entity_id, "type": DOUBLE_CLICK},
|
|
)
|
|
elif safe_int(value.get("m", 0)) == 2:
|
|
self.mega.last_long[self.port] = True
|
|
self.hass.bus.async_fire(
|
|
event_type=EVENT_BINARY,
|
|
event_data={"entity_id": self.entity_id, "type": LONG},
|
|
)
|
|
elif safe_int(value.get("m", 0)) == 1:
|
|
self.hass.bus.async_fire(
|
|
event_type=EVENT_BINARY,
|
|
event_data={
|
|
"entity_id": self.entity_id,
|
|
"type": LONG_RELEASE if ll else RELEASE,
|
|
},
|
|
)
|
|
elif safe_int(value.get("m", None)) == 0:
|
|
self.hass.bus.async_fire(
|
|
event_type=EVENT_BINARY,
|
|
event_data={
|
|
"entity_id": self.entity_id,
|
|
"type": PRESS,
|
|
},
|
|
)
|
|
self.mega.last_long[self.port] = False
|
|
return
|
|
|
|
def _update(self, payload: dict):
|
|
pass
|
|
|
|
|
|
class MegaOutPort(MegaPushEntity):
|
|
def __init__(self, dimmer=False, dimmer_scale=1, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._brightness = None
|
|
self._is_on = None
|
|
self.dimmer = dimmer
|
|
self.dimmer_scale = dimmer_scale
|
|
self.is_extender = isinstance(self.port, str) and "e" in self.port
|
|
self.task: asyncio.Task = None
|
|
self._restore_brightness = None
|
|
self._last_called: float = 0
|
|
|
|
# @property
|
|
# def assumed_state(self) -> bool:
|
|
# return True if self.index is not None or self.mega.mqtt is None else False
|
|
|
|
@property
|
|
def max_dim(self):
|
|
if self.dimmer_scale == 1:
|
|
return 255
|
|
elif self.dimmer_scale == 16:
|
|
return 4095
|
|
else:
|
|
return 255
|
|
|
|
@property
|
|
def range(self) -> typing.List[int]:
|
|
return self.customize.get(CONF_RANGE, [0, 255])
|
|
|
|
@property
|
|
def invert(self):
|
|
return self.customize.get(CONF_INVERT, False)
|
|
|
|
@property
|
|
def brightness(self):
|
|
ret = None
|
|
if not self.dimmer:
|
|
return
|
|
val = self.mega.values.get(self.port, {})
|
|
if isinstance(val, dict) and len(val) == 0 and self._state is not None:
|
|
ret = safe_int(
|
|
self._state.attributes.get("brightness"),
|
|
def_on=self.max_dim,
|
|
def_off=0,
|
|
def_val=0,
|
|
)
|
|
ret = self._calc_brightness(ret)
|
|
elif isinstance(self.port, str) and "e" in self.port:
|
|
if isinstance(val, str):
|
|
val = safe_int(val, def_on=self.max_dim, def_off=0, def_val=0)
|
|
else:
|
|
val = 0
|
|
if val == 0:
|
|
ret = self._brightness
|
|
elif isinstance(val, (int, float)):
|
|
ret = int(val / self.dimmer_scale)
|
|
elif val is not None:
|
|
val = val.get("value")
|
|
if val is None:
|
|
return
|
|
try:
|
|
val = int(val)
|
|
ret = val
|
|
except Exception:
|
|
pass
|
|
ret = self._cal_reverse_brightness(ret)
|
|
return ret
|
|
|
|
@property
|
|
def is_on(self) -> bool:
|
|
val = self.mega.values.get(self.port, {})
|
|
if isinstance(val, dict) and len(val) == 0 and self._state is not None:
|
|
return self._state == "ON"
|
|
elif isinstance(self.port, str) and "e" in self.port and val:
|
|
if val is None:
|
|
return
|
|
if hasattr(self, "dimmer") and self.dimmer:
|
|
val = safe_int(val)
|
|
if val is not None:
|
|
return val > 0 if not self.invert else val == 0
|
|
else:
|
|
return val == "ON" if not self.invert else val == "OFF"
|
|
elif val is not None:
|
|
val = val.get("value")
|
|
if (
|
|
not isinstance(val, str)
|
|
and self.index is not None
|
|
and self.addr is not None
|
|
):
|
|
if not isinstance(val, dict):
|
|
self.mega.lg.warning(f"{self.entity_id}: {val} is not a dict")
|
|
return
|
|
_val = val.get(
|
|
self.addr, val.get(self.addr.lower(), val.get(self.addr.upper()))
|
|
)
|
|
if not isinstance(_val, str):
|
|
self.mega.lg.warning(
|
|
f"{self.entity_id}: can not get {self.addr} from {val}, recieved {_val}"
|
|
)
|
|
return
|
|
_val = _val.split("/")
|
|
if len(_val) >= 2:
|
|
self.mega.lg.debug(
|
|
'%s parsed values: %s[%s]="%s"',
|
|
self.entity_id,
|
|
_val,
|
|
self.index,
|
|
_val,
|
|
)
|
|
val = _val[self.index]
|
|
else:
|
|
self.mega.lg.warning(f"{self.entity_id}: {_val} has wrong length")
|
|
return
|
|
elif self.index is not None and self.addr is None:
|
|
self.mega.lg.warning(f"{self.entity_id} does not has addr")
|
|
return
|
|
self.mega.lg.debug("%s.state = %s", self.entity_id, val)
|
|
if not self.invert:
|
|
return (
|
|
val == "ON"
|
|
or str(val) == "1"
|
|
or (safe_int(val) is not None and safe_int(val) > 0)
|
|
)
|
|
else:
|
|
return (
|
|
val == "OFF"
|
|
or str(val) == "0"
|
|
or (safe_int(val) is not None and safe_int(val) == 0)
|
|
)
|
|
|
|
@property
|
|
def cmd_port(self):
|
|
if self.index is not None:
|
|
return f"{self.port}A" if self.index == 0 else f"{self.port}B"
|
|
else:
|
|
return self.port
|
|
|
|
@property
|
|
def smooth(self) -> timedelta:
|
|
ret = self.customize.get(CONF_SMOOTH)
|
|
if ret is None and self._smooth:
|
|
ret = timedelta(seconds=self._smooth)
|
|
return ret
|
|
|
|
@property
|
|
def smooth_dim(self):
|
|
if not self.dimmer:
|
|
return False
|
|
return self.smooth or self.can_smooth_hardware
|
|
|
|
def update_from_smooth(self, value, update_state=False):
|
|
if isinstance(self.port, str):
|
|
self.mega.values[self.port] = value[0]
|
|
else:
|
|
self.mega.values[self.port] = {"value": value[0]}
|
|
if update_state:
|
|
self.async_write_ha_state()
|
|
|
|
def _set_dim_brightness(self, from_, to_, transition):
|
|
pct = abs(to_ - from_) / (255 if self.dimmer_scale == 1 else 4095)
|
|
update_state = transition is not None and transition > 3
|
|
tm = (self.smooth.total_seconds() * pct) if transition is None else transition
|
|
if self.task is not None:
|
|
self.task.cancel()
|
|
self.task = asyncio.create_task(
|
|
self.mega.smooth_dim(
|
|
(self.cmd_port, from_, to_),
|
|
time=tm,
|
|
can_smooth_hardware=self.can_smooth_hardware,
|
|
max_values=[255 if self.dimmer_scale == 1 else 4095],
|
|
updater=partial(self.update_from_smooth, update_state=update_state),
|
|
)
|
|
)
|
|
|
|
def _calc_brightness(self, brightness):
|
|
if brightness is None:
|
|
brightness = 0
|
|
pct = brightness / 255
|
|
pct = max((0, pct))
|
|
pct = min((pct, 1))
|
|
l, h = self.range
|
|
d = h - l
|
|
brightness = round(pct * d + l)
|
|
return brightness
|
|
|
|
def _cal_reverse_brightness(self, brightness):
|
|
if brightness is None:
|
|
brightness = 0
|
|
l, h = self.range
|
|
d = h - l
|
|
pct = (brightness - l) / d
|
|
pct = max((0, pct))
|
|
pct = min((pct, 1))
|
|
brightness = round(pct * 255)
|
|
return brightness
|
|
|
|
async def async_turn_on(self, brightness=None, transition=None, **kwargs):
|
|
if (time.time() - self._last_called) < 0.1:
|
|
return
|
|
self._last_called = time.time()
|
|
if not self.dimmer:
|
|
transition = None
|
|
if not self.is_on and (brightness is None or brightness == 0):
|
|
brightness = self._restore_brightness
|
|
brightness = brightness or self.brightness or 255
|
|
brightness = self._calc_brightness(brightness)
|
|
_prev = safe_int(self.brightness) or 0
|
|
self._brightness = brightness
|
|
if hasattr(self, "dimmer") and self.dimmer and brightness == 0:
|
|
cmd = self.max_dim
|
|
elif hasattr(self, "dimmer") and self.dimmer:
|
|
cmd = min((brightness * self.dimmer_scale, self.max_dim))
|
|
if self.smooth_dim or transition:
|
|
self._set_dim_brightness(from_=_prev, to_=cmd, transition=transition)
|
|
else:
|
|
cmd = 1 if not self.invert else 0
|
|
if transition is None:
|
|
_cmd = {"cmd": f"{self.cmd_port}:{cmd}"}
|
|
else:
|
|
_cmd = {
|
|
"pt": f"{self.cmd_port}",
|
|
"pwm": cmd,
|
|
"cnt": round(transition / (abs(_prev - brightness) / 255)),
|
|
}
|
|
if self.addr:
|
|
_cmd["addr"] = self.addr
|
|
if not (self.smooth_dim or transition):
|
|
await self.mega.request(**_cmd, priority=-1)
|
|
if self.index is not None:
|
|
# обновление текущего стейта для ds2413
|
|
await self.mega.get_port(
|
|
port=self.port,
|
|
force_http=True,
|
|
conv=False,
|
|
http_cmd="list",
|
|
)
|
|
elif isinstance(self.port, str) and "e" in self.port:
|
|
if not self.dimmer:
|
|
self.mega.values[self.port] = "ON" if not self.invert else "OFF"
|
|
else:
|
|
self.mega.values[self.port] = cmd
|
|
else:
|
|
self.mega.values[self.port] = {"value": cmd}
|
|
await self.get_state()
|
|
|
|
async def async_turn_off(self, transition=None, **kwargs) -> None:
|
|
if (time.time() - self._last_called) < 0.1:
|
|
return
|
|
self._last_called = time.time()
|
|
self._restore_brightness = self._cal_reverse_brightness(
|
|
safe_int(self._brightness)
|
|
)
|
|
if not self.dimmer:
|
|
transition = None
|
|
cmd = "0" if not self.invert else "1"
|
|
_cmd = {"cmd": f"{self.cmd_port}:{cmd}"}
|
|
_prev = safe_int(self.brightness) or 0
|
|
if self.addr:
|
|
_cmd["addr"] = self.addr
|
|
if not (self.smooth_dim or transition):
|
|
await self.mega.request(**_cmd, priority=-1)
|
|
else:
|
|
self._set_dim_brightness(
|
|
from_=_prev,
|
|
to_=0,
|
|
transition=transition,
|
|
)
|
|
if self.index is not None:
|
|
# обновление текущего стейта для ds2413
|
|
await self.mega.get_port(
|
|
port=self.port,
|
|
force_http=True,
|
|
conv=False,
|
|
http_cmd="list",
|
|
)
|
|
elif isinstance(self.port, str) and "e" in self.port:
|
|
self.mega.values[self.port] = "OFF" if not self.invert else "ON"
|
|
else:
|
|
self.mega.values[self.port] = {"value": cmd}
|
|
await self.get_state()
|
|
|
|
async def async_will_remove_from_hass(self) -> None:
|
|
if self.task is not None:
|
|
self.task.cancel()
|
|
|
|
|
|
def safe_int(v, def_on=1, def_off=0, def_val=None):
|
|
if v == "ON":
|
|
return def_on
|
|
elif v == "OFF":
|
|
return def_off
|
|
try:
|
|
return int(v)
|
|
except (ValueError, TypeError):
|
|
return def_val
|
|
|
|
|
|
def safe_float(v):
|
|
try:
|
|
return float(v)
|
|
except:
|
|
return None
|