mirror of
https://github.com/dw-0/kiauh.git
synced 2026-02-05 00:47:45 +05:00
Compare commits
12 Commits
657d919378
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09a5d96b63 | ||
|
|
1f9d4c823a | ||
|
|
c8df9427b3 | ||
|
|
5414aba299 | ||
|
|
80948edbb4 | ||
|
|
a455edba93 | ||
|
|
810ab3a2fa | ||
|
|
6c9a78496a | ||
|
|
123ccde378 | ||
|
|
45fde808d2 | ||
|
|
8ba134f574 | ||
|
|
f951936b20 |
@@ -236,9 +236,11 @@ class KlipperSelectMcuConnectionMenu(BaseMenu):
|
|||||||
if len(self.flash_options.mcu_list) < 1:
|
if len(self.flash_options.mcu_list) < 1:
|
||||||
Logger.print_warn("No MCUs found!")
|
Logger.print_warn("No MCUs found!")
|
||||||
Logger.print_warn("Make sure they are connected and repeat this step.")
|
Logger.print_warn("Make sure they are connected and repeat this step.")
|
||||||
|
time.sleep(3)
|
||||||
|
return
|
||||||
|
|
||||||
# if standalone is True, we only display the MCUs to the user and return
|
# if standalone is True, we only display the MCUs to the user and return
|
||||||
if self.__standalone and len(self.flash_options.mcu_list) > 0:
|
if self.__standalone:
|
||||||
Logger.print_ok("The following MCUs were found:", prefix=False)
|
Logger.print_ok("The following MCUs were found:", prefix=False)
|
||||||
for i, mcu in enumerate(self.flash_options.mcu_list):
|
for i, mcu in enumerate(self.flash_options.mcu_list):
|
||||||
print(f" ● MCU #{i}: {Color.CYAN}{mcu}{Color.RST}")
|
print(f" ● MCU #{i}: {Color.CYAN}{mcu}{Color.RST}")
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
# ======================================================================= #
|
# ======================================================================= #
|
||||||
|
|
||||||
|
|
||||||
from typing import List
|
from typing import List, Optional
|
||||||
|
|
||||||
from components.klipper.klipper import Klipper
|
from components.klipper.klipper import Klipper
|
||||||
from components.moonraker.moonraker import Moonraker
|
from components.moonraker.moonraker import Moonraker
|
||||||
@@ -27,6 +27,7 @@ def run_client_config_removal(
|
|||||||
client_config: BaseWebClientConfig,
|
client_config: BaseWebClientConfig,
|
||||||
kl_instances: List[Klipper],
|
kl_instances: List[Klipper],
|
||||||
mr_instances: List[Moonraker],
|
mr_instances: List[Moonraker],
|
||||||
|
svc: Optional[BackupService] = None,
|
||||||
) -> Message:
|
) -> Message:
|
||||||
completion_msg = Message(
|
completion_msg = Message(
|
||||||
title=f"{client_config.display_name} Removal Process completed",
|
title=f"{client_config.display_name} Removal Process completed",
|
||||||
@@ -36,12 +37,15 @@ def run_client_config_removal(
|
|||||||
if run_remove_routines(client_config.config_dir):
|
if run_remove_routines(client_config.config_dir):
|
||||||
completion_msg.text.append(f"● {client_config.display_name} removed")
|
completion_msg.text.append(f"● {client_config.display_name} removed")
|
||||||
|
|
||||||
BackupService().backup_printer_config_dir()
|
if svc is None:
|
||||||
|
svc = BackupService()
|
||||||
|
|
||||||
|
svc.backup_moonraker_conf()
|
||||||
completion_msg = remove_moonraker_config_section(
|
completion_msg = remove_moonraker_config_section(
|
||||||
completion_msg, client_config, mr_instances
|
completion_msg, client_config, mr_instances
|
||||||
)
|
)
|
||||||
|
|
||||||
|
svc.backup_printer_cfg()
|
||||||
completion_msg = remove_printer_config_section(
|
completion_msg = remove_printer_config_section(
|
||||||
completion_msg, client_config, kl_instances
|
completion_msg, client_config, kl_instances
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ def run_client_removal(
|
|||||||
)
|
)
|
||||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||||
|
svc = BackupService()
|
||||||
|
|
||||||
if backup_config:
|
if backup_config:
|
||||||
version = ""
|
version = ""
|
||||||
@@ -49,7 +50,6 @@ def run_client_removal(
|
|||||||
with open(src.joinpath(".version"), "r") as v:
|
with open(src.joinpath(".version"), "r") as v:
|
||||||
version = v.readlines()[0]
|
version = v.readlines()[0]
|
||||||
|
|
||||||
svc = BackupService()
|
|
||||||
target_path = svc.backup_root.joinpath(f"{client.client_dir.name}_{version}")
|
target_path = svc.backup_root.joinpath(f"{client.client_dir.name}_{version}")
|
||||||
success = svc.backup_file(
|
success = svc.backup_file(
|
||||||
source_path=client.config_file,
|
source_path=client.config_file,
|
||||||
@@ -67,7 +67,7 @@ def run_client_removal(
|
|||||||
if remove_client_nginx_logs(client, kl_instances):
|
if remove_client_nginx_logs(client, kl_instances):
|
||||||
completion_msg.text.append("● NGINX logs removed")
|
completion_msg.text.append("● NGINX logs removed")
|
||||||
|
|
||||||
BackupService().backup_moonraker_conf()
|
svc.backup_moonraker_conf()
|
||||||
section = f"update_manager {client_name}"
|
section = f"update_manager {client_name}"
|
||||||
handled_instances: List[Moonraker] = remove_config_section(
|
handled_instances: List[Moonraker] = remove_config_section(
|
||||||
section, mr_instances
|
section, mr_instances
|
||||||
@@ -83,6 +83,7 @@ def run_client_removal(
|
|||||||
client.client_config,
|
client.client_config,
|
||||||
kl_instances,
|
kl_instances,
|
||||||
mr_instances,
|
mr_instances,
|
||||||
|
svc,
|
||||||
)
|
)
|
||||||
if cfg_completion_msg.color == Color.GREEN:
|
if cfg_completion_msg.color == Color.GREEN:
|
||||||
completion_msg.text.extend(cfg_completion_msg.text[1:])
|
completion_msg.text.extend(cfg_completion_msg.text[1:])
|
||||||
|
|||||||
@@ -78,10 +78,10 @@ def get_current_client_config() -> str:
|
|||||||
installed = [c for c in clients if c.client_config.config_dir.exists()]
|
installed = [c for c in clients if c.client_config.config_dir.exists()]
|
||||||
|
|
||||||
if not installed:
|
if not installed:
|
||||||
return Color.apply("-", Color.CYAN)
|
return str(Color.apply("-", Color.CYAN))
|
||||||
elif len(installed) == 1:
|
elif len(installed) == 1:
|
||||||
cfg = installed[0].client_config
|
cfg = installed[0].client_config
|
||||||
return Color.apply(cfg.display_name, Color.CYAN)
|
return str(Color.apply(cfg.display_name, Color.CYAN))
|
||||||
|
|
||||||
# at this point, both client config folders exists, so we need to check
|
# at this point, both client config folders exists, so we need to check
|
||||||
# which are actually included in the printer.cfg of all klipper instances
|
# which are actually included in the printer.cfg of all klipper instances
|
||||||
@@ -100,18 +100,18 @@ def get_current_client_config() -> str:
|
|||||||
|
|
||||||
# if both are included in the same file, we have a potential conflict
|
# if both are included in the same file, we have a potential conflict
|
||||||
if includes_mainsail and includes_fluidd:
|
if includes_mainsail and includes_fluidd:
|
||||||
return Color.apply("Conflict", Color.YELLOW)
|
return str(Color.apply("Conflict", Color.YELLOW))
|
||||||
|
|
||||||
if not mainsail_includes and not fluidd_includes:
|
if not mainsail_includes and not fluidd_includes:
|
||||||
# there are no includes at all, even though the client config folders exist
|
# there are no includes at all, even though the client config folders exist
|
||||||
return Color.apply("-", Color.CYAN)
|
return str(Color.apply("-", Color.CYAN))
|
||||||
elif len(fluidd_includes) > len(mainsail_includes):
|
elif len(fluidd_includes) > len(mainsail_includes):
|
||||||
# there are more instances that include fluidd than mainsail
|
# there are more instances that include fluidd than mainsail
|
||||||
return Color.apply(fluidd.client_config.display_name, Color.CYAN)
|
return str(Color.apply(fluidd.client_config.display_name, Color.CYAN))
|
||||||
else:
|
else:
|
||||||
# there are the same amount of non-conflicting includes for each config
|
# there are the same amount of non-conflicting includes for each config
|
||||||
# or more instances include mainsail than fluidd
|
# or more instances include mainsail than fluidd
|
||||||
return Color.apply(mainsail.client_config.display_name, Color.CYAN)
|
return str(Color.apply(mainsail.client_config.display_name, Color.CYAN))
|
||||||
|
|
||||||
|
|
||||||
def enable_mainsail_remotemode() -> None:
|
def enable_mainsail_remotemode() -> None:
|
||||||
@@ -152,10 +152,9 @@ def symlink_webui_nginx_log(
|
|||||||
def get_local_client_version(client: BaseWebClient) -> str | None:
|
def get_local_client_version(client: BaseWebClient) -> str | None:
|
||||||
relinfo_file = client.client_dir.joinpath("release_info.json")
|
relinfo_file = client.client_dir.joinpath("release_info.json")
|
||||||
version_file = client.client_dir.joinpath(".version")
|
version_file = client.client_dir.joinpath(".version")
|
||||||
default = "n/a"
|
|
||||||
|
|
||||||
if not client.client_dir.exists():
|
if not client.client_dir.exists():
|
||||||
return default
|
return None
|
||||||
|
|
||||||
# try to get version from release_info.json first
|
# try to get version from release_info.json first
|
||||||
if relinfo_file.is_file():
|
if relinfo_file.is_file():
|
||||||
@@ -177,11 +176,11 @@ def get_local_client_version(client: BaseWebClient) -> str | None:
|
|||||||
try:
|
try:
|
||||||
with open(version_file, "r") as f:
|
with open(version_file, "r") as f:
|
||||||
line = f.readline().strip()
|
line = f.readline().strip()
|
||||||
return line or default
|
return line or None
|
||||||
except OSError:
|
except OSError:
|
||||||
Logger.print_error("Unable to read '.version'")
|
Logger.print_error("Unable to read '.version'")
|
||||||
|
|
||||||
return default
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_remote_client_version(client: BaseWebClient) -> str | None:
|
def get_remote_client_version(client: BaseWebClient) -> str | None:
|
||||||
@@ -446,9 +445,9 @@ def get_client_port_selection(
|
|||||||
while True:
|
while True:
|
||||||
_type = "Reconfigure" if reconfigure else "Configure"
|
_type = "Reconfigure" if reconfigure else "Configure"
|
||||||
question = f"{_type} {client.display_name} for port"
|
question = f"{_type} {client.display_name} for port"
|
||||||
port_input = get_number_input(question, min_value=80, default=port)
|
port_input: int | None = get_number_input(question, min_value=80, default=port)
|
||||||
|
|
||||||
if port_input not in ports_in_use:
|
if port_input and port_input not in ports_in_use:
|
||||||
client_settings: WebUiSettings = settings[client.name]
|
client_settings: WebUiSettings = settings[client.name]
|
||||||
client_settings.port = port_input
|
client_settings.port = port_input
|
||||||
settings.save()
|
settings.save()
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ class ClientInstallMenu(BaseMenu):
|
|||||||
self.message_service.set_message(message)
|
self.message_service.set_message(message)
|
||||||
|
|
||||||
def _get_current_port(self) -> int:
|
def _get_current_port(self) -> int:
|
||||||
curr_port = get_nginx_listen_port(self.client.nginx_config)
|
curr_port: int | None = get_nginx_listen_port(self.client.nginx_config)
|
||||||
if curr_port is None:
|
if curr_port is None:
|
||||||
# if the port is not found in the config file we use
|
# if the port is not found in the config file we use
|
||||||
# the default port from the kiauh settings as fallback
|
# the default port from the kiauh settings as fallback
|
||||||
|
|||||||
@@ -95,8 +95,8 @@ class MainMenu(BaseMenu):
|
|||||||
status_data: ComponentStatus = status_fn(*args)
|
status_data: ComponentStatus = status_fn(*args)
|
||||||
code: int = status_data.status
|
code: int = status_data.status
|
||||||
status: StatusText = StatusMap[code]
|
status: StatusText = StatusMap[code]
|
||||||
owner: str = trunc_string(status_data.owner, 23)
|
owner: str = trunc_string(status_data.owner, 23) if status_data.owner else '-'
|
||||||
repo: str = trunc_string(status_data.repo, 23)
|
repo: str = trunc_string(status_data.repo, 23) if status_data.repo else '-'
|
||||||
instance_count: int = status_data.instances
|
instance_count: int = status_data.instances
|
||||||
|
|
||||||
count_txt: str = ""
|
count_txt: str = ""
|
||||||
|
|||||||
@@ -100,11 +100,11 @@ class SettingsMenu(BaseMenu):
|
|||||||
def trim_repo_url(repo: str) -> str:
|
def trim_repo_url(repo: str) -> str:
|
||||||
return repo.replace(".git", "").replace("https://", "").replace("git@", "")
|
return repo.replace(".git", "").replace("https://", "").replace("git@", "")
|
||||||
|
|
||||||
if not klipper_status.repo == "-":
|
if klipper_status.repo:
|
||||||
url = trim_repo_url(klipper_status.repo_url)
|
url = trim_repo_url(klipper_status.repo_url)
|
||||||
self.kl_repo_url = Color.apply(url, Color.CYAN)
|
self.kl_repo_url = Color.apply(url, Color.CYAN)
|
||||||
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
|
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
|
||||||
if not moonraker_status.repo == "-":
|
if moonraker_status.repo:
|
||||||
url = trim_repo_url(moonraker_status.repo_url)
|
url = trim_repo_url(moonraker_status.repo_url)
|
||||||
self.mr_repo_url = Color.apply(url, Color.CYAN)
|
self.mr_repo_url = Color.apply(url, Color.CYAN)
|
||||||
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)
|
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)
|
||||||
|
|||||||
@@ -257,14 +257,14 @@ class UpdateMenu(BaseMenu):
|
|||||||
|
|
||||||
def _format_local_status(self, local_version, remote_version) -> str:
|
def _format_local_status(self, local_version, remote_version) -> str:
|
||||||
color = Color.RED
|
color = Color.RED
|
||||||
if not local_version:
|
if local_version is None:
|
||||||
color = Color.RED
|
color = Color.RED
|
||||||
elif local_version == remote_version:
|
elif local_version == remote_version:
|
||||||
color = Color.GREEN
|
color = Color.GREEN
|
||||||
elif local_version != remote_version:
|
elif local_version != remote_version:
|
||||||
color = Color.YELLOW
|
color = Color.YELLOW
|
||||||
|
|
||||||
return Color.apply(local_version or "-", color)
|
return str(Color.apply(local_version or '-', color))
|
||||||
|
|
||||||
def _set_status_data(self, name: str, status_fn: Callable, *args) -> None:
|
def _set_status_data(self, name: str, status_fn: Callable, *args) -> None:
|
||||||
comp_status: ComponentStatus = status_fn(*args)
|
comp_status: ComponentStatus = status_fn(*args)
|
||||||
@@ -290,7 +290,13 @@ class UpdateMenu(BaseMenu):
|
|||||||
return self.status_data[name]["installed"]
|
return self.status_data[name]["installed"]
|
||||||
|
|
||||||
def _is_update_available(self, name: str) -> bool:
|
def _is_update_available(self, name: str) -> bool:
|
||||||
return self.status_data[name]["local"] != self.status_data[name]["remote"]
|
local = self.status_data[name]["local"]
|
||||||
|
remote = self.status_data[name]["remote"]
|
||||||
|
|
||||||
|
if local is None or remote is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return local != remote
|
||||||
|
|
||||||
def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None:
|
def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None:
|
||||||
display_name = self.status_data[name]["display_name"]
|
display_name = self.status_data[name]["display_name"]
|
||||||
@@ -306,6 +312,27 @@ class UpdateMenu(BaseMenu):
|
|||||||
|
|
||||||
update_fn(*args)
|
update_fn(*args)
|
||||||
|
|
||||||
|
self._refresh_component_status(name)
|
||||||
|
|
||||||
|
def _refresh_component_status(self, name: str) -> None:
|
||||||
|
"""Refresh the status data for a component after an update."""
|
||||||
|
if name == "klipper":
|
||||||
|
self._set_status_data("klipper", get_klipper_status)
|
||||||
|
elif name == "moonraker":
|
||||||
|
self._set_status_data("moonraker", get_moonraker_status)
|
||||||
|
elif name == "mainsail":
|
||||||
|
self._set_status_data("mainsail", get_client_status, self.mainsail_data, True)
|
||||||
|
elif name == "mainsail_config":
|
||||||
|
self._set_status_data("mainsail_config", get_client_config_status, self.mainsail_data)
|
||||||
|
elif name == "fluidd":
|
||||||
|
self._set_status_data("fluidd", get_client_status, self.fluidd_data, True)
|
||||||
|
elif name == "fluidd_config":
|
||||||
|
self._set_status_data("fluidd_config", get_client_config_status, self.fluidd_data)
|
||||||
|
elif name == "klipperscreen":
|
||||||
|
self._set_status_data("klipperscreen", get_klipperscreen_status)
|
||||||
|
elif name == "crowsnest":
|
||||||
|
self._set_status_data("crowsnest", get_crowsnest_status)
|
||||||
|
|
||||||
def _run_system_updates(self) -> None:
|
def _run_system_updates(self) -> None:
|
||||||
if not self.packages:
|
if not self.packages:
|
||||||
Logger.print_info("No system upgrades available!")
|
Logger.print_info("No system upgrades available!")
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ from utils.instance_utils import get_instances
|
|||||||
class BackupService:
|
class BackupService:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._backup_root = Path.home().joinpath("kiauh_backups")
|
self._backup_root = Path.home().joinpath("kiauh_backups")
|
||||||
|
self._timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def backup_root(self) -> Path:
|
def backup_root(self) -> Path:
|
||||||
@@ -29,7 +30,7 @@ class BackupService:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def timestamp(self) -> str:
|
def timestamp(self) -> str:
|
||||||
return datetime.now().strftime("%Y%m%d-%H%M%S")
|
return self._timestamp
|
||||||
|
|
||||||
################################################
|
################################################
|
||||||
# GENERIC BACKUP METHODS
|
# GENERIC BACKUP METHODS
|
||||||
@@ -68,10 +69,15 @@ class BackupService:
|
|||||||
backup_dir = self._backup_root.joinpath(target_path)
|
backup_dir = self._backup_root.joinpath(target_path)
|
||||||
|
|
||||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||||
shutil.copy2(source_path, backup_dir.joinpath(filename))
|
target_path = backup_dir.joinpath(filename)
|
||||||
|
if target_path.exists():
|
||||||
|
Logger.print_info(f"File '{target_path}' already exists. Skipping ...")
|
||||||
|
return True
|
||||||
|
|
||||||
|
shutil.copy2(source_path, target_path)
|
||||||
|
|
||||||
Logger.print_ok(
|
Logger.print_ok(
|
||||||
f"Successfully backed up '{source_path}' to '{backup_dir}'"
|
f"Successfully backed up '{source_path}' to '{target_path}'"
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -111,7 +117,18 @@ class BackupService:
|
|||||||
|
|
||||||
if backup_path.exists():
|
if backup_path.exists():
|
||||||
Logger.print_info(f"Reusing existing backup directory '{backup_path}'")
|
Logger.print_info(f"Reusing existing backup directory '{backup_path}'")
|
||||||
|
for item in source_path.rglob("*"):
|
||||||
|
relative_path = item.relative_to(source_path)
|
||||||
|
target_item = backup_path.joinpath(relative_path)
|
||||||
|
if item.is_file():
|
||||||
|
if not target_item.exists():
|
||||||
|
target_item.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
shutil.copy2(item, target_item)
|
||||||
|
else:
|
||||||
|
Logger.print_info(f"File '{target_item}' already exists. Skipping...")
|
||||||
|
elif item.is_dir():
|
||||||
|
target_item.mkdir(parents=True, exist_ok=True)
|
||||||
|
else:
|
||||||
shutil.copytree(
|
shutil.copytree(
|
||||||
source_path,
|
source_path,
|
||||||
backup_path,
|
backup_path,
|
||||||
@@ -134,27 +151,29 @@ class BackupService:
|
|||||||
################################################
|
################################################
|
||||||
|
|
||||||
def backup_printer_cfg(self):
|
def backup_printer_cfg(self):
|
||||||
|
"""Backup printer.cfg files of all Klipper instances.
|
||||||
|
Files are backed up to:
|
||||||
|
{backup_root}/{instance_data_dir_name}/printer_{timestamp}.cfg
|
||||||
|
"""
|
||||||
klipper_instances: List[Klipper] = get_instances(Klipper)
|
klipper_instances: List[Klipper] = get_instances(Klipper)
|
||||||
for instance in klipper_instances:
|
for instance in klipper_instances:
|
||||||
target_path: Path = self._backup_root.joinpath(
|
target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
|
||||||
instance.data_dir.name, f"config_{self.timestamp}"
|
|
||||||
)
|
|
||||||
self.backup_file(
|
self.backup_file(
|
||||||
source_path=instance.cfg_file,
|
source_path=instance.cfg_file,
|
||||||
target_path=target_path,
|
target_path=target_path,
|
||||||
target_name=instance.cfg_file.name,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def backup_moonraker_conf(self):
|
def backup_moonraker_conf(self):
|
||||||
|
"""Backup moonraker.conf files of all Moonraker instances.
|
||||||
|
Files are backed up to:
|
||||||
|
{backup_root}/{instance_data_dir_name}/moonraker_{timestamp}.conf
|
||||||
|
"""
|
||||||
moonraker_instances: List[Moonraker] = get_instances(Moonraker)
|
moonraker_instances: List[Moonraker] = get_instances(Moonraker)
|
||||||
for instance in moonraker_instances:
|
for instance in moonraker_instances:
|
||||||
target_path: Path = self._backup_root.joinpath(
|
target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
|
||||||
instance.data_dir.name, f"config_{self.timestamp}"
|
|
||||||
)
|
|
||||||
self.backup_file(
|
self.backup_file(
|
||||||
source_path=instance.cfg_file,
|
source_path=instance.cfg_file,
|
||||||
target_path=target_path,
|
target_path=target_path,
|
||||||
target_name=instance.cfg_file.name,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def backup_printer_config_dir(self) -> None:
|
def backup_printer_config_dir(self) -> None:
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ class ComponentStatus:
|
|||||||
owner: str | None = None
|
owner: str | None = None
|
||||||
repo: str | None = None
|
repo: str | None = None
|
||||||
repo_url: str | None = None
|
repo_url: str | None = None
|
||||||
branch: str = ""
|
branch: str | None = None
|
||||||
local: str | None = None
|
local: str | None = None
|
||||||
remote: str | None = None
|
remote: str | None = None
|
||||||
instances: int | None = None
|
instances: int | None = None
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ class GcodeShellCmdExtension(BaseExtension):
|
|||||||
|
|
||||||
def install_example_cfg(self, instances: List[Klipper]):
|
def install_example_cfg(self, instances: List[Klipper]):
|
||||||
cfg_dirs = [instance.base.cfg_dir for instance in instances]
|
cfg_dirs = [instance.base.cfg_dir for instance in instances]
|
||||||
# copy extension to klippy/extras
|
# copy extension to config directories
|
||||||
for cfg_dir in cfg_dirs:
|
for cfg_dir in cfg_dirs:
|
||||||
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
|
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
|
||||||
if check_file_exist(cfg_dir.joinpath("shell_command.cfg")):
|
if check_file_exist(cfg_dir.joinpath("shell_command.cfg")):
|
||||||
|
|||||||
28
kiauh/extensions/tmc_autotune/__init__.py
Normal file
28
kiauh/extensions/tmc_autotune/__init__.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
# ======================================================================= #
|
||||||
|
# Copyright (C) 2020 - 2026 Dominik Willner <th33xitus@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||||
|
# https://github.com/dw-0/kiauh #
|
||||||
|
# #
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||||
|
# ======================================================================= #
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# repo
|
||||||
|
TMCA_REPO = "https://github.com/andrewmcgr/klipper_tmc_autotune"
|
||||||
|
|
||||||
|
# directories
|
||||||
|
TMCA_DIR = Path.home().joinpath("klipper_tmc_autotune")
|
||||||
|
MODULE_PATH = Path(__file__).resolve().parent
|
||||||
|
KLIPPER_DIR = Path.home().joinpath("klipper")
|
||||||
|
KLIPPER_EXTRAS = KLIPPER_DIR.joinpath("klippy/extras")
|
||||||
|
KLIPPER_PLUGINS = KLIPPER_DIR.joinpath("klippy/plugins")
|
||||||
|
KLIPPER_EXTENSIONS_PATH = (
|
||||||
|
KLIPPER_PLUGINS if KLIPPER_PLUGINS.is_dir() else KLIPPER_EXTRAS
|
||||||
|
)
|
||||||
|
|
||||||
|
# files
|
||||||
|
TMCA_EXAMPLE_CONFIG = TMCA_DIR.joinpath("docs/example.cfg")
|
||||||
|
|
||||||
|
# names
|
||||||
|
TMCA_MOONRAKER_UPDATER_NAME = "update_manager klipper_tmc_autotune"
|
||||||
13
kiauh/extensions/tmc_autotune/metadata.json
Normal file
13
kiauh/extensions/tmc_autotune/metadata.json
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
{
|
||||||
|
"metadata": {
|
||||||
|
"index": 13,
|
||||||
|
"module": "tmc_autotune_extension",
|
||||||
|
"maintained_by": "theogayar",
|
||||||
|
"display_name": "Klipper TMC Autotune",
|
||||||
|
"description": [
|
||||||
|
"Klipper extension for automatic configuration and tuning of TMC drivers."
|
||||||
|
],
|
||||||
|
"repo": "https://github.com/andrewmcgr/klipper_tmc_autotune",
|
||||||
|
"updates": true
|
||||||
|
}
|
||||||
|
}
|
||||||
384
kiauh/extensions/tmc_autotune/tmc_autotune_extension.py
Normal file
384
kiauh/extensions/tmc_autotune/tmc_autotune_extension.py
Normal file
@@ -0,0 +1,384 @@
|
|||||||
|
# ======================================================================= #
|
||||||
|
# Copyright (C) 2020 - 2026 Dominik Willner <th33xitus@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||||
|
# https://github.com/dw-0/kiauh #
|
||||||
|
# #
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||||
|
# ======================================================================= #
|
||||||
|
import shutil
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from components.klipper.klipper import Klipper
|
||||||
|
from components.moonraker.moonraker import Moonraker
|
||||||
|
from core.instance_manager.instance_manager import InstanceManager
|
||||||
|
from core.logger import DialogType, Logger
|
||||||
|
from core.services.backup_service import BackupService
|
||||||
|
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||||
|
SimpleConfigParser,
|
||||||
|
)
|
||||||
|
from extensions.base_extension import BaseExtension
|
||||||
|
from extensions.tmc_autotune import (
|
||||||
|
KLIPPER_DIR,
|
||||||
|
KLIPPER_EXTENSIONS_PATH,
|
||||||
|
TMCA_DIR,
|
||||||
|
TMCA_EXAMPLE_CONFIG,
|
||||||
|
TMCA_MOONRAKER_UPDATER_NAME,
|
||||||
|
TMCA_REPO,
|
||||||
|
)
|
||||||
|
from utils.config_utils import add_config_section, remove_config_section
|
||||||
|
from utils.fs_utils import check_file_exist, create_symlink, run_remove_routines
|
||||||
|
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
|
||||||
|
from utils.input_utils import get_confirm
|
||||||
|
from utils.instance_utils import get_instances
|
||||||
|
from utils.sys_utils import check_python_version
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyMethodMayBeStatic
|
||||||
|
class TmcAutotuneExtension(BaseExtension):
|
||||||
|
def install_extension(self, **kwargs) -> None:
|
||||||
|
Logger.print_status("Installing Klipper TMC Autotune...")
|
||||||
|
|
||||||
|
# Check for Python 3.x, aligned with upstream install script
|
||||||
|
if not check_python_version(3, 0):
|
||||||
|
Logger.print_warn("Python 3.x is required. Aborting install.")
|
||||||
|
return
|
||||||
|
|
||||||
|
klipper_dir_exists = check_file_exist(KLIPPER_DIR)
|
||||||
|
if not klipper_dir_exists:
|
||||||
|
Logger.print_warn(
|
||||||
|
"No Klipper directory found! Unable to install extension."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
tmca_exists = (
|
||||||
|
check_file_exist(TMCA_DIR)
|
||||||
|
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"))
|
||||||
|
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"))
|
||||||
|
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"))
|
||||||
|
)
|
||||||
|
|
||||||
|
overwrite = True
|
||||||
|
if tmca_exists:
|
||||||
|
overwrite = get_confirm(
|
||||||
|
question="Extension seems to be installed already. Overwrite?",
|
||||||
|
default_choice=True,
|
||||||
|
allow_go_back=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not overwrite:
|
||||||
|
Logger.print_warn("Installation aborted due to user request.")
|
||||||
|
return
|
||||||
|
|
||||||
|
add_moonraker_update_section = get_confirm(
|
||||||
|
question="Add Klipper TMC Autotune to Moonraker update manager(s)?",
|
||||||
|
default_choice=True,
|
||||||
|
allow_go_back=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
create_example_config = get_confirm(
|
||||||
|
question="Create an example autotune_tmc.cfg for each instance?",
|
||||||
|
default_choice=True,
|
||||||
|
allow_go_back=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
kl_instances = get_instances(Klipper)
|
||||||
|
|
||||||
|
if not self._stop_klipper_instances_interactively(
|
||||||
|
kl_instances, "installation of TMC Autotune"
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
git_clone_wrapper(TMCA_REPO, TMCA_DIR, force=True)
|
||||||
|
|
||||||
|
Logger.print_info("Creating symlinks in Klipper extras directory...")
|
||||||
|
create_symlink(
|
||||||
|
TMCA_DIR.joinpath("autotune_tmc.py"),
|
||||||
|
KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"),
|
||||||
|
)
|
||||||
|
create_symlink(
|
||||||
|
TMCA_DIR.joinpath("motor_constants.py"),
|
||||||
|
KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"),
|
||||||
|
)
|
||||||
|
create_symlink(
|
||||||
|
TMCA_DIR.joinpath("motor_database.cfg"),
|
||||||
|
KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"),
|
||||||
|
)
|
||||||
|
Logger.print_ok(
|
||||||
|
"Symlinks created successfully for all instances.", end="\n\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
if create_example_config:
|
||||||
|
self._install_example_cfg(kl_instances)
|
||||||
|
else:
|
||||||
|
Logger.print_info(
|
||||||
|
"Skipping example config creation as per user request."
|
||||||
|
)
|
||||||
|
Logger.print_warn(
|
||||||
|
"Make sure to create and include an autotune_tmc.cfg in your printer.cfg in order to use the extension!"
|
||||||
|
)
|
||||||
|
|
||||||
|
if add_moonraker_update_section:
|
||||||
|
mr_instances = get_instances(Moonraker)
|
||||||
|
self._add_moonraker_update_manager_section(mr_instances)
|
||||||
|
else:
|
||||||
|
Logger.print_info(
|
||||||
|
"Skipping update section creation as per user request."
|
||||||
|
)
|
||||||
|
Logger.print_warn(
|
||||||
|
"Make sure to create the corresponding section in your moonraker.conf in order to have it appear in your frontend update manager!"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
Logger.print_error(f"Error during Klipper TMC Autotune installation:\n{e}")
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
return
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
|
||||||
|
if create_example_config:
|
||||||
|
Logger.print_dialog(
|
||||||
|
DialogType.ATTENTION,
|
||||||
|
[
|
||||||
|
"Basic configuration files were created per instance. You must edit them to enable the extension.",
|
||||||
|
"Documentation:",
|
||||||
|
f"{TMCA_REPO}",
|
||||||
|
"\n\n",
|
||||||
|
"IMPORTANT:",
|
||||||
|
"Define [autotune_tmc] sections ONLY in 'autotune_tmc.cfg'. ",
|
||||||
|
"Do NOT add them to 'printer.cfg', contrary to official docs. "
|
||||||
|
"While not fatal, mixing configs breaks file segmentation and is bad practice.",
|
||||||
|
],
|
||||||
|
margin_bottom=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
Logger.print_ok("Klipper TMC Autotune installed successfully!")
|
||||||
|
|
||||||
|
def update_extension(self, **kwargs) -> None:
|
||||||
|
extension_installed = check_file_exist(TMCA_DIR)
|
||||||
|
if not extension_installed:
|
||||||
|
Logger.print_info("Extension does not seem to be installed! Skipping ...")
|
||||||
|
return
|
||||||
|
|
||||||
|
backup_before_update = get_confirm(
|
||||||
|
question="Backup Klipper TMC Autotune directory before update?",
|
||||||
|
default_choice=True,
|
||||||
|
allow_go_back=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
kl_instances = get_instances(Klipper)
|
||||||
|
|
||||||
|
if not self._stop_klipper_instances_interactively(
|
||||||
|
kl_instances, "update of TMC Autotune"
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
|
Logger.print_status("Updating Klipper TMC Autotune...")
|
||||||
|
try:
|
||||||
|
if backup_before_update:
|
||||||
|
Logger.print_status("Backing up Klipper TMC Autotune directory...")
|
||||||
|
svc = BackupService()
|
||||||
|
svc.backup_directory(
|
||||||
|
source_path=TMCA_DIR,
|
||||||
|
backup_name="klipper_tmc_autotune",
|
||||||
|
)
|
||||||
|
Logger.print_ok("Backup completed successfully.")
|
||||||
|
|
||||||
|
git_pull_wrapper(TMCA_DIR)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
Logger.print_error(f"Error during Klipper TMC Autotune update:\n{e}")
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
return
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
|
||||||
|
Logger.print_ok("Klipper TMC Autotune updated successfully.", end="\n\n")
|
||||||
|
|
||||||
|
def remove_extension(self, **kwargs) -> None:
|
||||||
|
extension_installed = check_file_exist(TMCA_DIR)
|
||||||
|
if not extension_installed:
|
||||||
|
Logger.print_info("Extension does not seem to be installed! Skipping ...")
|
||||||
|
return
|
||||||
|
|
||||||
|
kl_instances = get_instances(Klipper)
|
||||||
|
|
||||||
|
if not self._stop_klipper_instances_interactively(
|
||||||
|
kl_instances, "removal of TMC Autotune"
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
Logger.print_info("Removing Klipper TMC Autotune extension ...")
|
||||||
|
run_remove_routines(TMCA_DIR)
|
||||||
|
Logger.print_info("Removing symlinks from Klipper extras directory ...")
|
||||||
|
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"))
|
||||||
|
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"))
|
||||||
|
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"))
|
||||||
|
|
||||||
|
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||||
|
self._remove_moonraker_update_manager_section(mr_instances)
|
||||||
|
|
||||||
|
Logger.print_info("Removing include from printer.cfg files ...")
|
||||||
|
BackupService().backup_printer_cfg()
|
||||||
|
remove_config_section("include autotune_tmc.cfg", kl_instances)
|
||||||
|
|
||||||
|
Logger.print_dialog(
|
||||||
|
DialogType.ATTENTION,
|
||||||
|
[
|
||||||
|
"Manual edits to 'printer.cfg' may be required if using exotic stepper configurations.",
|
||||||
|
"\n\n",
|
||||||
|
"NOTE:",
|
||||||
|
"'autotune_tmc.cfg' is NOT removed automatically. ",
|
||||||
|
"Please delete it manually if no longer needed.",
|
||||||
|
],
|
||||||
|
margin_bottom=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
Logger.print_error(f"Unable to remove extension:\n{e}")
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
return
|
||||||
|
|
||||||
|
if kl_instances:
|
||||||
|
InstanceManager.start_all(kl_instances)
|
||||||
|
|
||||||
|
Logger.print_ok("Klipper TMC Autotune removed successfully.")
|
||||||
|
|
||||||
|
def _install_example_cfg(self, kl_instances: List[Klipper]):
|
||||||
|
cfg_dirs = [instance.base.cfg_dir for instance in kl_instances]
|
||||||
|
|
||||||
|
for cfg_dir in cfg_dirs:
|
||||||
|
Logger.print_status(f"Create autotune_tmc.cfg in '{cfg_dir}' ...")
|
||||||
|
if check_file_exist(cfg_dir.joinpath("autotune_tmc.cfg")):
|
||||||
|
Logger.print_info("File already exists! Skipping ...")
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
shutil.copy(TMCA_EXAMPLE_CONFIG, cfg_dir.joinpath("autotune_tmc.cfg"))
|
||||||
|
Logger.print_ok("Done!")
|
||||||
|
except OSError as e:
|
||||||
|
Logger.print_error(f"Unable to create example config: {e}")
|
||||||
|
|
||||||
|
BackupService().backup_printer_cfg()
|
||||||
|
|
||||||
|
section = "include autotune_tmc.cfg"
|
||||||
|
cfg_files = [instance.cfg_file for instance in kl_instances]
|
||||||
|
for cfg_file in cfg_files:
|
||||||
|
Logger.print_status(f"Include autotune_tmc.cfg in '{cfg_file}' ...")
|
||||||
|
scp = SimpleConfigParser()
|
||||||
|
scp.read_file(cfg_file)
|
||||||
|
if scp.has_section(section):
|
||||||
|
Logger.print_info("Section already defined! Skipping ...")
|
||||||
|
continue
|
||||||
|
scp.add_section(section)
|
||||||
|
scp.write_file(cfg_file)
|
||||||
|
Logger.print_ok("Done!")
|
||||||
|
|
||||||
|
def _add_moonraker_update_manager_section(
|
||||||
|
self, mr_instances: List[Moonraker]
|
||||||
|
) -> None:
|
||||||
|
if not mr_instances:
|
||||||
|
Logger.print_dialog(
|
||||||
|
DialogType.WARNING,
|
||||||
|
[
|
||||||
|
"Moonraker not found! Klipper TMC Autotune update manager support "
|
||||||
|
"for Moonraker will not be added to moonraker.conf.",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
if not get_confirm(
|
||||||
|
"Continue Klipper TMC Autotune installation?",
|
||||||
|
default_choice=False,
|
||||||
|
allow_go_back=True,
|
||||||
|
):
|
||||||
|
Logger.print_info("Installation aborted due to user request.")
|
||||||
|
return
|
||||||
|
|
||||||
|
BackupService().backup_moonraker_conf()
|
||||||
|
|
||||||
|
add_config_section(
|
||||||
|
section=TMCA_MOONRAKER_UPDATER_NAME,
|
||||||
|
instances=mr_instances,
|
||||||
|
options=[
|
||||||
|
("type", "git_repo"),
|
||||||
|
("channel", "dev"),
|
||||||
|
("path", TMCA_DIR.as_posix()),
|
||||||
|
("origin", TMCA_REPO),
|
||||||
|
("managed_services", "klipper"),
|
||||||
|
("primary_branch", "main"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
InstanceManager.restart_all(mr_instances)
|
||||||
|
|
||||||
|
Logger.print_ok(
|
||||||
|
"Klipper TMC Autotune successfully added to Moonraker update manager(s)!"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _remove_moonraker_update_manager_section(
|
||||||
|
self, mr_instances: List[Moonraker]
|
||||||
|
) -> None:
|
||||||
|
if not mr_instances:
|
||||||
|
Logger.print_dialog(
|
||||||
|
DialogType.WARNING,
|
||||||
|
[
|
||||||
|
"Moonraker not found! Klipper TMC Autotune update manager support "
|
||||||
|
"for Moonraker will not be removed from moonraker.conf.",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
BackupService().backup_moonraker_conf()
|
||||||
|
|
||||||
|
remove_config_section("update_manager klipper_tmc_autotune", mr_instances)
|
||||||
|
InstanceManager.restart_all(mr_instances)
|
||||||
|
|
||||||
|
Logger.print_ok(
|
||||||
|
"Klipper TMC Autotune successfully removed from Moonraker update manager(s)!"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _stop_klipper_instances_interactively(
|
||||||
|
self, kl_instances: List[Klipper], operation_name: str = "operation"
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Interactively stops all active Klipper instances, warning the user that ongoing prints will be disrupted.
|
||||||
|
|
||||||
|
:param kl_instances: List of Klipper instances to stop.
|
||||||
|
:param operation_name: Optional name of the operation being performed (for user messaging). Do NOT capitalize.
|
||||||
|
:return: True if instances were stopped or no instances found, False if operation was aborted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not kl_instances:
|
||||||
|
Logger.print_warn("No instances found, skipping instance stopping.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
Logger.print_dialog(
|
||||||
|
DialogType.ATTENTION,
|
||||||
|
[
|
||||||
|
"Do NOT continue if there are ongoing prints running",
|
||||||
|
f"All Klipper instances will be restarted during the {operation_name} and "
|
||||||
|
"ongoing prints WILL FAIL.",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
stop_klipper = get_confirm(
|
||||||
|
question=f"Stop Klipper now and proceed with {operation_name}?",
|
||||||
|
default_choice=False,
|
||||||
|
allow_go_back=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if stop_klipper:
|
||||||
|
InstanceManager.stop_all(kl_instances)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
Logger.print_warn(
|
||||||
|
f"{operation_name.capitalize()} aborted due to user request."
|
||||||
|
)
|
||||||
|
return False
|
||||||
@@ -42,7 +42,7 @@ def get_kiauh_version() -> str:
|
|||||||
Helper method to get the current KIAUH version by reading the latest tag
|
Helper method to get the current KIAUH version by reading the latest tag
|
||||||
:return: string of the latest tag or a default value if no tags exist
|
:return: string of the latest tag or a default value if no tags exist
|
||||||
"""
|
"""
|
||||||
tags = get_local_tags(Path(__file__).parent.parent)
|
tags: List[str] = get_local_tags(Path(__file__).parent.parent)
|
||||||
if tags:
|
if tags:
|
||||||
return tags[-1]
|
return tags[-1]
|
||||||
else:
|
else:
|
||||||
@@ -108,7 +108,7 @@ def get_install_status(
|
|||||||
from utils.instance_utils import get_instances
|
from utils.instance_utils import get_instances
|
||||||
|
|
||||||
checks = []
|
checks = []
|
||||||
branch: str = ""
|
branch: str | None = None
|
||||||
|
|
||||||
if repo_dir.exists():
|
if repo_dir.exists():
|
||||||
checks.append(True)
|
checks.append(True)
|
||||||
|
|||||||
@@ -24,13 +24,16 @@ from core.logger import Logger
|
|||||||
|
|
||||||
def check_file_exist(file_path: Path, sudo=False) -> bool:
|
def check_file_exist(file_path: Path, sudo=False) -> bool:
|
||||||
"""
|
"""
|
||||||
Helper function for checking the existence of a file |
|
Helper function for checking the existence of a file.
|
||||||
|
Also works with symlinks (returns False if broken) |
|
||||||
:param file_path: the absolute path of the file to check
|
:param file_path: the absolute path of the file to check
|
||||||
:param sudo: use sudo if required
|
:param sudo: use sudo if required
|
||||||
:return: True, if file exists, otherwise False
|
:return: True, if file exists, otherwise False
|
||||||
"""
|
"""
|
||||||
if sudo:
|
if sudo:
|
||||||
command = ["sudo", "find", file_path.as_posix()]
|
# -L forces find to follow symlinks
|
||||||
|
# -maxdepth = 0 avoids losing time if `file_path` is a directory
|
||||||
|
command = ["sudo", "find", "-L", file_path.as_posix(), "-maxdepth", "0"]
|
||||||
try:
|
try:
|
||||||
check_output(command, stderr=DEVNULL)
|
check_output(command, stderr=DEVNULL)
|
||||||
return True
|
return True
|
||||||
@@ -44,7 +47,16 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def create_symlink(source: Path, target: Path, sudo=False) -> None:
|
def create_symlink(source: Path, target: Path, sudo=False) -> None:
|
||||||
|
"""
|
||||||
|
Helper function to create a symlink from source to target
|
||||||
|
If the target file exists, it will be overwritten. |
|
||||||
|
:param source: the source file/directory
|
||||||
|
:param target: the target file/directory
|
||||||
|
:param sudo: use sudo if required
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
|
# -f forcibly creates/overwrites the symlink
|
||||||
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
|
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
|
||||||
if sudo:
|
if sudo:
|
||||||
cmd.insert(0, "sudo")
|
cmd.insert(0, "sudo")
|
||||||
|
|||||||
@@ -73,44 +73,44 @@ def git_pull_wrapper(target_dir: Path) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def get_repo_name(repo: Path) -> Tuple[str, str]:
|
def get_repo_name(repo: Path) -> Tuple[str | None, str | None]:
|
||||||
"""
|
"""
|
||||||
Helper method to extract the organisation and name of a repository |
|
Helper method to extract the organisation and name of a repository |
|
||||||
:param repo: repository to extract the values from
|
:param repo: repository to extract the values from
|
||||||
:return: String in form of "<orga>/<name>" or None
|
:return: String in form of "<orga>/<name>" or None
|
||||||
"""
|
"""
|
||||||
if not repo.exists() or not repo.joinpath(".git").exists():
|
if not repo.exists() or not repo.joinpath(".git").exists():
|
||||||
return "-", "-"
|
return None, None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
|
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
|
||||||
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
|
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
|
||||||
substrings: List[str] = result.strip().split("/")[-2:]
|
substrings: List[str] = result.strip().split("/")[-2:]
|
||||||
|
|
||||||
orga: str = substrings[0] if substrings[0] else "-"
|
orga: str | None = substrings[0] if substrings[0] else None
|
||||||
name: str = substrings[1] if substrings[1] else "-"
|
name: str | None = substrings[1] if substrings[1] else None
|
||||||
|
|
||||||
return orga, name.replace(".git", "")
|
return orga, name.replace(".git", "") if name else None
|
||||||
|
|
||||||
except CalledProcessError:
|
except CalledProcessError:
|
||||||
return "-", "-"
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
def get_current_branch(repo: Path) -> str:
|
def get_current_branch(repo: Path) -> str | None:
|
||||||
"""
|
"""
|
||||||
Get the current branch of a local Git repository
|
Get the current branch of a local Git repository
|
||||||
:param repo: Path to the local Git repository
|
:param repo: Path to the local Git repository
|
||||||
:return: Current branch
|
:return: Current branch or None if not determinable
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
cmd = ["git", "branch", "--show-current"]
|
cmd = ["git", "branch", "--show-current"]
|
||||||
result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode(
|
result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode(
|
||||||
encoding="utf-8"
|
encoding="utf-8"
|
||||||
)
|
)
|
||||||
return result.strip() if result else "-"
|
return result.strip() if result else None
|
||||||
|
|
||||||
except CalledProcessError:
|
except CalledProcessError:
|
||||||
return "-"
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:
|
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:
|
||||||
|
|||||||
@@ -156,7 +156,7 @@ def format_question(question: str, default=None) -> str:
|
|||||||
if default is not None:
|
if default is not None:
|
||||||
formatted_q += f" (default={default})"
|
formatted_q += f" (default={default})"
|
||||||
|
|
||||||
return Color.apply(f"###### {formatted_q}: ", Color.CYAN)
|
return str(Color.apply(f"###### {formatted_q}: ", Color.CYAN))
|
||||||
|
|
||||||
|
|
||||||
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int:
|
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int:
|
||||||
|
|||||||
Reference in New Issue
Block a user