Compare commits

...

5 Commits

Author SHA1 Message Date
Théo Gaillard
80948edbb4 fix(gcode_shell_cmd): update comment to clarify config directory usage (#767) 2026-01-19 17:04:28 +01:00
Théo Gaillard
a455edba93 docs(fs_utils): add the documentation for create_symlink (#768) 2026-01-19 17:03:04 +01:00
Théo Gaillard
810ab3a2fa fix(fs-utils): enhance check_file_exist to support symlink resolution (#766) 2026-01-19 17:02:18 +01:00
dw-0
6c9a78496a fix(client_utils): ensure proper type conversion and hints for improved safety
- Standardize `str()` wrapping for color-applied strings in various returns.
- Refine type hints for better code clarity and robustness.
- Add null checks for port inputs to prevent potential errors.
2026-01-18 15:58:25 +01:00
dw-0
123ccde378 fix(core): standardize handling of None values for repo and version fields
- Improve local and remote version comparison by replacing default placeholders with None.
- Update repo and branch logic to handle None values consistently.
- Refactor type hints for better readability and accuracy.
2026-01-18 15:49:48 +01:00
10 changed files with 51 additions and 34 deletions

View File

@@ -78,10 +78,10 @@ def get_current_client_config() -> str:
installed = [c for c in clients if c.client_config.config_dir.exists()] installed = [c for c in clients if c.client_config.config_dir.exists()]
if not installed: if not installed:
return Color.apply("-", Color.CYAN) return str(Color.apply("-", Color.CYAN))
elif len(installed) == 1: elif len(installed) == 1:
cfg = installed[0].client_config cfg = installed[0].client_config
return Color.apply(cfg.display_name, Color.CYAN) return str(Color.apply(cfg.display_name, Color.CYAN))
# at this point, both client config folders exists, so we need to check # at this point, both client config folders exists, so we need to check
# which are actually included in the printer.cfg of all klipper instances # which are actually included in the printer.cfg of all klipper instances
@@ -100,18 +100,18 @@ def get_current_client_config() -> str:
# if both are included in the same file, we have a potential conflict # if both are included in the same file, we have a potential conflict
if includes_mainsail and includes_fluidd: if includes_mainsail and includes_fluidd:
return Color.apply("Conflict", Color.YELLOW) return str(Color.apply("Conflict", Color.YELLOW))
if not mainsail_includes and not fluidd_includes: if not mainsail_includes and not fluidd_includes:
# there are no includes at all, even though the client config folders exist # there are no includes at all, even though the client config folders exist
return Color.apply("-", Color.CYAN) return str(Color.apply("-", Color.CYAN))
elif len(fluidd_includes) > len(mainsail_includes): elif len(fluidd_includes) > len(mainsail_includes):
# there are more instances that include fluidd than mainsail # there are more instances that include fluidd than mainsail
return Color.apply(fluidd.client_config.display_name, Color.CYAN) return str(Color.apply(fluidd.client_config.display_name, Color.CYAN))
else: else:
# there are the same amount of non-conflicting includes for each config # there are the same amount of non-conflicting includes for each config
# or more instances include mainsail than fluidd # or more instances include mainsail than fluidd
return Color.apply(mainsail.client_config.display_name, Color.CYAN) return str(Color.apply(mainsail.client_config.display_name, Color.CYAN))
def enable_mainsail_remotemode() -> None: def enable_mainsail_remotemode() -> None:
@@ -152,10 +152,9 @@ def symlink_webui_nginx_log(
def get_local_client_version(client: BaseWebClient) -> str | None: def get_local_client_version(client: BaseWebClient) -> str | None:
relinfo_file = client.client_dir.joinpath("release_info.json") relinfo_file = client.client_dir.joinpath("release_info.json")
version_file = client.client_dir.joinpath(".version") version_file = client.client_dir.joinpath(".version")
default = "-"
if not client.client_dir.exists(): if not client.client_dir.exists():
return default return None
# try to get version from release_info.json first # try to get version from release_info.json first
if relinfo_file.is_file(): if relinfo_file.is_file():
@@ -177,11 +176,11 @@ def get_local_client_version(client: BaseWebClient) -> str | None:
try: try:
with open(version_file, "r") as f: with open(version_file, "r") as f:
line = f.readline().strip() line = f.readline().strip()
return line or default return line or None
except OSError: except OSError:
Logger.print_error("Unable to read '.version'") Logger.print_error("Unable to read '.version'")
return default return None
def get_remote_client_version(client: BaseWebClient) -> str | None: def get_remote_client_version(client: BaseWebClient) -> str | None:
@@ -446,9 +445,9 @@ def get_client_port_selection(
while True: while True:
_type = "Reconfigure" if reconfigure else "Configure" _type = "Reconfigure" if reconfigure else "Configure"
question = f"{_type} {client.display_name} for port" question = f"{_type} {client.display_name} for port"
port_input = get_number_input(question, min_value=80, default=port) port_input: int | None = get_number_input(question, min_value=80, default=port)
if port_input not in ports_in_use: if port_input and port_input not in ports_in_use:
client_settings: WebUiSettings = settings[client.name] client_settings: WebUiSettings = settings[client.name]
client_settings.port = port_input client_settings.port = port_input
settings.save() settings.save()

View File

@@ -97,7 +97,7 @@ class ClientInstallMenu(BaseMenu):
self.message_service.set_message(message) self.message_service.set_message(message)
def _get_current_port(self) -> int: def _get_current_port(self) -> int:
curr_port = get_nginx_listen_port(self.client.nginx_config) curr_port: int | None = get_nginx_listen_port(self.client.nginx_config)
if curr_port is None: if curr_port is None:
# if the port is not found in the config file we use # if the port is not found in the config file we use
# the default port from the kiauh settings as fallback # the default port from the kiauh settings as fallback

View File

@@ -95,8 +95,8 @@ class MainMenu(BaseMenu):
status_data: ComponentStatus = status_fn(*args) status_data: ComponentStatus = status_fn(*args)
code: int = status_data.status code: int = status_data.status
status: StatusText = StatusMap[code] status: StatusText = StatusMap[code]
owner: str = trunc_string(status_data.owner, 23) owner: str = trunc_string(status_data.owner, 23) if status_data.owner else '-'
repo: str = trunc_string(status_data.repo, 23) repo: str = trunc_string(status_data.repo, 23) if status_data.repo else '-'
instance_count: int = status_data.instances instance_count: int = status_data.instances
count_txt: str = "" count_txt: str = ""

View File

@@ -257,7 +257,7 @@ class UpdateMenu(BaseMenu):
def _format_local_status(self, local_version, remote_version) -> str: def _format_local_status(self, local_version, remote_version) -> str:
color = Color.RED color = Color.RED
if not local_version or local_version == '-': if local_version is None:
color = Color.RED color = Color.RED
elif local_version == remote_version: elif local_version == remote_version:
color = Color.GREEN color = Color.GREEN
@@ -290,7 +290,13 @@ class UpdateMenu(BaseMenu):
return self.status_data[name]["installed"] return self.status_data[name]["installed"]
def _is_update_available(self, name: str) -> bool: def _is_update_available(self, name: str) -> bool:
return self.status_data[name]["local"] != self.status_data[name]["remote"] local = self.status_data[name]["local"]
remote = self.status_data[name]["remote"]
if local is None or remote is None:
return False
return local != remote
def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None: def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None:
display_name = self.status_data[name]["display_name"] display_name = self.status_data[name]["display_name"]

View File

@@ -26,7 +26,7 @@ class ComponentStatus:
owner: str | None = None owner: str | None = None
repo: str | None = None repo: str | None = None
repo_url: str | None = None repo_url: str | None = None
branch: str = "" branch: str | None = None
local: str | None = None local: str | None = None
remote: str | None = None remote: str | None = None
instances: int | None = None instances: int | None = None

View File

@@ -97,7 +97,7 @@ class GcodeShellCmdExtension(BaseExtension):
def install_example_cfg(self, instances: List[Klipper]): def install_example_cfg(self, instances: List[Klipper]):
cfg_dirs = [instance.base.cfg_dir for instance in instances] cfg_dirs = [instance.base.cfg_dir for instance in instances]
# copy extension to klippy/extras # copy extension to config directories
for cfg_dir in cfg_dirs: for cfg_dir in cfg_dirs:
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...") Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
if check_file_exist(cfg_dir.joinpath("shell_command.cfg")): if check_file_exist(cfg_dir.joinpath("shell_command.cfg")):

View File

@@ -42,7 +42,7 @@ def get_kiauh_version() -> str:
Helper method to get the current KIAUH version by reading the latest tag Helper method to get the current KIAUH version by reading the latest tag
:return: string of the latest tag or a default value if no tags exist :return: string of the latest tag or a default value if no tags exist
""" """
tags = get_local_tags(Path(__file__).parent.parent) tags: List[str] = get_local_tags(Path(__file__).parent.parent)
if tags: if tags:
return tags[-1] return tags[-1]
else: else:
@@ -108,7 +108,7 @@ def get_install_status(
from utils.instance_utils import get_instances from utils.instance_utils import get_instances
checks = [] checks = []
branch: str = "" branch: str | None = None
if repo_dir.exists(): if repo_dir.exists():
checks.append(True) checks.append(True)

View File

@@ -24,13 +24,16 @@ from core.logger import Logger
def check_file_exist(file_path: Path, sudo=False) -> bool: def check_file_exist(file_path: Path, sudo=False) -> bool:
""" """
Helper function for checking the existence of a file | Helper function for checking the existence of a file.
Also works with symlinks (returns False if broken) |
:param file_path: the absolute path of the file to check :param file_path: the absolute path of the file to check
:param sudo: use sudo if required :param sudo: use sudo if required
:return: True, if file exists, otherwise False :return: True, if file exists, otherwise False
""" """
if sudo: if sudo:
command = ["sudo", "find", file_path.as_posix()] # -L forces find to follow symlinks
# -maxdepth = 0 avoids losing time if `file_path` is a directory
command = ["sudo", "find", "-L", file_path.as_posix(), "-maxdepth", "0"]
try: try:
check_output(command, stderr=DEVNULL) check_output(command, stderr=DEVNULL)
return True return True
@@ -44,7 +47,16 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
def create_symlink(source: Path, target: Path, sudo=False) -> None: def create_symlink(source: Path, target: Path, sudo=False) -> None:
"""
Helper function to create a symlink from source to target
If the target file exists, it will be overwritten. |
:param source: the source file/directory
:param target: the target file/directory
:param sudo: use sudo if required
:return: None
"""
try: try:
# -f forcibly creates/overwrites the symlink
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()] cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
if sudo: if sudo:
cmd.insert(0, "sudo") cmd.insert(0, "sudo")

View File

@@ -73,44 +73,44 @@ def git_pull_wrapper(target_dir: Path) -> None:
return return
def get_repo_name(repo: Path) -> Tuple[str, str]: def get_repo_name(repo: Path) -> Tuple[str | None, str | None]:
""" """
Helper method to extract the organisation and name of a repository | Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from :param repo: repository to extract the values from
:return: String in form of "<orga>/<name>" or None :return: String in form of "<orga>/<name>" or None
""" """
if not repo.exists() or not repo.joinpath(".git").exists(): if not repo.exists() or not repo.joinpath(".git").exists():
return "-", "-" return None, None
try: try:
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"] cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8") result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
substrings: List[str] = result.strip().split("/")[-2:] substrings: List[str] = result.strip().split("/")[-2:]
orga: str = substrings[0] if substrings[0] else "-" orga: str | None = substrings[0] if substrings[0] else None
name: str = substrings[1] if substrings[1] else "-" name: str | None = substrings[1] if substrings[1] else None
return orga, name.replace(".git", "") return orga, name.replace(".git", "") if name else None
except CalledProcessError: except CalledProcessError:
return "-", "-" return None, None
def get_current_branch(repo: Path) -> str: def get_current_branch(repo: Path) -> str | None:
""" """
Get the current branch of a local Git repository Get the current branch of a local Git repository
:param repo: Path to the local Git repository :param repo: Path to the local Git repository
:return: Current branch :return: Current branch or None if not determinable
""" """
try: try:
cmd = ["git", "branch", "--show-current"] cmd = ["git", "branch", "--show-current"]
result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode( result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode(
encoding="utf-8" encoding="utf-8"
) )
return result.strip() if result else "-" return result.strip() if result else None
except CalledProcessError: except CalledProcessError:
return "-" return None
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]: def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:

View File

@@ -156,7 +156,7 @@ def format_question(question: str, default=None) -> str:
if default is not None: if default is not None:
formatted_q += f" (default={default})" formatted_q += f" (default={default})"
return Color.apply(f"###### {formatted_q}: ", Color.CYAN) return str(Color.apply(f"###### {formatted_q}: ", Color.CYAN))
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int: def validate_number_input(value: str, min_count: int, max_count: int | None) -> int: