Compare commits

...

12 Commits

Author SHA1 Message Date
dw-0
cf4e915430 cicd: restrict worflow runs to develop branch
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
CODeRUS
c901cd1fdf feat(advanced): install input shaper dependencies (#662)
* feat(advanced): install input shaper dependencies

Signed-off-by: Andrey Kozhevnikov <coderusinbox@gmail.com>

* chore: fix formatting/wording

also add a quick check if the klipper env exists

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Andrey Kozhevnikov <coderusinbox@gmail.com>
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
Co-authored-by: dw-0 <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
Aleksei Sviridkin
da3c37a872 feat(git_utils): Support for blolbless clone mode in git_cmd_clone (#640)
* feat(git_utils): enhance git_cmd_clone with depth and single-branch options

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): add a newline for better readability in git_cmd_clone

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* feat(git_utils): enhance git_cmd_clone with optional depth and single-branch parameters

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* feat(git_utils): update git_cmd_clone to support blolbless cloning option

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* revert formatting changes

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix another formatting changes

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): correct indentation for improved readability in get_local_tags function

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): rename blolbless parameter to blobless and update documentation for clarity

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* refactor: enable the blobless clone feature for all regular clones

skip checkout step if brach is master or main

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Aleksei Sviridkin <f@lex.la>
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
Co-authored-by: dw-0 <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
dw-0
8f436646cd cicd: add action for fast-forward check and merge (#660)
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-09 12:45:46 +01:00
dw-0
760f131d1c fix(klipper): handle file access exception for dietpi version file (#658)
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-09 08:32:14 +01:00
dw-0
41804f0eaa style: ruff format
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-09 08:32:14 +01:00
dw-0
d3c9bcc38c refactor(klipper): move setup functions into KlipperSetupService
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-09 08:32:14 +01:00
dw-0
7fc36f3e68 feat(moonraker): display moonraker ip address after install
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-08 11:41:29 +01:00
CODeRUS
a4942b9404 fix: Add pkg-config to klipper packages (#655)
Signed-off-by: Andrey Kozhevnikov <coderusinbox@gmail.com>
2025-03-08 11:41:29 +01:00
dw-0
9e0a8a0081 Release v5.1.3
Release v5.1.3
2025-02-23 12:42:44 +01:00
dw-0
6082528628 Merge pull request #648 from Arksine/dev-v5-moonraker-deps-fix
fix: add support for Moonraker's dependency requirement specifiers to V5
2025-02-23 12:32:17 +01:00
Eric Callahan
9e92e4a36a fix: parse moonraker deps with requirement specifiers
Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
2025-02-22 16:50:28 -05:00
38 changed files with 1061 additions and 537 deletions

View File

@@ -11,5 +11,5 @@ end_of_line = lf
[*.py] [*.py]
max_line_length = 88 max_line_length = 88
[*.sh] [*.{sh,yml,yaml}]
indent_size = 2 indent_size = 2

28
.github/workflows/fast-forward.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: fast-forward
on:
issue_comment:
types: [ created, edited ]
jobs:
fast-forward:
# Only run if the comment contains the /fast-forward command.
if: |
contains(github.event.comment.body, '/fast-forward') &&
github.event.issue.pull_request &&
github.base_ref == 'develop'
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
issues: write
steps:
- name: Fast forwarding
uses: sequoia-pgp/fast-forward@v1
with:
merge: true
# To reduce the workflow's verbosity, use 'on-error'
# to only post a comment when an error occurs, or 'never' to
# never post a comment. (In all cases the information is
# still available in the step's summary.)
comment: on-error

27
.github/workflows/pull-request.yml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: pull-request
on:
pull_request:
branches:
- develop
types: [ opened, reopened, synchronize ]
jobs:
check-fast-forward:
runs-on: ubuntu-latest
permissions:
contents: read
# We appear to need write permission for both pull-requests and
# issues in order to post a comment to a pull request.
pull-requests: write
issues: write
steps:
- name: Checking if fast forwarding is possible
uses: sequoia-pgp/fast-forward@v1
with:
merge: false
# To reduce the workflow's verbosity, use 'on-error'
# to only post a comment when an error occurs, or 'never' to
# never post a comment. (In all cases the information is
# still available in the step's summary.)
comment: on-error

View File

@@ -1,117 +0,0 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from typing import List
from components.klipper import KLIPPER_DIR, KLIPPER_ENV_DIR
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import print_instance_overview
from core.instance_manager.instance_manager import InstanceManager
from core.logger import Logger
from core.services.message_service import Message
from core.types.color import Color
from utils.fs_utils import run_remove_routines
from utils.input_utils import get_selection_input
from utils.instance_utils import get_instances
from utils.sys_utils import unit_file_exists
def run_klipper_removal(
remove_service: bool,
remove_dir: bool,
remove_env: bool,
) -> Message:
completion_msg = Message(
title="Klipper Removal Process completed",
color=Color.GREEN,
)
klipper_instances: List[Klipper] = get_instances(Klipper)
if remove_service:
Logger.print_status("Removing Klipper instances ...")
if klipper_instances:
instances_to_remove = select_instances_to_remove(klipper_instances)
remove_instances(instances_to_remove)
instance_names = [i.service_file_path.stem for i in instances_to_remove]
txt = f"● Klipper instances removed: {', '.join(instance_names)}"
completion_msg.text.append(txt)
else:
Logger.print_info("No Klipper Services installed! Skipped ...")
if (remove_dir or remove_env) and unit_file_exists("klipper", suffix="service"):
completion_msg.text = [
"Some Klipper services are still installed:",
f"'{KLIPPER_DIR}' was not removed, even though selected for removal.",
f"'{KLIPPER_ENV_DIR}' was not removed, even though selected for removal.",
]
else:
if remove_dir:
Logger.print_status("Removing Klipper local repository ...")
if run_remove_routines(KLIPPER_DIR):
completion_msg.text.append("● Klipper local repository removed")
if remove_env:
Logger.print_status("Removing Klipper Python environment ...")
if run_remove_routines(KLIPPER_ENV_DIR):
completion_msg.text.append("● Klipper Python environment removed")
if completion_msg.text:
completion_msg.text.insert(0, "The following actions were performed:")
else:
completion_msg.color = Color.YELLOW
completion_msg.centered = True
completion_msg.text = ["Nothing to remove."]
return completion_msg
def select_instances_to_remove(instances: List[Klipper]) -> List[Klipper] | None:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "b"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(
instances,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Klipper instance to remove", options)
instances_to_remove = []
if selection == "b":
return None
elif selection == "a":
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])
return instances_to_remove
def remove_instances(
instance_list: List[Klipper] | None,
) -> None:
if not instance_list:
return
for instance in instance_list:
Logger.print_status(f"Removing instance {instance.service_file_path.stem} ...")
InstanceManager.remove(instance)
delete_klipper_env_file(instance)
def delete_klipper_env_file(instance: Klipper):
Logger.print_status(f"Remove '{instance.env_file}'")
if not instance.env_file.exists():
msg = f"Env file in {instance.base.sysd_dir} not found. Skipped ..."
Logger.print_info(msg)
return
run_remove_routines(instance.env_file)

View File

@@ -1,239 +0,0 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Tuple
from components.klipper import (
EXIT_KLIPPER_SETUP,
KLIPPER_DIR,
KLIPPER_ENV_DIR,
KLIPPER_INSTALL_SCRIPT,
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_select_custom_name_dialog,
)
from components.klipper.klipper_utils import (
assign_custom_name,
backup_klipper_dir,
check_user_groups,
create_example_printer_cfg,
get_install_count,
handle_disruptive_system_packages,
)
from components.moonraker.moonraker import Moonraker
from components.webui_client.client_utils import (
get_existing_clients,
)
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from core.settings.kiauh_settings import KiauhSettings
from utils.common import check_install_dependencies
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import get_confirm
from utils.instance_utils import get_instances
from utils.sys_utils import (
cmd_sysctl_manage,
cmd_sysctl_service,
create_python_venv,
install_python_requirements,
parse_packages_from_file,
)
def install_klipper() -> None:
Logger.print_status("Installing Klipper ...")
klipper_list: List[Klipper] = get_instances(Klipper)
moonraker_list: List[Moonraker] = get_instances(Moonraker)
match_moonraker: bool = False
# if there are more moonraker instances than klipper instances, ask the user to
# match the klipper instance count to the count of moonraker instances with the same suffix
if len(moonraker_list) > len(klipper_list):
is_confirmed = display_moonraker_info(moonraker_list)
if not is_confirmed:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
match_moonraker = True
install_count, name_dict = get_install_count_and_name_dict(
klipper_list, moonraker_list
)
if install_count == 0:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
is_multi_install = install_count > 1 or (len(name_dict) >= 1 and install_count >= 1)
if not name_dict and install_count == 1:
name_dict = {0: ""}
elif is_multi_install and not match_moonraker:
custom_names = use_custom_names_or_go_back()
if custom_names is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
handle_instance_names(install_count, name_dict, custom_names)
create_example_cfg = get_confirm("Create example printer.cfg?")
# run the actual installation
try:
run_klipper_setup(klipper_list, name_dict, create_example_cfg)
except Exception as e:
Logger.print_error(e)
Logger.print_error("Klipper installation failed!")
return
def run_klipper_setup(
klipper_list: List[Klipper], name_dict: Dict[int, str], create_example_cfg: bool
) -> None:
if not klipper_list:
setup_klipper_prerequesites()
for i in name_dict:
# skip this iteration if there is already an instance with the name
if name_dict[i] in [n.suffix for n in klipper_list]:
continue
instance = Klipper(suffix=name_dict[i])
instance.create()
cmd_sysctl_service(instance.service_file_path.name, "enable")
if create_example_cfg:
# if a client-config is installed, include it in the new example cfg
clients = get_existing_clients()
create_example_printer_cfg(instance, clients)
cmd_sysctl_service(instance.service_file_path.name, "start")
cmd_sysctl_manage("daemon-reload")
# step 4: check/handle conflicting packages/services
handle_disruptive_system_packages()
# step 5: check for required group membership
check_user_groups()
def handle_instance_names(
install_count: int, name_dict: Dict[int, str], custom_names: bool
) -> None:
for i in range(install_count): # 3
key: int = len(name_dict.keys()) + 1
if custom_names:
assign_custom_name(key, name_dict)
else:
name_dict[key] = str(len(name_dict) + 1)
def get_install_count_and_name_dict(
klipper_list: List[Klipper], moonraker_list: List[Moonraker]
) -> Tuple[int, Dict[int, str]]:
install_count: int | None
if len(moonraker_list) > len(klipper_list):
install_count = len(moonraker_list)
name_dict = {i: moonraker.suffix for i, moonraker in enumerate(moonraker_list)}
else:
install_count = get_install_count()
name_dict = {i: klipper.suffix for i, klipper in enumerate(klipper_list)}
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return 0, {}
return install_count, name_dict
def setup_klipper_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.klipper.repo_url
branch = settings.klipper.branch
git_clone_wrapper(repo, KLIPPER_DIR, branch)
# install klipper dependencies and create python virtualenv
try:
install_klipper_packages()
if create_python_venv(KLIPPER_ENV_DIR):
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
except Exception:
Logger.print_error("Error during installation of Klipper requirements!")
raise
def install_klipper_packages() -> None:
script = KLIPPER_INSTALL_SCRIPT
packages = parse_packages_from_file(script)
# Add dbus requirement for DietPi distro
if Path("/boot/dietpi/.version").exists():
packages.append("dbus")
check_install_dependencies({*packages})
def update_klipper() -> None:
Logger.print_dialog(
DialogType.WARNING,
[
"Do NOT continue if there are ongoing prints running!",
"All Klipper instances will be restarted during the update process and "
"ongoing prints WILL FAIL.",
],
)
if not get_confirm("Update Klipper now?"):
return
settings = KiauhSettings()
if settings.kiauh.backup_before_update:
backup_klipper_dir()
instances = get_instances(Klipper)
InstanceManager.stop_all(instances)
git_pull_wrapper(repo=settings.klipper.repo_url, target_dir=KLIPPER_DIR)
# install possible new system packages
install_klipper_packages()
# install possible new python dependencies
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
InstanceManager.start_all(instances)
def use_custom_names_or_go_back() -> bool | None:
print_select_custom_name_dialog()
_input: bool | None = get_confirm(
"Assign custom names?",
False,
allow_go_back=True,
)
return _input
def display_moonraker_info(moonraker_list: List[Moonraker]) -> bool:
# todo: only show the klipper instances that are not already installed
Logger.print_dialog(
DialogType.INFO,
[
"Existing Moonraker instances detected:",
*[f"{m.service_file_path.stem}" for m in moonraker_list],
"\n\n",
"The following Klipper instances will be installed:",
*[f"● klipper-{m.suffix}" for m in moonraker_list],
],
)
_input: bool = get_confirm("Proceed with installation?")
return _input

View File

@@ -11,6 +11,7 @@ from __future__ import annotations
import grp import grp
import os import os
import shutil import shutil
from pathlib import Path
from subprocess import CalledProcessError, run from subprocess import CalledProcessError, run
from typing import Dict, List from typing import Dict, List
@@ -18,6 +19,7 @@ from components.klipper import (
KLIPPER_BACKUP_DIR, KLIPPER_BACKUP_DIR,
KLIPPER_DIR, KLIPPER_DIR,
KLIPPER_ENV_DIR, KLIPPER_ENV_DIR,
KLIPPER_INSTALL_SCRIPT,
MODULE_PATH, MODULE_PATH,
) )
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
@@ -37,10 +39,15 @@ from core.submodules.simple_config_parser.src.simple_config_parser.simple_config
SimpleConfigParser, SimpleConfigParser,
) )
from core.types.component_status import ComponentStatus from core.types.component_status import ComponentStatus
from utils.common import get_install_status from utils.common import check_install_dependencies, get_install_status
from utils.fs_utils import check_file_exist
from utils.input_utils import get_confirm, get_number_input, get_string_input from utils.input_utils import get_confirm, get_number_input, get_string_input
from utils.instance_utils import get_instances from utils.instance_utils import get_instances
from utils.sys_utils import cmd_sysctl_service from utils.sys_utils import (
cmd_sysctl_service,
install_python_packages,
parse_packages_from_file,
)
def get_klipper_status() -> ComponentStatus: def get_klipper_status() -> ComponentStatus:
@@ -194,3 +201,56 @@ def backup_klipper_dir() -> None:
bm = BackupManager() bm = BackupManager()
bm.backup_directory("klipper", source=KLIPPER_DIR, target=KLIPPER_BACKUP_DIR) bm.backup_directory("klipper", source=KLIPPER_DIR, target=KLIPPER_BACKUP_DIR)
bm.backup_directory("klippy-env", source=KLIPPER_ENV_DIR, target=KLIPPER_BACKUP_DIR) bm.backup_directory("klippy-env", source=KLIPPER_ENV_DIR, target=KLIPPER_BACKUP_DIR)
def install_klipper_packages() -> None:
script = KLIPPER_INSTALL_SCRIPT
packages = parse_packages_from_file(script)
# Add pkg-config for rp2040 build
packages.append("pkg-config")
# Add dbus requirement for DietPi distro
if check_file_exist(Path("/boot/dietpi/.version")):
packages.append("dbus")
check_install_dependencies({*packages})
def install_input_shaper_deps() -> None:
if not KLIPPER_ENV_DIR.exists():
Logger.print_warn("Required Klipper python environment not found!")
return
Logger.print_dialog(
DialogType.CUSTOM,
[
"Resonance measurements and shaper auto-calibration require additional "
"software dependencies which are not installed by default. "
"If you agree, the following additional system packages will be installed:",
"● python3-numpy",
"● python3-matplotlib",
"● libatlas-base-dev",
"● libopenblas-dev",
"\n\n",
"Also, the following Python package will be installed:",
"● numpy",
],
custom_title="Install Input Shaper Dependencies",
)
if not get_confirm(
"Do you want to install the required packages?", default_choice=False
):
return
apt_deps = (
"python3-numpy",
"python3-matplotlib",
"libatlas-base-dev",
"libopenblas-dev",
)
check_install_dependencies({*apt_deps})
py_deps = ("numpy",)
install_python_packages(KLIPPER_ENV_DIR, {*py_deps})

View File

@@ -11,7 +11,7 @@ from __future__ import annotations
import textwrap import textwrap
from typing import Type from typing import Type
from components.klipper import klipper_remove from components.klipper.services.klipper_setup_service import KlipperSetupService
from core.menus import FooterType, Option from core.menus import FooterType, Option
from core.menus.base_menu import BaseMenu from core.menus.base_menu import BaseMenu
from core.types.color import Color from core.types.color import Color
@@ -27,11 +27,13 @@ class KlipperRemoveMenu(BaseMenu):
self.previous_menu: Type[BaseMenu] | None = previous_menu self.previous_menu: Type[BaseMenu] | None = previous_menu
self.footer_type = FooterType.BACK self.footer_type = FooterType.BACK
self.remove_klipper_service = False self.rm_svc = False
self.remove_klipper_dir = False self.rm_dir = False
self.remove_klipper_env = False self.rm_env = False
self.select_state = False self.select_state = False
self.klsvc = KlipperSetupService()
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None: def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.remove_menu import RemoveMenu from core.menus.remove_menu import RemoveMenu
@@ -49,10 +51,10 @@ class KlipperRemoveMenu(BaseMenu):
def print_menu(self) -> None: def print_menu(self) -> None:
checked = f"[{Color.apply('x', Color.CYAN)}]" checked = f"[{Color.apply('x', Color.CYAN)}]"
unchecked = "[ ]" unchecked = "[ ]"
o1 = checked if self.remove_klipper_service else unchecked o1 = checked if self.rm_svc else unchecked
o2 = checked if self.remove_klipper_dir else unchecked o2 = checked if self.rm_dir else unchecked
o3 = checked if self.remove_klipper_env else unchecked o3 = checked if self.rm_env else unchecked
sel_state = f"{'Select'if not self.select_state else 'Deselect'} everything" sel_state = f"{'Select' if not self.select_state else 'Deselect'} everything"
menu = textwrap.dedent( menu = textwrap.dedent(
f""" f"""
╟───────────────────────────────────────────────────────╢ ╟───────────────────────────────────────────────────────╢
@@ -73,37 +75,28 @@ class KlipperRemoveMenu(BaseMenu):
def toggle_all(self, **kwargs) -> None: def toggle_all(self, **kwargs) -> None:
self.select_state = not self.select_state self.select_state = not self.select_state
self.remove_klipper_service = self.select_state self.rm_svc = self.select_state
self.remove_klipper_dir = self.select_state self.rm_dir = self.select_state
self.remove_klipper_env = self.select_state self.rm_env = self.select_state
def toggle_remove_klipper_service(self, **kwargs) -> None: def toggle_remove_klipper_service(self, **kwargs) -> None:
self.remove_klipper_service = not self.remove_klipper_service self.rm_svc = not self.rm_svc
def toggle_remove_klipper_dir(self, **kwargs) -> None: def toggle_remove_klipper_dir(self, **kwargs) -> None:
self.remove_klipper_dir = not self.remove_klipper_dir self.rm_dir = not self.rm_dir
def toggle_remove_klipper_env(self, **kwargs) -> None: def toggle_remove_klipper_env(self, **kwargs) -> None:
self.remove_klipper_env = not self.remove_klipper_env self.rm_env = not self.rm_env
def run_removal_process(self, **kwargs) -> None: def run_removal_process(self, **kwargs) -> None:
if ( if not self.rm_svc and not self.rm_dir and not self.rm_env:
not self.remove_klipper_service
and not self.remove_klipper_dir
and not self.remove_klipper_env
):
msg = "Nothing selected! Select options to remove first." msg = "Nothing selected! Select options to remove first."
print(Color.apply(msg, Color.RED)) print(Color.apply(msg, Color.RED))
return return
completion_msg = klipper_remove.run_klipper_removal( self.klsvc.remove(self.rm_svc, self.rm_dir, self.rm_env)
self.remove_klipper_service,
self.remove_klipper_dir,
self.remove_klipper_env,
)
self.message_service.set_message(completion_msg)
self.remove_klipper_service = False self.rm_svc = False
self.remove_klipper_dir = False self.rm_dir = False
self.remove_klipper_env = False self.rm_env = False
self.select_state = False self.select_state = False

View File

@@ -0,0 +1,46 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from typing import List
from components.klipper.klipper import Klipper
from utils.instance_utils import get_instances
class KlipperInstanceService:
__cls_instance = None
__instances: List[Klipper] = []
def __new__(cls) -> "KlipperInstanceService":
if cls.__cls_instance is None:
cls.__cls_instance = super(KlipperInstanceService, cls).__new__(cls)
return cls.__cls_instance
def __init__(self) -> None:
if not hasattr(self, "__initialized"):
self.__initialized = False
if self.__initialized:
return
self.__initialized = True
def load_instances(self) -> None:
self.__instances = get_instances(Klipper)
def create_new_instance(self, suffix: str) -> Klipper:
instance = Klipper(suffix)
self.__instances.append(instance)
return instance
def get_all_instances(self) -> List[Klipper]:
return self.__instances
def get_instance_by_suffix(self, suffix: str) -> Klipper | None:
instances: List[Klipper] = [i for i in self.__instances if i.suffix == suffix]
return instances[0] if instances else None

View File

@@ -0,0 +1,362 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from copy import copy
from typing import Dict, List, Tuple
from components.klipper import (
EXIT_KLIPPER_SETUP,
KLIPPER_DIR,
KLIPPER_ENV_DIR,
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_instance_overview,
print_select_custom_name_dialog,
)
from components.klipper.klipper_utils import (
assign_custom_name,
backup_klipper_dir,
check_user_groups,
create_example_printer_cfg,
get_install_count,
handle_disruptive_system_packages,
install_klipper_packages,
)
from components.klipper.services.klipper_instance_service import KlipperInstanceService
from components.moonraker.moonraker import Moonraker
from components.moonraker.services.moonraker_instance_service import (
MoonrakerInstanceService,
)
from components.webui_client.client_utils import (
get_existing_clients,
)
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from core.services.message_service import Message, MessageService
from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from utils.fs_utils import run_remove_routines
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import get_confirm, get_selection_input
from utils.sys_utils import (
cmd_sysctl_manage,
create_python_venv,
install_python_requirements,
unit_file_exists,
)
# noinspection PyMethodMayBeStatic
class KlipperSetupService:
__cls_instance = None
kisvc: KlipperInstanceService
misvc: MoonrakerInstanceService
msgsvc = MessageService
settings: KiauhSettings
klipper_list: List[Klipper]
moonraker_list: List[Moonraker]
def __new__(cls) -> "KlipperSetupService":
if cls.__cls_instance is None:
cls.__cls_instance = super(KlipperSetupService, cls).__new__(cls)
return cls.__cls_instance
def __init__(self) -> None:
if not hasattr(self, "__initialized"):
self.__initialized = False
if self.__initialized:
return
self.__initialized = True
self.__init_state()
def __init_state(self) -> None:
self.settings = KiauhSettings()
self.kisvc = KlipperInstanceService()
self.kisvc.load_instances()
self.klipper_list = self.kisvc.get_all_instances()
self.misvc = MoonrakerInstanceService()
self.misvc.load_instances()
self.moonraker_list = self.misvc.get_all_instances()
self.msgsvc = MessageService()
def __refresh_state(self) -> None:
self.kisvc.load_instances()
self.klipper_list = self.kisvc.get_all_instances()
self.misvc.load_instances()
self.moonraker_list = self.misvc.get_all_instances()
def install(self) -> None:
Logger.print_status("Installing Klipper ...")
match_moonraker: bool = False
# if there are more moonraker instances than klipper instances, ask the user to
# match the klipper instance count to the count of moonraker instances with the same suffix
if len(self.moonraker_list) > len(self.klipper_list):
is_confirmed = self.__display_moonraker_info()
if not is_confirmed:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
match_moonraker = True
install_count, name_dict = self.__get_install_count_and_name_dict()
if install_count == 0:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
is_multi_install = install_count > 1 or (
len(name_dict) >= 1 and install_count >= 1
)
if not name_dict and install_count == 1:
name_dict = {0: ""}
elif is_multi_install and not match_moonraker:
custom_names = self.__use_custom_names_or_go_back()
if custom_names is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
self.__handle_instance_names(install_count, name_dict, custom_names)
create_example_cfg = get_confirm("Create example printer.cfg?")
# run the actual installation
try:
self.__run_setup(name_dict, create_example_cfg)
except Exception as e:
Logger.print_error(e)
Logger.print_error("Klipper installation failed!")
return
def update(self) -> None:
Logger.print_dialog(
DialogType.WARNING,
[
"Do NOT continue if there are ongoing prints running!",
"All Klipper instances will be restarted during the update process and "
"ongoing prints WILL FAIL.",
],
)
if not get_confirm("Update Klipper now?"):
return
self.__refresh_state()
if self.settings.kiauh.backup_before_update:
backup_klipper_dir()
InstanceManager.stop_all(self.klipper_list)
git_pull_wrapper(self.settings.klipper.repo_url, KLIPPER_DIR)
install_klipper_packages()
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
InstanceManager.start_all(self.klipper_list)
def remove(
self,
remove_service: bool,
remove_dir: bool,
remove_env: bool,
) -> None:
self.__refresh_state()
completion_msg = Message(
title="Klipper Removal Process completed",
color=Color.GREEN,
)
if remove_service:
Logger.print_status("Removing Klipper instances ...")
if self.klipper_list:
instances_to_remove = self.__get_instances_to_remove()
self.__remove_instances(instances_to_remove)
if instances_to_remove:
instance_names = [
i.service_file_path.stem for i in instances_to_remove
]
txt = f"● Klipper instances removed: {', '.join(instance_names)}"
completion_msg.text.append(txt)
else:
Logger.print_info("No Klipper Services installed! Skipped ...")
if (remove_dir or remove_env) and unit_file_exists("klipper", suffix="service"):
completion_msg.text = [
"Some Klipper services are still installed:",
f"'{KLIPPER_DIR}' was not removed, even though selected for removal.",
f"'{KLIPPER_ENV_DIR}' was not removed, even though selected for removal.",
]
else:
if remove_dir:
Logger.print_status("Removing Klipper local repository ...")
if run_remove_routines(KLIPPER_DIR):
completion_msg.text.append("● Klipper local repository removed")
if remove_env:
Logger.print_status("Removing Klipper Python environment ...")
if run_remove_routines(KLIPPER_ENV_DIR):
completion_msg.text.append("● Klipper Python environment removed")
if completion_msg.text:
completion_msg.text.insert(0, "The following actions were performed:")
else:
completion_msg.color = Color.YELLOW
completion_msg.centered = True
completion_msg.text = ["Nothing to remove."]
self.msgsvc.set_message(completion_msg)
def __get_install_count_and_name_dict(self) -> Tuple[int, Dict[int, str]]:
install_count: int | None
if len(self.moonraker_list) > len(self.klipper_list):
install_count = len(self.moonraker_list)
name_dict = {
i: moonraker.suffix for i, moonraker in enumerate(self.moonraker_list)
}
else:
install_count = get_install_count()
name_dict = {
i: klipper.suffix for i, klipper in enumerate(self.klipper_list)
}
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return 0, {}
return install_count, name_dict
def __run_setup(self, name_dict: Dict[int, str], create_example_cfg: bool) -> None:
if not self.klipper_list:
self.__install_deps()
for i in name_dict:
# skip this iteration if there is already an instance with the name
if name_dict[i] in [n.suffix for n in self.klipper_list]:
continue
instance = Klipper(suffix=name_dict[i])
instance.create()
InstanceManager.enable(instance)
if create_example_cfg:
# if a client-config is installed, include it in the new example cfg
clients = get_existing_clients()
create_example_printer_cfg(instance, clients)
InstanceManager.start(instance)
cmd_sysctl_manage("daemon-reload")
# step 4: check/handle conflicting packages/services
handle_disruptive_system_packages()
# step 5: check for required group membership
check_user_groups()
def __install_deps(self) -> None:
repo = self.settings.klipper.repo_url
branch = self.settings.klipper.branch
git_clone_wrapper(repo, KLIPPER_DIR, branch)
try:
install_klipper_packages()
if create_python_venv(KLIPPER_ENV_DIR):
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
except Exception:
Logger.print_error("Error during installation of Klipper requirements!")
raise
def __display_moonraker_info(self) -> bool:
# todo: only show the klipper instances that are not already installed
Logger.print_dialog(
DialogType.INFO,
[
"Existing Moonraker instances detected:",
*[f"{m.service_file_path.stem}" for m in self.moonraker_list],
"\n\n",
"The following Klipper instances will be installed:",
*[f"● klipper-{m.suffix}" for m in self.moonraker_list],
],
)
_input: bool = get_confirm("Proceed with installation?")
return _input
def __handle_instance_names(
self, install_count: int, name_dict: Dict[int, str], custom_names: bool
) -> None:
for i in range(install_count): # 3
key: int = len(name_dict.keys()) + 1
if custom_names:
assign_custom_name(key, name_dict)
else:
name_dict[key] = str(len(name_dict) + 1)
def __use_custom_names_or_go_back(self) -> bool | None:
print_select_custom_name_dialog()
_input: bool | None = get_confirm(
"Assign custom names?",
False,
allow_go_back=True,
)
return _input
def __get_instances_to_remove(self) -> List[Klipper] | None:
start_index = 1
curr_instances: List[Klipper] = self.klipper_list
instance_count = len(curr_instances)
options = [str(i + start_index) for i in range(instance_count)]
options.extend(["a", "b"])
instance_map = {options[i]: self.klipper_list[i] for i in range(instance_count)}
print_instance_overview(
self.klipper_list,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Klipper instance to remove", options)
if selection == "b":
return None
elif selection == "a":
return copy(self.klipper_list)
return [instance_map[selection]]
def __remove_instances(
self,
instance_list: List[Klipper] | None,
) -> None:
if not instance_list:
return
for instance in instance_list:
Logger.print_status(
f"Removing instance {instance.service_file_path.stem} ..."
)
InstanceManager.remove(instance)
self.__delete_klipper_env_file(instance)
self.__refresh_state()
def __delete_klipper_env_file(self, instance: Klipper):
Logger.print_status(f"Remove '{instance.env_file}'")
if not instance.env_file.exists():
msg = f"Env file in {instance.base.sysd_dir} not found. Skipped ..."
Logger.print_info(msg)
return
run_remove_routines(instance.env_file)

View File

@@ -48,7 +48,7 @@ def print_moonraker_overview(
for i, k in enumerate(instance_map): for i, k in enumerate(instance_map):
mr_name = instance_map.get(k) mr_name = instance_map.get(k)
m = f"<-> {mr_name}" if mr_name != "" else "" m = f"<-> {mr_name}" if mr_name != "" else ""
line = Color.apply(f"{f'{i+1})' if show_index else ''} {k} {m}", Color.CYAN) line = Color.apply(f"{f'{i + 1})' if show_index else ''} {k} {m}", Color.CYAN)
dialog += f"{line:<63}\n" dialog += f"{line:<63}\n"
warn_l1 = Color.apply("PLEASE NOTE:", Color.YELLOW) warn_l1 = Color.apply("PLEASE NOTE:", Color.YELLOW)

View File

@@ -27,6 +27,9 @@ from components.moonraker import (
) )
from components.moonraker.moonraker import Moonraker from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_dialogs import print_moonraker_overview from components.moonraker.moonraker_dialogs import print_moonraker_overview
from components.moonraker.services.moonraker_instance_service import (
MoonrakerInstanceService,
)
from components.moonraker.utils.sysdeps_parser import SysDepsParser from components.moonraker.utils.sysdeps_parser import SysDepsParser
from components.moonraker.utils.utils import ( from components.moonraker.utils.utils import (
backup_moonraker_dir, backup_moonraker_dir,
@@ -39,8 +42,9 @@ from components.webui_client.client_utils import (
) )
from components.webui_client.mainsail_data import MainsailData from components.webui_client.mainsail_data import MainsailData
from core.instance_manager.instance_manager import InstanceManager from core.instance_manager.instance_manager import InstanceManager
from core.logger import Logger from core.logger import DialogType, Logger
from core.settings.kiauh_settings import KiauhSettings from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from utils.common import check_install_dependencies from utils.common import check_install_dependencies
from utils.fs_utils import check_file_exist from utils.fs_utils import check_file_exist
from utils.git_utils import git_clone_wrapper, git_pull_wrapper from utils.git_utils import git_clone_wrapper, git_pull_wrapper
@@ -54,6 +58,7 @@ from utils.sys_utils import (
cmd_sysctl_manage, cmd_sysctl_manage,
cmd_sysctl_service, cmd_sysctl_service,
create_python_venv, create_python_venv,
get_ipv4_addr,
install_python_requirements, install_python_requirements,
parse_packages_from_file, parse_packages_from_file,
) )
@@ -65,12 +70,18 @@ def install_moonraker() -> None:
if not check_moonraker_install_requirements(klipper_list): if not check_moonraker_install_requirements(klipper_list):
return return
moonraker_list: List[Moonraker] = get_instances(Moonraker) instance_service = MoonrakerInstanceService()
instances: List[Moonraker] = [] instance_service.load_instances()
moonraker_list: List[Moonraker] = instance_service.get_all_instances()
new_instances: List[Moonraker] = []
selected_option: str | Klipper selected_option: str | Klipper
if len(klipper_list) == 1: if len(klipper_list) == 1:
instances.append(Moonraker(klipper_list[0].suffix)) suffix: str = klipper_list[0].suffix
new_inst = instance_service.create_new_instance(suffix)
new_instances.append(new_inst)
else: else:
print_moonraker_overview( print_moonraker_overview(
klipper_list, klipper_list,
@@ -89,12 +100,16 @@ def install_moonraker() -> None:
return return
if selected_option == "a": if selected_option == "a":
instances.extend([Moonraker(k.suffix) for k in klipper_list]) new_inst_list: List[Moonraker] = [
instance_service.create_new_instance(k.suffix) for k in klipper_list
]
new_instances.extend(new_inst_list)
else: else:
klipper_instance: Klipper | None = options.get(selected_option) klipper_instance: Klipper | None = options.get(selected_option)
if klipper_instance is None: if klipper_instance is None:
raise Exception("Error selecting instance!") raise Exception("Error selecting instance!")
instances.append(Moonraker(klipper_instance.suffix)) new_inst = instance_service.create_new_instance(klipper_instance.suffix)
new_instances.append(new_inst)
create_example_cfg = get_confirm("Create example moonraker.conf?") create_example_cfg = get_confirm("Create example moonraker.conf?")
@@ -103,8 +118,8 @@ def install_moonraker() -> None:
setup_moonraker_prerequesites() setup_moonraker_prerequesites()
install_moonraker_polkit() install_moonraker_polkit()
used_ports_map = {m.suffix: m.port for m in moonraker_list} ports_map = instance_service.get_instance_port_map()
for instance in instances: for instance in new_instances:
instance.create() instance.create()
cmd_sysctl_service(instance.service_file_path.name, "enable") cmd_sysctl_service(instance.service_file_path.name, "enable")
@@ -112,7 +127,7 @@ def install_moonraker() -> None:
# if a webclient and/or it's config is installed, patch # if a webclient and/or it's config is installed, patch
# its update section to the config # its update section to the config
clients = get_existing_clients() clients = get_existing_clients()
create_example_moonraker_conf(instance, used_ports_map, clients) create_example_moonraker_conf(instance, ports_map, clients)
cmd_sysctl_service(instance.service_file_path.name, "start") cmd_sysctl_service(instance.service_file_path.name, "start")
@@ -123,6 +138,30 @@ def install_moonraker() -> None:
if MainsailData().client_dir.exists() and len(moonraker_list) > 1: if MainsailData().client_dir.exists() and len(moonraker_list) > 1:
enable_mainsail_remotemode() enable_mainsail_remotemode()
instance_service.load_instances()
new_instances = [
instance_service.get_instance_by_suffix(i.suffix) for i in new_instances
]
ip: str = get_ipv4_addr()
# noinspection HttpUrlsUsage
url_list = [
f"{i.service_file_path.stem}: http://{ip}:{i.port}"
for i in new_instances
if i.port
]
dialog_content = []
if url_list:
dialog_content.append("You can access Moonraker via the following URL:")
dialog_content.extend(url_list)
Logger.print_dialog(
DialogType.CUSTOM,
custom_title="Moonraker successfully installed!",
custom_color=Color.GREEN,
content=dialog_content,
)
except Exception as e: except Exception as e:
Logger.print_error(f"Error while installing Moonraker: {e}") Logger.print_error(f"Error while installing Moonraker: {e}")
return return
@@ -160,7 +199,8 @@ def install_moonraker_packages() -> None:
moonraker_deps = [] moonraker_deps = []
if MOONRAKER_DEPS_JSON_FILE.exists(): if MOONRAKER_DEPS_JSON_FILE.exists():
Logger.print_info( Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_DEPS_JSON_FILE.name} ...") f"Parsing system dependencies from {MOONRAKER_DEPS_JSON_FILE.name} ..."
)
parser = SysDepsParser() parser = SysDepsParser()
sysdeps = load_sysdeps_json(MOONRAKER_DEPS_JSON_FILE) sysdeps = load_sysdeps_json(MOONRAKER_DEPS_JSON_FILE)
moonraker_deps.extend(parser.parse_dependencies(sysdeps)) moonraker_deps.extend(parser.parse_dependencies(sysdeps))
@@ -168,7 +208,8 @@ def install_moonraker_packages() -> None:
elif MOONRAKER_INSTALL_SCRIPT.exists(): elif MOONRAKER_INSTALL_SCRIPT.exists():
Logger.print_warn(f"{MOONRAKER_DEPS_JSON_FILE.name} not found!") Logger.print_warn(f"{MOONRAKER_DEPS_JSON_FILE.name} not found!")
Logger.print_info( Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_INSTALL_SCRIPT.name} ...") f"Parsing system dependencies from {MOONRAKER_INSTALL_SCRIPT.name} ..."
)
moonraker_deps = parse_packages_from_file(MOONRAKER_INSTALL_SCRIPT) moonraker_deps = parse_packages_from_file(MOONRAKER_INSTALL_SCRIPT)
if not moonraker_deps: if not moonraker_deps:

View File

@@ -0,0 +1,49 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from typing import Dict, List
from components.moonraker.moonraker import Moonraker
from utils.instance_utils import get_instances
class MoonrakerInstanceService:
__cls_instance = None
__instances: List[Moonraker] = []
def __new__(cls) -> "MoonrakerInstanceService":
if cls.__cls_instance is None:
cls.__cls_instance = super(MoonrakerInstanceService, cls).__new__(cls)
return cls.__cls_instance
def __init__(self) -> None:
if not hasattr(self, "__initialized"):
self.__initialized = False
if self.__initialized:
return
self.__initialized = True
def load_instances(self) -> None:
self.__instances = get_instances(Moonraker)
def create_new_instance(self, suffix: str) -> Moonraker:
instance = Moonraker(suffix)
self.__instances.append(instance)
return instance
def get_all_instances(self) -> List[Moonraker]:
return self.__instances
def get_instance_by_suffix(self, suffix: str) -> Moonraker | None:
instances: List[Moonraker] = [i for i in self.__instances if i.suffix == suffix]
return instances[0] if instances else None
def get_instance_port_map(self) -> Dict[str, int]:
return {i.suffix: i.port for i in self.__instances}

View File

@@ -34,19 +34,23 @@ def _get_distro_info() -> Dict[str, Any]:
return dict( return dict(
distro_id=release_info.get("ID", ""), distro_id=release_info.get("ID", ""),
distro_version=release_info.get("VERSION_ID", ""), distro_version=release_info.get("VERSION_ID", ""),
aliases=release_info.get("ID_LIKE", "").split() aliases=release_info.get("ID_LIKE", "").split(),
) )
def _convert_version(version: str) -> Tuple[str | int, ...]: def _convert_version(version: str) -> Tuple[str | int, ...]:
version = version.strip() version = version.strip()
ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", version) ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", version)
if ver_match is not None: if ver_match is not None:
return tuple([ return tuple(
int(part) if part.isdigit() else part [
for part in re.split(r"\.|-", version) int(part) if part.isdigit() else part
]) for part in re.split(r"\.|-", version)
]
)
return (version,) return (version,)
class SysDepsParser: class SysDepsParser:
def __init__(self, distro_info: Dict[str, Any] | None = None) -> None: def __init__(self, distro_info: Dict[str, Any] | None = None) -> None:
if distro_info is None: if distro_info is None:
@@ -86,14 +90,16 @@ class SysDepsParser:
if logical_op not in ("and", "or"): if logical_op not in ("and", "or"):
logging.info( logging.info(
f"Invalid logical operator {logical_op} in requirement " f"Invalid logical operator {logical_op} in requirement "
f"specifier: {full_spec}") f"specifier: {full_spec}"
)
return None return None
last_logical_op = logical_op last_logical_op = logical_op
continue continue
elif last_logical_op is None: elif last_logical_op is None:
logging.info( logging.info(
f"Requirement specifier contains two seqential expressions " f"Requirement specifier contains two seqential expressions "
f"without a logical operator: {full_spec}") f"without a logical operator: {full_spec}"
)
return None return None
dep_parts = re.split(r"(==|!=|<=|>=|<|>)", exp.strip()) dep_parts = re.split(r"(==|!=|<=|>=|<|>)", exp.strip())
req_var = dep_parts[0].strip().lower() req_var = dep_parts[0].strip().lower()
@@ -123,7 +129,7 @@ class SysDepsParser:
"==": lambda x, y: x == y, "==": lambda x, y: x == y,
"!=": lambda x, y: x != y, "!=": lambda x, y: x != y,
">=": lambda x, y: x >= y, ">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y "<=": lambda x, y: x <= y,
}.get(operator, lambda x, y: False) }.get(operator, lambda x, y: False)
result = compfunc(left_op, right_op) result = compfunc(left_op, right_op)
if last_logical_op == "and": if last_logical_op == "and":

View File

@@ -140,6 +140,7 @@ def backup_moonraker_db_dir() -> None:
name, source=instance.db_dir, target=MOONRAKER_DB_BACKUP_DIR name, source=instance.db_dir, target=MOONRAKER_DB_BACKUP_DIR
) )
def load_sysdeps_json(file: Path) -> Dict[str, List[str]]: def load_sysdeps_json(file: Path) -> Dict[str, List[str]]:
try: try:
sysdeps: Dict[str, List[str]] = json.loads(file.read_bytes()) sysdeps: Dict[str, List[str]] = json.loads(file.read_bytes())

View File

@@ -57,7 +57,7 @@ class ClientRemoveMenu(BaseMenu):
o1 = checked if self.remove_client else unchecked o1 = checked if self.remove_client else unchecked
o2 = checked if self.remove_client_cfg else unchecked o2 = checked if self.remove_client_cfg else unchecked
o3 = checked if self.backup_config_json else unchecked o3 = checked if self.backup_config_json else unchecked
sel_state = f"{'Select'if not self.select_state else 'Deselect'} everything" sel_state = f"{'Select' if not self.select_state else 'Deselect'} everything"
menu = textwrap.dedent( menu = textwrap.dedent(
f""" f"""
╟───────────────────────────────────────────────────────╢ ╟───────────────────────────────────────────────────────╢

View File

@@ -86,7 +86,12 @@ class BackupManager:
date = get_current_date().get("date") date = get_current_date().get("date")
time = get_current_date().get("time") time = get_current_date().get("time")
backup_target = target.joinpath(f"{name.lower()}-{date}-{time}") backup_target = target.joinpath(f"{name.lower()}-{date}-{time}")
shutil.copytree(source, backup_target, ignore=self.ignore_folders_func, ignore_dangling_symlinks=True) shutil.copytree(
source,
backup_target,
ignore=self.ignore_folders_func,
ignore_dangling_symlinks=True,
)
Logger.print_ok("Backup successful!") Logger.print_ok("Backup successful!")
return backup_target return backup_target

View File

@@ -27,6 +27,13 @@ class DialogType(Enum):
LINE_WIDTH = 53 LINE_WIDTH = 53
BORDER_TOP: str = "┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓"
BORDER_BOTTOM: str = "┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛"
BORDER_TITLE: str = "┠───────────────────────────────────────────────────────┨"
BORDER_LEFT: str = ""
BORDER_RIGHT: str = ""
class Logger: class Logger:
@staticmethod @staticmethod
def print_info(msg, prefix=True, start="", end="\n") -> None: def print_info(msg, prefix=True, start="", end="\n") -> None:
@@ -81,23 +88,29 @@ class Logger:
:param margin_top: The number of empty lines to print before the dialog. :param margin_top: The number of empty lines to print before the dialog.
:param margin_bottom: The number of empty lines to print after the dialog. :param margin_bottom: The number of empty lines to print after the dialog.
""" """
dialog_color = Logger._get_dialog_color(title, custom_color) color = Logger._get_dialog_color(title, custom_color)
dialog_title = Logger._get_dialog_title(title, custom_title) dialog_title = Logger._get_dialog_title(title, custom_title)
dialog_title_formatted = Logger._format_dialog_title(dialog_title, dialog_color)
dialog_content = Logger.format_content(
content,
LINE_WIDTH,
dialog_color,
center_content,
)
top = Logger._format_top_border(dialog_color)
bottom = Logger._format_bottom_border(dialog_color)
print("\n" * margin_top) print("\n" * margin_top)
print(
f"{top}{dialog_title_formatted}{dialog_content}{bottom}", print(Color.apply(BORDER_TOP, color))
end="",
) if dialog_title:
print(Color.apply(f"{dialog_title:^{LINE_WIDTH}}", color))
print(Color.apply(BORDER_TITLE, color))
if content:
print(
Logger.format_content(
content,
LINE_WIDTH,
color,
center_content,
)
)
print(Color.apply(BORDER_BOTTOM, color))
print("\n" * margin_bottom) print("\n" * margin_bottom)
@staticmethod @staticmethod
@@ -119,31 +132,6 @@ class Logger:
return color return color
@staticmethod
def _format_top_border(color: Color) -> str:
_border = Color.apply(
"┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓\n", color
)
return _border
@staticmethod
def _format_bottom_border(color: Color) -> str:
_border = Color.apply(
"\n┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛", color
)
return _border
@staticmethod
def _format_dialog_title(title: str | None, color: Color) -> str:
if title is None:
return ""
_title = Color.apply(f"{title:^{LINE_WIDTH}}\n", color)
_title += Color.apply(
"┠───────────────────────────────────────────────────────┨\n", color
)
return _title
@staticmethod @staticmethod
def format_content( def format_content(
content: List[str], content: List[str],

View File

@@ -13,6 +13,7 @@ from typing import Type
from components.klipper import KLIPPER_DIR from components.klipper import KLIPPER_DIR
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
from components.klipper.klipper_utils import install_input_shaper_deps
from components.klipper_firmware.menus.klipper_build_menu import ( from components.klipper_firmware.menus.klipper_build_menu import (
KlipperBuildFirmwareMenu, KlipperBuildFirmwareMenu,
KlipperKConfigMenu, KlipperKConfigMenu,
@@ -50,9 +51,10 @@ class AdvancedMenu(BaseMenu):
"2": Option(method=self.flash), "2": Option(method=self.flash),
"3": Option(method=self.build_flash), "3": Option(method=self.build_flash),
"4": Option(method=self.get_id), "4": Option(method=self.get_id),
"5": Option(method=self.klipper_rollback), "5": Option(method=self.input_shaper),
"6": Option(method=self.moonraker_rollback), "6": Option(method=self.klipper_rollback),
"7": Option(method=self.change_hostname), "7": Option(method=self.moonraker_rollback),
"8": Option(method=self.change_hostname),
} }
def print_menu(self) -> None: def print_menu(self) -> None:
@@ -60,11 +62,13 @@ class AdvancedMenu(BaseMenu):
""" """
╟───────────────────────────┬───────────────────────────╢ ╟───────────────────────────┬───────────────────────────╢
║ Klipper Firmware: │ Repository Rollback: ║ ║ Klipper Firmware: │ Repository Rollback: ║
║ 1) [Build] │ 5) [Klipper] ║ ║ 1) [Build] │ 6) [Klipper] ║
║ 2) [Flash] │ 6) [Moonraker] ║ ║ 2) [Flash] │ 7) [Moonraker] ║
║ 3) [Build + Flash] │ ║ ║ 3) [Build + Flash] │ ║
║ 4) [Get MCU ID] │ System: ║ ║ 4) [Get MCU ID] │ System: ║
║ │ 7) [Change hostname] ║ ║ │ 8) [Change hostname] ║
║ Extra Dependencies: │ ║
║ 5) [Input Shaper] │ ║
╟───────────────────────────┴───────────────────────────╢ ╟───────────────────────────┴───────────────────────────╢
""" """
)[1:] )[1:]
@@ -97,3 +101,6 @@ class AdvancedMenu(BaseMenu):
def change_hostname(self, **kwargs) -> None: def change_hostname(self, **kwargs) -> None:
change_system_hostname() change_system_hostname()
def input_shaper(self, **kwargs) -> None:
install_input_shaper_deps()

View File

@@ -12,7 +12,7 @@ import textwrap
from typing import Type from typing import Type
from components.crowsnest.crowsnest import install_crowsnest from components.crowsnest.crowsnest import install_crowsnest
from components.klipper import klipper_setup from components.klipper.services.klipper_setup_service import KlipperSetupService
from components.klipperscreen.klipperscreen import install_klipperscreen from components.klipperscreen.klipperscreen import install_klipperscreen
from components.moonraker import moonraker_setup from components.moonraker import moonraker_setup
from components.webui_client.client_config.client_config_setup import ( from components.webui_client.client_config.client_config_setup import (
@@ -36,6 +36,7 @@ class InstallMenu(BaseMenu):
self.title = "Installation Menu" self.title = "Installation Menu"
self.title_color = Color.GREEN self.title_color = Color.GREEN
self.previous_menu: Type[BaseMenu] | None = previous_menu self.previous_menu: Type[BaseMenu] | None = previous_menu
self.klsvc = KlipperSetupService()
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None: def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.main_menu import MainMenu from core.menus.main_menu import MainMenu
@@ -75,7 +76,7 @@ class InstallMenu(BaseMenu):
print(menu, end="") print(menu, end="")
def install_klipper(self, **kwargs) -> None: def install_klipper(self, **kwargs) -> None:
klipper_setup.install_klipper() self.klsvc.install()
def install_moonraker(self, **kwargs) -> None: def install_moonraker(self, **kwargs) -> None:
moonraker_setup.install_moonraker() moonraker_setup.install_moonraker()

View File

@@ -40,7 +40,6 @@ class SettingsMenu(BaseMenu):
self._load_settings() self._load_settings()
print(self.klipper_status) print(self.klipper_status)
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None: def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.main_menu import MainMenu from core.menus.main_menu import MainMenu
@@ -122,18 +121,24 @@ class SettingsMenu(BaseMenu):
self.moonraker_status.owner = url_parts[-2] self.moonraker_status.owner = url_parts[-2]
self.moonraker_status.branch = self.settings.moonraker.branch self.moonraker_status.branch = self.settings.moonraker.branch
def _gather_input(self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path) -> Tuple[str, str]: def _gather_input(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> Tuple[str, str]:
warn_msg = [ warn_msg = [
"There is only basic input validation in place! " "There is only basic input validation in place! "
"Make sure your the input is valid and has no typos or invalid characters!"] "Make sure your the input is valid and has no typos or invalid characters!"
]
if repo_dir.exists(): if repo_dir.exists():
warn_msg.extend([ warn_msg.extend(
"For the change to take effect, the new repository will be cloned. " [
"A backup of the old repository will be created.", "For the change to take effect, the new repository will be cloned. "
"\n\n", "A backup of the old repository will be created.",
"Make sure you don't have any ongoing prints running, as the services " "\n\n",
"will be restarted during this process! You will loose any ongoing print!"]) "Make sure you don't have any ongoing prints running, as the services "
"will be restarted during this process! You will loose any ongoing print!",
]
)
Logger.print_dialog(DialogType.ATTENTION, warn_msg) Logger.print_dialog(DialogType.ATTENTION, warn_msg)
@@ -143,14 +148,14 @@ class SettingsMenu(BaseMenu):
default=KLIPPER_REPO_URL if repo_name == "klipper" else MOONRAKER_REPO_URL, default=KLIPPER_REPO_URL if repo_name == "klipper" else MOONRAKER_REPO_URL,
) )
branch = get_string_input( branch = get_string_input(
"Enter new branch name", "Enter new branch name", regex=r"^.+$", default="master"
regex=r"^.+$",
default="master"
) )
return repo, branch return repo, branch
def _set_repo(self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path) -> None: def _set_repo(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
repo_url, branch = self._gather_input(repo_name, repo_dir) repo_url, branch = self._gather_input(repo_name, repo_dir)
display_name = repo_name.capitalize() display_name = repo_name.capitalize()
Logger.print_dialog( Logger.print_dialog(
@@ -180,11 +185,15 @@ class SettingsMenu(BaseMenu):
self._switch_repo(repo_name, repo_dir) self._switch_repo(repo_name, repo_dir)
def _switch_repo(self, name: Literal["klipper", "moonraker"], repo_dir: Path ) -> None: def _switch_repo(
self, name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
if not repo_dir.exists(): if not repo_dir.exists():
return return
Logger.print_status(f"Switching to {name.capitalize()}'s new source repository ...") Logger.print_status(
f"Switching to {name.capitalize()}'s new source repository ..."
)
repo: RepoSettings = self.settings[name] repo: RepoSettings = self.settings[name]
run_switch_repo_routine(name, repo) run_switch_repo_routine(name, repo)

View File

@@ -12,10 +12,10 @@ import textwrap
from typing import Callable, List, Type from typing import Callable, List, Type
from components.crowsnest.crowsnest import get_crowsnest_status, update_crowsnest from components.crowsnest.crowsnest import get_crowsnest_status, update_crowsnest
from components.klipper.klipper_setup import update_klipper
from components.klipper.klipper_utils import ( from components.klipper.klipper_utils import (
get_klipper_status, get_klipper_status,
) )
from components.klipper.services.klipper_setup_service import KlipperSetupService
from components.klipperscreen.klipperscreen import ( from components.klipperscreen.klipperscreen import (
get_klipperscreen_status, get_klipperscreen_status,
update_klipperscreen, update_klipperscreen,
@@ -193,7 +193,8 @@ class UpdateMenu(BaseMenu):
self.upgrade_system_packages() self.upgrade_system_packages()
def update_klipper(self, **kwargs) -> None: def update_klipper(self, **kwargs) -> None:
self._run_update_routine("klipper", update_klipper) klsvc = KlipperSetupService()
self._run_update_routine("klipper", klsvc.update)
def update_moonraker(self, **kwargs) -> None: def update_moonraker(self, **kwargs) -> None:
self._run_update_routine("moonraker", update_moonraker) self._run_update_routine("moonraker", update_moonraker)

View File

@@ -6,6 +6,7 @@
# # # #
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List from typing import List
@@ -23,12 +24,13 @@ class Message:
class MessageService: class MessageService:
_instance = None __cls_instance = None
__message: Message | None
def __new__(cls) -> "MessageService": def __new__(cls) -> "MessageService":
if cls._instance is None: if cls.__cls_instance is None:
cls._instance = super(MessageService, cls).__new__(cls) cls.__cls_instance = super(MessageService, cls).__new__(cls)
return cls._instance return cls.__cls_instance
def __init__(self) -> None: def __init__(self) -> None:
if not hasattr(self, "__initialized"): if not hasattr(self, "__initialized"):
@@ -36,24 +38,24 @@ class MessageService:
if self.__initialized: if self.__initialized:
return return
self.__initialized = True self.__initialized = True
self.message = None self.__message = None
def set_message(self, message: Message) -> None: def set_message(self, message: Message) -> None:
self.message = message self.__message = message
def display_message(self) -> None: def display_message(self) -> None:
if self.message is None: if self.__message is None:
return return
Logger.print_dialog( Logger.print_dialog(
title=DialogType.CUSTOM, title=DialogType.CUSTOM,
content=self.message.text, content=self.__message.text,
custom_title=self.message.title, custom_title=self.__message.title,
custom_color=self.message.color, custom_color=self.__message.color,
center_content=self.message.centered, center_content=self.__message.centered,
) )
self.__clear_message() self.__clear_message()
def __clear_message(self) -> None: def __clear_message(self) -> None:
self.message = None self.__message = None

View File

@@ -10,11 +10,11 @@
# ======================================================================= # # ======================================================================= #
import os import os
import shutil
import subprocess import subprocess
from pathlib import Path
from core.constants import SYSTEMD from core.constants import SYSTEMD
from core.logger import Logger from core.logger import Logger
from pathlib import Path
from extensions.base_extension import BaseExtension from extensions.base_extension import BaseExtension
from extensions.klipper_backup import ( from extensions.klipper_backup import (
KLIPPERBACKUP_CONFIG_DIR, KLIPPERBACKUP_CONFIG_DIR,
@@ -29,7 +29,6 @@ from utils.sys_utils import cmd_sysctl_manage, remove_system_service, unit_file_
class KlipperbackupExtension(BaseExtension): class KlipperbackupExtension(BaseExtension):
def remove_extension(self, **kwargs) -> None: def remove_extension(self, **kwargs) -> None:
if not check_file_exist(KLIPPERBACKUP_DIR): if not check_file_exist(KLIPPERBACKUP_DIR):
Logger.print_info("Extension does not seem to be installed! Skipping ...") Logger.print_info("Extension does not seem to be installed! Skipping ...")
@@ -48,29 +47,44 @@ class KlipperbackupExtension(BaseExtension):
cmd_sysctl_manage("daemon-reload") cmd_sysctl_manage("daemon-reload")
cmd_sysctl_manage("reset-failed") cmd_sysctl_manage("reset-failed")
else: else:
Logger.print_error(f"Unknown unit type {unit_type} of {full_service_name}") Logger.print_error(
f"Unknown unit type {unit_type} of {full_service_name}"
)
except: except:
Logger.print_error(f"Failed to remove {full_service_name}: {str(e)}") Logger.print_error(f"Failed to remove {full_service_name}: {str(e)}")
def check_crontab_entry(entry) -> bool: def check_crontab_entry(entry) -> bool:
try: try:
crontab_content = subprocess.check_output(["crontab", "-l"], stderr=subprocess.DEVNULL, text=True) crontab_content = subprocess.check_output(
["crontab", "-l"], stderr=subprocess.DEVNULL, text=True
)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
return any(entry in line for line in crontab_content.splitlines()) return any(entry in line for line in crontab_content.splitlines())
def remove_moonraker_entry(): def remove_moonraker_entry():
original_file_path = MOONRAKER_CONF original_file_path = MOONRAKER_CONF
comparison_file_path = os.path.join(str(KLIPPERBACKUP_DIR), "install-files", "moonraker.conf") comparison_file_path = os.path.join(
if not (os.path.exists(original_file_path) and os.path.exists(comparison_file_path)): str(KLIPPERBACKUP_DIR), "install-files", "moonraker.conf"
)
if not (
os.path.exists(original_file_path)
and os.path.exists(comparison_file_path)
):
return False return False
with open(original_file_path, "r") as original_file, open(comparison_file_path, "r") as comparison_file: with open(original_file_path, "r") as original_file, open(
comparison_file_path, "r"
) as comparison_file:
original_content = original_file.read() original_content = original_file.read()
comparison_content = comparison_file.read() comparison_content = comparison_file.read()
if comparison_content in original_content: if comparison_content in original_content:
Logger.print_status("Removing Klipper-Backup moonraker entry ...") Logger.print_status("Removing Klipper-Backup moonraker entry ...")
modified_content = original_content.replace(comparison_content, "").strip() modified_content = original_content.replace(
modified_content = "\n".join(line for line in modified_content.split("\n") if line.strip()) comparison_content, ""
).strip()
modified_content = "\n".join(
line for line in modified_content.split("\n") if line.strip()
)
with open(original_file_path, "w") as original_file: with open(original_file_path, "w") as original_file:
original_file.write(modified_content) original_file.write(modified_content)
Logger.print_ok("Klipper-Backup moonraker entry successfully removed!") Logger.print_ok("Klipper-Backup moonraker entry successfully removed!")
@@ -79,7 +93,11 @@ class KlipperbackupExtension(BaseExtension):
if get_confirm("Do you really want to remove the extension?", True, False): if get_confirm("Do you really want to remove the extension?", True, False):
# Remove systemd timer and services # Remove systemd timer and services
service_names = ["klipper-backup-on-boot", "klipper-backup-filewatch", "klipper-backup"] service_names = [
"klipper-backup-on-boot",
"klipper-backup-filewatch",
"klipper-backup",
]
unit_types = ["timer", "service"] unit_types = ["timer", "service"]
for service_name in service_names: for service_name in service_names:
@@ -91,10 +109,23 @@ class KlipperbackupExtension(BaseExtension):
try: try:
if check_crontab_entry("/klipper-backup/script.sh"): if check_crontab_entry("/klipper-backup/script.sh"):
Logger.print_status("Removing Klipper-Backup crontab entry ...") Logger.print_status("Removing Klipper-Backup crontab entry ...")
crontab_content = subprocess.check_output(["crontab", "-l"], text=True) crontab_content = subprocess.check_output(
modified_content = "\n".join(line for line in crontab_content.splitlines() if "/klipper-backup/script.sh" not in line) ["crontab", "-l"], text=True
subprocess.run(["crontab", "-"], input=modified_content + "\n", text=True, check=True) )
Logger.print_ok("Klipper-Backup crontab entry successfully removed!") modified_content = "\n".join(
line
for line in crontab_content.splitlines()
if "/klipper-backup/script.sh" not in line
)
subprocess.run(
["crontab", "-"],
input=modified_content + "\n",
text=True,
check=True,
)
Logger.print_ok(
"Klipper-Backup crontab entry successfully removed!"
)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
Logger.print_error("Unable to remove the Klipper-Backup cron entry") Logger.print_error("Unable to remove the Klipper-Backup cron entry")
@@ -102,7 +133,9 @@ class KlipperbackupExtension(BaseExtension):
try: try:
remove_moonraker_entry() remove_moonraker_entry()
except: except:
Logger.print_error("Unable to remove the Klipper-Backup moonraker entry") Logger.print_error(
"Unable to remove the Klipper-Backup moonraker entry"
)
# Remove Klipper-backup extension # Remove Klipper-backup extension
Logger.print_status("Removing Klipper-Backup extension ...") Logger.print_status("Removing Klipper-Backup extension ...")
@@ -112,7 +145,7 @@ class KlipperbackupExtension(BaseExtension):
remove_with_sudo(KLIPPERBACKUP_CONFIG_DIR) remove_with_sudo(KLIPPERBACKUP_CONFIG_DIR)
Logger.print_ok("Extension Klipper-Backup successfully removed!") Logger.print_ok("Extension Klipper-Backup successfully removed!")
except: except:
Logger.print_error(f"Unable to remove Klipper-Backup extension") Logger.print_error("Unable to remove Klipper-Backup extension")
def install_extension(self, **kwargs) -> None: def install_extension(self, **kwargs) -> None:
if not KLIPPERBACKUP_DIR.exists(): if not KLIPPERBACKUP_DIR.exists():

View File

@@ -11,8 +11,8 @@ from typing import List
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
from components.moonraker.moonraker import Moonraker from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from core.instance_manager.base_instance import SUFFIX_BLACKLIST from core.instance_manager.base_instance import SUFFIX_BLACKLIST
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger from core.logger import DialogType, Logger
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import ( from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
SimpleConfigParser, SimpleConfigParser,
@@ -309,8 +309,12 @@ class ObicoExtension(BaseExtension):
def _check_and_opt_link_instances(self) -> None: def _check_and_opt_link_instances(self) -> None:
Logger.print_status("Checking link status of Obico instances ...") Logger.print_status("Checking link status of Obico instances ...")
suffix_blacklist: List[str] = [suffix for suffix in SUFFIX_BLACKLIST if suffix != 'obico'] suffix_blacklist: List[str] = [
ob_instances: List[MoonrakerObico] = get_instances(MoonrakerObico, suffix_blacklist=suffix_blacklist) suffix for suffix in SUFFIX_BLACKLIST if suffix != "obico"
]
ob_instances: List[MoonrakerObico] = get_instances(
MoonrakerObico, suffix_blacklist=suffix_blacklist
)
unlinked_instances: List[MoonrakerObico] = [ unlinked_instances: List[MoonrakerObico] = [
obico for obico in ob_instances if not obico.is_linked obico for obico in ob_instances if not obico.is_linked
] ]

View File

@@ -45,9 +45,7 @@ class Octoapp:
self.base: BaseInstance = BaseInstance(Moonraker, self.suffix) self.base: BaseInstance = BaseInstance(Moonraker, self.suffix)
self.base.log_file_name = self.log_file_name self.base.log_file_name = self.log_file_name
self.service_file_path: Path = get_service_file_path( self.service_file_path: Path = get_service_file_path(Octoapp, self.suffix)
Octoapp, self.suffix
)
self.store_dir = self.base.data_dir.joinpath("store") self.store_dir = self.base.data_dir.joinpath("store")
self.cfg_file = self.base.cfg_dir.joinpath(OA_CFG_NAME) self.cfg_file = self.base.cfg_dir.joinpath(OA_CFG_NAME)
self.sys_cfg_file = self.base.cfg_dir.joinpath(OA_SYS_CFG_NAME) self.sys_cfg_file = self.base.cfg_dir.joinpath(OA_SYS_CFG_NAME)

View File

@@ -9,8 +9,8 @@
import json import json
from typing import List from typing import List
from components.moonraker.moonraker import Moonraker
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger from core.logger import DialogType, Logger
from extensions.base_extension import BaseExtension from extensions.base_extension import BaseExtension
@@ -107,9 +107,7 @@ class OctoappExtension(BaseExtension):
) )
except Exception as e: except Exception as e:
Logger.print_error( Logger.print_error(f"Error during OctoApp for Klipper installation:\n{e}")
f"Error during OctoApp for Klipper installation:\n{e}"
)
def update_extension(self, **kwargs) -> None: def update_extension(self, **kwargs) -> None:
Logger.print_status("Updating OctoApp for Klipper ...") Logger.print_status("Updating OctoApp for Klipper ...")
@@ -183,7 +181,6 @@ class OctoappExtension(BaseExtension):
run_remove_routines(OA_DIR) run_remove_routines(OA_DIR)
def _remove_OA_store_dirs(self) -> None: def _remove_OA_store_dirs(self) -> None:
Logger.print_status("Removing OctoApp for Klipper store directory ...") Logger.print_status("Removing OctoApp for Klipper store directory ...")
@@ -197,7 +194,6 @@ class OctoappExtension(BaseExtension):
run_remove_routines(store_dir) run_remove_routines(store_dir)
def _remove_OA_env(self) -> None: def _remove_OA_env(self) -> None:
Logger.print_status("Removing OctoApp for Klipper environment ...") Logger.print_status("Removing OctoApp for Klipper environment ...")

View File

@@ -116,10 +116,7 @@ class MoonrakerTelegramBot:
"%TELEGRAM_BOT_DIR%", "%TELEGRAM_BOT_DIR%",
self.bot_dir.as_posix(), self.bot_dir.as_posix(),
) )
env_file_content = env_file_content.replace( env_file_content = env_file_content.replace("%CFG%", self.cfg_file.as_posix())
"%CFG%",
self.cfg_file.as_posix()
)
env_file_content = env_file_content.replace( env_file_content = env_file_content.replace(
"%LOG%", "%LOG%",
self.base.log_dir.joinpath(self.log_file_name).as_posix(), self.base.log_dir.joinpath(self.log_file_name).as_posix(),

View File

@@ -19,7 +19,7 @@ from components.klipper import (
KLIPPER_REQ_FILE, KLIPPER_REQ_FILE,
) )
from components.klipper.klipper import Klipper from components.klipper.klipper import Klipper
from components.klipper.klipper_setup import install_klipper_packages from components.klipper.klipper_utils import install_klipper_packages
from components.moonraker import ( from components.moonraker import (
MOONRAKER_BACKUP_DIR, MOONRAKER_BACKUP_DIR,
MOONRAKER_DIR, MOONRAKER_DIR,

View File

@@ -192,5 +192,5 @@ def moonraker_exists(name: str = "") -> List[Moonraker]:
def trunc_string(input_str: str, length: int) -> str: def trunc_string(input_str: str, length: int) -> str:
if len(input_str) > length: if len(input_str) > length:
return f"{input_str[:length - 3]}..." return f"{input_str[: length - 3]}..."
return input_str return input_str

View File

@@ -10,6 +10,7 @@
# ======================================================================= # # ======================================================================= #
from __future__ import annotations from __future__ import annotations
import os
import re import re
import shutil import shutil
from pathlib import Path from pathlib import Path
@@ -29,15 +30,15 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
:return: True, if file exists, otherwise False :return: True, if file exists, otherwise False
""" """
if sudo: if sudo:
command = ["sudo", "find", file_path.as_posix()]
try: try:
command = ["sudo", "find", file_path.as_posix()]
check_output(command, stderr=DEVNULL) check_output(command, stderr=DEVNULL)
return True return True
except CalledProcessError: except CalledProcessError:
return False return False
else: else:
if file_path.exists(): if os.access(file_path, os.F_OK):
return True return file_path.exists()
else: else:
return False return False

View File

@@ -26,9 +26,10 @@ def git_clone_wrapper(
) -> None: ) -> None:
""" """
Clones a repository from the given URL and checks out the specified branch if given. Clones a repository from the given URL and checks out the specified branch if given.
The clone will be performed with the '--filter=blob:none' flag to perform a blobless clone.
:param repo: The URL of the repository to clone. :param repo: The URL of the repository to clone.
:param branch: The branch to check out. If None, the default branch will be checked out. :param branch: The branch to check out. If None, master or main, no checkout will be performed.
:param target_dir: The directory where the repository will be cloned. :param target_dir: The directory where the repository will be cloned.
:param force: Force the cloning of the repository even if it already exists. :param force: Force the cloning of the repository even if it already exists.
:return: None :return: None
@@ -43,8 +44,11 @@ def git_clone_wrapper(
return return
shutil.rmtree(target_dir) shutil.rmtree(target_dir)
git_cmd_clone(repo, target_dir) git_cmd_clone(repo, target_dir, blobless=True)
git_cmd_checkout(branch, target_dir)
if branch not in ("master", "main"):
git_cmd_checkout(branch, target_dir)
except CalledProcessError: except CalledProcessError:
log = "An unexpected error occured during cloning of the repository." log = "An unexpected error occured during cloning of the repository."
Logger.print_error(log) Logger.print_error(log)
@@ -132,8 +136,10 @@ def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:
tags: List[str] = result.split("\n")[:-1] tags: List[str] = result.split("\n")[:-1]
return sorted(tags, key=lambda x: [int(i) if i.isdigit() else i for i in return sorted(
re.split(r'(\d+)', x)]) tags,
key=lambda x: [int(i) if i.isdigit() else i for i in re.split(r"(\d+)", x)],
)
except CalledProcessError: except CalledProcessError:
return [] return []
@@ -253,11 +259,23 @@ def get_remote_commit(repo: Path) -> str | None:
return None return None
def git_cmd_clone(repo: str, target_dir: Path) -> None: def git_cmd_clone(repo: str, target_dir: Path, blobless: bool = False) -> None:
try: """
command = ["git", "clone", repo, target_dir.as_posix()] Clones a repository with optional blobless clone.
run(command, check=True)
:param repo: URL of the repository to clone.
:param target_dir: Path where the repository will be cloned.
:param blobless: If True, perform a blobless clone by adding the '--filter=blob:none' flag.
"""
try:
command = ["git", "clone"]
if blobless:
command.append("--filter=blob:none")
command += [repo, target_dir.as_posix()]
run(command, check=True)
Logger.print_ok("Clone successful!") Logger.print_ok("Clone successful!")
except CalledProcessError as e: except CalledProcessError as e:
error = e.stderr.decode() if e.stderr else "Unknown error" error = e.stderr.decode() if e.stderr else "Unknown error"

View File

@@ -17,7 +17,9 @@ from core.instance_manager.base_instance import SUFFIX_BLACKLIST
from utils.instance_type import InstanceType from utils.instance_type import InstanceType
def get_instances(instance_type: type, suffix_blacklist: List[str] = SUFFIX_BLACKLIST) -> List[InstanceType]: def get_instances(
instance_type: type, suffix_blacklist: List[str] = SUFFIX_BLACKLIST
) -> List[InstanceType]:
from utils.common import convert_camelcase_to_kebabcase from utils.common import convert_camelcase_to_kebabcase
if not isinstance(instance_type, type): if not isinstance(instance_type, type):

View File

@@ -197,6 +197,38 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
raise VenvCreationFailedException(log) raise VenvCreationFailedException(log)
def install_python_packages(target: Path, packages: List[str]) -> None:
"""
Installs the python packages based on a provided packages list |
:param target: Path of the virtualenv
:param packages: str list of required packages
:return: None
"""
try:
# always update pip before installing requirements
update_python_pip(target)
Logger.print_status("Installing Python requirements ...")
command = [
target.joinpath("bin/pip").as_posix(),
"install",
]
for pkg in packages:
command.append(pkg)
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
raise VenvCreationFailedException("Installing Python requirements failed!")
Logger.print_ok("Installing Python requirements successful!")
except Exception as e:
log = f"Error installing Python requirements: {e}"
Logger.print_error(log)
raise VenvCreationFailedException(log)
def update_system_package_lists(silent: bool, rls_info_change=False) -> None: def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
""" """
Updates the systems package list | Updates the systems package list |

View File

@@ -5,12 +5,13 @@ requires-python = ">=3.8"
dev=["ruff", "mypy"] dev=["ruff", "mypy"]
[tool.ruff] [tool.ruff]
required-version = ">=0.3.4" required-version = ">=0.9.10"
respect-gitignore = true respect-gitignore = true
exclude = [".git",".github", "./docs"] exclude = [".git",".github", "./docs", "kiauh/core/submodules"]
line-length = 88 line-length = 88
indent-width = 4 indent-width = 4
output-format = "full" output-format = "full"
target-version = "py38"
[tool.ruff.format] [tool.ruff.format]
indent-style = "space" indent-style = "space"

View File

@@ -304,6 +304,8 @@ function install_klipper_packages() {
packages=$(grep "PKGLIST=" "${install_script}" | cut -d'"' -f2 | sed 's/\${PKGLIST}//g' | tr -d '\n') packages=$(grep "PKGLIST=" "${install_script}" | cut -d'"' -f2 | sed 's/\${PKGLIST}//g' | tr -d '\n')
### add dfu-util for octopi-images ### add dfu-util for octopi-images
packages+=" dfu-util" packages+=" dfu-util"
### add pkg-config for rp2040 build
packages+=" pkg-config"
### add dbus requirement for DietPi distro ### add dbus requirement for DietPi distro
[[ -e "/boot/dietpi/.version" ]] && packages+=" dbus" [[ -e "/boot/dietpi/.version" ]] && packages+=" dbus"

View File

@@ -147,7 +147,177 @@ function install_moonraker_dependencies() {
### read PKGLIST from official install-script ### read PKGLIST from official install-script
status_msg "Reading dependencies..." status_msg "Reading dependencies..."
# shellcheck disable=SC2016 # shellcheck disable=SC2016
packages=$(cat $package_json | tr -d ' \n{}' | cut -d "]" -f1 | cut -d":" -f2 | tr -d '"[' | sed 's/,/ /g') packages=$(python3 - << EOF
from __future__ import annotations
import shlex
import re
import pathlib
import logging
import json
from typing import Tuple, Dict, List, Any
def _get_distro_info() -> Dict[str, Any]:
release_file = pathlib.Path("/etc/os-release")
release_info: Dict[str, str] = {}
with release_file.open("r") as f:
lexer = shlex.shlex(f, posix=True)
lexer.whitespace_split = True
for item in list(lexer):
if "=" in item:
key, val = item.split("=", maxsplit=1)
release_info[key] = val
return dict(
distro_id=release_info.get("ID", ""),
distro_version=release_info.get("VERSION_ID", ""),
aliases=release_info.get("ID_LIKE", "").split()
)
def _convert_version(version: str) -> Tuple[str | int, ...]:
version = version.strip()
ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", version)
if ver_match is not None:
return tuple([
int(part) if part.isdigit() else part
for part in re.split(r"\.|-", version)
])
return (version,)
class SysDepsParser:
def __init__(self, distro_info: Dict[str, Any] | None = None) -> None:
if distro_info is None:
distro_info = _get_distro_info()
self.distro_id: str = distro_info.get("distro_id", "")
self.aliases: List[str] = distro_info.get("aliases", [])
self.distro_version: Tuple[int | str, ...] = tuple()
version = distro_info.get("distro_version")
if version:
self.distro_version = _convert_version(version)
def _parse_spec(self, full_spec: str) -> str | None:
parts = full_spec.split(";", maxsplit=1)
if len(parts) == 1:
return full_spec
pkg_name = parts[0].strip()
expressions = re.split(r"( and | or )", parts[1].strip())
if not len(expressions) & 1:
logging.info(
f"Requirement specifier is missing an expression "
f"between logical operators : {full_spec}"
)
return None
last_result: bool = True
last_logical_op: str | None = "and"
for idx, exp in enumerate(expressions):
if idx & 1:
if last_logical_op is not None:
logging.info(
"Requirement specifier contains sequential logical "
f"operators: {full_spec}"
)
return None
logical_op = exp.strip()
if logical_op not in ("and", "or"):
logging.info(
f"Invalid logical operator {logical_op} in requirement "
f"specifier: {full_spec}")
return None
last_logical_op = logical_op
continue
elif last_logical_op is None:
logging.info(
f"Requirement specifier contains two seqential expressions "
f"without a logical operator: {full_spec}")
return None
dep_parts = re.split(r"(==|!=|<=|>=|<|>)", exp.strip())
req_var = dep_parts[0].strip().lower()
if len(dep_parts) != 3:
logging.info(f"Invalid comparison, must be 3 parts: {full_spec}")
return None
elif req_var == "distro_id":
left_op: str | Tuple[int | str, ...] = self.distro_id
right_op = dep_parts[2].strip().strip("\"'")
elif req_var == "distro_version":
if not self.distro_version:
logging.info(
"Distro Version not detected, cannot satisfy requirement: "
f"{full_spec}"
)
return None
left_op = self.distro_version
right_op = _convert_version(dep_parts[2].strip().strip("\"'"))
else:
logging.info(f"Invalid requirement specifier: {full_spec}")
return None
operator = dep_parts[1].strip()
try:
compfunc = {
"<": lambda x, y: x < y,
">": lambda x, y: x > y,
"==": lambda x, y: x == y,
"!=": lambda x, y: x != y,
">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y
}.get(operator, lambda x, y: False)
result = compfunc(left_op, right_op)
if last_logical_op == "and":
last_result &= result
else:
last_result |= result
last_logical_op = None
except Exception:
logging.exception(f"Error comparing requirements: {full_spec}")
return None
if last_result:
return pkg_name
return None
def parse_dependencies(self, sys_deps: Dict[str, List[str]]) -> List[str]:
if not self.distro_id:
logging.info(
"Failed to detect current distro ID, cannot parse dependencies"
)
return []
all_ids = [self.distro_id] + self.aliases
for distro_id in all_ids:
if distro_id in sys_deps:
if not sys_deps[distro_id]:
logging.info(
f"Dependency data contains an empty package definition "
f"for linux distro '{distro_id}'"
)
continue
processed_deps: List[str] = []
for dep in sys_deps[distro_id]:
parsed_dep = self._parse_spec(dep)
if parsed_dep is not None:
processed_deps.append(parsed_dep)
return processed_deps
else:
logging.info(
f"Dependency data has no package definition for linux "
f"distro '{self.distro_id}'"
)
return []
# *** SYSTEM DEPENDENCIES START ***
system_deps = {
"debian": [
"python3-virtualenv", "python3-dev", "libopenjp2-7", "libsodium-dev",
"zlib1g-dev", "libjpeg-dev", "packagekit",
"wireless-tools; distro_id != 'ubuntu' or distro_version <= '24.04'",
"iw; distro_id == 'ubuntu' and distro_version >= '24.10'", "curl",
"build-essential"
],
}
system_deps_json = pathlib.Path("$package_json")
system_deps = json.loads(system_deps_json.read_bytes())
parser = SysDepsParser()
pkgs = parser.parse_dependencies(system_deps)
if pkgs:
print(' '.join(pkgs), end="")
exit(0)
EOF
)
echo "${cyan}${packages}${white}" | tr '[:space:]' '\n' echo "${cyan}${packages}${white}" | tr '[:space:]' '\n'
read -r -a packages <<< "${packages}" read -r -a packages <<< "${packages}"