Compare commits

...

12 Commits

Author SHA1 Message Date
Théo Gaillard
757344128a fix(extensions_menu): prevent extension index collisions during loading (#783)
* fix(extensions_menu): prevent extension index collisions during loading

* feat(extensions_menu): add GITHUB_ISSUES_URL for reporting extension loading issues
2026-03-22 13:43:20 +01:00
Théo Gaillard
7ca08f9b30 feat(instance_manager): add interactive stopping of Klipper instances (#784)
* feat(instance_manager): add interactive stopping of Klipper instances with user confirmation

* fix(instance_utils): remove unnecessary 'self' parameter from stop_klipper_instances_interactively function

* refactor(tmc_autotune): replace internal stop function with direct call to stop_klipper_instances_interactively
2026-03-22 12:01:15 +01:00
David Rios
308079a821 fix: incorrect Logger.warn (#778)
Fix method name
2026-02-27 22:45:58 +01:00
Théo Gaillard
09a5d96b63 feat(tmc_autotune): add initial implementation of TMC Autotune extens… (#771)
* feat(tmc_autotune): add initial implementation of TMC Autotune extension with installation and update functionalities

* fix: remove useless comments

* fix: added support for Kalico style plugins directory

* refactor: extract method for moonraker update manager section removal, automatically reloading moonraker upon call
2026-02-03 20:46:24 +01:00
dw-0
1f9d4c823a fix(settings_menu): fix regression by checking for the correct condition (#773) 2026-01-31 11:13:37 +01:00
dw-0
c8df9427b3 fix(backup): improve reusability of backup service and enhance file handling
- Refactor `BackupService` instance management for better reuse across methods.
- Avoid redundant file backups by checking for existing files.
- Enhance directory backup logic to handle nested files and directories more efficiently.
- Standardize timestamp initialization for consistent time-based backup operations.
2026-01-31 10:59:53 +01:00
Théo Gaillard
5414aba299 fix(backup_service): backup methods use proper paths (#769) (#770)
* fix(backup_service): streamline backup methods for proper paths and add docstring

* fix(backup_service): add proper destination_path in verbose output

* fix(backup_service): replaces html headers with { } to avoid rendering

* nitpick(backup_service): correct variable name for backup destination path in logging
2026-01-29 18:52:48 +01:00
Théo Gaillard
80948edbb4 fix(gcode_shell_cmd): update comment to clarify config directory usage (#767) 2026-01-19 17:04:28 +01:00
Théo Gaillard
a455edba93 docs(fs_utils): add the documentation for create_symlink (#768) 2026-01-19 17:03:04 +01:00
Théo Gaillard
810ab3a2fa fix(fs-utils): enhance check_file_exist to support symlink resolution (#766) 2026-01-19 17:02:18 +01:00
dw-0
6c9a78496a fix(client_utils): ensure proper type conversion and hints for improved safety
- Standardize `str()` wrapping for color-applied strings in various returns.
- Refine type hints for better code clarity and robustness.
- Add null checks for port inputs to prevent potential errors.
2026-01-18 15:58:25 +01:00
dw-0
123ccde378 fix(core): standardize handling of None values for repo and version fields
- Improve local and remote version comparison by replacing default placeholders with None.
- Update repo and branch logic to handle None values consistently.
- Refactor type hints for better readability and accuracy.
2026-01-18 15:49:48 +01:00
21 changed files with 561 additions and 64 deletions

View File

@@ -113,7 +113,7 @@ def check_user_groups() -> None:
if not get_confirm(f"Add user '{CURRENT_USER}' to group(s) now?"): if not get_confirm(f"Add user '{CURRENT_USER}' to group(s) now?"):
log = "Skipped adding user to required groups. You might encounter issues." log = "Skipped adding user to required groups. You might encounter issues."
Logger.warn(log) Logger.print_warn(log)
return return
try: try:

View File

@@ -8,7 +8,7 @@
# ======================================================================= # # ======================================================================= #
from typing import List from typing import List, Optional
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
from components.moonraker.moonraker import Moonraker from components.moonraker.moonraker import Moonraker
@@ -27,6 +27,7 @@ def run_client_config_removal(
client_config: BaseWebClientConfig, client_config: BaseWebClientConfig,
kl_instances: List[Klipper], kl_instances: List[Klipper],
mr_instances: List[Moonraker], mr_instances: List[Moonraker],
svc: Optional[BackupService] = None,
) -> Message: ) -> Message:
completion_msg = Message( completion_msg = Message(
title=f"{client_config.display_name} Removal Process completed", title=f"{client_config.display_name} Removal Process completed",
@@ -36,12 +37,15 @@ def run_client_config_removal(
if run_remove_routines(client_config.config_dir): if run_remove_routines(client_config.config_dir):
completion_msg.text.append(f"{client_config.display_name} removed") completion_msg.text.append(f"{client_config.display_name} removed")
BackupService().backup_printer_config_dir() if svc is None:
svc = BackupService()
svc.backup_moonraker_conf()
completion_msg = remove_moonraker_config_section( completion_msg = remove_moonraker_config_section(
completion_msg, client_config, mr_instances completion_msg, client_config, mr_instances
) )
svc.backup_printer_cfg()
completion_msg = remove_printer_config_section( completion_msg = remove_printer_config_section(
completion_msg, client_config, kl_instances completion_msg, client_config, kl_instances
) )

View File

@@ -41,6 +41,7 @@ def run_client_removal(
) )
mr_instances: List[Moonraker] = get_instances(Moonraker) mr_instances: List[Moonraker] = get_instances(Moonraker)
kl_instances: List[Klipper] = get_instances(Klipper) kl_instances: List[Klipper] = get_instances(Klipper)
svc = BackupService()
if backup_config: if backup_config:
version = "" version = ""
@@ -49,7 +50,6 @@ def run_client_removal(
with open(src.joinpath(".version"), "r") as v: with open(src.joinpath(".version"), "r") as v:
version = v.readlines()[0] version = v.readlines()[0]
svc = BackupService()
target_path = svc.backup_root.joinpath(f"{client.client_dir.name}_{version}") target_path = svc.backup_root.joinpath(f"{client.client_dir.name}_{version}")
success = svc.backup_file( success = svc.backup_file(
source_path=client.config_file, source_path=client.config_file,
@@ -67,7 +67,7 @@ def run_client_removal(
if remove_client_nginx_logs(client, kl_instances): if remove_client_nginx_logs(client, kl_instances):
completion_msg.text.append("● NGINX logs removed") completion_msg.text.append("● NGINX logs removed")
BackupService().backup_moonraker_conf() svc.backup_moonraker_conf()
section = f"update_manager {client_name}" section = f"update_manager {client_name}"
handled_instances: List[Moonraker] = remove_config_section( handled_instances: List[Moonraker] = remove_config_section(
section, mr_instances section, mr_instances
@@ -83,6 +83,7 @@ def run_client_removal(
client.client_config, client.client_config,
kl_instances, kl_instances,
mr_instances, mr_instances,
svc,
) )
if cfg_completion_msg.color == Color.GREEN: if cfg_completion_msg.color == Color.GREEN:
completion_msg.text.extend(cfg_completion_msg.text[1:]) completion_msg.text.extend(cfg_completion_msg.text[1:])

View File

@@ -78,10 +78,10 @@ def get_current_client_config() -> str:
installed = [c for c in clients if c.client_config.config_dir.exists()] installed = [c for c in clients if c.client_config.config_dir.exists()]
if not installed: if not installed:
return Color.apply("-", Color.CYAN) return str(Color.apply("-", Color.CYAN))
elif len(installed) == 1: elif len(installed) == 1:
cfg = installed[0].client_config cfg = installed[0].client_config
return Color.apply(cfg.display_name, Color.CYAN) return str(Color.apply(cfg.display_name, Color.CYAN))
# at this point, both client config folders exists, so we need to check # at this point, both client config folders exists, so we need to check
# which are actually included in the printer.cfg of all klipper instances # which are actually included in the printer.cfg of all klipper instances
@@ -100,18 +100,18 @@ def get_current_client_config() -> str:
# if both are included in the same file, we have a potential conflict # if both are included in the same file, we have a potential conflict
if includes_mainsail and includes_fluidd: if includes_mainsail and includes_fluidd:
return Color.apply("Conflict", Color.YELLOW) return str(Color.apply("Conflict", Color.YELLOW))
if not mainsail_includes and not fluidd_includes: if not mainsail_includes and not fluidd_includes:
# there are no includes at all, even though the client config folders exist # there are no includes at all, even though the client config folders exist
return Color.apply("-", Color.CYAN) return str(Color.apply("-", Color.CYAN))
elif len(fluidd_includes) > len(mainsail_includes): elif len(fluidd_includes) > len(mainsail_includes):
# there are more instances that include fluidd than mainsail # there are more instances that include fluidd than mainsail
return Color.apply(fluidd.client_config.display_name, Color.CYAN) return str(Color.apply(fluidd.client_config.display_name, Color.CYAN))
else: else:
# there are the same amount of non-conflicting includes for each config # there are the same amount of non-conflicting includes for each config
# or more instances include mainsail than fluidd # or more instances include mainsail than fluidd
return Color.apply(mainsail.client_config.display_name, Color.CYAN) return str(Color.apply(mainsail.client_config.display_name, Color.CYAN))
def enable_mainsail_remotemode() -> None: def enable_mainsail_remotemode() -> None:
@@ -152,10 +152,9 @@ def symlink_webui_nginx_log(
def get_local_client_version(client: BaseWebClient) -> str | None: def get_local_client_version(client: BaseWebClient) -> str | None:
relinfo_file = client.client_dir.joinpath("release_info.json") relinfo_file = client.client_dir.joinpath("release_info.json")
version_file = client.client_dir.joinpath(".version") version_file = client.client_dir.joinpath(".version")
default = "-"
if not client.client_dir.exists(): if not client.client_dir.exists():
return default return None
# try to get version from release_info.json first # try to get version from release_info.json first
if relinfo_file.is_file(): if relinfo_file.is_file():
@@ -177,11 +176,11 @@ def get_local_client_version(client: BaseWebClient) -> str | None:
try: try:
with open(version_file, "r") as f: with open(version_file, "r") as f:
line = f.readline().strip() line = f.readline().strip()
return line or default return line or None
except OSError: except OSError:
Logger.print_error("Unable to read '.version'") Logger.print_error("Unable to read '.version'")
return default return None
def get_remote_client_version(client: BaseWebClient) -> str | None: def get_remote_client_version(client: BaseWebClient) -> str | None:
@@ -446,9 +445,9 @@ def get_client_port_selection(
while True: while True:
_type = "Reconfigure" if reconfigure else "Configure" _type = "Reconfigure" if reconfigure else "Configure"
question = f"{_type} {client.display_name} for port" question = f"{_type} {client.display_name} for port"
port_input = get_number_input(question, min_value=80, default=port) port_input: int | None = get_number_input(question, min_value=80, default=port)
if port_input not in ports_in_use: if port_input and port_input not in ports_in_use:
client_settings: WebUiSettings = settings[client.name] client_settings: WebUiSettings = settings[client.name]
client_settings.port = port_input client_settings.port = port_input
settings.save() settings.save()

View File

@@ -97,7 +97,7 @@ class ClientInstallMenu(BaseMenu):
self.message_service.set_message(message) self.message_service.set_message(message)
def _get_current_port(self) -> int: def _get_current_port(self) -> int:
curr_port = get_nginx_listen_port(self.client.nginx_config) curr_port: int | None = get_nginx_listen_port(self.client.nginx_config)
if curr_port is None: if curr_port is None:
# if the port is not found in the config file we use # if the port is not found in the config file we use
# the default port from the kiauh settings as fallback # the default port from the kiauh settings as fallback

View File

@@ -95,8 +95,8 @@ class MainMenu(BaseMenu):
status_data: ComponentStatus = status_fn(*args) status_data: ComponentStatus = status_fn(*args)
code: int = status_data.status code: int = status_data.status
status: StatusText = StatusMap[code] status: StatusText = StatusMap[code]
owner: str = trunc_string(status_data.owner, 23) owner: str = trunc_string(status_data.owner, 23) if status_data.owner else '-'
repo: str = trunc_string(status_data.repo, 23) repo: str = trunc_string(status_data.repo, 23) if status_data.repo else '-'
instance_count: int = status_data.instances instance_count: int = status_data.instances
count_txt: str = "" count_txt: str = ""

View File

@@ -100,11 +100,11 @@ class SettingsMenu(BaseMenu):
def trim_repo_url(repo: str) -> str: def trim_repo_url(repo: str) -> str:
return repo.replace(".git", "").replace("https://", "").replace("git@", "") return repo.replace(".git", "").replace("https://", "").replace("git@", "")
if not klipper_status.repo == "-": if klipper_status.repo:
url = trim_repo_url(klipper_status.repo_url) url = trim_repo_url(klipper_status.repo_url)
self.kl_repo_url = Color.apply(url, Color.CYAN) self.kl_repo_url = Color.apply(url, Color.CYAN)
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN) self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
if not moonraker_status.repo == "-": if moonraker_status.repo:
url = trim_repo_url(moonraker_status.repo_url) url = trim_repo_url(moonraker_status.repo_url)
self.mr_repo_url = Color.apply(url, Color.CYAN) self.mr_repo_url = Color.apply(url, Color.CYAN)
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN) self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)

View File

@@ -257,7 +257,7 @@ class UpdateMenu(BaseMenu):
def _format_local_status(self, local_version, remote_version) -> str: def _format_local_status(self, local_version, remote_version) -> str:
color = Color.RED color = Color.RED
if not local_version or local_version == '-': if local_version is None:
color = Color.RED color = Color.RED
elif local_version == remote_version: elif local_version == remote_version:
color = Color.GREEN color = Color.GREEN
@@ -290,7 +290,13 @@ class UpdateMenu(BaseMenu):
return self.status_data[name]["installed"] return self.status_data[name]["installed"]
def _is_update_available(self, name: str) -> bool: def _is_update_available(self, name: str) -> bool:
return self.status_data[name]["local"] != self.status_data[name]["remote"] local = self.status_data[name]["local"]
remote = self.status_data[name]["remote"]
if local is None or remote is None:
return False
return local != remote
def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None: def _run_update_routine(self, name: str, update_fn: Callable, *args) -> None:
display_name = self.status_data[name]["display_name"] display_name = self.status_data[name]["display_name"]

View File

@@ -22,6 +22,7 @@ from utils.instance_utils import get_instances
class BackupService: class BackupService:
def __init__(self): def __init__(self):
self._backup_root = Path.home().joinpath("kiauh_backups") self._backup_root = Path.home().joinpath("kiauh_backups")
self._timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
@property @property
def backup_root(self) -> Path: def backup_root(self) -> Path:
@@ -29,7 +30,7 @@ class BackupService:
@property @property
def timestamp(self) -> str: def timestamp(self) -> str:
return datetime.now().strftime("%Y%m%d-%H%M%S") return self._timestamp
################################################ ################################################
# GENERIC BACKUP METHODS # GENERIC BACKUP METHODS
@@ -68,10 +69,15 @@ class BackupService:
backup_dir = self._backup_root.joinpath(target_path) backup_dir = self._backup_root.joinpath(target_path)
backup_dir.mkdir(parents=True, exist_ok=True) backup_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, backup_dir.joinpath(filename)) target_path = backup_dir.joinpath(filename)
if target_path.exists():
Logger.print_info(f"File '{target_path}' already exists. Skipping ...")
return True
shutil.copy2(source_path, target_path)
Logger.print_ok( Logger.print_ok(
f"Successfully backed up '{source_path}' to '{backup_dir}'" f"Successfully backed up '{source_path}' to '{target_path}'"
) )
return True return True
@@ -111,14 +117,25 @@ class BackupService:
if backup_path.exists(): if backup_path.exists():
Logger.print_info(f"Reusing existing backup directory '{backup_path}'") Logger.print_info(f"Reusing existing backup directory '{backup_path}'")
for item in source_path.rglob("*"):
shutil.copytree( relative_path = item.relative_to(source_path)
source_path, target_item = backup_path.joinpath(relative_path)
backup_path, if item.is_file():
dirs_exist_ok=True, if not target_item.exists():
symlinks=True, target_item.parent.mkdir(parents=True, exist_ok=True)
ignore_dangling_symlinks=True, shutil.copy2(item, target_item)
) else:
Logger.print_info(f"File '{target_item}' already exists. Skipping...")
elif item.is_dir():
target_item.mkdir(parents=True, exist_ok=True)
else:
shutil.copytree(
source_path,
backup_path,
dirs_exist_ok=True,
symlinks=True,
ignore_dangling_symlinks=True,
)
Logger.print_ok( Logger.print_ok(
f"Successfully backed up '{source_path}' to '{backup_path}'" f"Successfully backed up '{source_path}' to '{backup_path}'"
@@ -134,27 +151,29 @@ class BackupService:
################################################ ################################################
def backup_printer_cfg(self): def backup_printer_cfg(self):
"""Backup printer.cfg files of all Klipper instances.
Files are backed up to:
{backup_root}/{instance_data_dir_name}/printer_{timestamp}.cfg
"""
klipper_instances: List[Klipper] = get_instances(Klipper) klipper_instances: List[Klipper] = get_instances(Klipper)
for instance in klipper_instances: for instance in klipper_instances:
target_path: Path = self._backup_root.joinpath( target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
instance.data_dir.name, f"config_{self.timestamp}"
)
self.backup_file( self.backup_file(
source_path=instance.cfg_file, source_path=instance.cfg_file,
target_path=target_path, target_path=target_path,
target_name=instance.cfg_file.name,
) )
def backup_moonraker_conf(self): def backup_moonraker_conf(self):
"""Backup moonraker.conf files of all Moonraker instances.
Files are backed up to:
{backup_root}/{instance_data_dir_name}/moonraker_{timestamp}.conf
"""
moonraker_instances: List[Moonraker] = get_instances(Moonraker) moonraker_instances: List[Moonraker] = get_instances(Moonraker)
for instance in moonraker_instances: for instance in moonraker_instances:
target_path: Path = self._backup_root.joinpath( target_path: Path = self._backup_root.joinpath(instance.data_dir.name)
instance.data_dir.name, f"config_{self.timestamp}"
)
self.backup_file( self.backup_file(
source_path=instance.cfg_file, source_path=instance.cfg_file,
target_path=target_path, target_path=target_path,
target_name=instance.cfg_file.name,
) )
def backup_printer_config_dir(self) -> None: def backup_printer_config_dir(self) -> None:

View File

@@ -26,7 +26,7 @@ class ComponentStatus:
owner: str | None = None owner: str | None = None
repo: str | None = None repo: str | None = None
repo_url: str | None = None repo_url: str | None = None
branch: str = "" branch: str | None = None
local: str | None = None local: str | None = None
remote: str | None = None remote: str | None = None
instances: int | None = None instances: int | None = None

View File

@@ -10,3 +10,4 @@
from pathlib import Path from pathlib import Path
EXTENSION_ROOT = Path(__file__).resolve().parents[1].joinpath("extensions") EXTENSION_ROOT = Path(__file__).resolve().parents[1].joinpath("extensions")
GITHUB_ISSUES_URL = "https://github.com/dw-0/kiauh/issues"

View File

@@ -19,7 +19,7 @@ from core.logger import Logger
from core.menus import Option from core.menus import Option
from core.menus.base_menu import BaseMenu from core.menus.base_menu import BaseMenu
from core.types.color import Color from core.types.color import Color
from extensions import EXTENSION_ROOT from extensions import EXTENSION_ROOT, GITHUB_ISSUES_URL
from extensions.base_extension import BaseExtension from extensions.base_extension import BaseExtension
@@ -56,6 +56,22 @@ class ExtensionsMenu(BaseMenu):
with open(metadata_json, "r") as m: with open(metadata_json, "r") as m:
# read extension metadata from json # read extension metadata from json
metadata = json.load(m).get("metadata") metadata = json.load(m).get("metadata")
index = str(metadata.get("index"))
# Prevent collisions where one extension silently overrides another.
if index in ext_dict:
existing_name = ext_dict[index].metadata.get("display_name")
duplicate_name = metadata.get("display_name")
Logger.print_warn(
"Failed loading extension"
f" {ext}: duplicate index '{index}'"
f" already used by '{existing_name}'."
f" Skipping '{duplicate_name}'."
f" Please report this at {GITHUB_ISSUES_URL}."
)
continue
int(index)
module_name = metadata.get("module") module_name = metadata.get("module")
module_path = f"kiauh.extensions.{ext.name}.{module_name}" module_path = f"kiauh.extensions.{ext.name}.{module_name}"
@@ -73,10 +89,20 @@ class ExtensionsMenu(BaseMenu):
# instantiate the extension with its metadata and add to dict # instantiate the extension with its metadata and add to dict
ext_instance: BaseExtension = ext_class(metadata) ext_instance: BaseExtension = ext_class(metadata)
ext_dict[f"{metadata.get('index')}"] = ext_instance ext_dict[index] = ext_instance
except (IOError, json.JSONDecodeError, ImportError) as e: except (
print(f"Failed loading extension {ext}: {e}") IOError,
json.JSONDecodeError,
ImportError,
TypeError,
ValueError,
AttributeError,
) as e:
Logger.print_warn(
f"Failed loading extension {ext}: {e}. "
f"Please report this at {GITHUB_ISSUES_URL}."
)
return dict(sorted(ext_dict.items(), key=lambda x: int(x[0]))) return dict(sorted(ext_dict.items(), key=lambda x: int(x[0])))

View File

@@ -97,7 +97,7 @@ class GcodeShellCmdExtension(BaseExtension):
def install_example_cfg(self, instances: List[Klipper]): def install_example_cfg(self, instances: List[Klipper]):
cfg_dirs = [instance.base.cfg_dir for instance in instances] cfg_dirs = [instance.base.cfg_dir for instance in instances]
# copy extension to klippy/extras # copy extension to config directories
for cfg_dir in cfg_dirs: for cfg_dir in cfg_dirs:
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...") Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
if check_file_exist(cfg_dir.joinpath("shell_command.cfg")): if check_file_exist(cfg_dir.joinpath("shell_command.cfg")):

View File

@@ -0,0 +1,28 @@
# ======================================================================= #
# Copyright (C) 2020 - 2026 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from pathlib import Path
# repo
TMCA_REPO = "https://github.com/andrewmcgr/klipper_tmc_autotune"
# directories
TMCA_DIR = Path.home().joinpath("klipper_tmc_autotune")
MODULE_PATH = Path(__file__).resolve().parent
KLIPPER_DIR = Path.home().joinpath("klipper")
KLIPPER_EXTRAS = KLIPPER_DIR.joinpath("klippy/extras")
KLIPPER_PLUGINS = KLIPPER_DIR.joinpath("klippy/plugins")
KLIPPER_EXTENSIONS_PATH = (
KLIPPER_PLUGINS if KLIPPER_PLUGINS.is_dir() else KLIPPER_EXTRAS
)
# files
TMCA_EXAMPLE_CONFIG = TMCA_DIR.joinpath("docs/example.cfg")
# names
TMCA_MOONRAKER_UPDATER_NAME = "update_manager klipper_tmc_autotune"

View File

@@ -0,0 +1,13 @@
{
"metadata": {
"index": 13,
"module": "tmc_autotune_extension",
"maintained_by": "theogayar",
"display_name": "Klipper TMC Autotune",
"description": [
"Klipper extension for automatic configuration and tuning of TMC drivers."
],
"repo": "https://github.com/andrewmcgr/klipper_tmc_autotune",
"updates": true
}
}

View File

@@ -0,0 +1,347 @@
# ======================================================================= #
# Copyright (C) 2020 - 2026 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import shutil
from typing import List
from components.klipper.klipper import Klipper
from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from core.services.backup_service import BackupService
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
SimpleConfigParser,
)
from extensions.base_extension import BaseExtension
from extensions.tmc_autotune import (
KLIPPER_DIR,
KLIPPER_EXTENSIONS_PATH,
TMCA_DIR,
TMCA_EXAMPLE_CONFIG,
TMCA_MOONRAKER_UPDATER_NAME,
TMCA_REPO,
)
from utils.config_utils import add_config_section, remove_config_section
from utils.fs_utils import check_file_exist, create_symlink, run_remove_routines
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import get_confirm
from utils.instance_utils import get_instances, stop_klipper_instances_interactively
from utils.sys_utils import check_python_version
# noinspection PyMethodMayBeStatic
class TmcAutotuneExtension(BaseExtension):
def install_extension(self, **kwargs) -> None:
Logger.print_status("Installing Klipper TMC Autotune...")
# Check for Python 3.x, aligned with upstream install script
if not check_python_version(3, 0):
Logger.print_warn("Python 3.x is required. Aborting install.")
return
klipper_dir_exists = check_file_exist(KLIPPER_DIR)
if not klipper_dir_exists:
Logger.print_warn(
"No Klipper directory found! Unable to install extension."
)
return
tmca_exists = (
check_file_exist(TMCA_DIR)
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"))
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"))
and check_file_exist(KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"))
)
overwrite = True
if tmca_exists:
overwrite = get_confirm(
question="Extension seems to be installed already. Overwrite?",
default_choice=True,
allow_go_back=False,
)
if not overwrite:
Logger.print_warn("Installation aborted due to user request.")
return
add_moonraker_update_section = get_confirm(
question="Add Klipper TMC Autotune to Moonraker update manager(s)?",
default_choice=True,
allow_go_back=False,
)
create_example_config = get_confirm(
question="Create an example autotune_tmc.cfg for each instance?",
default_choice=True,
allow_go_back=False,
)
kl_instances = get_instances(Klipper)
if not stop_klipper_instances_interactively(
kl_instances, "installation of TMC Autotune"
):
return
try:
git_clone_wrapper(TMCA_REPO, TMCA_DIR, force=True)
Logger.print_info("Creating symlinks in Klipper extras directory...")
create_symlink(
TMCA_DIR.joinpath("autotune_tmc.py"),
KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"),
)
create_symlink(
TMCA_DIR.joinpath("motor_constants.py"),
KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"),
)
create_symlink(
TMCA_DIR.joinpath("motor_database.cfg"),
KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"),
)
Logger.print_ok(
"Symlinks created successfully for all instances.", end="\n\n"
)
if create_example_config:
self._install_example_cfg(kl_instances)
else:
Logger.print_info(
"Skipping example config creation as per user request."
)
Logger.print_warn(
"Make sure to create and include an autotune_tmc.cfg in your printer.cfg in order to use the extension!"
)
if add_moonraker_update_section:
mr_instances = get_instances(Moonraker)
self._add_moonraker_update_manager_section(mr_instances)
else:
Logger.print_info(
"Skipping update section creation as per user request."
)
Logger.print_warn(
"Make sure to create the corresponding section in your moonraker.conf in order to have it appear in your frontend update manager!"
)
except Exception as e:
Logger.print_error(f"Error during Klipper TMC Autotune installation:\n{e}")
if kl_instances:
InstanceManager.start_all(kl_instances)
return
if kl_instances:
InstanceManager.start_all(kl_instances)
if create_example_config:
Logger.print_dialog(
DialogType.ATTENTION,
[
"Basic configuration files were created per instance. You must edit them to enable the extension.",
"Documentation:",
f"{TMCA_REPO}",
"\n\n",
"IMPORTANT:",
"Define [autotune_tmc] sections ONLY in 'autotune_tmc.cfg'. ",
"Do NOT add them to 'printer.cfg', contrary to official docs. "
"While not fatal, mixing configs breaks file segmentation and is bad practice.",
],
margin_bottom=1,
)
Logger.print_ok("Klipper TMC Autotune installed successfully!")
def update_extension(self, **kwargs) -> None:
extension_installed = check_file_exist(TMCA_DIR)
if not extension_installed:
Logger.print_info("Extension does not seem to be installed! Skipping ...")
return
backup_before_update = get_confirm(
question="Backup Klipper TMC Autotune directory before update?",
default_choice=True,
allow_go_back=True,
)
kl_instances = get_instances(Klipper)
if not stop_klipper_instances_interactively(
kl_instances, "update of TMC Autotune"
):
return
Logger.print_status("Updating Klipper TMC Autotune...")
try:
if backup_before_update:
Logger.print_status("Backing up Klipper TMC Autotune directory...")
svc = BackupService()
svc.backup_directory(
source_path=TMCA_DIR,
backup_name="klipper_tmc_autotune",
)
Logger.print_ok("Backup completed successfully.")
git_pull_wrapper(TMCA_DIR)
except Exception as e:
Logger.print_error(f"Error during Klipper TMC Autotune update:\n{e}")
if kl_instances:
InstanceManager.start_all(kl_instances)
return
if kl_instances:
InstanceManager.start_all(kl_instances)
Logger.print_ok("Klipper TMC Autotune updated successfully.", end="\n\n")
def remove_extension(self, **kwargs) -> None:
extension_installed = check_file_exist(TMCA_DIR)
if not extension_installed:
Logger.print_info("Extension does not seem to be installed! Skipping ...")
return
kl_instances = get_instances(Klipper)
if not stop_klipper_instances_interactively(
kl_instances, "removal of TMC Autotune"
):
return
try:
Logger.print_info("Removing Klipper TMC Autotune extension ...")
run_remove_routines(TMCA_DIR)
Logger.print_info("Removing symlinks from Klipper extras directory ...")
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("autotune_tmc.py"))
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("motor_constants.py"))
run_remove_routines(KLIPPER_EXTENSIONS_PATH.joinpath("motor_database.cfg"))
mr_instances: List[Moonraker] = get_instances(Moonraker)
self._remove_moonraker_update_manager_section(mr_instances)
Logger.print_info("Removing include from printer.cfg files ...")
BackupService().backup_printer_cfg()
remove_config_section("include autotune_tmc.cfg", kl_instances)
Logger.print_dialog(
DialogType.ATTENTION,
[
"Manual edits to 'printer.cfg' may be required if using exotic stepper configurations.",
"\n\n",
"NOTE:",
"'autotune_tmc.cfg' is NOT removed automatically. ",
"Please delete it manually if no longer needed.",
],
margin_bottom=1,
)
except Exception as e:
Logger.print_error(f"Unable to remove extension:\n{e}")
if kl_instances:
InstanceManager.start_all(kl_instances)
return
if kl_instances:
InstanceManager.start_all(kl_instances)
Logger.print_ok("Klipper TMC Autotune removed successfully.")
def _install_example_cfg(self, kl_instances: List[Klipper]):
cfg_dirs = [instance.base.cfg_dir for instance in kl_instances]
for cfg_dir in cfg_dirs:
Logger.print_status(f"Create autotune_tmc.cfg in '{cfg_dir}' ...")
if check_file_exist(cfg_dir.joinpath("autotune_tmc.cfg")):
Logger.print_info("File already exists! Skipping ...")
continue
try:
shutil.copy(TMCA_EXAMPLE_CONFIG, cfg_dir.joinpath("autotune_tmc.cfg"))
Logger.print_ok("Done!")
except OSError as e:
Logger.print_error(f"Unable to create example config: {e}")
BackupService().backup_printer_cfg()
section = "include autotune_tmc.cfg"
cfg_files = [instance.cfg_file for instance in kl_instances]
for cfg_file in cfg_files:
Logger.print_status(f"Include autotune_tmc.cfg in '{cfg_file}' ...")
scp = SimpleConfigParser()
scp.read_file(cfg_file)
if scp.has_section(section):
Logger.print_info("Section already defined! Skipping ...")
continue
scp.add_section(section)
scp.write_file(cfg_file)
Logger.print_ok("Done!")
def _add_moonraker_update_manager_section(
self, mr_instances: List[Moonraker]
) -> None:
if not mr_instances:
Logger.print_dialog(
DialogType.WARNING,
[
"Moonraker not found! Klipper TMC Autotune update manager support "
"for Moonraker will not be added to moonraker.conf.",
],
)
if not get_confirm(
"Continue Klipper TMC Autotune installation?",
default_choice=False,
allow_go_back=True,
):
Logger.print_info("Installation aborted due to user request.")
return
BackupService().backup_moonraker_conf()
add_config_section(
section=TMCA_MOONRAKER_UPDATER_NAME,
instances=mr_instances,
options=[
("type", "git_repo"),
("channel", "dev"),
("path", TMCA_DIR.as_posix()),
("origin", TMCA_REPO),
("managed_services", "klipper"),
("primary_branch", "main"),
],
)
InstanceManager.restart_all(mr_instances)
Logger.print_ok(
"Klipper TMC Autotune successfully added to Moonraker update manager(s)!"
)
def _remove_moonraker_update_manager_section(
self, mr_instances: List[Moonraker]
) -> None:
if not mr_instances:
Logger.print_dialog(
DialogType.WARNING,
[
"Moonraker not found! Klipper TMC Autotune update manager support "
"for Moonraker will not be removed from moonraker.conf.",
],
)
return
BackupService().backup_moonraker_conf()
remove_config_section("update_manager klipper_tmc_autotune", mr_instances)
InstanceManager.restart_all(mr_instances)
Logger.print_ok(
"Klipper TMC Autotune successfully removed from Moonraker update manager(s)!"
)

View File

@@ -42,7 +42,7 @@ def get_kiauh_version() -> str:
Helper method to get the current KIAUH version by reading the latest tag Helper method to get the current KIAUH version by reading the latest tag
:return: string of the latest tag or a default value if no tags exist :return: string of the latest tag or a default value if no tags exist
""" """
tags = get_local_tags(Path(__file__).parent.parent) tags: List[str] = get_local_tags(Path(__file__).parent.parent)
if tags: if tags:
return tags[-1] return tags[-1]
else: else:
@@ -108,7 +108,7 @@ def get_install_status(
from utils.instance_utils import get_instances from utils.instance_utils import get_instances
checks = [] checks = []
branch: str = "" branch: str | None = None
if repo_dir.exists(): if repo_dir.exists():
checks.append(True) checks.append(True)

View File

@@ -24,13 +24,16 @@ from core.logger import Logger
def check_file_exist(file_path: Path, sudo=False) -> bool: def check_file_exist(file_path: Path, sudo=False) -> bool:
""" """
Helper function for checking the existence of a file | Helper function for checking the existence of a file.
Also works with symlinks (returns False if broken) |
:param file_path: the absolute path of the file to check :param file_path: the absolute path of the file to check
:param sudo: use sudo if required :param sudo: use sudo if required
:return: True, if file exists, otherwise False :return: True, if file exists, otherwise False
""" """
if sudo: if sudo:
command = ["sudo", "find", file_path.as_posix()] # -L forces find to follow symlinks
# -maxdepth = 0 avoids losing time if `file_path` is a directory
command = ["sudo", "find", "-L", file_path.as_posix(), "-maxdepth", "0"]
try: try:
check_output(command, stderr=DEVNULL) check_output(command, stderr=DEVNULL)
return True return True
@@ -44,7 +47,16 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
def create_symlink(source: Path, target: Path, sudo=False) -> None: def create_symlink(source: Path, target: Path, sudo=False) -> None:
"""
Helper function to create a symlink from source to target
If the target file exists, it will be overwritten. |
:param source: the source file/directory
:param target: the target file/directory
:param sudo: use sudo if required
:return: None
"""
try: try:
# -f forcibly creates/overwrites the symlink
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()] cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
if sudo: if sudo:
cmd.insert(0, "sudo") cmd.insert(0, "sudo")

View File

@@ -73,44 +73,44 @@ def git_pull_wrapper(target_dir: Path) -> None:
return return
def get_repo_name(repo: Path) -> Tuple[str, str]: def get_repo_name(repo: Path) -> Tuple[str | None, str | None]:
""" """
Helper method to extract the organisation and name of a repository | Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from :param repo: repository to extract the values from
:return: String in form of "<orga>/<name>" or None :return: String in form of "<orga>/<name>" or None
""" """
if not repo.exists() or not repo.joinpath(".git").exists(): if not repo.exists() or not repo.joinpath(".git").exists():
return "-", "-" return None, None
try: try:
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"] cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8") result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
substrings: List[str] = result.strip().split("/")[-2:] substrings: List[str] = result.strip().split("/")[-2:]
orga: str = substrings[0] if substrings[0] else "-" orga: str | None = substrings[0] if substrings[0] else None
name: str = substrings[1] if substrings[1] else "-" name: str | None = substrings[1] if substrings[1] else None
return orga, name.replace(".git", "") return orga, name.replace(".git", "") if name else None
except CalledProcessError: except CalledProcessError:
return "-", "-" return None, None
def get_current_branch(repo: Path) -> str: def get_current_branch(repo: Path) -> str | None:
""" """
Get the current branch of a local Git repository Get the current branch of a local Git repository
:param repo: Path to the local Git repository :param repo: Path to the local Git repository
:return: Current branch :return: Current branch or None if not determinable
""" """
try: try:
cmd = ["git", "branch", "--show-current"] cmd = ["git", "branch", "--show-current"]
result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode( result: str = check_output(cmd, stderr=DEVNULL, cwd=repo).decode(
encoding="utf-8" encoding="utf-8"
) )
return result.strip() if result else "-" return result.strip() if result else None
except CalledProcessError: except CalledProcessError:
return "-" return None
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]: def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:

View File

@@ -156,7 +156,7 @@ def format_question(question: str, default=None) -> str:
if default is not None: if default is not None:
formatted_q += f" (default={default})" formatted_q += f" (default={default})"
return Color.apply(f"###### {formatted_q}: ", Color.CYAN) return str(Color.apply(f"###### {formatted_q}: ", Color.CYAN))
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int: def validate_number_input(value: str, min_count: int, max_count: int | None) -> int:

View File

@@ -12,8 +12,12 @@ import re
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from components.klipper.klipper import Klipper
from core.constants import SYSTEMD from core.constants import SYSTEMD
from core.instance_manager.base_instance import SUFFIX_BLACKLIST from core.instance_manager.base_instance import SUFFIX_BLACKLIST
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from utils.input_utils import get_confirm
from utils.instance_type import InstanceType from utils.instance_type import InstanceType
@@ -56,3 +60,40 @@ def get_instance_suffix(name: str, file_path: Path) -> str:
# otherwise there is and hyphen left, and we return the part after the hyphen # otherwise there is and hyphen left, and we return the part after the hyphen
suffix = file_path.stem[len(name) :] suffix = file_path.stem[len(name) :]
return suffix[1:] if suffix else "" return suffix[1:] if suffix else ""
def stop_klipper_instances_interactively(
kl_instances: List[Klipper], operation_name: str = "operation"
) -> bool:
"""
Interactively stops all active Klipper instances, warning the user that ongoing prints will be disrupted.
:param kl_instances: List of Klipper instances to stop.
:param operation_name: Optional name of the operation being performed (for user messaging). Do NOT capitalize.
:return: True if instances were stopped or no instances found, False if operation was aborted.
"""
if not kl_instances:
Logger.print_warn("No instances found, skipping instance stopping.")
return True
Logger.print_dialog(
DialogType.ATTENTION,
[
"Do NOT continue if there are ongoing prints running",
f"All Klipper instances will be restarted during the {operation_name} and "
"ongoing prints WILL FAIL.",
],
)
stop_klipper = get_confirm(
question=f"Stop Klipper now and proceed with {operation_name}?",
default_choice=False,
allow_go_back=True,
)
if stop_klipper:
InstanceManager.stop_all(kl_instances)
return True
else:
Logger.print_warn(f"{operation_name.capitalize()} aborted due to user request.")
return False