mirror of
https://github.com/dw-0/kiauh.git
synced 2026-02-02 23:47:46 +05:00
Compare commits
8 Commits
45fde808d2
...
v6.0.12
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f9d4c823a | ||
|
|
c8df9427b3 | ||
|
|
5414aba299 | ||
|
|
80948edbb4 | ||
|
|
a455edba93 | ||
|
|
810ab3a2fa | ||
|
|
6c9a78496a | ||
|
|
123ccde378 |
@@ -8,7 +8,7 @@
|
||||
# ======================================================================= #
|
||||
|
||||
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
@@ -27,6 +27,7 @@ def run_client_config_removal(
|
||||
client_config: BaseWebClientConfig,
|
||||
kl_instances: List[Klipper],
|
||||
mr_instances: List[Moonraker],
|
||||
svc: Optional[BackupService] = None,
|
||||
) -> Message:
|
||||
completion_msg = Message(
|
||||
title=f"{client_config.display_name} Removal Process completed",
|
||||
@@ -36,12 +37,15 @@ def run_client_config_removal(
|
||||
if run_remove_routines(client_config.config_dir):
|
||||
completion_msg.text.append(f"● {client_config.display_name} removed")
|
||||
|
||||
BackupService().backup_printer_config_dir()
|
||||
if svc is None:
|
||||
svc = BackupService()
|
||||
|
||||
svc.backup_moonraker_conf()
|
||||
completion_msg = remove_moonraker_config_section(
|
||||
completion_msg, client_config, mr_instances
|
||||
)
|
||||
|
||||
svc.backup_printer_cfg()
|
||||
completion_msg = remove_printer_config_section(
|
||||
completion_msg, client_config, kl_instances
|
||||
)
|
||||
|
||||
@@ -41,6 +41,7 @@ def run_client_removal(
|
||||
)
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
svc = BackupService()
|
||||
|
||||
if backup_config:
|
||||
version = ""
|
||||
@@ -49,7 +50,6 @@ def run_client_removal(
|
||||
with open(src.joinpath(".version"), "r") as v:
|
||||
version = v.readlines()[0]
|
||||
|
||||
svc = BackupService()
|
||||
target_path = svc.backup_root.joinpath(f"{client.client_dir.name}_{version}")
|
||||
success = svc.backup_file(
|
||||
source_path=client.config_file,
|
||||
@@ -67,7 +67,7 @@ def run_client_removal(
|
||||
if remove_client_nginx_logs(client, kl_instances):
|
||||
completion_msg.text.append("● NGINX logs removed")
|
||||
|
||||
BackupService().backup_moonraker_conf()
|
||||
svc.backup_moonraker_conf()
|
||||
section = f"update_manager {client_name}"
|
||||
handled_instances: List[Moonraker] = remove_config_section(
|
||||
section, mr_instances
|
||||
@@ -83,6 +83,7 @@ def run_client_removal(
|
||||
client.client_config,
|
||||
kl_instances,
|
||||
mr_instances,
|
||||
svc,
|
||||
)
|
||||
if cfg_completion_msg.color == Color.GREEN:
|
||||
completion_msg.text.extend(cfg_completion_msg.text[1:])
|
||||
|
||||
@@ -78,10 +78,10 @@ def get_current_client_config() -> str:
|
||||
installed = [c for c in clients if c.client_config.config_dir.exists()]
|
||||
|
||||
if not installed:
|
||||
return Color.apply("-", Color.CYAN)
|
||||
return str(Color.apply("-", Color.CYAN))
|
||||
elif len(installed) == 1:
|
||||
cfg = installed[0].client_config
|
||||
return Color.apply(cfg.display_name, Color.CYAN)
|
||||
return str(Color.apply(cfg.display_name, Color.CYAN))
|
||||
|
||||
# at this point, both client config folders exists, so we need to check
|
||||
# which are actually included in the printer.cfg of all klipper instances
|
||||
@@ -100,18 +100,18 @@ def get_current_client_config() -> str:
|
||||
|
||||
# if both are included in the same file, we have a potential conflict
|
||||
if includes_mainsail and includes_fluidd:
|
||||
return Color.apply("Conflict", Color.YELLOW)
|
||||
return str(Color.apply("Conflict", Color.YELLOW))
|
||||
|
||||
if not mainsail_includes and not fluidd_includes:
|
||||
# there are no includes at all, even though the client config folders exist
|
||||
return Color.apply("-", Color.CYAN)
|
||||
return str(Color.apply("-", Color.CYAN))
|
||||
elif len(fluidd_includes) > len(mainsail_includes):
|
||||
# there are more instances that include fluidd than mainsail
|
||||
return Color.apply(fluidd.client_config.display_name, Color.CYAN)
|
||||
return str(Color.apply(fluidd.client_config.display_name, Color.CYAN))
|
||||
else:
|
||||
# there are the same amount of non-conflicting includes for each config
|
||||
# or more instances include mainsail than fluidd
|
||||
return Color.apply(mainsail.client_config.display_name, Color.CYAN)
|
||||
return str(Color.apply(mainsail.client_config.display_name, Color.CYAN))
|
||||
|
||||
|
||||
def enable_mainsail_remotemode() -> None:
|
||||
@@ -152,10 +152,9 @@ def symlink_webui_nginx_log(
|
||||
def get_local_client_version(client: BaseWebClient) -> str | None:
|
||||
relinfo_file = client.client_dir.joinpath("release_info.json")
|
||||
version_file = client.client_dir.joinpath(".version")
|
||||
default = "-"
|
||||
|
||||
if not client.client_dir.exists():
|
||||
return default
|
||||
return None
|
||||
|
||||
# try to get version from release_info.json first
|
||||
if relinfo_file.is_file():
|
||||
@@ -177,11 +176,11 @@ def get_local_client_version(client: BaseWebClient) -> str | None:
|
||||
try:
|
||||
with open(version_file, "r") as f:
|
||||
line = f.readline().strip()
|
||||
return line or default
|
||||
return line or None
|
||||
except OSError:
|
||||
Logger.print_error("Unable to read '.version'")
|
||||
|
||||
return default
|
||||
return None
|
||||
|
||||
|
||||
def get_remote_client_version(client: BaseWebClient) -> str | None:
|
||||
@@ -446,9 +445,9 @@ def get_client_port_selection(
|
||||
while True:
|
||||
_type = "Reconfigure" if reconfigure else "Configure"
|
||||
question = f"{_type} {client.display_name} for port"
|
||||
port_input = get_number_input(question, min_value=80, default=port)
|
||||
port_input: int | None = get_number_input(question, min_value=80, default=port)
|
||||
|
||||
if port_input not in ports_in_use:
|
||||
if port_input and port_input not in ports_in_use:
|
||||
client_settings: WebUiSettings = settings[client.name]
|
||||
client_settings.port = port_input
|
||||
settings.save()
|
||||
|
||||
@@ -97,7 +97,7 @@ class ClientInstallMenu(BaseMenu):
|
||||
self.message_service.set_message(message)
|
||||
|
||||
def _get_current_port(self) -> int:
|
||||
curr_port = get_nginx_listen_port(self.client.nginx_config)
|
||||
curr_port: int | None = get_nginx_listen_port(self.client.nginx_config)
|
||||
if curr_port is None:
|
||||
# if the port is not found in the config file we use
|
||||
# the default port from the kiauh settings as fallback
|
||||
|
||||
@@ -95,8 +95,8 @@ class MainMenu(BaseMenu):
|
||||
status_data: ComponentStatus = status_fn(*args)
|
||||
code: int = status_data.status
|
||||
status: StatusText = StatusMap[code]
|
||||
owner: str = trunc_string(status_data.owner, 23)
|
||||
repo: str = trunc_string(status_data.repo, 23)
|
||||
owner: str = trunc_string(status_data.owner, 23) if status_data.owner else '-'
|
||||
repo: str = trunc_string(status_data.repo, 23) if status_data.repo else '-'
|
||||
instance_count: int = status_data.instances
|
||||
|
||||
count_txt: str = ""
|
||||
|
||||
@@ -100,11 +100,11 @@ class SettingsMenu(BaseMenu):
|
||||
def trim_repo_url(repo: str) -> str:
|
||||
return repo.replace(".git", "").replace("https://", "").replace("git@", "")
|
||||
|
||||
if not klipper_status.repo == "-":
|
||||
if klipper_status.repo:
|
||||
url = trim_repo_url(klipper_status.repo_url)
|
||||
self.kl_repo_url = Color.apply(url, Color.CYAN)
|
||||
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
|
||||
if not moonraker_status.repo == "-":
|
||||
if moonraker_status.repo:
|
||||
url = trim_repo_url(moonraker_status.repo_url)
|
||||
self.mr_repo_url = Color.apply(url, Color.CYAN)
|
||||
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)
|
||||
|
||||
@@ -257,7 +257,7 @@ class UpdateMenu(BaseMenu):
|
||||
|
||||
def _format_local_status(self, local_version, remote_version) -> str:
|
||||
color = Color.RED
|
||||
if not local_version or local_version == '-':
|
||||
if local_version is None:
|
||||
color = Color.RED
|
||||
elif local_version == remote_version:
|
||||
color = Color.GREEN
|
||||
@@ -290,7 +290,13 @@ class UpdateMenu(BaseMenu):
|
||||
return self.status_data[name]["installed"]
|
||||
|
||||
def _is_update_available(self, name: str) -> bool:
|
||||
return self.status_data[name]["local"] != self.status_data[name]["remote"]
|
||||
local = self.status_data[name]["local"]
|
||||
remote = self.status_data[name]["remote"]
|
||||
|
||||
if local is None or remote is None:
|
||||
return False
|
||||
|
||||
return local != remote
|
||||
|
||||
def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None:
|
||||
display_name = self.status_data[name]["display_name"]
|
||||
|
||||
@@ -22,6 +22,7 @@ from utils.instance_utils import get_instances
|
||||
class BackupService:
|
||||
def __init__(self):
|
||||
self._backup_root = Path.home().joinpath("kiauh_backups")
|
||||
self._timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
|
||||
@property
|
||||
def backup_root(self) -> Path:
|
||||
@@ -29,7 +30,7 @@ class BackupService:
|
||||
|
||||
@property
|
||||
def timestamp(self) -> str:
|
||||
return datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
return self._timestamp
|
||||
|
||||
################################################
|
||||
# GENERIC BACKUP METHODS
|
||||
@@ -68,10 +69,15 @@ class BackupService:
|
||||
backup_dir = self._backup_root.joinpath(target_path)
|
||||
|
||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(source_path, backup_dir.joinpath(filename))
|
||||
target_path = backup_dir.joinpath(filename)
|
||||
if target_path.exists():
|
||||
Logger.print_info(f"File '{target_path}' already exists. Skipping ...")
|
||||
return True
|
||||
|
||||
shutil.copy2(source_path, target_path)
|
||||
|
||||
Logger.print_ok(
|
||||
f"Successfully backed up '{source_path}' to '{backup_dir}'"
|
||||
f"Successfully backed up '{source_path}' to '{target_path}'"
|
||||
)
|
||||
return True
|
||||
|
||||
@@ -111,14 +117,25 @@ class BackupService:
|
||||
|
||||
if backup_path.exists():
|
||||
Logger.print_info(f"Reusing existing backup directory '{backup_path}'")
|
||||
|
||||
shutil.copytree(
|
||||
source_path,
|
||||
backup_path,
|
||||
dirs_exist_ok=True,
|
||||
symlinks=True,
|
||||
ignore_dangling_symlinks=True,
|
||||
)
|
||||
for item in source_path.rglob("*"):
|
||||
relative_path = item.relative_to(source_path)
|
||||
target_item = backup_path.joinpath(relative_path)
|
||||
if item.is_file():
|
||||
if not target_item.exists():
|
||||
target_item.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(item, target_item)
|
||||
else:
|
||||
Logger.print_info(f"File '{target_item}' already exists. Skipping...")
|
||||
elif item.is_dir():
|
||||
target_item.mkdir(parents=True, exist_ok=True)
|
||||
else:
|
||||
shutil.copytree(
|
||||
source_path,
|
||||
backup_path,
|
||||
dirs_exist_ok=True,
|
||||
symlinks=True,
|
||||
ignore_dangling_symlinks=True,
|
||||
)
|
||||
|
||||
Logger.print_ok(
|
||||
f"Successfully backed up '{source_path}' to '{backup_path}'"
|
||||
@@ -134,27 +151,29 @@ class BackupService:
|
||||
################################################
|
||||
|
||||
def backup_printer_cfg(self):
|
||||
"""Backup printer.cfg files of all Klipper instances.
|
||||
Files are backed up to:
|
||||
{backup_root}/{instance_data_dir_name}/printer_{timestamp}.cfg
|
||||
"""
|
||||
klipper_instances: List[Klipper] = get_instances(Klipper)
|
||||
for instance in klipper_instances:
|
||||
target_path: Path = self._backup_root.joinpath(
|
||||
instance.data_dir.name, f"config_{self.timestamp}"
|
||||
)
|
||||
target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
|
||||
self.backup_file(
|
||||
source_path=instance.cfg_file,
|
||||
target_path=target_path,
|
||||
target_name=instance.cfg_file.name,
|
||||
)
|
||||
|
||||
def backup_moonraker_conf(self):
|
||||
"""Backup moonraker.conf files of all Moonraker instances.
|
||||
Files are backed up to:
|
||||
{backup_root}/{instance_data_dir_name}/moonraker_{timestamp}.conf
|
||||
"""
|
||||
moonraker_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
for instance in moonraker_instances:
|
||||
target_path: Path = self._backup_root.joinpath(
|
||||
instance.data_dir.name, f"config_{self.timestamp}"
|
||||
)
|
||||
target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
|
||||
self.backup_file(
|
||||
source_path=instance.cfg_file,
|
||||
target_path=target_path,
|
||||
target_name=instance.cfg_file.name,
|
||||
)
|
||||
|
||||
def backup_printer_config_dir(self) -> None:
|
||||
|
||||
@@ -26,7 +26,7 @@ class ComponentStatus:
|
||||
owner: str | None = None
|
||||
repo: str | None = None
|
||||
repo_url: str | None = None
|
||||
branch: str = ""
|
||||
branch: str | None = None
|
||||
local: str | None = None
|
||||
remote: str | None = None
|
||||
instances: int | None = None
|
||||
|
||||
@@ -97,7 +97,7 @@ class GcodeShellCmdExtension(BaseExtension):
|
||||
|
||||
def install_example_cfg(self, instances: List[Klipper]):
|
||||
cfg_dirs = [instance.base.cfg_dir for instance in instances]
|
||||
# copy extension to klippy/extras
|
||||
# copy extension to config directories
|
||||
for cfg_dir in cfg_dirs:
|
||||
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
|
||||
if check_file_exist(cfg_dir.joinpath("shell_command.cfg")):
|
||||
|
||||
@@ -42,7 +42,7 @@ def get_kiauh_version() -> str:
|
||||
Helper method to get the current KIAUH version by reading the latest tag
|
||||
:return: string of the latest tag or a default value if no tags exist
|
||||
"""
|
||||
tags = get_local_tags(Path(__file__).parent.parent)
|
||||
tags: List[str] = get_local_tags(Path(__file__).parent.parent)
|
||||
if tags:
|
||||
return tags[-1]
|
||||
else:
|
||||
@@ -108,7 +108,7 @@ def get_install_status(
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
checks = []
|
||||
branch: str = ""
|
||||
branch: str | None = None
|
||||
|
||||
if repo_dir.exists():
|
||||
checks.append(True)
|
||||
|
||||
@@ -24,13 +24,16 @@ from core.logger import Logger
|
||||
|
||||
def check_file_exist(file_path: Path, sudo=False) -> bool:
|
||||
"""
|
||||
Helper function for checking the existence of a file |
|
||||
Helper function for checking the existence of a file.
|
||||
Also works with symlinks (returns False if broken) |
|
||||
:param file_path: the absolute path of the file to check
|
||||
:param sudo: use sudo if required
|
||||
:return: True, if file exists, otherwise False
|
||||
"""
|
||||
if sudo:
|
||||
command = ["sudo", "find", file_path.as_posix()]
|
||||
# -L forces find to follow symlinks
|
||||
# -maxdepth = 0 avoids losing time if `file_path` is a directory
|
||||
command = ["sudo", "find", "-L", file_path.as_posix(), "-maxdepth", "0"]
|
||||
try:
|
||||
check_output(command, stderr=DEVNULL)
|
||||
return True
|
||||
@@ -44,7 +47,16 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
|
||||
|
||||
|
||||
def create_symlink(source: Path, target: Path, sudo=False) -> None:
|
||||
"""
|
||||
Helper function to create a symlink from source to target
|
||||
If the target file exists, it will be overwritten. |
|
||||
:param source: the source file/directory
|
||||
:param target: the target file/directory
|
||||
:param sudo: use sudo if required
|
||||
:return: None
|
||||
"""
|
||||
try:
|
||||
# -f forcibly creates/overwrites the symlink
|
||||
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
|
||||
if sudo:
|
||||
cmd.insert(0, "sudo")
|
||||
|
||||
@@ -73,44 +73,44 @@ def git_pull_wrapper(target_dir: Path) -> None:
|
||||
return
|
||||
|
||||
|
||||
def get_repo_name(repo: Path) -> Tuple[str, str]:
|
||||
def get_repo_name(repo: Path) -> Tuple[str | None, str | None]:
|
||||
"""
|
||||
Helper method to extract the organisation and name of a repository |
|
||||
:param repo: repository to extract the values from
|
||||
:return: String in form of "<orga>/<name>" or None
|
||||
"""
|
||||
if not repo.exists() or not repo.joinpath(".git").exists():
|
||||
return "-", "-"
|
||||
return None, None
|
||||
|
||||
try:
|
||||
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
|
||||
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
|
||||
substrings: List[str] = result.strip().split("/")[-2:]
|
||||
|
||||
orga: str = substrings[0] if substrings[0] else "-"
|
||||
name: str = substrings[1] if substrings[1] else "-"
|
||||
orga: str | None = substrings[0] if substrings[0] else None
|
||||
name: str | None = substrings[1] if substrings[1] else None
|
||||
|
||||
return orga, name.replace(".git", "")
|
||||
return orga, name.replace(".git", "") if name else None
|
||||
|
||||
except CalledProcessError:
|
||||
return "-", "-"
|
||||
return None, None
|
||||
|
||||
|
||||
def get_current_branch(repo: Path) -> str:
|
||||
def get_current_branch(repo: Path) -> str | None:
|
||||
"""
|
||||
Get the current branch of a local Git repository
|
||||
:param repo: Path to the local Git repository
|
||||
:return: Current branch
|
||||
:return: Current branch or None if not determinable
|
||||
"""
|
||||
try:
|
||||
cmd = ["git", "branch", "--show-current"]
|
||||
result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode(
|
||||
encoding="utf-8"
|
||||
)
|
||||
return result.strip() if result else "-"
|
||||
return result.strip() if result else None
|
||||
|
||||
except CalledProcessError:
|
||||
return "-"
|
||||
return None
|
||||
|
||||
|
||||
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:
|
||||
|
||||
@@ -156,7 +156,7 @@ def format_question(question: str, default=None) -> str:
|
||||
if default is not None:
|
||||
formatted_q += f" (default={default})"
|
||||
|
||||
return Color.apply(f"###### {formatted_q}: ", Color.CYAN)
|
||||
return str(Color.apply(f"###### {formatted_q}: ", Color.CYAN))
|
||||
|
||||
|
||||
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int:
|
||||
|
||||
Reference in New Issue
Block a user