Compare commits

...

17 Commits

Author SHA1 Message Date
dw-0
33113e72e9 fix: exception raised on pip warning (#688)
pip seems to write to stderr on warnings, caused by retries. even if the process exits with 0.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-05-31 17:44:02 +02:00
dw-0
6f59fd06aa fix: do not upgrade pip before installing packages (#680)
pip 25 seems to introduce some compatibility issues.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-05-02 20:08:32 +02:00
dw-0
56ea43ccb6 refactor: improve typesafety KiauhSettings
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-04-14 21:19:38 +02:00
dw-0
25e22c993f chore(scp): update SimpleConfigParser
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-04-14 21:15:12 +02:00
dw-0
ead521b377 refactor: replace mypy with pyright
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-04-14 21:07:56 +02:00
dw-0
3c952ccc12 refactor: use sane fallbacks on missing kiauh config options
for some options a warning is print if the fallback is used

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-04-12 15:12:11 +02:00
dw-0
c8f713c00e fix: no validation of optional_speedups option
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-04-12 00:36:34 +02:00
Pavel Sorejs
95cf809378 feat: add option to customize python binary for Klipper and Moonraker, add option to not install Moonraker speedups (#671)
Add option to cusomize python binary for klipper and moonraker. Add option to not install moonraker speedups.
2025-04-06 22:23:39 +02:00
dw-0
c91816d13f feat(extension): add Spoolman Docker installer (#669)
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-30 17:57:46 +02:00
dw-0
1a6f06eaf2 refactor(moonraker): move setup functions into MoonrakerSetupService
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-29 23:00:06 +01:00
dw-0
ea8621af0c refactor(git_utils): remove unnecessary url parameter in git_pull_wrapper
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-29 16:49:08 +01:00
dw-0
88742ab496 feat: allow configuration of multiple repos in kiauh.cfg (#668)
* remove existing simple_config_parser directory

* Squashed 'kiauh/core/submodules/simple_config_parser/' content from commit da22e6a

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: da22e6ad9ca4bc121c39dc3bc6c63175a72e78a2

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from da22e6a..9ae5749

9ae5749 fix: comment out file writing in test
1ac4e3d refactor: improve section writing

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 9ae574930dfe82107a3712c7c72b3aa777588996

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from 9ae5749..53e8408

53e8408 fix: do not add a blank line before writing a section header
dc77569 test: add test for removing option before writing

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 53e840853f12318dcac68196fb74c1843cb75808

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from 53e8408..4a6e5f2

4a6e5f2 refactor: full rework of the internal storage of the parsed config

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 4a6e5f23cb1f298f0a3efbf042186b16c91763c7

* refactor!: switching repos now offers list of repositories to choose from

this rework aligns more with the feature provided in kiauh v5.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-29 16:18:20 +01:00
dw-0
b99e6612e2 feat(ci): add automated release workflow for fast-forward and tagging
Adds a new GitHub Actions workflow that:
- Fast-forwards master branch from develop
- Creates and pushes a new release tag
- Requires manual trigger with tag name input

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-19 21:43:09 +01:00
dw-0
cf4e915430 cicd: restrict worflow runs to develop branch
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
CODeRUS
c901cd1fdf feat(advanced): install input shaper dependencies (#662)
* feat(advanced): install input shaper dependencies

Signed-off-by: Andrey Kozhevnikov <coderusinbox@gmail.com>

* chore: fix formatting/wording

also add a quick check if the klipper env exists

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Andrey Kozhevnikov <coderusinbox@gmail.com>
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
Co-authored-by: dw-0 <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
Aleksei Sviridkin
da3c37a872 feat(git_utils): Support for blolbless clone mode in git_cmd_clone (#640)
* feat(git_utils): enhance git_cmd_clone with depth and single-branch options

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): add a newline for better readability in git_cmd_clone

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* feat(git_utils): enhance git_cmd_clone with optional depth and single-branch parameters

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* feat(git_utils): update git_cmd_clone to support blolbless cloning option

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* revert formatting changes

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix another formatting changes

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): correct indentation for improved readability in get_local_tags function

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* fix(git_utils): rename blolbless parameter to blobless and update documentation for clarity

Signed-off-by: Aleksei Sviridkin <f@lex.la>

* refactor: enable the blobless clone feature for all regular clones

skip checkout step if brach is master or main

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Aleksei Sviridkin <f@lex.la>
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
Co-authored-by: dw-0 <th33xitus@gmail.com>
2025-03-13 18:26:23 +01:00
dw-0
8f436646cd cicd: add action for fast-forward check and merge (#660)
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2025-03-09 12:45:46 +01:00
59 changed files with 2305 additions and 964 deletions

View File

@@ -11,5 +11,5 @@ end_of_line = lf
[*.py]
max_line_length = 88
[*.sh]
indent_size = 2
[*.{sh,yml,yaml,json}]
indent_size = 2

View File

@@ -0,0 +1,33 @@
name: Release - Fast-Forward and Tag
on:
workflow_dispatch:
inputs:
tag_name:
description: 'Provide a tag name (e.g. v1.0.0)'
required: true
type: string
jobs:
ff-and-tag:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
ref: 'master'
- name: Merge Fast Forward
uses: MaximeHeckel/github-action-merge-fast-forward@v1.1.0
with:
branchtomerge: origin/develop
branch: master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Create and Push Tag
run: |
git tag ${{ inputs.tag_name }}
git push origin ${{ inputs.tag_name }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -2,12 +2,30 @@
backup_before_update: False
[klipper]
repo_url: https://github.com/Klipper3d/klipper
branch: master
# add custom repositories here, if at least one is given, the first in the list will be used by default
# otherwise the official repository is used
#
# format: https://github.com/username/repository, branch
# example: https://github.com/Klipper3d/klipper, master
#
# branch is optional, if given, it must be preceded by a comma, if not given, 'master' is used
repositories:
https://github.com/Klipper3d/klipper
[moonraker]
repo_url: https://github.com/Arksine/moonraker
branch: master
# Moonraker supports two optional Python packages that can be used to reduce its CPU load
# If set to true, those packages will be installed during the Moonraker installation
optional_speedups: True
# add custom repositories here, if at least one is given, the first in the list will be used by default
# otherwise the official repository is used
#
# format: https://github.com/username/repository, branch
# example: https://github.com/Arksine/moonraker, master
#
# branch is optional, if given, it must be preceded by a comma, if not given, 'master' is used
repositories:
https://github.com/Arksine/moonraker
[mainsail]
port: 80

View File

@@ -72,7 +72,7 @@ def install_crowsnest() -> None:
Logger.print_info("Installer will prompt you for sudo password!")
try:
run(
f"sudo make install",
"sudo make install",
cwd=CROWSNEST_DIR,
shell=True,
check=True,
@@ -134,7 +134,7 @@ def update_crowsnest() -> None:
target=CROWSNEST_BACKUP_DIR,
)
git_pull_wrapper(CROWSNEST_REPO, CROWSNEST_DIR)
git_pull_wrapper(CROWSNEST_DIR)
deps = parse_packages_from_file(CROWSNEST_INSTALL_SCRIPT)
check_install_dependencies({*deps})

View File

@@ -43,7 +43,11 @@ from utils.common import check_install_dependencies, get_install_status
from utils.fs_utils import check_file_exist
from utils.input_utils import get_confirm, get_number_input, get_string_input
from utils.instance_utils import get_instances
from utils.sys_utils import cmd_sysctl_service, parse_packages_from_file
from utils.sys_utils import (
cmd_sysctl_service,
install_python_packages,
parse_packages_from_file,
)
def get_klipper_status() -> ComponentStatus:
@@ -211,3 +215,42 @@ def install_klipper_packages() -> None:
packages.append("dbus")
check_install_dependencies({*packages})
def install_input_shaper_deps() -> None:
if not KLIPPER_ENV_DIR.exists():
Logger.print_warn("Required Klipper python environment not found!")
return
Logger.print_dialog(
DialogType.CUSTOM,
[
"Resonance measurements and shaper auto-calibration require additional "
"software dependencies which are not installed by default. "
"If you agree, the following additional system packages will be installed:",
"● python3-numpy",
"● python3-matplotlib",
"● libatlas-base-dev",
"● libopenblas-dev",
"\n\n",
"Also, the following Python package will be installed:",
"● numpy",
],
custom_title="Install Input Shaper Dependencies",
)
if not get_confirm(
"Do you want to install the required packages?", default_choice=False
):
return
apt_deps = (
"python3-numpy",
"python3-matplotlib",
"libatlas-base-dev",
"libopenblas-dev",
)
check_install_dependencies({*apt_deps})
py_deps = ("numpy",)
install_python_packages(KLIPPER_ENV_DIR, {*py_deps})

View File

@@ -15,6 +15,7 @@ from components.klipper import (
EXIT_KLIPPER_SETUP,
KLIPPER_DIR,
KLIPPER_ENV_DIR,
KLIPPER_REPO_URL,
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
@@ -101,6 +102,8 @@ class KlipperSetupService:
self.moonraker_list = self.misvc.get_all_instances()
def install(self) -> None:
self.__refresh_state()
Logger.print_status("Installing Klipper ...")
match_moonraker: bool = False
@@ -161,7 +164,7 @@ class KlipperSetupService:
backup_klipper_dir()
InstanceManager.stop_all(self.klipper_list)
git_pull_wrapper(self.settings.klipper.repo_url, KLIPPER_DIR)
git_pull_wrapper(KLIPPER_DIR)
install_klipper_packages()
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
InstanceManager.start_all(self.klipper_list)
@@ -266,14 +269,15 @@ class KlipperSetupService:
check_user_groups()
def __install_deps(self) -> None:
repo = self.settings.klipper.repo_url
branch = self.settings.klipper.branch
default_repo = (KLIPPER_REPO_URL, "master")
repo = self.settings.klipper.repositories
# pull the first repo defined in kiauh.cfg or fallback to the official Klipper repo
repo, branch = (repo[0].url, repo[0].branch) if repo else default_repo
git_clone_wrapper(repo, KLIPPER_DIR, branch)
try:
install_klipper_packages()
if create_python_venv(KLIPPER_ENV_DIR):
if create_python_venv(KLIPPER_ENV_DIR, False, False, self.settings.klipper.use_python_binary):
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
except Exception:
Logger.print_error("Error during installation of Klipper requirements!")

View File

@@ -386,7 +386,7 @@ class KlipperSelectSDFlashBoardMenu(BaseMenu):
self.flash_options.selected_baudrate = get_number_input(
question="Please set the baud rate",
default=250000,
min_count=0,
min_value=0,
allow_go_back=True,
)
KlipperFlashOverviewMenu(previous_menu=self.__class__).run()

View File

@@ -126,7 +126,7 @@ def update_klipperscreen() -> None:
if settings.kiauh.backup_before_update:
backup_klipperscreen_dir()
git_pull_wrapper(KLIPPERSCREEN_REPO, KLIPPERSCREEN_DIR)
git_pull_wrapper(KLIPPERSCREEN_DIR)
install_python_requirements(KLIPPERSCREEN_ENV_DIR, KLIPPERSCREEN_REQ_FILE)

View File

@@ -11,8 +11,8 @@ from __future__ import annotations
import textwrap
from typing import Type
from components.moonraker import moonraker_remove
from core.menus import Option
from components.moonraker.services.moonraker_setup_service import MoonrakerSetupService
from core.menus import FooterType, Option
from core.menus.base_menu import BaseMenu
from core.types.color import Color
@@ -21,14 +21,19 @@ from core.types.color import Color
class MoonrakerRemoveMenu(BaseMenu):
def __init__(self, previous_menu: Type[BaseMenu] | None = None):
super().__init__()
self.title = "Remove Moonraker"
self.title_color = Color.RED
self.previous_menu: Type[BaseMenu] | None = previous_menu
self.remove_moonraker_service = False
self.remove_moonraker_dir = False
self.remove_moonraker_env = False
self.remove_moonraker_polkit = False
self.selection_state = False
self.footer_type = FooterType.BACK
self.rm_svc = False
self.rm_dir = False
self.rm_env = False
self.rm_pk = False
self.select_state = False
self.mrsvc = MoonrakerSetupService()
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.remove_menu import RemoveMenu
@@ -48,17 +53,18 @@ class MoonrakerRemoveMenu(BaseMenu):
def print_menu(self) -> None:
checked = f"[{Color.apply('x', Color.CYAN)}]"
unchecked = "[ ]"
o1 = checked if self.remove_moonraker_service else unchecked
o2 = checked if self.remove_moonraker_dir else unchecked
o3 = checked if self.remove_moonraker_env else unchecked
o4 = checked if self.remove_moonraker_polkit else unchecked
o1 = checked if self.rm_svc else unchecked
o2 = checked if self.rm_dir else unchecked
o3 = checked if self.rm_env else unchecked
o4 = checked if self.rm_pk else unchecked
sel_state = f"{'Select' if not self.select_state else 'Deselect'} everything"
menu = textwrap.dedent(
f"""
╟───────────────────────────────────────────────────────╢
║ Enter a number and hit enter to select / deselect ║
║ the specific option for removal. ║
╟───────────────────────────────────────────────────────╢
║ a) {self._get_selection_state_str():37}
║ a) {sel_state:49}
╟───────────────────────────────────────────────────────╢
║ 1) {o1} Remove Service ║
║ 2) {o2} Remove Local Repository ║
@@ -72,57 +78,33 @@ class MoonrakerRemoveMenu(BaseMenu):
print(menu, end="")
def toggle_all(self, **kwargs) -> None:
self.selection_state = not self.selection_state
self.remove_moonraker_service = self.selection_state
self.remove_moonraker_dir = self.selection_state
self.remove_moonraker_env = self.selection_state
self.remove_moonraker_polkit = self.selection_state
self.select_state = not self.select_state
self.rm_svc = self.select_state
self.rm_dir = self.select_state
self.rm_env = self.select_state
self.rm_pk = self.select_state
def toggle_remove_moonraker_service(self, **kwargs) -> None:
self.remove_moonraker_service = not self.remove_moonraker_service
self.rm_svc = not self.rm_svc
def toggle_remove_moonraker_dir(self, **kwargs) -> None:
self.remove_moonraker_dir = not self.remove_moonraker_dir
self.rm_dir = not self.rm_dir
def toggle_remove_moonraker_env(self, **kwargs) -> None:
self.remove_moonraker_env = not self.remove_moonraker_env
self.rm_env = not self.rm_env
def toggle_remove_moonraker_polkit(self, **kwargs) -> None:
self.remove_moonraker_polkit = not self.remove_moonraker_polkit
self.rm_pk = not self.rm_pk
def run_removal_process(self, **kwargs) -> None:
if (
not self.remove_moonraker_service
and not self.remove_moonraker_dir
and not self.remove_moonraker_env
and not self.remove_moonraker_polkit
):
print(
Color.apply(
"Nothing selected! Select options to remove first.", Color.RED
)
)
if not self.rm_svc and not self.rm_dir and not self.rm_env and not self.rm_pk:
msg = "Nothing selected! Select options to remove first."
print(Color.apply(msg, Color.RED))
return
moonraker_remove.run_moonraker_removal(
self.remove_moonraker_service,
self.remove_moonraker_dir,
self.remove_moonraker_env,
self.remove_moonraker_polkit,
)
self.mrsvc.remove(self.rm_svc, self.rm_dir, self.rm_env, self.rm_pk)
self.remove_moonraker_service = False
self.remove_moonraker_dir = False
self.remove_moonraker_env = False
self.remove_moonraker_polkit = False
self._go_back()
def _get_selection_state_str(self) -> str:
return (
"Select everything" if not self.selection_state else "Deselect everything"
)
def _go_back(self, **kwargs) -> None:
if self.previous_menu is not None:
self.previous_menu().run()
self.rm_svc = False
self.rm_dir = False
self.rm_env = False
self.rm_pk = False

View File

@@ -1,121 +0,0 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from subprocess import DEVNULL, PIPE, CalledProcessError, run
from typing import List
from components.klipper.klipper_dialogs import print_instance_overview
from components.moonraker import MOONRAKER_DIR, MOONRAKER_ENV_DIR
from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from core.logger import Logger
from utils.fs_utils import run_remove_routines
from utils.input_utils import get_selection_input
from utils.instance_utils import get_instances
from utils.sys_utils import unit_file_exists
def run_moonraker_removal(
remove_service: bool,
remove_dir: bool,
remove_env: bool,
remove_polkit: bool,
) -> None:
instances = get_instances(Moonraker)
if remove_service:
Logger.print_status("Removing Moonraker instances ...")
if instances:
instances_to_remove = select_instances_to_remove(instances)
remove_instances(instances_to_remove)
else:
Logger.print_info("No Moonraker Services installed! Skipped ...")
delete_remaining: bool = remove_polkit or remove_dir or remove_env
if delete_remaining and unit_file_exists("moonraker", suffix="service"):
Logger.print_info("There are still other Moonraker services installed")
Logger.print_info(
"● Moonraker PolicyKit rules were not removed.", prefix=False
)
Logger.print_info(f"'{MOONRAKER_DIR}' was not removed.", prefix=False)
Logger.print_info(f"'{MOONRAKER_ENV_DIR}' was not removed.", prefix=False)
else:
if remove_polkit:
Logger.print_status("Removing all Moonraker policykit rules ...")
remove_polkit_rules()
if remove_dir:
Logger.print_status("Removing Moonraker local repository ...")
run_remove_routines(MOONRAKER_DIR)
if remove_env:
Logger.print_status("Removing Moonraker Python environment ...")
run_remove_routines(MOONRAKER_ENV_DIR)
def select_instances_to_remove(
instances: List[Moonraker],
) -> List[Moonraker] | None:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "b"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(
instances,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Moonraker instance to remove", options)
instances_to_remove = []
if selection == "b":
return None
elif selection == "a":
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])
return instances_to_remove
def remove_instances(
instance_list: List[Moonraker] | None,
) -> None:
if not instance_list:
Logger.print_info("No Moonraker instances found. Skipped ...")
return
for instance in instance_list:
Logger.print_status(f"Removing instance {instance.service_file_path.stem} ...")
InstanceManager.remove(instance)
delete_moonraker_env_file(instance)
def remove_polkit_rules() -> None:
if not MOONRAKER_DIR.exists():
log = "Cannot remove policykit rules. Moonraker directory not found."
Logger.print_warn(log)
return
try:
cmd = [f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh", "--clear"]
run(cmd, stderr=PIPE, stdout=DEVNULL, check=True)
except CalledProcessError as e:
Logger.print_error(f"Error while removing policykit rules: {e}")
Logger.print_ok("Policykit rules successfully removed!")
def delete_moonraker_env_file(instance: Moonraker):
Logger.print_status(f"Remove '{instance.env_file}'")
if not instance.env_file.exists():
msg = f"Env file in {instance.base.sysd_dir} not found. Skipped ..."
Logger.print_info(msg)
return
run_remove_routines(instance.env_file)

View File

@@ -1,269 +0,0 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import subprocess
from typing import List
from components.klipper.klipper import Klipper
from components.moonraker import (
EXIT_MOONRAKER_SETUP,
MOONRAKER_DEPS_JSON_FILE,
MOONRAKER_DIR,
MOONRAKER_ENV_DIR,
MOONRAKER_INSTALL_SCRIPT,
MOONRAKER_REQ_FILE,
MOONRAKER_SPEEDUPS_REQ_FILE,
POLKIT_FILE,
POLKIT_LEGACY_FILE,
POLKIT_SCRIPT,
POLKIT_USR_FILE,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_dialogs import print_moonraker_overview
from components.moonraker.services.moonraker_instance_service import (
MoonrakerInstanceService,
)
from components.moonraker.utils.sysdeps_parser import SysDepsParser
from components.moonraker.utils.utils import (
backup_moonraker_dir,
create_example_moonraker_conf,
load_sysdeps_json,
)
from components.webui_client.client_utils import (
enable_mainsail_remotemode,
get_existing_clients,
)
from components.webui_client.mainsail_data import MainsailData
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from utils.common import check_install_dependencies
from utils.fs_utils import check_file_exist
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import (
get_confirm,
get_selection_input,
)
from utils.instance_utils import get_instances
from utils.sys_utils import (
check_python_version,
cmd_sysctl_manage,
cmd_sysctl_service,
create_python_venv,
get_ipv4_addr,
install_python_requirements,
parse_packages_from_file,
)
def install_moonraker() -> None:
klipper_list: List[Klipper] = get_instances(Klipper)
if not check_moonraker_install_requirements(klipper_list):
return
instance_service = MoonrakerInstanceService()
instance_service.load_instances()
moonraker_list: List[Moonraker] = instance_service.get_all_instances()
new_instances: List[Moonraker] = []
selected_option: str | Klipper
if len(klipper_list) == 1:
suffix: str = klipper_list[0].suffix
new_inst = instance_service.create_new_instance(suffix)
new_instances.append(new_inst)
else:
print_moonraker_overview(
klipper_list,
moonraker_list,
show_index=True,
show_select_all=True,
)
options = {str(i + 1): k for i, k in enumerate(klipper_list)}
additional_options = {"a": None, "b": None}
options = {**options, **additional_options}
question = "Select Klipper instance to setup Moonraker for"
selected_option = get_selection_input(question, options)
if selected_option == "b":
Logger.print_status(EXIT_MOONRAKER_SETUP)
return
if selected_option == "a":
new_inst_list: List[Moonraker] = [
instance_service.create_new_instance(k.suffix) for k in klipper_list
]
new_instances.extend(new_inst_list)
else:
klipper_instance: Klipper | None = options.get(selected_option)
if klipper_instance is None:
raise Exception("Error selecting instance!")
new_inst = instance_service.create_new_instance(klipper_instance.suffix)
new_instances.append(new_inst)
create_example_cfg = get_confirm("Create example moonraker.conf?")
try:
check_install_dependencies()
setup_moonraker_prerequesites()
install_moonraker_polkit()
ports_map = instance_service.get_instance_port_map()
for instance in new_instances:
instance.create()
cmd_sysctl_service(instance.service_file_path.name, "enable")
if create_example_cfg:
# if a webclient and/or it's config is installed, patch
# its update section to the config
clients = get_existing_clients()
create_example_moonraker_conf(instance, ports_map, clients)
cmd_sysctl_service(instance.service_file_path.name, "start")
cmd_sysctl_manage("daemon-reload")
# if mainsail is installed, and we installed
# multiple moonraker instances, we enable mainsails remote mode
if MainsailData().client_dir.exists() and len(moonraker_list) > 1:
enable_mainsail_remotemode()
instance_service.load_instances()
new_instances = [
instance_service.get_instance_by_suffix(i.suffix) for i in new_instances
]
ip: str = get_ipv4_addr()
# noinspection HttpUrlsUsage
url_list = [
f"{i.service_file_path.stem}: http://{ip}:{i.port}"
for i in new_instances
if i.port
]
dialog_content = []
if url_list:
dialog_content.append("You can access Moonraker via the following URL:")
dialog_content.extend(url_list)
Logger.print_dialog(
DialogType.CUSTOM,
custom_title="Moonraker successfully installed!",
custom_color=Color.GREEN,
content=dialog_content,
)
except Exception as e:
Logger.print_error(f"Error while installing Moonraker: {e}")
return
def check_moonraker_install_requirements(klipper_list: List[Klipper]) -> bool:
def check_klipper_instances() -> bool:
if len(klipper_list) >= 1:
return True
Logger.print_warn("Klipper not installed!")
Logger.print_warn("Moonraker cannot be installed! Install Klipper first.")
return False
return check_python_version(3, 7) and check_klipper_instances()
def setup_moonraker_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.moonraker.repo_url
branch = settings.moonraker.branch
git_clone_wrapper(repo, MOONRAKER_DIR, branch)
# install moonraker dependencies and create python virtualenv
install_moonraker_packages()
if create_python_venv(MOONRAKER_ENV_DIR):
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_SPEEDUPS_REQ_FILE)
def install_moonraker_packages() -> None:
Logger.print_status("Parsing Moonraker system dependencies ...")
moonraker_deps = []
if MOONRAKER_DEPS_JSON_FILE.exists():
Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_DEPS_JSON_FILE.name} ..."
)
parser = SysDepsParser()
sysdeps = load_sysdeps_json(MOONRAKER_DEPS_JSON_FILE)
moonraker_deps.extend(parser.parse_dependencies(sysdeps))
elif MOONRAKER_INSTALL_SCRIPT.exists():
Logger.print_warn(f"{MOONRAKER_DEPS_JSON_FILE.name} not found!")
Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_INSTALL_SCRIPT.name} ..."
)
moonraker_deps = parse_packages_from_file(MOONRAKER_INSTALL_SCRIPT)
if not moonraker_deps:
raise ValueError("Error parsing Moonraker dependencies!")
check_install_dependencies({*moonraker_deps})
def install_moonraker_polkit() -> None:
Logger.print_status("Installing Moonraker policykit rules ...")
legacy_file_exists = check_file_exist(POLKIT_LEGACY_FILE, True)
polkit_file_exists = check_file_exist(POLKIT_FILE, True)
usr_file_exists = check_file_exist(POLKIT_USR_FILE, True)
if legacy_file_exists or (polkit_file_exists and usr_file_exists):
Logger.print_info("Moonraker policykit rules are already installed.")
return
try:
command = [POLKIT_SCRIPT, "--disable-systemctl"]
result = subprocess.run(
command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
text=True,
)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
Logger.print_error("Installing Moonraker policykit rules failed!")
return
Logger.print_ok("Moonraker policykit rules successfully installed!")
except subprocess.CalledProcessError as e:
log = f"Error while installing Moonraker policykit rules: {e.stderr.decode()}"
Logger.print_error(log)
def update_moonraker() -> None:
if not get_confirm("Update Moonraker now?"):
return
settings = KiauhSettings()
if settings.kiauh.backup_before_update:
backup_moonraker_dir()
instances = get_instances(Moonraker)
InstanceManager.stop_all(instances)
git_pull_wrapper(repo=settings.moonraker.repo_url, target_dir=MOONRAKER_DIR)
# install possible new system packages
install_moonraker_packages()
# install possible new python dependencies
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
InstanceManager.start_all(instances)

View File

@@ -0,0 +1,408 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from copy import copy
from subprocess import DEVNULL, PIPE, CalledProcessError, run
from typing import List
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import print_instance_overview
from components.klipper.services.klipper_instance_service import KlipperInstanceService
from components.moonraker import (
EXIT_MOONRAKER_SETUP,
MOONRAKER_DIR,
MOONRAKER_ENV_DIR,
MOONRAKER_REPO_URL,
MOONRAKER_REQ_FILE,
MOONRAKER_SPEEDUPS_REQ_FILE,
POLKIT_FILE,
POLKIT_LEGACY_FILE,
POLKIT_SCRIPT,
POLKIT_USR_FILE,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_dialogs import print_moonraker_overview
from components.moonraker.services.moonraker_instance_service import (
MoonrakerInstanceService,
)
from components.moonraker.utils.utils import (
backup_moonraker_dir,
create_example_moonraker_conf,
install_moonraker_packages,
remove_polkit_rules,
)
from components.webui_client.client_utils import (
enable_mainsail_remotemode,
get_existing_clients,
)
from components.webui_client.mainsail_data import MainsailData
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from core.services.message_service import Message, MessageService
from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from utils.common import check_install_dependencies
from utils.fs_utils import check_file_exist, run_remove_routines
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import (
get_confirm,
get_selection_input,
)
from utils.sys_utils import (
check_python_version,
cmd_sysctl_manage,
cmd_sysctl_service,
create_python_venv,
get_ipv4_addr,
install_python_requirements,
unit_file_exists,
)
# noinspection PyMethodMayBeStatic
class MoonrakerSetupService:
__cls_instance = None
kisvc: KlipperInstanceService
misvc: MoonrakerInstanceService
msgsvc = MessageService
settings: KiauhSettings
klipper_list: List[Klipper]
moonraker_list: List[Moonraker]
def __new__(cls) -> "MoonrakerSetupService":
if cls.__cls_instance is None:
cls.__cls_instance = super(MoonrakerSetupService, cls).__new__(cls)
return cls.__cls_instance
def __init__(self) -> None:
if not hasattr(self, "__initialized"):
self.__initialized = False
if self.__initialized:
return
self.__initialized = True
self.__init_state()
def __init_state(self) -> None:
self.settings = KiauhSettings()
self.kisvc = KlipperInstanceService()
self.kisvc.load_instances()
self.klipper_list = self.kisvc.get_all_instances()
self.misvc = MoonrakerInstanceService()
self.misvc.load_instances()
self.moonraker_list = self.misvc.get_all_instances()
self.msgsvc = MessageService()
def __refresh_state(self) -> None:
self.kisvc.load_instances()
self.klipper_list = self.kisvc.get_all_instances()
self.misvc.load_instances()
self.moonraker_list = self.misvc.get_all_instances()
def install(self) -> None:
self.__refresh_state()
if not self.__check_requirements(self.klipper_list):
return
new_instances: List[Moonraker] = []
selected_option: str | Klipper
if len(self.klipper_list) == 1:
suffix: str = self.klipper_list[0].suffix
new_inst = self.misvc.create_new_instance(suffix)
new_instances.append(new_inst)
else:
print_moonraker_overview(
self.klipper_list,
self.moonraker_list,
show_index=True,
show_select_all=True,
)
options = {str(i + 1): k for i, k in enumerate(self.klipper_list)}
additional_options = {"a": None, "b": None}
options = {**options, **additional_options}
question = "Select Klipper instance to setup Moonraker for"
selected_option = get_selection_input(question, options)
if selected_option == "b":
Logger.print_status(EXIT_MOONRAKER_SETUP)
return
if selected_option == "a":
new_inst_list: List[Moonraker] = [
self.misvc.create_new_instance(k.suffix) for k in self.klipper_list
]
new_instances.extend(new_inst_list)
else:
klipper_instance: Klipper | None = options.get(selected_option)
if klipper_instance is None:
raise Exception("Error selecting instance!")
new_inst = self.misvc.create_new_instance(klipper_instance.suffix)
new_instances.append(new_inst)
create_example_cfg = get_confirm("Create example moonraker.conf?")
try:
self.__run_setup(new_instances, create_example_cfg)
except Exception as e:
Logger.print_error(f"Error while installing Moonraker: {e}")
return
def update(self) -> None:
Logger.print_dialog(
DialogType.WARNING,
[
"Be careful if there are ongoing prints running!",
"All Moonraker instances will be restarted during the update process and "
"ongoing prints COULD FAIL.",
],
)
if not get_confirm("Update Moonraker now?"):
return
self.__refresh_state()
if self.settings.kiauh.backup_before_update:
backup_moonraker_dir()
InstanceManager.stop_all(self.moonraker_list)
git_pull_wrapper(MOONRAKER_DIR)
install_moonraker_packages()
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
InstanceManager.start_all(self.moonraker_list)
def remove(
self,
remove_service: bool,
remove_dir: bool,
remove_env: bool,
remove_polkit: bool,
) -> None:
self.__refresh_state()
completion_msg = Message(
title="Moonraker Removal Process completed",
color=Color.GREEN,
)
if remove_service:
Logger.print_status("Removing Moonraker instances ...")
if self.moonraker_list:
instances_to_remove = self.__get_instances_to_remove()
self.__remove_instances(instances_to_remove)
if instances_to_remove:
instance_names = [
i.service_file_path.stem for i in instances_to_remove
]
txt = f"● Moonraker instances removed: {', '.join(instance_names)}"
completion_msg.text.append(txt)
else:
Logger.print_info("No Moonraker Services installed! Skipped ...")
if (remove_polkit or remove_dir or remove_env) and unit_file_exists(
"moonraker", suffix="service"
):
completion_msg.text = [
"Some Klipper services are still installed:",
"● Moonraker PolicyKit rules were not removed, even though selected for removal.",
f"'{MOONRAKER_DIR}' was not removed, even though selected for removal.",
f"'{MOONRAKER_ENV_DIR}' was not removed, even though selected for removal.",
]
else:
if remove_polkit:
Logger.print_status("Removing all Moonraker policykit rules ...")
if remove_polkit_rules():
completion_msg.text.append("● Moonraker policykit rules removed")
if remove_dir:
Logger.print_status("Removing Moonraker local repository ...")
if run_remove_routines(MOONRAKER_DIR):
completion_msg.text.append("● Moonraker local repository removed")
if remove_env:
Logger.print_status("Removing Moonraker Python environment ...")
if run_remove_routines(MOONRAKER_ENV_DIR):
completion_msg.text.append("● Moonraker Python environment removed")
if completion_msg.text:
completion_msg.text.insert(0, "The following actions were performed:")
else:
completion_msg.color = Color.YELLOW
completion_msg.centered = True
completion_msg.text = ["Nothing to remove."]
self.msgsvc.set_message(completion_msg)
def __run_setup(
self, new_instances: List[Moonraker], create_example_cfg: bool
) -> None:
check_install_dependencies()
self.__install_deps()
ports_map = self.misvc.get_instance_port_map()
for i in new_instances:
i.create()
cmd_sysctl_service(i.service_file_path.name, "enable")
if create_example_cfg:
# if a webclient and/or it's config is installed, patch
# its update section to the config
clients = get_existing_clients()
create_example_moonraker_conf(i, ports_map, clients)
cmd_sysctl_service(i.service_file_path.name, "start")
cmd_sysctl_manage("daemon-reload")
# if mainsail is installed, and we installed
# multiple moonraker instances, we enable mainsails remote mode
if MainsailData().client_dir.exists() and len(self.moonraker_list) > 1:
enable_mainsail_remotemode()
self.misvc.load_instances()
new_instances = [
self.misvc.get_instance_by_suffix(i.suffix) for i in new_instances
]
ip: str = get_ipv4_addr()
# noinspection HttpUrlsUsage
url_list = [
f"{i.service_file_path.stem}: http://{ip}:{i.port}"
for i in new_instances
if i.port
]
dialog_content = []
if url_list:
dialog_content.append("You can access Moonraker via the following URL:")
dialog_content.extend(url_list)
Logger.print_dialog(
DialogType.CUSTOM,
custom_title="Moonraker successfully installed!",
custom_color=Color.GREEN,
content=dialog_content,
)
def __check_requirements(self, klipper_list: List[Klipper]) -> bool:
is_klipper_installed = len(klipper_list) >= 1
if not is_klipper_installed:
Logger.print_warn("Klipper not installed!")
Logger.print_warn("Moonraker cannot be installed! Install Klipper first.")
is_python_ok = check_python_version(3, 7)
return is_klipper_installed and is_python_ok
def __install_deps(self) -> None:
default_repo = (MOONRAKER_REPO_URL, "master")
repo = self.settings.moonraker.repositories
# pull the first repo defined in kiauh.cfg or fallback to the official Moonraker repo
repo, branch = (repo[0].url, repo[0].branch) if repo else default_repo
git_clone_wrapper(repo, MOONRAKER_DIR, branch)
try:
install_moonraker_packages()
if create_python_venv(MOONRAKER_ENV_DIR, False, False, self.settings.moonraker.use_python_binary):
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
if self.settings.moonraker.optional_speedups:
install_python_requirements(
MOONRAKER_ENV_DIR, MOONRAKER_SPEEDUPS_REQ_FILE
)
self.__install_polkit()
except Exception:
Logger.print_error("Error during installation of Moonraker requirements!")
raise
def __install_polkit(self) -> None:
Logger.print_status("Installing Moonraker policykit rules ...")
legacy_file_exists = check_file_exist(POLKIT_LEGACY_FILE, True)
polkit_file_exists = check_file_exist(POLKIT_FILE, True)
usr_file_exists = check_file_exist(POLKIT_USR_FILE, True)
if legacy_file_exists or (polkit_file_exists and usr_file_exists):
Logger.print_info("Moonraker policykit rules are already installed.")
return
try:
command = [POLKIT_SCRIPT, "--disable-systemctl"]
result = run(
command,
stderr=PIPE,
stdout=DEVNULL,
text=True,
)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
Logger.print_error("Installing Moonraker policykit rules failed!")
return
Logger.print_ok("Moonraker policykit rules successfully installed!")
except CalledProcessError as e:
log = (
f"Error while installing Moonraker policykit rules: {e.stderr.decode()}"
)
Logger.print_error(log)
def __get_instances_to_remove(self) -> List[Moonraker] | None:
start_index = 1
curr_instances: List[Moonraker] = self.moonraker_list
instance_count = len(curr_instances)
options = [str(i + start_index) for i in range(instance_count)]
options.extend(["a", "b"])
instance_map = {
options[i]: self.moonraker_list[i] for i in range(instance_count)
}
print_instance_overview(
self.moonraker_list,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Moonraker instance to remove", options)
if selection == "b":
return None
elif selection == "a":
return copy(self.moonraker_list)
return [instance_map[selection]]
def __remove_instances(
self,
instance_list: List[Moonraker] | None,
) -> None:
if not instance_list:
return
for instance in instance_list:
Logger.print_status(
f"Removing instance {instance.service_file_path.stem} ..."
)
InstanceManager.remove(instance)
self.__delete_env_file(instance)
self.__refresh_state()
def __delete_env_file(self, instance: Moonraker):
Logger.print_status(f"Remove '{instance.env_file}'")
if not instance.env_file.exists():
msg = f"Env file in {instance.base.sysd_dir} not found. Skipped ..."
Logger.print_info(msg)
return
run_remove_routines(instance.env_file)

View File

@@ -9,6 +9,7 @@
import json
import shutil
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError, run
from typing import Dict, List, Optional
from components.moonraker import (
@@ -16,10 +17,13 @@ from components.moonraker import (
MOONRAKER_BACKUP_DIR,
MOONRAKER_DB_BACKUP_DIR,
MOONRAKER_DEFAULT_PORT,
MOONRAKER_DEPS_JSON_FILE,
MOONRAKER_DIR,
MOONRAKER_ENV_DIR,
MOONRAKER_INSTALL_SCRIPT,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.utils.sysdeps_parser import SysDepsParser
from components.webui_client.base_data import BaseWebClient
from core.backup_manager.backup_manager import BackupManager
from core.logger import Logger
@@ -27,10 +31,11 @@ from core.submodules.simple_config_parser.src.simple_config_parser.simple_config
SimpleConfigParser,
)
from core.types.component_status import ComponentStatus
from utils.common import get_install_status
from utils.common import check_install_dependencies, get_install_status
from utils.instance_utils import get_instances
from utils.sys_utils import (
get_ipv4_addr,
parse_packages_from_file,
)
@@ -38,6 +43,46 @@ def get_moonraker_status() -> ComponentStatus:
return get_install_status(MOONRAKER_DIR, MOONRAKER_ENV_DIR, Moonraker)
def install_moonraker_packages() -> None:
Logger.print_status("Parsing Moonraker system dependencies ...")
moonraker_deps = []
if MOONRAKER_DEPS_JSON_FILE.exists():
Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_DEPS_JSON_FILE.name} ..."
)
parser = SysDepsParser()
sysdeps = load_sysdeps_json(MOONRAKER_DEPS_JSON_FILE)
moonraker_deps.extend(parser.parse_dependencies(sysdeps))
elif MOONRAKER_INSTALL_SCRIPT.exists():
Logger.print_warn(f"{MOONRAKER_DEPS_JSON_FILE.name} not found!")
Logger.print_info(
f"Parsing system dependencies from {MOONRAKER_INSTALL_SCRIPT.name} ..."
)
moonraker_deps = parse_packages_from_file(MOONRAKER_INSTALL_SCRIPT)
if not moonraker_deps:
raise ValueError("Error parsing Moonraker dependencies!")
check_install_dependencies({*moonraker_deps})
def remove_polkit_rules() -> bool:
if not MOONRAKER_DIR.exists():
log = "Cannot remove policykit rules. Moonraker directory not found."
Logger.print_warn(log)
return False
try:
cmd = [f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh", "--clear"]
run(cmd, stderr=PIPE, stdout=DEVNULL, check=True)
return True
except CalledProcessError as e:
Logger.print_error(f"Error while removing policykit rules: {e}")
return False
def create_example_moonraker_conf(
instance: Moonraker,
ports_map: Dict[str, int],

View File

@@ -106,7 +106,7 @@ def update_client_config(client: BaseWebClient) -> None:
if settings.kiauh.backup_before_update:
backup_client_config_data(client)
git_pull_wrapper(client_config.repo_url, client_config.config_dir)
git_pull_wrapper(client_config.config_dir)
Logger.print_ok(f"Successfully updated {client_config.display_name}.")
Logger.print_info("Restart Klipper to reload the configuration!")

View File

@@ -414,7 +414,7 @@ def get_client_port_selection(
while True:
_type = "Reconfigure" if reconfigure else "Configure"
question = f"{_type} {client.display_name} for port"
port_input = get_number_input(question, min_count=80, default=port)
port_input = get_number_input(question, min_value=80, default=port)
if port_input not in ports_in_use:
client_settings: WebUiSettings = settings[client.name]

View File

@@ -91,7 +91,8 @@ class Logger:
color = Logger._get_dialog_color(title, custom_color)
dialog_title = Logger._get_dialog_title(title, custom_title)
print("\n" * margin_top)
if margin_top > 0:
print("\n" * margin_top, end="")
print(Color.apply(BORDER_TOP, color))
@@ -111,7 +112,8 @@ class Logger:
print(Color.apply(BORDER_BOTTOM, color))
print("\n" * margin_bottom)
if margin_bottom > 0:
print("\n" * margin_bottom, end="")
@staticmethod
def _get_dialog_title(

View File

@@ -13,6 +13,7 @@ from typing import Type
from components.klipper import KLIPPER_DIR
from components.klipper.klipper import Klipper
from components.klipper.klipper_utils import install_input_shaper_deps
from components.klipper_firmware.menus.klipper_build_menu import (
KlipperBuildFirmwareMenu,
KlipperKConfigMenu,
@@ -50,9 +51,10 @@ class AdvancedMenu(BaseMenu):
"2": Option(method=self.flash),
"3": Option(method=self.build_flash),
"4": Option(method=self.get_id),
"5": Option(method=self.klipper_rollback),
"6": Option(method=self.moonraker_rollback),
"7": Option(method=self.change_hostname),
"5": Option(method=self.input_shaper),
"6": Option(method=self.klipper_rollback),
"7": Option(method=self.moonraker_rollback),
"8": Option(method=self.change_hostname),
}
def print_menu(self) -> None:
@@ -60,11 +62,13 @@ class AdvancedMenu(BaseMenu):
"""
╟───────────────────────────┬───────────────────────────╢
║ Klipper Firmware: │ Repository Rollback: ║
║ 1) [Build] │ 5) [Klipper] ║
║ 2) [Flash] │ 6) [Moonraker] ║
║ 1) [Build] │ 6) [Klipper] ║
║ 2) [Flash] │ 7) [Moonraker] ║
║ 3) [Build + Flash] │ ║
║ 4) [Get MCU ID] │ System: ║
║ │ 7) [Change hostname] ║
║ │ 8) [Change hostname] ║
║ Extra Dependencies: │ ║
║ 5) [Input Shaper] │ ║
╟───────────────────────────┴───────────────────────────╢
"""
)[1:]
@@ -97,3 +101,6 @@ class AdvancedMenu(BaseMenu):
def change_hostname(self, **kwargs) -> None:
change_system_hostname()
def input_shaper(self, **kwargs) -> None:
install_input_shaper_deps()

View File

@@ -14,7 +14,7 @@ from typing import Type
from components.crowsnest.crowsnest import install_crowsnest
from components.klipper.services.klipper_setup_service import KlipperSetupService
from components.klipperscreen.klipperscreen import install_klipperscreen
from components.moonraker import moonraker_setup
from components.moonraker.services.moonraker_setup_service import MoonrakerSetupService
from components.webui_client.client_config.client_config_setup import (
install_client_config,
)
@@ -37,6 +37,7 @@ class InstallMenu(BaseMenu):
self.title_color = Color.GREEN
self.previous_menu: Type[BaseMenu] | None = previous_menu
self.klsvc = KlipperSetupService()
self.mrsvc = MoonrakerSetupService()
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.main_menu import MainMenu
@@ -79,7 +80,7 @@ class InstallMenu(BaseMenu):
self.klsvc.install()
def install_moonraker(self, **kwargs) -> None:
moonraker_setup.install_moonraker()
self.mrsvc.install()
def install_mainsail(self, **kwargs) -> None:
client: MainsailData = MainsailData()

View File

@@ -0,0 +1,79 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from typing import List, Literal, Type
from core.logger import Logger
from core.menus import Option
from core.menus.base_menu import BaseMenu
from core.settings.kiauh_settings import KiauhSettings, Repository
from core.types.color import Color
from procedures.switch_repo import run_switch_repo_routine
class RepoSelectMenu(BaseMenu):
def __init__(
self,
name: Literal["klipper", "moonraker"],
repos: List[Repository],
previous_menu: Type[BaseMenu] | None = None,
) -> None:
super().__init__()
self.title_color = Color.CYAN
self.previous_menu = previous_menu
self.settings = KiauhSettings()
self.input_label_txt = "Select repository"
self.name = name
self.repos = repos
if self.name == "klipper":
self.title = "Klipper Repository Selection Menu"
elif self.name == "moonraker":
self.title = "Moonraker Repository Selection Menu"
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.settings_menu import SettingsMenu
self.previous_menu = (
previous_menu if previous_menu is not None else SettingsMenu
)
def set_options(self) -> None:
self.options = {}
if not self.repos:
return
for idx, repo in enumerate(self.repos, start=1):
self.options[str(idx)] = Option(
method=self.select_repository, opt_data=repo
)
def print_menu(self) -> None:
menu = "╟───────────────────────────────────────────────────────╢\n"
menu += "║ Available Repositories: ║\n"
menu += "╟───────────────────────────────────────────────────────╢\n"
for idx, repo in enumerate(self.repos, start=1):
url = f"● Repo: {repo.url.replace('.git', '')}"
branch = f"└► Branch: {repo.branch}"
menu += f"{idx}) {Color.apply(url, Color.CYAN):<59}\n"
menu += f"{Color.apply(branch, Color.CYAN):<59}\n"
menu += "╟───────────────────────────────────────────────────────╢\n"
print(menu, end="")
def select_repository(self, **kwargs) -> None:
repo: Repository = kwargs.get("opt_data")
Logger.print_status(
f"Switching to {self.name.capitalize()}'s new source repository ..."
)
run_switch_repo_routine(self.name, repo.url, repo.branch)

View File

@@ -9,20 +9,17 @@
from __future__ import annotations
import textwrap
from pathlib import Path
from typing import Literal, Tuple, Type
from typing import Type
from components.klipper import KLIPPER_DIR, KLIPPER_REPO_URL
from components.klipper.klipper_utils import get_klipper_status
from components.moonraker import MOONRAKER_DIR, MOONRAKER_REPO_URL
from components.moonraker.utils.utils import get_moonraker_status
from core.logger import DialogType, Logger
from core.menus import Option
from core.menus.base_menu import BaseMenu
from core.settings.kiauh_settings import KiauhSettings, RepoSettings
from core.menus.repo_select_menu import RepoSelectMenu
from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from procedures.switch_repo import run_switch_repo_routine
from utils.input_utils import get_confirm, get_string_input
from core.types.component_status import ComponentStatus
# noinspection PyUnusedLocal
@@ -37,8 +34,14 @@ class SettingsMenu(BaseMenu):
self.mainsail_unstable: bool | None = None
self.fluidd_unstable: bool | None = None
self.auto_backups_enabled: bool | None = None
na: str = "Not available!"
self.kl_repo_url: str = Color.apply(na, Color.RED)
self.kl_branch: str = Color.apply(na, Color.RED)
self.mr_repo_url: str = Color.apply(na, Color.RED)
self.mr_branch: str = Color.apply(na, Color.RED)
self._load_settings()
print(self.klipper_status)
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.main_menu import MainMenu
@@ -47,54 +50,39 @@ class SettingsMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"1": Option(method=self.set_klipper_repo),
"2": Option(method=self.set_moonraker_repo),
"1": Option(method=self.switch_klipper_repo),
"2": Option(method=self.switch_moonraker_repo),
"3": Option(method=self.toggle_mainsail_release),
"4": Option(method=self.toggle_fluidd_release),
"5": Option(method=self.toggle_backup_before_update),
}
def print_menu(self) -> None:
color = Color.CYAN
checked = f"[{Color.apply('x', Color.GREEN)}]"
unchecked = "[ ]"
kl_repo: str = Color.apply(self.klipper_status.repo, color)
kl_branch: str = Color.apply(self.klipper_status.branch, color)
kl_owner: str = Color.apply(self.klipper_status.owner, color)
mr_repo: str = Color.apply(self.moonraker_status.repo, color)
mr_branch: str = Color.apply(self.moonraker_status.branch, color)
mr_owner: str = Color.apply(self.moonraker_status.owner, color)
o1 = checked if self.mainsail_unstable else unchecked
o2 = checked if self.fluidd_unstable else unchecked
o3 = checked if self.auto_backups_enabled else unchecked
menu = textwrap.dedent(
f"""
╟───────────────────────────────────────────────────────╢
Klipper:
● Repo: {kl_repo:51}
● Owner: {kl_owner:51}
Branch: {kl_branch:51}
1) Switch Klipper source repository
● Current repository:
└► Repo: {self.kl_repo_url:50}
└► Branch: {self.kl_branch:48}
╟───────────────────────────────────────────────────────╢
Moonraker:
● Repo: {mr_repo:51}
● Owner: {mr_owner:51}
Branch: {mr_branch:51}
2) Switch Moonraker source repository
● Current repository:
└► Repo: {self.mr_repo_url:50}
└► Branch: {self.mr_branch:48}
╟───────────────────────────────────────────────────────╢
║ Install unstable releases: ║
{o1} Mainsail
{o2} Fluidd
3) {o1} Mainsail ║
4) {o2} Fluidd ║
╟───────────────────────────────────────────────────────╢
║ Auto-Backup: ║
{o3} Automatic backup before update ║
╟───────────────────────────────────────────────────────╢
║ 1) Set Klipper source repository ║
║ 2) Set Moonraker source repository ║
║ ║
║ 3) Toggle unstable Mainsail releases ║
║ 4) Toggle unstable Fluidd releases ║
║ ║
║ 5) Toggle automatic backups before updates ║
5) {o3} Backup before update
╟───────────────────────────────────────────────────────╢
"""
)[1:]
@@ -106,103 +94,43 @@ class SettingsMenu(BaseMenu):
self.mainsail_unstable = self.settings.mainsail.unstable_releases
self.fluidd_unstable = self.settings.fluidd.unstable_releases
# by default, we show the status of the installed repositories
self.klipper_status = get_klipper_status()
self.moonraker_status = get_moonraker_status()
# if the repository is not installed, we show the status of the settings from the config file
if self.klipper_status.repo == "-":
url_parts = self.settings.klipper.repo_url.split("/")
self.klipper_status.repo = url_parts[-1]
self.klipper_status.owner = url_parts[-2]
self.klipper_status.branch = self.settings.klipper.branch
if self.moonraker_status.repo == "-":
url_parts = self.settings.moonraker.repo_url.split("/")
self.moonraker_status.repo = url_parts[-1]
self.moonraker_status.owner = url_parts[-2]
self.moonraker_status.branch = self.settings.moonraker.branch
klipper_status: ComponentStatus = get_klipper_status()
moonraker_status: ComponentStatus = get_moonraker_status()
def _gather_input(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> Tuple[str, str]:
warn_msg = [
"There is only basic input validation in place! "
"Make sure your the input is valid and has no typos or invalid characters!"
]
def trim_repo_url(repo: str) -> str:
return repo.replace(".git", "").replace("https://", "").replace("git@", "")
if repo_dir.exists():
warn_msg.extend(
[
"For the change to take effect, the new repository will be cloned. "
"A backup of the old repository will be created.",
"\n\n",
"Make sure you don't have any ongoing prints running, as the services "
"will be restarted during this process! You will loose any ongoing print!",
]
)
if not klipper_status.repo == "-":
url = trim_repo_url(klipper_status.repo_url)
self.kl_repo_url = Color.apply(url, Color.CYAN)
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
if not moonraker_status.repo == "-":
url = trim_repo_url(moonraker_status.repo_url)
self.mr_repo_url = Color.apply(url, Color.CYAN)
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)
Logger.print_dialog(DialogType.ATTENTION, warn_msg)
repo = get_string_input(
"Enter new repository URL",
regex=r"^[\w/.:-]+$",
default=KLIPPER_REPO_URL if repo_name == "klipper" else MOONRAKER_REPO_URL,
)
branch = get_string_input(
"Enter new branch name", regex=r"^.+$", default="master"
)
return repo, branch
def _set_repo(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
repo_url, branch = self._gather_input(repo_name, repo_dir)
display_name = repo_name.capitalize()
def _warn_no_repos(self, name: str) -> None:
Logger.print_dialog(
DialogType.CUSTOM,
[
f"New {display_name} repository URL:",
f"{repo_url}",
f"New {display_name} repository branch:",
f"{branch}",
],
DialogType.WARNING,
[f"No {name} repositories configured in kiauh.cfg!"],
center_content=True,
)
if get_confirm("Apply changes?", allow_go_back=True):
repo: RepoSettings = self.settings[repo_name]
repo.repo_url = repo_url
repo.branch = branch
self.settings.save()
self._load_settings()
Logger.print_ok("Changes saved!")
else:
Logger.print_info(
f"Changing of {display_name} source repository canceled ..."
)
def switch_klipper_repo(self, **kwargs) -> None:
name = "Klipper"
repos = self.settings.klipper.repositories
if not repos:
self._warn_no_repos(name)
return
RepoSelectMenu(name.lower(), repos=repos, previous_menu=self.__class__).run()
self._switch_repo(repo_name, repo_dir)
def _switch_repo(
self, name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
if not repo_dir.exists():
def switch_moonraker_repo(self, **kwargs) -> None:
name = "Moonraker"
repos = self.settings.moonraker.repositories
if not repos:
self._warn_no_repos(name)
return
Logger.print_status(
f"Switching to {name.capitalize()}'s new source repository ..."
)
repo: RepoSettings = self.settings[name]
run_switch_repo_routine(name, repo)
def set_klipper_repo(self, **kwargs) -> None:
self._set_repo("klipper", KLIPPER_DIR)
def set_moonraker_repo(self, **kwargs) -> None:
self._set_repo("moonraker", MOONRAKER_DIR)
RepoSelectMenu(name.lower(), repos=repos, previous_menu=self.__class__).run()
def toggle_mainsail_release(self, **kwargs) -> None:
self.mainsail_unstable = not self.mainsail_unstable

View File

@@ -20,7 +20,7 @@ from components.klipperscreen.klipperscreen import (
get_klipperscreen_status,
update_klipperscreen,
)
from components.moonraker.moonraker_setup import update_moonraker
from components.moonraker.services.moonraker_setup_service import MoonrakerSetupService
from components.moonraker.utils.utils import get_moonraker_status
from components.webui_client.client_config.client_config_setup import (
update_client_config,
@@ -197,7 +197,8 @@ class UpdateMenu(BaseMenu):
self._run_update_routine("klipper", klsvc.update)
def update_moonraker(self, **kwargs) -> None:
self._run_update_routine("moonraker", update_moonraker)
mrsvc = MoonrakerSetupService()
self._run_update_routine("moonraker", mrsvc.update)
def update_mainsail(self, **kwargs) -> None:
self._run_update_routine(

View File

@@ -8,15 +8,18 @@
# ======================================================================= #
from __future__ import annotations
import shutil
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Callable, List, TypeVar
from components.klipper import KLIPPER_REPO_URL
from components.moonraker import MOONRAKER_REPO_URL
from core.backup_manager.backup_manager import BackupManager
from core.logger import DialogType, Logger
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
NoOptionError,
NoSectionError,
SimpleConfigParser,
)
from utils.input_utils import get_confirm
from utils.sys_utils import kill
from kiauh import PROJECT_ROOT
@@ -24,6 +27,16 @@ from kiauh import PROJECT_ROOT
DEFAULT_CFG = PROJECT_ROOT.joinpath("default.kiauh.cfg")
CUSTOM_CFG = PROJECT_ROOT.joinpath("kiauh.cfg")
T = TypeVar("T")
class InvalidValueError(Exception):
"""Raised when a value is invalid for an option"""
def __init__(self, section: str, option: str, value: str):
msg = f"Invalid value '{value}' for option '{option}' in section '{section}'"
super().__init__(msg)
@dataclass
class AppSettings:
@@ -31,26 +44,40 @@ class AppSettings:
@dataclass
class RepoSettings:
repo_url: str | None = field(default=None)
branch: str | None = field(default=None)
class Repository:
url: str
branch: str
@dataclass
class KlipperSettings:
repositories: List[Repository] | None = field(default=None)
use_python_binary: str | None = field(default=None)
@dataclass
class MoonrakerSettings:
optional_speedups: bool | None = field(default=None)
repositories: List[Repository] | None = field(default=None)
use_python_binary: str | None = field(default=None)
@dataclass
class WebUiSettings:
port: str | None = field(default=None)
port: int | None = field(default=None)
unstable_releases: bool | None = field(default=None)
# noinspection PyUnusedLocal
# noinspection PyMethodMayBeStatic
class KiauhSettings:
_instance = None
__instance = None
__initialized = False
def __new__(cls, *args, **kwargs) -> "KiauhSettings":
if cls._instance is None:
cls._instance = super(KiauhSettings, cls).__new__(cls, *args, **kwargs)
return cls._instance
if cls.__instance is None:
cls.__instance = super(KiauhSettings, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def __repr__(self) -> str:
return (
@@ -63,20 +90,20 @@ class KiauhSettings:
return getattr(self, item)
def __init__(self) -> None:
if not hasattr(self, "__initialized"):
self.__initialized = False
if self.__initialized:
return
self.__initialized = True
self.config = SimpleConfigParser()
self.kiauh = AppSettings()
self.klipper = RepoSettings()
self.moonraker = RepoSettings()
self.klipper = KlipperSettings()
self.moonraker = MoonrakerSettings()
self.mainsail = WebUiSettings()
self.fluidd = WebUiSettings()
self._load_config()
self.__read_config_set_internal_state()
# todo: refactor this, at least rename to something else!
def get(self, section: str, option: str) -> str | int | bool:
"""
Get a value from the settings state by providing the section and option name as
@@ -94,107 +121,294 @@ class KiauhSettings:
raise
def save(self) -> None:
self._set_config_options_state()
self.config.write_file(CUSTOM_CFG)
self._load_config()
self.__write_internal_state_to_cfg()
self.__read_config_set_internal_state()
def _load_config(self) -> None:
def __read_config_set_internal_state(self) -> None:
if not CUSTOM_CFG.exists() and not DEFAULT_CFG.exists():
self._kill()
cfg = CUSTOM_CFG if CUSTOM_CFG.exists() else DEFAULT_CFG
self.config.read_file(cfg)
self._validate_cfg()
self._apply_settings_from_file()
def _validate_cfg(self) -> None:
try:
self._validate_bool("kiauh", "backup_before_update")
self._validate_str("klipper", "repo_url")
self._validate_str("klipper", "branch")
self._validate_int("mainsail", "port")
self._validate_bool("mainsail", "unstable_releases")
self._validate_int("fluidd", "port")
self._validate_bool("fluidd", "unstable_releases")
except ValueError:
err = f"Invalid value for option '{self._v_option}' in section '{self._v_section}'"
Logger.print_error(err)
kill()
except NoSectionError:
err = f"Missing section '{self._v_section}' in config file"
Logger.print_error(err)
kill()
except NoOptionError:
err = f"Missing option '{self._v_option}' in section '{self._v_section}'"
Logger.print_error(err)
Logger.print_dialog(
DialogType.ERROR,
[
"No KIAUH configuration file found! Please make sure you have at least "
"one of the following configuration files in KIAUH's root directory:",
"● default.kiauh.cfg",
"● kiauh.cfg",
],
)
kill()
def _validate_bool(self, section: str, option: str) -> None:
self._v_section, self._v_option = (section, option)
(bool(self.config.getboolean(section, option)))
# copy default config to custom config if it does not exist
if not CUSTOM_CFG.exists():
shutil.copyfile(DEFAULT_CFG, CUSTOM_CFG)
def _validate_int(self, section: str, option: str) -> None:
self._v_section, self._v_option = (section, option)
int(self.config.getint(section, option))
self.config.read_file(CUSTOM_CFG)
def _validate_str(self, section: str, option: str) -> None:
self._v_section, self._v_option = (section, option)
v = self.config.getval(section, option)
# check if there are deprecated repo_url and branch options in the kiauh.cfg
if self._check_deprecated_repo_config():
self._prompt_migration_dialog()
if not v:
raise ValueError
self.__set_internal_state()
def _apply_settings_from_file(self) -> None:
self.kiauh.backup_before_update = self.config.getboolean(
"kiauh", "backup_before_update"
)
self.klipper.repo_url = self.config.getval("klipper", "repo_url")
self.klipper.branch = self.config.getval("klipper", "branch")
self.moonraker.repo_url = self.config.getval("moonraker", "repo_url")
self.moonraker.branch = self.config.getval("moonraker", "branch")
self.mainsail.port = self.config.getint("mainsail", "port")
self.mainsail.unstable_releases = self.config.getboolean(
"mainsail", "unstable_releases"
)
self.fluidd.port = self.config.getint("fluidd", "port")
self.fluidd.unstable_releases = self.config.getboolean(
"fluidd", "unstable_releases"
)
def _set_config_options_state(self) -> None:
self.config.set_option(
def __set_internal_state(self) -> None:
# parse Kiauh options
self.kiauh.backup_before_update = self.__read_from_cfg(
"kiauh",
"backup_before_update",
str(self.kiauh.backup_before_update),
)
self.config.set_option("klipper", "repo_url", self.klipper.repo_url)
self.config.set_option("klipper", "branch", self.klipper.branch)
self.config.set_option("moonraker", "repo_url", self.moonraker.repo_url)
self.config.set_option("moonraker", "branch", self.moonraker.branch)
self.config.set_option("mainsail", "port", str(self.mainsail.port))
self.config.set_option(
"mainsail",
"unstable_releases",
str(self.mainsail.unstable_releases),
)
self.config.set_option("fluidd", "port", str(self.fluidd.port))
self.config.set_option(
"fluidd", "unstable_releases", str(self.fluidd.unstable_releases)
self.config.getboolean,
False,
)
def _kill(self) -> None:
# parse Klipper options
self.klipper.use_python_binary = self.__read_from_cfg(
"klipper",
"use_python_binary",
self.config.getval,
None,
True,
)
kl_repos: List[str] = self.__read_from_cfg(
"klipper",
"repositories",
self.config.getvals,
[KLIPPER_REPO_URL],
)
self.klipper.repositories = self.__set_repo_state("klipper", kl_repos)
# parse Moonraker options
self.moonraker.use_python_binary = self.__read_from_cfg(
"moonraker",
"use_python_binary",
self.config.getval,
None,
True,
)
self.moonraker.optional_speedups = self.__read_from_cfg(
"moonraker",
"optional_speedups",
self.config.getboolean,
True,
)
mr_repos: List[str] = self.__read_from_cfg(
"moonraker",
"repositories",
self.config.getvals,
[MOONRAKER_REPO_URL],
)
self.moonraker.repositories = self.__set_repo_state("moonraker", mr_repos)
# parse Mainsail options
self.mainsail.port = self.__read_from_cfg(
"mainsail",
"port",
self.config.getint,
80,
)
self.mainsail.unstable_releases = self.__read_from_cfg(
"mainsail",
"unstable_releases",
self.config.getboolean,
False,
)
# parse Fluidd options
self.fluidd.port = self.__read_from_cfg(
"fluidd",
"port",
self.config.getint,
80,
)
self.fluidd.unstable_releases = self.__read_from_cfg(
"fluidd",
"unstable_releases",
self.config.getboolean,
False,
)
def __check_option_exists(
self, section: str, option: str, fallback: Any, silent: bool = False
) -> bool:
has_section = self.config.has_section(section)
has_option = self.config.has_option(section, option)
if not (has_section and has_option):
if not silent:
Logger.print_warn(
f"Option '{option}' in section '{section}' not defined. Falling back to '{fallback}'."
)
return False
return True
def __read_bool_from_cfg(
self,
section: str,
option: str,
fallback: bool | None = None,
silent: bool = False,
) -> bool | None:
if not self.__check_option_exists(section, option, fallback, silent):
return fallback
return self.config.getboolean(section, option, fallback)
def __read_from_cfg(
self,
section: str,
option: str,
getter: Callable[[str, str, T | None], T],
fallback: T = None,
silent: bool = False,
) -> T:
if not self.__check_option_exists(section, option, fallback, silent):
return fallback
return getter(section, option, fallback)
def __set_repo_state(self, section: str, repos: List[str]) -> List[Repository]:
_repos: List[Repository] = []
for repo in repos:
try:
if repo.strip().startswith("#") or repo.strip().startswith(";"):
continue
if "," in repo:
url, branch = repo.strip().split(",")
if not branch:
branch = "master"
else:
url = repo.strip()
branch = "master"
# url must not be empty otherwise it's considered
# as an unrecoverable, invalid configuration
if not url:
raise InvalidValueError(section, "repositories", repo)
_repos.append(Repository(url.strip(), branch.strip()))
except InvalidValueError as e:
Logger.print_error(f"Error parsing kiauh.cfg: {e}")
kill()
return _repos
def __write_internal_state_to_cfg(self) -> None:
"""Updates the config with current settings, preserving values that haven't been modified"""
if self.kiauh.backup_before_update is not None:
self.config.set_option(
"kiauh",
"backup_before_update",
str(self.kiauh.backup_before_update),
)
# Handle repositories
if self.klipper.repositories is not None:
repos = [f"{repo.url}, {repo.branch}" for repo in self.klipper.repositories]
self.config.set_option("klipper", "repositories", repos)
if self.moonraker.repositories is not None:
repos = [
f"{repo.url}, {repo.branch}" for repo in self.moonraker.repositories
]
self.config.set_option("moonraker", "repositories", repos)
# Handle Mainsail settings
if self.mainsail.port is not None:
self.config.set_option("mainsail", "port", str(self.mainsail.port))
if self.mainsail.unstable_releases is not None:
self.config.set_option(
"mainsail",
"unstable_releases",
str(self.mainsail.unstable_releases),
)
# Handle Fluidd settings
if self.fluidd.port is not None:
self.config.set_option("fluidd", "port", str(self.fluidd.port))
if self.fluidd.unstable_releases is not None:
self.config.set_option(
"fluidd", "unstable_releases", str(self.fluidd.unstable_releases)
)
self.config.write_file(CUSTOM_CFG)
def _check_deprecated_repo_config(self) -> bool:
# repo_url and branch are deprecated - 2025.03.23
for section in ["klipper", "moonraker"]:
if self.config.has_option(section, "repo_url") or self.config.has_option(
section, "branch"
):
return True
return False
def _prompt_migration_dialog(self) -> None:
migration_1: List[str] = [
"Options 'repo_url' and 'branch' are now combined into a 'repositories' option.",
"\n\n",
"● Old format:",
" [klipper]",
" repo_url: https://github.com/Klipper3d/klipper",
" branch: master",
"\n\n",
"● New format:",
" [klipper]",
" repositories:",
" https://github.com/Klipper3d/klipper, master",
]
Logger.print_dialog(
DialogType.ERROR,
DialogType.ATTENTION,
[
"No KIAUH configuration file found! Please make sure you have at least "
"one of the following configuration files in KIAUH's root directory:",
"● default.kiauh.cfg",
"● kiauh.cfg",
"Deprecated kiauh.cfg configuration found!",
"KAIUH can now attempt to automatically migrate the configuration.",
"\n\n",
*migration_1,
],
)
kill()
if get_confirm("Migrate to the new format?"):
self._migrate_repo_config()
else:
Logger.print_dialog(
DialogType.ERROR,
[
"Please update the configuration file manually.",
],
center_content=True,
)
kill()
def _migrate_repo_config(self) -> None:
bm = BackupManager()
if not bm.backup_file(CUSTOM_CFG):
Logger.print_dialog(
DialogType.ERROR,
[
"Failed to create backup of kiauh.cfg. Aborting migration. Please migrate manually."
],
)
kill()
# run migrations
try:
# migrate deprecated repo_url and branch options - 2025.03.23
for section in ["klipper", "moonraker"]:
if not self.config.has_section(section):
continue
repo_url = self.config.getval(section, "repo_url", fallback="")
branch = self.config.getval(section, "branch", fallback="master")
if repo_url:
# create repositories option with the old values
repositories = [f"{repo_url}, {branch}\n"]
self.config.set_option(section, "repositories", repositories)
# remove deprecated options
self.config.remove_option(section, "repo_url")
self.config.remove_option(section, "branch")
Logger.print_ok(f"Successfully migrated {section} configuration")
self.config.write_file(CUSTOM_CFG)
self.config.read_file(CUSTOM_CFG) # reload config
except Exception as e:
Logger.print_error(f"Error migrating configuration: {e}")
Logger.print_error("Please migrate manually.")
kill()

View File

@@ -3,4 +3,48 @@
A custom config parser inspired by Python's configparser module.
Specialized for handling Klipper style config files.
---
### When parsing a config file, it will be split into the following elements:
- Header: All lines before the first section
- Section: A section is defined by a line starting with a `[` and ending with a `]`
- Option: A line starting with a word, followed by a `:` or `=` and a value
- Option Block: A line starting with a word, followed by a `:` or `=` and a newline
- Comment: A line starting with a `#` or `;`
- Blank: A line containing only whitespace characters
---
### Internally, the config is stored as a dictionary of sections, each containing a header and a list of elements:
```python
config = {
"section_name": {
"header": "[section_name]\n",
"elements": [
{
"type": "comment",
"content": "# This is a comment\n"
},
{
"type": "option",
"name": "option1",
"value": "value1",
"raw": "option1: value1\n"
},
{
"type": "blank",
"content": "\n"
},
{
"type": "option_block",
"name": "option2",
"value": [
"value2",
"value3"
],
"raw": "option2:"
}
]
}
}
```

View File

@@ -36,7 +36,7 @@ extend-select = ["I"]
[tool.pytest.ini_options]
minversion = "8.2.1"
testpaths = ["tests/**/*.py"]
addopts = "--cov --cov-config=pyproject.toml --cov-report=html"
addopts = "-svvv --cov --cov-config=pyproject.toml --cov-report=html"
[tool.coverage.run]
branch = true

View File

@@ -6,6 +6,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import re
from enum import Enum
# definition of section line:
# - then line MUST start with an opening square bracket - it is the first section marker
@@ -60,3 +61,11 @@ BOOLEAN_STATES = {
}
HEADER_IDENT = "#_header"
INDENT = " " * 4
class LineType(Enum):
OPTION = "option"
OPTION_BLOCK = "option_block"
COMMENT = "comment"
BLANK = "blank"

View File

@@ -8,8 +8,6 @@
from __future__ import annotations
import secrets
import string
from pathlib import Path
from typing import Callable, Dict, List
@@ -20,7 +18,7 @@ from ..simple_config_parser.constants import (
LINE_COMMENT_RE,
OPTION_RE,
OPTIONS_BLOCK_START_RE,
SECTION_RE,
SECTION_RE, LineType, INDENT,
)
_UNSET = object()
@@ -49,6 +47,13 @@ class NoOptionError(Exception):
msg = f"Option '{option}' in section '{section}' is not defined"
super().__init__(msg)
class UnknownLineError(Exception):
"""Raised when a line is not recognized as any known type"""
def __init__(self, line: str):
msg = f"Unknown line: '{line}'"
super().__init__(msg)
# noinspection PyMethodMayBeStatic
class SimpleConfigParser:
@@ -59,7 +64,6 @@ class SimpleConfigParser:
self.config: Dict = {}
self.current_section: str | None = None
self.current_opt_block: str | None = None
self.current_collector: str | None = None
self.in_option_block: bool = False
def _match_section(self, line: str) -> bool:
@@ -85,28 +89,40 @@ class SimpleConfigParser:
def _parse_line(self, line: str) -> None:
"""Parses a line and determines its type"""
if self._match_section(line):
self.current_collector = None
self.current_opt_block = None
self.current_section = SECTION_RE.match(line).group(1)
self.config[self.current_section] = {"_raw": line}
self.config[self.current_section] = {
"header": line,
"elements": []
}
elif self._match_option(line):
self.current_collector = None
self.current_opt_block = None
option = OPTION_RE.match(line).group(1)
value = OPTION_RE.match(line).group(2)
self.config[self.current_section][option] = {"_raw": line, "value": value}
self.config[self.current_section]["elements"].append({
"type": LineType.OPTION.value,
"name": option,
"value": value,
"raw": line
})
elif self._match_options_block_start(line):
self.current_collector = None
option = OPTIONS_BLOCK_START_RE.match(line).group(1)
self.current_opt_block = option
self.config[self.current_section][option] = {"_raw": line, "value": []}
self.config[self.current_section]["elements"].append({
"type": LineType.OPTION_BLOCK.value,
"name": option,
"value": [],
"raw": line
})
elif self.current_opt_block is not None:
self.config[self.current_section][self.current_opt_block]["value"].append(
line
)
# we are in an option block, so we add the line to the option's value
for element in reversed(self.config[self.current_section]["elements"]):
if element["type"] == LineType.OPTION_BLOCK.value and element["name"] == self.current_opt_block:
element["value"].append(line.strip()) # indentation is removed
break
elif self._match_empty_line(line) or self._match_line_comment(line):
self.current_opt_block = None
@@ -116,15 +132,11 @@ class SimpleConfigParser:
if not self.current_section:
self.config.setdefault(HEADER_IDENT, []).append(line)
else:
section = self.config[self.current_section]
# set the current collector to a new value, so that continuous
# empty lines or comments are collected into the same collector
if not self.current_collector:
self.current_collector = self._generate_rand_id()
section[self.current_collector] = []
section[self.current_collector].append(line)
element_type = LineType.BLANK.value if self._match_empty_line(line) else LineType.COMMENT.value
self.config[self.current_section]["elements"].append({
"type": element_type,
"content": line
})
def read_file(self, file: Path) -> None:
"""Read and parse a config file"""
@@ -132,41 +144,46 @@ class SimpleConfigParser:
for line in file:
self._parse_line(line)
# print(json.dumps(self.config, indent=4))
def write_file(self, path: str | Path) -> None:
"""Write the config to a file"""
if path is None:
raise ValueError("File path cannot be None")
def write_file(self, file: Path) -> None:
"""Write the current config to the config file"""
if not file:
raise ValueError("No config file specified")
with open(path, "w", encoding="utf-8") as f:
if HEADER_IDENT in self.config:
for line in self.config[HEADER_IDENT]:
f.write(line)
with open(file, "w") as file:
self._write_header(file)
self._write_sections(file)
sections = self.get_sections()
for i, section in enumerate(sections):
f.write(self.config[section]["header"])
def _write_header(self, file) -> None:
"""Write the header to the config file"""
for line in self.config.get(HEADER_IDENT, []):
file.write(line)
for element in self.config[section]["elements"]:
if element["type"] == LineType.OPTION.value:
f.write(element["raw"])
elif element["type"] == LineType.OPTION_BLOCK.value:
f.write(element["raw"])
for line in element["value"]:
f.write(INDENT + line.strip() + "\n")
elif element["type"] in [LineType.COMMENT.value, LineType.BLANK.value]:
f.write(element["content"])
else:
raise UnknownLineError(element["raw"])
def _write_sections(self, file) -> None:
"""Write the sections to the config file"""
for section in self.get_sections():
for key, value in self.config[section].items():
self._write_section_content(file, key, value)
# Ensure file ends with a single newline
if sections: # Only if we have any sections
last_section = sections[-1]
last_elements = self.config[last_section]["elements"]
def _write_section_content(self, file, key, value) -> None:
"""Write the content of a section to the config file"""
if key == "_raw":
file.write(value)
elif key.startswith("#_"):
for line in value:
file.write(line)
elif isinstance(value["value"], list):
file.write(value["_raw"])
for line in value["value"]:
file.write(line)
else:
file.write(value["_raw"])
if last_elements:
last_element = last_elements[-1]
if "raw" in last_element:
last_line = last_element["raw"]
else: # comment or blank line
last_line = last_element["content"]
if not last_line.endswith("\n"):
f.write("\n")
def get_sections(self) -> List[str]:
"""Return a list of all section names, but exclude any section starting with '#_'"""
@@ -189,29 +206,40 @@ class SimpleConfigParser:
if len(self.get_sections()) >= 1:
self._check_set_section_spacing()
self.config[section] = {"_raw": f"[{section}]\n"}
self.config[section] = {
"header": f"[{section}]\n",
"elements": []
}
def _check_set_section_spacing(self):
"""Check if there is a blank line between the last section and the new section"""
prev_section_name: str = self.get_sections()[-1]
prev_section_content: Dict = self.config[prev_section_name]
last_option_name: str = list(prev_section_content.keys())[-1]
prev_section = self.config[prev_section_name]
prev_elements = prev_section["elements"]
if last_option_name.startswith("#_"):
last_elem_value: str = prev_section_content[last_option_name][-1]
if prev_elements:
last_element = prev_elements[-1]
# if the last section is a collector, we first check if the last element
# in the collector ends with a newline. if it does not, we append a newline.
# this can happen if the config file does not end with a newline.
if not last_elem_value.endswith("\n"):
prev_section_content[last_option_name][-1] = f"{last_elem_value}\n"
# If the last element is a comment or blank line
if last_element["type"] in [LineType.COMMENT.value, LineType.BLANK.value]:
last_content = last_element["content"]
# if the last item in a collector is not a newline, we append a newline, so
# that the new section is seperated from the options of the previous section
# by a newline
if last_elem_value != "\n":
prev_section_content[last_option_name].append("\n")
else:
prev_section_content[self._generate_rand_id()] = ["\n"]
# If the last element doesn't end with a newline, add one
if not last_content.endswith("\n"):
last_element["content"] += "\n"
# If the last element is not a blank line, add a blank line
if last_content.strip() != "":
prev_elements.append({
"type": "blank",
"content": "\n"
})
else:
# If the last element is an option, add a blank line
prev_elements.append({
"type": LineType.BLANK.value,
"content": "\n"
})
def remove_section(self, section: str) -> None:
"""Remove a section from the config"""
@@ -219,12 +247,12 @@ class SimpleConfigParser:
def get_options(self, section: str) -> List[str]:
"""Return a list of all option names for a given section"""
return list(
filter(
lambda option: option != "_raw" and not option.startswith("#_"),
self.config[section].keys(),
)
)
options = []
if self.has_section(section):
for element in self.config[section]["elements"]:
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value]:
options.append(element["name"])
return options
def has_option(self, section: str, option: str) -> bool:
"""Check if an option exists in a section"""
@@ -238,26 +266,55 @@ class SimpleConfigParser:
if not self.has_section(section):
self.add_section(section)
if not self.has_option(section, option):
self.config[section][option] = {
"_raw": f"{option}:\n"
if isinstance(value, list)
else f"{option}: {value}\n",
# Check if option already exists
for element in self.config[section]["elements"]:
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value] and element["name"] == option:
# Update existing option
if isinstance(value, list):
element["type"] = LineType.OPTION_BLOCK.value
element["value"] = value
element["raw"] = f"{option}:\n"
else:
element["type"] = LineType.OPTION.value
element["value"] = value
element["raw"] = f"{option}: {value}\n"
return
# Option doesn't exist, create new one
if isinstance(value, list):
new_element = {
"type": LineType.OPTION_BLOCK.value,
"name": option,
"value": value,
"raw": f"{option}:\n"
}
else:
opt = self.config[section][option]
if not isinstance(value, list):
opt["_raw"] = opt["_raw"].replace(opt["value"], value)
opt["value"] = value
new_element = {
"type": LineType.OPTION.value,
"name": option,
"value": value,
"raw": f"{option}: {value}\n"
}
# scan through elements to find the last option, after which we insert the new option
insert_pos = 0
elements = self.config[section]["elements"]
for i, element in enumerate(elements):
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value]:
insert_pos = i + 1
elements.insert(insert_pos, new_element)
def remove_option(self, section: str, option: str) -> None:
"""Remove an option from a section"""
self.config[section].pop(option, None)
if self.has_section(section):
elements = self.config[section]["elements"]
for i, element in enumerate(elements):
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value] and element["name"] == option:
elements.pop(i)
break
def getval(
self, section: str, option: str, fallback: str | _UNSET = _UNSET
) -> str | List[str]:
def getval(self, section: str, option: str, fallback: str | _UNSET = _UNSET) -> str:
"""
Return the value of the given option in the given section
@@ -269,7 +326,35 @@ class SimpleConfigParser:
raise NoSectionError(section)
if option not in self.get_options(section):
raise NoOptionError(option, section)
return self.config[section][option]["value"]
for element in self.config[section]["elements"]:
if element["type"] is LineType.OPTION.value and element["name"] == option:
return str(element["value"].strip().replace("\n", ""))
return ""
except (NoSectionError, NoOptionError):
if fallback is _UNSET:
raise
return fallback
def getvals(self, section: str, option: str, fallback: List[str] | _UNSET = _UNSET) -> List[str]:
"""
Return the values of the given multi-line option in the given section
If the key is not found and 'fallback' is provided, it is used as
a fallback value.
"""
try:
if section not in self.get_sections():
raise NoSectionError(section)
if option not in self.get_options(section):
raise NoOptionError(option, section)
for element in self.config[section]["elements"]:
if element["type"] is LineType.OPTION_BLOCK.value and element["name"] == option:
return [val.strip() for val in element["value"] if val.strip()]
return []
except (NoSectionError, NoOptionError):
if fallback is _UNSET:
raise
@@ -317,9 +402,3 @@ class SimpleConfigParser:
raise ValueError(
f"Cannot convert {self.getval(section, option)} to {conv.__name__}"
) from e
def _generate_rand_id(self) -> str:
"""Generate a random id with 6 characters"""
chars = string.ascii_letters + string.digits
rand_string = "".join(secrets.choice(chars) for _ in range(12))
return f"#_{rand_string}"

View File

@@ -25,8 +25,8 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1

View File

@@ -25,9 +25,9 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1
# config ending with a comment

View File

@@ -25,22 +25,22 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1
[gcode_macro M117]
rename_existing: M117.1
gcode:
{% if rawparams %}
{% set escaped_msg = rawparams.split(';', 1)[0].split('\x23', 1)[0]|replace('"', '\\"') %}
SET_DISPLAY_TEXT MSG="{escaped_msg}"
RESPOND TYPE=command MSG="{escaped_msg}"
{% else %}
SET_DISPLAY_TEXT
{% endif %}
{% if rawparams %}
{% set escaped_msg = rawparams.split(';', 1)[0].split('\x23', 1)[0]|replace('"', '\\"') %}
SET_DISPLAY_TEXT MSG="{escaped_msg}"
RESPOND TYPE=command MSG="{escaped_msg}"
{% else %}
SET_DISPLAY_TEXT
{% endif %}
# SDCard 'looping' (aka Marlin M808 commands) support
#
@@ -48,47 +48,47 @@ gcode:
[sdcard_loop]
[gcode_macro M486]
gcode:
# Parameters known to M486 are as follows:
# [C<flag>] Cancel the current object
# [P<index>] Cancel the object with the given index
# [S<index>] Set the index of the current object.
# If the object with the given index has been canceled, this will cause
# the firmware to skip to the next object. The value -1 is used to
# indicate something that isnt an object and shouldnt be skipped.
# [T<count>] Reset the state and set the number of objects
# [U<index>] Un-cancel the object with the given index. This command will be
# ignored if the object has already been skipped
# Parameters known to M486 are as follows:
# [C<flag>] Cancel the current object
# [P<index>] Cancel the object with the given index
# [S<index>] Set the index of the current object.
# If the object with the given index has been canceled, this will cause
# the firmware to skip to the next object. The value -1 is used to
# indicate something that isnt an object and shouldnt be skipped.
# [T<count>] Reset the state and set the number of objects
# [U<index>] Un-cancel the object with the given index. This command will be
# ignored if the object has already been skipped
{% if 'exclude_object' not in printer %}
{action_raise_error("[exclude_object] is not enabled")}
{% endif %}
{% if 'T' in params %}
EXCLUDE_OBJECT RESET=1
{% for i in range(params.T | int) %}
EXCLUDE_OBJECT_DEFINE NAME={i}
{% endfor %}
{% endif %}
{% if 'C' in params %}
EXCLUDE_OBJECT CURRENT=1
{% endif %}
{% if 'P' in params %}
EXCLUDE_OBJECT NAME={params.P}
{% endif %}
{% if 'S' in params %}
{% if params.S == '-1' %}
{% if printer.exclude_object.current_object %}
EXCLUDE_OBJECT_END NAME={printer.exclude_object.current_object}
{% endif %}
{% else %}
EXCLUDE_OBJECT_START NAME={params.S}
{% if 'exclude_object' not in printer %}
{action_raise_error("[exclude_object] is not enabled")}
{% endif %}
{% endif %}
{% if 'U' in params %}
EXCLUDE_OBJECT RESET=1 NAME={params.U}
{% endif %}
{% if 'T' in params %}
EXCLUDE_OBJECT RESET=1
{% for i in range(params.T | int) %}
EXCLUDE_OBJECT_DEFINE NAME={i}
{% endfor %}
{% endif %}
{% if 'C' in params %}
EXCLUDE_OBJECT CURRENT=1
{% endif %}
{% if 'P' in params %}
EXCLUDE_OBJECT NAME={params.P}
{% endif %}
{% if 'S' in params %}
{% if params.S == '-1' %}
{% if printer.exclude_object.current_object %}
EXCLUDE_OBJECT_END NAME={printer.exclude_object.current_object}
{% endif %}
{% else %}
EXCLUDE_OBJECT_START NAME={params.S}
{% endif %}
{% endif %}
{% if 'U' in params %}
EXCLUDE_OBJECT RESET=1 NAME={params.U}
{% endif %}

View File

@@ -0,0 +1,8 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
new_option: new_value
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,8 @@
[section_1]
# comment
option_1: value_1
option_to_remove: value_to_remove
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
option_1: value_1
option_2: value_2
# comment
[section_2]
option_5: value_5

View File

@@ -0,0 +1,11 @@
[section_1]
option_1: value_1
option_2: value_2
# comment
[section_to_remove]
option_3: value_3
option_4: value_4
[section_2]
option_5: value_5

View File

@@ -5,11 +5,12 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import json
from pathlib import Path
import pytest
from src.simple_config_parser.constants import HEADER_IDENT
from src.simple_config_parser.constants import HEADER_IDENT, LineType
from src.simple_config_parser.simple_config_parser import SimpleConfigParser
from tests.utils import load_testdata_from_file
@@ -33,16 +34,17 @@ def test_section_parsing(parser):
), f"Expected keys: {expected_keys}, got: {parser.config.keys()}"
assert parser.in_option_block is False
assert parser.current_section == parser.get_sections()[-1]
assert parser.config["section_2"]["_raw"] == "[section_2] ; comment"
assert parser.config["section_2"] is not None
assert parser.config["section_2"]["header"] == "[section_2] ; comment"
assert parser.config["section_2"]["elements"] is not None
assert len(parser.config["section_2"]["elements"]) > 0
def test_option_parsing(parser):
assert parser.config["section_1"]["option_1"]["value"] == "value_1"
assert parser.config["section_1"]["option_1"]["_raw"] == "option_1: value_1"
assert parser.config["section_3"]["option_3"]["value"] == "value_3"
assert (
parser.config["section_3"]["option_3"]["_raw"] == "option_3: value_3 # comment"
)
assert parser.config["section_1"]["elements"][0]["type"] == LineType.OPTION.value
assert parser.config["section_1"]["elements"][0]["name"] == "option_1"
assert parser.config["section_1"]["elements"][0]["value"] == "value_1"
assert parser.config["section_1"]["elements"][0]["raw"] == "option_1: value_1"
def test_header_parsing(parser):
@@ -51,12 +53,27 @@ def test_header_parsing(parser):
assert len(header) > 0
def test_collector_parsing(parser):
section = "section_2"
section_content = list(parser.config[section].keys())
coll_name = [name for name in section_content if name.startswith("#_")][0]
collector = parser.config[section][coll_name]
assert collector is not None
assert isinstance(collector, list)
assert len(collector) > 0
assert "; comment" in collector
def test_option_block_parsing(parser):
section = "section number 5"
option_block = None
for element in parser.config[section]["elements"]:
if (element["type"] == LineType.OPTION_BLOCK.value and
element["name"] == "multi_option"):
option_block = element
break
assert option_block is not None, "multi_option block not found"
assert option_block["type"] == LineType.OPTION_BLOCK.value
assert option_block["name"] == "multi_option"
assert option_block["raw"] == "multi_option:"
expected_values = [
"# these are multi-line values",
"value_5_1",
"value_5_2 ; here is a comment",
"value_5_3"
]
assert option_block["value"] == expected_values, (
f"Expected values: {expected_values}, "
f"got: {option_block['value']}"
)

View File

@@ -8,6 +8,7 @@
import pytest
from src.simple_config_parser.constants import LineType
from src.simple_config_parser.simple_config_parser import (
NoOptionError,
NoSectionError,
@@ -50,7 +51,7 @@ def test_getval(parser):
assert parser.getval("section_2", "option_2") == "value_2"
# test multiline option values
ml_val = parser.getval("section number 5", "multi_option")
ml_val = parser.getvals("section number 5", "multi_option")
assert isinstance(ml_val, list)
assert len(ml_val) > 0
@@ -148,13 +149,11 @@ def test_getfloat_fallback(parser):
def test_set_existing_option(parser):
parser.set_option("section_1", "new_option", "new_value")
assert parser.getval("section_1", "new_option") == "new_value"
assert parser.config["section_1"]["new_option"]["_raw"] == "new_option: new_value\n"
parser.set_option("section_1", "new_option", "new_value_2")
assert parser.getval("section_1", "new_option") == "new_value_2"
assert (
parser.config["section_1"]["new_option"]["_raw"] == "new_option: new_value_2\n"
)
assert parser.config["section_1"]["elements"][4] is not None
assert parser.config["section_1"]["elements"][4]["type"] == LineType.OPTION.value
assert parser.config["section_1"]["elements"][4]["name"] == "new_option"
assert parser.config["section_1"]["elements"][4]["value"] == "new_value"
assert parser.config["section_1"]["elements"][4]["raw"] == "new_option: new_value\n"
def test_set_new_option(parser):
@@ -165,12 +164,21 @@ def test_set_new_option(parser):
assert parser.getval("new_section", "very_new_option") == "very_new_value"
parser.set_option("section_2", "array_option", ["value_1", "value_2", "value_3"])
assert parser.getval("section_2", "array_option") == [
assert parser.getvals("section_2", "array_option") == [
"value_1",
"value_2",
"value_3",
]
assert parser.config["section_2"]["array_option"]["_raw"] == "array_option:\n"
assert parser.config["section_2"]["elements"][1] is not None
assert parser.config["section_2"]["elements"][1]["type"] == LineType.OPTION_BLOCK.value
assert parser.config["section_2"]["elements"][1]["name"] == "array_option"
assert parser.config["section_2"]["elements"][1]["value"] == [
"value_1",
"value_2",
"value_3",
]
assert parser.config["section_2"]["elements"][1]["raw"] == "array_option:\n"
def test_remove_option(parser):

View File

@@ -41,16 +41,15 @@ def test_add_section(parser):
new_section = parser.config["new_section"]
assert isinstance(new_section, dict)
assert new_section["_raw"] == "[new_section]\n"
# this should be the collector, added by the parser before
# then second section was added
assert list(new_section.keys())[-1].startswith("#_")
assert "\n" in new_section[list(new_section.keys())[-1]]
assert new_section["header"] == "[new_section]\n"
assert new_section["elements"] is not None
assert new_section["elements"] == []
new_section2 = parser.config["new_section2"]
assert isinstance(new_section2, dict)
assert new_section2["_raw"] == "[new_section2]\n"
assert new_section2["header"] == "[new_section2]\n"
assert new_section2["elements"] is not None
assert new_section2["elements"] == []
def test_add_section_duplicate(parser):

View File

@@ -39,3 +39,81 @@ def test_write_to_file(tmp_path):
with open(TEST_DATA_PATH, "r") as original, open(tmp_file, "r") as written:
assert original.read() == written.read()
def test_remove_option_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/remove_option")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and remove option
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.remove_option("section_1", "option_to_remove")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert not parser2.has_option("section_1", "option_to_remove")
def test_remove_section_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/remove_section")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and remove section
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.remove_section("section_to_remove")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert not parser2.has_section("section_to_remove")
assert "section_1" in parser2.get_sections()
assert "section_2" in parser2.get_sections()
def test_add_option_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/add_option")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and add option
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.set_option("section_1", "new_option", "new_value")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert parser2.has_option("section_1", "new_option")
assert parser2.getval("section_1", "new_option") == "new_value"

View File

@@ -25,6 +25,7 @@ class ComponentStatus:
status: StatusCode
owner: str | None = None
repo: str | None = None
repo_url: str | None = None
branch: str = ""
local: str | None = None
remote: str | None = None

View File

@@ -104,7 +104,7 @@ class MobilerakerExtension(BaseExtension):
if settings.kiauh.backup_before_update:
self._backup_mobileraker_dir()
git_pull_wrapper(MOBILERAKER_REPO, MOBILERAKER_DIR)
git_pull_wrapper(MOBILERAKER_DIR)
install_python_requirements(MOBILERAKER_ENV_DIR, MOBILERAKER_REQ_FILE)

View File

@@ -145,7 +145,7 @@ class ObicoExtension(BaseExtension):
instances = get_instances(MoonrakerObico)
InstanceManager.stop_all(instances)
git_pull_wrapper(OBICO_REPO, OBICO_DIR)
git_pull_wrapper(OBICO_DIR)
self._install_dependencies()
InstanceManager.start_all(instances)

View File

@@ -43,7 +43,7 @@ class PrettyGcodeExtension(BaseExtension):
port = get_number_input(
"On which port should PrettyGCode run",
min_count=0,
min_value=0,
default=7136,
allow_go_back=True,
)
@@ -78,7 +78,7 @@ class PrettyGcodeExtension(BaseExtension):
def update_extension(self, **kwargs) -> None:
Logger.print_status("Updating PrettyGCode for Klipper ...")
try:
git_pull_wrapper(PGC_REPO, PGC_DIR)
git_pull_wrapper(PGC_DIR)
except Exception as e:
Logger.print_error(f"Error during PrettyGCode for Klipper update: {e}")

View File

@@ -0,0 +1,16 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parent
SPOOLMAN_DOCKER_IMAGE = "ghcr.io/donkie/spoolman:latest"
SPOOLMAN_DIR = Path.home().joinpath("spoolman")
SPOOLMAN_DATA_DIR = SPOOLMAN_DIR.joinpath("data")
SPOOLMAN_COMPOSE_FILE = SPOOLMAN_DIR.joinpath("docker-compose.yml")
SPOOLMAN_DEFAULT_PORT = 7912

View File

@@ -0,0 +1,14 @@
services:
spoolman:
image: ghcr.io/donkie/spoolman:latest
restart: unless-stopped
volumes:
# Mount the host machine's ./data directory into the container's /home/app/.local/share/spoolman directory
- type: bind
source: ./data # This is where the data will be stored locally. Could also be set to for example `source: /home/pi/printer_data/spoolman`.
target: /home/app/.local/share/spoolman # Do NOT modify this line
ports:
# Map the host machine's port 7912 to the container's port 8000
- "7912:8000"
environment:
- TZ=Europe/Stockholm # Optional, defaults to UTC

View File

@@ -0,0 +1,18 @@
{
"metadata": {
"index": 11,
"module": "spoolman_extension",
"maintained_by": "dw-0",
"display_name": "Spoolman (Docker)",
"description": [
"Filament manager for 3D printing",
"- Track your filament inventory",
"- Monitor filament usage",
"- Manage vendors, materials, and spools",
"- Integrates with Moonraker",
"\n\n",
"Note: This extension installs Spoolman using Docker. Docker must be installed on your system before installing Spoolman."
],
"updates": true
}
}

View File

@@ -0,0 +1,190 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import CalledProcessError, run
from components.moonraker.moonraker import Moonraker
from core.instance_manager.base_instance import BaseInstance
from core.logger import Logger
from extensions.spoolman import (
MODULE_PATH,
SPOOLMAN_COMPOSE_FILE,
SPOOLMAN_DIR,
SPOOLMAN_DOCKER_IMAGE,
)
from utils.sys_utils import get_system_timezone
@dataclass
class Spoolman:
suffix: str
base: BaseInstance = field(init=False, repr=False)
dir: Path = SPOOLMAN_DIR
data_dir: Path = field(init=False)
def __post_init__(self):
self.base: BaseInstance = BaseInstance(Moonraker, self.suffix)
self.data_dir = self.base.data_dir
@staticmethod
def is_container_running() -> bool:
"""Check if the Spoolman container is running"""
try:
result = run(
["docker", "compose", "-f", str(SPOOLMAN_COMPOSE_FILE), "ps", "-q"],
capture_output=True,
text=True,
check=True,
)
return bool(result.stdout.strip())
except CalledProcessError:
return False
@staticmethod
def is_docker_available() -> bool:
"""Check if Docker is installed and available"""
try:
run(["docker", "--version"], capture_output=True, check=True)
return True
except (CalledProcessError, FileNotFoundError):
return False
@staticmethod
def is_docker_compose_available() -> bool:
"""Check if Docker Compose is installed and available"""
try:
# Try modern docker compose command
run(["docker", "compose", "version"], capture_output=True, check=True)
return True
except (CalledProcessError, FileNotFoundError):
# Try legacy docker-compose command
try:
run(["docker-compose", "--version"], capture_output=True, check=True)
return True
except (CalledProcessError, FileNotFoundError):
return False
@staticmethod
def create_docker_compose() -> bool:
"""Copy the docker-compose.yml file for Spoolman and set system timezone"""
try:
shutil.copy(
MODULE_PATH.joinpath("assets/docker-compose.yml"),
SPOOLMAN_COMPOSE_FILE,
)
# get system timezone
timezone = get_system_timezone()
with open(SPOOLMAN_COMPOSE_FILE, "r") as f:
content = f.read()
content = content.replace("TZ=Europe/Stockholm", f"TZ={timezone}")
with open(SPOOLMAN_COMPOSE_FILE, "w") as f:
f.write(content)
return True
except Exception as e:
Logger.print_error(f"Error creating Docker Compose file: {e}")
return False
@staticmethod
def start_container() -> bool:
"""Start the Spoolman container"""
try:
run(
["docker", "compose", "-f", str(SPOOLMAN_COMPOSE_FILE), "up", "-d"],
check=True,
)
return True
except CalledProcessError as e:
Logger.print_error(f"Failed to start Spoolman container: {e}")
return False
@staticmethod
def update_container() -> bool:
"""Update the Spoolman container"""
def __get_image_id() -> str:
"""Get the image ID of the Spoolman Docker image"""
try:
result = run(
["docker", "images", "-q", SPOOLMAN_DOCKER_IMAGE],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except CalledProcessError:
raise Exception("Failed to get Spoolman Docker image ID")
try:
old_image_id = __get_image_id()
Logger.print_status("Pulling latest Spoolman image...")
Spoolman.pull_image()
new_image_id = __get_image_id()
Logger.print_status("Tearing down old Spoolman container...")
Spoolman.tear_down_container()
Logger.print_status("Spinning up new Spoolman container...")
Spoolman.start_container()
if old_image_id != new_image_id:
Logger.print_status("Removing old Spoolman image...")
run(["docker", "rmi", old_image_id], check=True)
return True
except CalledProcessError as e:
Logger.print_error(f"Failed to update Spoolman container: {e}")
return False
@staticmethod
def tear_down_container() -> bool:
"""Stop and remove the Spoolman container"""
try:
run(
["docker", "compose", "-f", str(SPOOLMAN_COMPOSE_FILE), "down"],
check=True,
)
return True
except CalledProcessError as e:
Logger.print_error(f"Failed to tear down Spoolman container: {e}")
return False
@staticmethod
def pull_image() -> bool:
"""Pull the Spoolman Docker image"""
try:
run(["docker", "pull", SPOOLMAN_DOCKER_IMAGE], check=True)
return True
except CalledProcessError as e:
Logger.print_error(f"Failed to pull Spoolman Docker image: {e}")
return False
@staticmethod
def remove_image() -> bool:
"""Remove the Spoolman Docker image"""
try:
image_exists = run(
["docker", "images", "-q", SPOOLMAN_DOCKER_IMAGE],
capture_output=True,
text=True,
).stdout.strip()
if not image_exists:
Logger.print_info("Spoolman Docker image not found. Nothing to remove.")
return False
run(["docker", "rmi", SPOOLMAN_DOCKER_IMAGE], check=True)
return True
except CalledProcessError as e:
Logger.print_error(f"Failed to remove Spoolman Docker image: {e}")
return False

View File

@@ -0,0 +1,344 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import re
from subprocess import CalledProcessError, run
from typing import List, Tuple
from components.moonraker.moonraker import Moonraker
from components.moonraker.services.moonraker_instance_service import (
MoonrakerInstanceService,
)
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.instance_manager import InstanceManager
from core.logger import DialogType, Logger
from extensions.base_extension import BaseExtension
from extensions.spoolman import (
SPOOLMAN_COMPOSE_FILE,
SPOOLMAN_DATA_DIR,
SPOOLMAN_DEFAULT_PORT,
SPOOLMAN_DIR,
)
from extensions.spoolman.spoolman import Spoolman
from utils.config_utils import (
add_config_section,
remove_config_section,
)
from utils.fs_utils import run_remove_routines
from utils.input_utils import get_confirm, get_number_input
from utils.sys_utils import get_ipv4_addr
# noinspection PyMethodMayBeStatic
class SpoolmanExtension(BaseExtension):
ip: str = ""
port: int = SPOOLMAN_DEFAULT_PORT
def install_extension(self, **kwargs) -> None:
Logger.print_status("Installing Spoolman using Docker...")
docker_available, docker_compose_available = self.__check_docker_prereqs()
if not docker_available or not docker_compose_available:
return
if not self.__handle_existing_installation():
self.ip: str = get_ipv4_addr()
self.__run_setup()
# noinspection HttpUrlsUsage
Logger.print_dialog(
DialogType.SUCCESS,
[
"Spoolman successfully installed using Docker!",
"You can access Spoolman via the following URL:",
f"http://{self.ip}:{self.port}",
],
center_content=True,
)
def update_extension(self, **kwargs) -> None:
Logger.print_status("Updating Spoolman Docker container...")
if not SPOOLMAN_DIR.exists() or not SPOOLMAN_COMPOSE_FILE.exists():
Logger.print_error("Spoolman installation not found or incomplete.")
return
docker_available, docker_compose_available = self.__check_docker_prereqs()
if not docker_available or not docker_compose_available:
return
Logger.print_status("Updating Spoolman container...")
if not Spoolman.update_container():
return
Logger.print_dialog(
DialogType.SUCCESS,
["Spoolman Docker container successfully updated!"],
center_content=True,
)
def remove_extension(self, **kwargs) -> None:
Logger.print_status("Removing Spoolman Docker container...")
if not SPOOLMAN_DIR.exists():
Logger.print_info("Spoolman is not installed. Nothing to remove.")
return
docker_available, docker_compose_available = self.__check_docker_prereqs()
if not docker_available or not docker_compose_available:
return
# remove moonraker integration
mrsvc = MoonrakerInstanceService()
mrsvc.load_instances()
mr_instances: List[Moonraker] = mrsvc.get_all_instances()
Logger.print_status("Removing Spoolman configuration from moonraker.conf...")
remove_config_section("spoolman", mr_instances)
Logger.print_status("Removing Spoolman from moonraker.asvc...")
self.__remove_from_moonraker_asvc()
# stop and remove the container if docker-compose exists
if SPOOLMAN_COMPOSE_FILE.exists():
Logger.print_status("Stopping and removing Spoolman container...")
if Spoolman.tear_down_container():
Logger.print_ok("Spoolman container removed!")
else:
Logger.print_error(
"Failed to remove Spoolman container! Please remove it manually."
)
if Spoolman.remove_image():
Logger.print_ok("Spoolman container and image removed!")
else:
Logger.print_error(
"Failed to remove Spoolman image! Please remove it manually."
)
# backup Spoolman directory to ~/spoolman_data-<timestamp> before removing it
try:
bm = BackupManager()
result = bm.backup_directory(
f"{SPOOLMAN_DIR.name}_data",
source=SPOOLMAN_DIR,
target=SPOOLMAN_DIR.parent,
)
if result:
Logger.print_ok(f"Spoolman data backed up to {result}")
Logger.print_status("Removing Spoolman directory...")
if run_remove_routines(SPOOLMAN_DIR):
Logger.print_ok("Spoolman directory removed!")
else:
Logger.print_error(
"Failed to remove Spoolman directory! Please remove it manually."
)
except Exception as e:
Logger.print_error(f"Failed to backup Spoolman directory: {e}")
Logger.print_info("Skipping Spoolman directory removal...")
Logger.print_dialog(
DialogType.SUCCESS,
["Spoolman successfully removed!"],
center_content=True,
)
def __run_setup(self) -> None:
# Create Spoolman directory and data directory
Logger.print_status("Setting up Spoolman directories...")
SPOOLMAN_DIR.mkdir(parents=True)
Logger.print_ok(f"Directory {SPOOLMAN_DIR} created!")
SPOOLMAN_DATA_DIR.mkdir(parents=True)
Logger.print_ok(f"Directory {SPOOLMAN_DATA_DIR} created!")
# Set correct permissions for data directory
try:
Logger.print_status("Setting permissions for Spoolman data directory...")
run(["chown", "1000:1000", str(SPOOLMAN_DATA_DIR)], check=True)
Logger.print_ok("Permissions set!")
except CalledProcessError:
Logger.print_warn(
"Could not set permissions on data directory. This might cause issues."
)
Logger.print_status("Creating Docker Compose file...")
if Spoolman.create_docker_compose():
Logger.print_ok("Docker Compose file created!")
else:
Logger.print_error("Failed to create Docker Compose file!")
self.__port_config_prompt()
Logger.print_status("Spinning up Spoolman container...")
if Spoolman.start_container():
Logger.print_ok("Spoolman container started!")
else:
Logger.print_error("Failed to start Spoolman container!")
if self.__add_moonraker_integration():
Logger.print_ok("Spoolman integration added to Moonraker!")
else:
Logger.print_info("Moonraker integration skipped.")
def __check_docker_prereqs(self) -> Tuple[bool, bool]:
# check if Docker is available
is_docker_available = Spoolman.is_docker_available()
if not is_docker_available:
Logger.print_error("Docker is not installed or not available.")
Logger.print_info(
"Please install Docker first: https://docs.docker.com/engine/install/"
)
# check if Docker Compose is available
is_docker_compose_available = Spoolman.is_docker_compose_available()
if not is_docker_compose_available:
Logger.print_error("Docker Compose is not installed or not available.")
return is_docker_available, is_docker_compose_available
def __port_config_prompt(self) -> None:
"""Prompt for advanced configuration options"""
Logger.print_dialog(
DialogType.INFO,
[
"You can configure Spoolman to run on a different port than the default. "
"Make sure you don't select a port which is already in use by "
"another application. Your input will not be validated! "
"The default port is 7912.",
],
)
if not get_confirm("Continue with default port 7912?", default_choice=True):
self.__set_port()
def __set_port(self) -> None:
"""Configure advanced options for Spoolman Docker container"""
port = get_number_input(
"Which port should Spoolman run on?",
default=SPOOLMAN_DEFAULT_PORT,
min_value=1024,
max_value=65535,
)
if port != SPOOLMAN_DEFAULT_PORT:
self.port = port
with open(SPOOLMAN_COMPOSE_FILE, "r") as f:
content = f.read()
port_mapping_pattern = r'"(\d+):8000"'
content = re.sub(port_mapping_pattern, f'"{port}:8000"', content)
with open(SPOOLMAN_COMPOSE_FILE, "w") as f:
f.write(content)
Logger.print_ok(f"Port set to {port}...")
def __handle_existing_installation(self) -> bool:
if not (SPOOLMAN_DIR.exists() and SPOOLMAN_DIR.is_dir()):
return False
compose_file_exists = SPOOLMAN_COMPOSE_FILE.exists()
container_running = Spoolman.is_container_running()
if container_running and compose_file_exists:
Logger.print_info("Spoolman is already installed!")
return True
elif container_running and not compose_file_exists:
Logger.print_status(
"Spoolman container is running but Docker Compose file is missing..."
)
if get_confirm(
"Do you want to recreate the Docker Compose file?",
default_choice=True,
):
Spoolman.create_docker_compose()
self.__port_config_prompt()
return True
elif not container_running and compose_file_exists:
Logger.print_status(
"Docker Compose file exists but container is not running..."
)
Spoolman.start_container()
return True
return False
def __add_moonraker_integration(self) -> bool:
"""Enable Moonraker integration for Spoolman Docker container"""
if not get_confirm("Add Moonraker integration?", default_choice=True):
return False
Logger.print_status("Adding Spoolman integration to Moonraker...")
# read port from the docker-compose file
port = SPOOLMAN_DEFAULT_PORT
if SPOOLMAN_COMPOSE_FILE.exists():
with open(SPOOLMAN_COMPOSE_FILE, "r") as f:
content = f.read()
# Extract port from the port mapping
port_match = re.search(r'"(\d+):8000"', content)
if port_match:
port = port_match.group(1)
mrsvc = MoonrakerInstanceService()
mrsvc.load_instances()
mr_instances = mrsvc.get_all_instances()
# noinspection HttpUrlsUsage
add_config_section(
section="spoolman",
instances=mr_instances,
options=[("server", f"http://{self.ip}:{port}")],
)
Logger.print_status("Adding Spoolman to moonraker.asvc...")
self.__add_to_moonraker_asvc()
InstanceManager.restart_all(mr_instances)
return True
def __add_to_moonraker_asvc(self) -> None:
"""Add Spoolman to moonraker.asvc"""
mrsvc = MoonrakerInstanceService()
mrsvc.load_instances()
mr_instances = mrsvc.get_all_instances()
for instance in mr_instances:
asvc_path = instance.data_dir.joinpath("moonraker.asvc")
if asvc_path.exists():
if "Spoolman" in open(asvc_path).read():
Logger.print_info(f"Spoolman already in {asvc_path}. Skipping...")
continue
with open(asvc_path, "a") as f:
f.write("Spoolman\n")
Logger.print_ok(f"Spoolman added to {asvc_path}!")
def __remove_from_moonraker_asvc(self) -> None:
"""Remove Spoolman from moonraker.asvc"""
mrsvc = MoonrakerInstanceService()
mrsvc.load_instances()
mr_instances = mrsvc.get_all_instances()
for instance in mr_instances:
asvc_path = instance.data_dir.joinpath("moonraker.asvc")
if asvc_path.exists():
if "Spoolman" not in open(asvc_path).read():
Logger.print_info(f"Spoolman not in {asvc_path}. Skipping...")
continue
with open(asvc_path, "r") as f:
lines = f.readlines()
new_lines = [line for line in lines if "Spoolman" not in line]
with open(asvc_path, "w") as f:
f.writelines(new_lines)
Logger.print_ok(f"Spoolman removed from {asvc_path}!")

View File

@@ -135,7 +135,7 @@ class TelegramBotExtension(BaseExtension):
instances = get_instances(MoonrakerTelegramBot)
InstanceManager.stop_all(instances)
git_pull_wrapper(TG_BOT_REPO, TG_BOT_DIR)
git_pull_wrapper(TG_BOT_DIR)
self._install_dependencies()
InstanceManager.start_all(instances)

View File

@@ -27,11 +27,12 @@ from components.moonraker import (
MOONRAKER_REQ_FILE,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_setup import install_moonraker_packages
from components.moonraker.services.moonraker_setup_service import (
install_moonraker_packages,
)
from core.backup_manager.backup_manager import BackupManager, BackupManagerException
from core.instance_manager.instance_manager import InstanceManager
from core.logger import Logger
from core.settings.kiauh_settings import RepoSettings
from utils.git_utils import GitException, get_repo_name, git_clone_wrapper
from utils.instance_utils import get_instances
from utils.sys_utils import (
@@ -46,7 +47,7 @@ class RepoSwitchFailedException(Exception):
def run_switch_repo_routine(
name: Literal["klipper", "moonraker"], repo_settings: RepoSettings
name: Literal["klipper", "moonraker"], repo_url: str, branch: str
) -> None:
repo_dir: Path = KLIPPER_DIR if name == "klipper" else MOONRAKER_DIR
env_dir: Path = KLIPPER_ENV_DIR if name == "klipper" else MOONRAKER_ENV_DIR
@@ -78,10 +79,6 @@ def run_switch_repo_routine(
backup_dir,
)
# step 3: read repo url and branch from settings
repo_url = repo_settings.repo_url
branch = repo_settings.branch
if not (repo_url or branch):
error = f"Invalid repository URL ({repo_url}) or branch ({branch})!"
raise ValueError(error)

View File

@@ -29,6 +29,7 @@ from utils.git_utils import (
get_local_tags,
get_remote_commit,
get_repo_name,
get_repo_url,
)
from utils.instance_utils import get_instances
from utils.sys_utils import (
@@ -133,11 +134,14 @@ def get_install_status(
status = 1 # incomplete
org, repo = get_repo_name(repo_dir)
repo_url = get_repo_url(repo_dir) if repo_dir.exists() else None
return ComponentStatus(
status=status,
instances=instances,
owner=org,
repo=repo,
repo_url=repo_url,
branch=branch,
local=get_local_commit(repo_dir),
remote=get_remote_commit(repo_dir),

View File

@@ -26,9 +26,10 @@ def git_clone_wrapper(
) -> None:
"""
Clones a repository from the given URL and checks out the specified branch if given.
The clone will be performed with the '--filter=blob:none' flag to perform a blobless clone.
:param repo: The URL of the repository to clone.
:param branch: The branch to check out. If None, the default branch will be checked out.
:param branch: The branch to check out. If None, master or main, no checkout will be performed.
:param target_dir: The directory where the repository will be cloned.
:param force: Force the cloning of the repository even if it already exists.
:return: None
@@ -43,8 +44,11 @@ def git_clone_wrapper(
return
shutil.rmtree(target_dir)
git_cmd_clone(repo, target_dir)
git_cmd_checkout(branch, target_dir)
git_cmd_clone(repo, target_dir, blobless=True)
if branch not in ("master", "main"):
git_cmd_checkout(branch, target_dir)
except CalledProcessError:
log = "An unexpected error occured during cloning of the repository."
Logger.print_error(log)
@@ -54,15 +58,14 @@ def git_clone_wrapper(
raise GitException(f"Error removing existing repository: {e.strerror}")
def git_pull_wrapper(repo: str, target_dir: Path) -> None:
def git_pull_wrapper(target_dir: Path) -> None:
"""
A function that updates a repository using git pull.
:param repo: The repository to update.
:param target_dir: The directory of the repository.
:return: None
"""
Logger.print_status(f"Updating repository '{repo}' ...")
Logger.print_status("Updating repository ...")
try:
git_cmd_pull(target_dir)
except CalledProcessError:
@@ -255,11 +258,23 @@ def get_remote_commit(repo: Path) -> str | None:
return None
def git_cmd_clone(repo: str, target_dir: Path) -> None:
try:
command = ["git", "clone", repo, target_dir.as_posix()]
run(command, check=True)
def git_cmd_clone(repo: str, target_dir: Path, blobless: bool = False) -> None:
"""
Clones a repository with optional blobless clone.
:param repo: URL of the repository to clone.
:param target_dir: Path where the repository will be cloned.
:param blobless: If True, perform a blobless clone by adding the '--filter=blob:none' flag.
"""
try:
command = ["git", "clone"]
if blobless:
command.append("--filter=blob:none")
command += [repo, target_dir.as_posix()]
run(command, check=True)
Logger.print_ok("Clone successful!")
except CalledProcessError as e:
error = e.stderr.decode() if e.stderr else "Unknown error"
@@ -321,3 +336,25 @@ def rollback_repository(repo_dir: Path, instance: Type[InstanceType]) -> None:
Logger.print_error(f"An error occured during repo rollback:\n{e}")
InstanceManager.start_all(instances)
def get_repo_url(repo_dir: Path) -> str | None:
"""
Get the remote repository URL for a git repository
:param repo_dir: Path to the git repository
:return: URL of the remote repository or None if not found
"""
if not repo_dir.exists():
return None
try:
result = run(
["git", "config", "--get", "remote.origin.url"],
cwd=repo_dir,
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except CalledProcessError:
return None

View File

@@ -52,16 +52,16 @@ def get_confirm(question: str, default_choice=True, allow_go_back=False) -> bool
def get_number_input(
question: str,
min_count: int,
max_count: int | None = None,
min_value: int,
max_value: int | None = None,
default: int | None = None,
allow_go_back: bool = False,
) -> int | None:
"""
Helper method to get a number input from the user
:param question: The question to display
:param min_count: The lowest allowed value
:param max_count: The highest allowed value (or None)
:param min_value: The lowest allowed value
:param max_value: The highest allowed value (or None)
:param default: Optional default value
:param allow_go_back: Navigate back to a previous dialog
:return: Either the validated number input, or None on go_back
@@ -77,7 +77,7 @@ def get_number_input(
return default
try:
return validate_number_input(_input, min_count, max_count)
return validate_number_input(_input, min_value, max_value)
except ValueError:
Logger.print_error(INVALID_CHOICE)

View File

@@ -95,6 +95,7 @@ def create_python_venv(
target: Path,
force: bool = False,
allow_access_to_system_site_packages: bool = False,
use_python_binary: str | None = None
) -> bool:
"""
Create a python 3 virtualenv at the provided target destination.
@@ -103,36 +104,46 @@ def create_python_venv(
:param target: Path where to create the virtualenv at
:param force: Force recreation of the virtualenv
:param allow_access_to_system_site_packages: give the virtual environment access to the system site-packages dir
:param use_python_binary: allows to override default python binary
:return: bool
"""
Logger.print_status("Set up Python virtual environment ...")
cmd = ["virtualenv", "-p", "/usr/bin/python3", target.as_posix()]
# If binarry override is not set, we use default defined here
python_binary = use_python_binary if use_python_binary else "/usr/bin/python3"
cmd = ["virtualenv", "-p", python_binary, target.as_posix()]
cmd.append(
"--system-site-packages"
) if allow_access_to_system_site_packages else None
if not target.exists():
try:
run(cmd, check=True)
Logger.print_ok("Setup of virtualenv successful!")
return True
except CalledProcessError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e}")
return False
else:
if not force and not get_confirm(
"Virtualenv already exists. Re-create?", default_choice=False
):
Logger.print_info("Skipping re-creation of virtualenv ...")
return False
try:
shutil.rmtree(target)
create_python_venv(target)
return True
except OSError as e:
log = f"Error removing existing virtualenv: {e.strerror}"
Logger.print_error(log, False)
return False
n = 2
while(n > 0):
if not target.exists():
try:
run(cmd, check=True)
Logger.print_ok("Setup of virtualenv successful!")
return True
except CalledProcessError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e}")
return False
else:
if n == 1:
# This case should never happen,
# but the function should still behave correctly
Logger.print_error("Virtualenv still exists after deletion.")
return False
if not force and not get_confirm(
"Virtualenv already exists. Re-create?", default_choice=False
):
Logger.print_info("Skipping re-creation of virtualenv ...")
return False
try:
shutil.rmtree(target)
n -= 1
except OSError as e:
log = f"Error removing existing virtualenv: {e.strerror}"
Logger.print_error(log, False)
return False
def update_python_pip(target: Path) -> None:
@@ -173,9 +184,6 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
:return: None
"""
try:
# always update pip before installing requirements
update_python_pip(target)
Logger.print_status("Installing Python requirements ...")
command = [
target.joinpath("bin/pip").as_posix(),
@@ -185,7 +193,36 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
]
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0 or result.stderr:
if result.returncode != 0:
Logger.print_error(f"{result.stderr}", False)
raise VenvCreationFailedException("Installing Python requirements failed!")
Logger.print_ok("Installing Python requirements successful!")
except Exception as e:
log = f"Error installing Python requirements: {e}"
Logger.print_error(log)
raise VenvCreationFailedException(log)
def install_python_packages(target: Path, packages: List[str]) -> None:
"""
Installs the python packages based on a provided packages list |
:param target: Path of the virtualenv
:param packages: str list of required packages
:return: None
"""
try:
Logger.print_status("Installing Python requirements ...")
command = [
target.joinpath("bin/pip").as_posix(),
"install",
]
for pkg in packages:
command.append(pkg)
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0:
Logger.print_error(f"{result.stderr}", False)
raise VenvCreationFailedException("Installing Python requirements failed!")
@@ -327,11 +364,12 @@ def get_ipv4_addr() -> str:
try:
# doesn't even have to be reachable
s.connect(("192.255.255.255", 1))
return str(s.getsockname()[0])
except Exception:
return "127.0.0.1"
finally:
ipv4: str = str(s.getsockname()[0])
s.close()
return ipv4
except Exception:
s.close()
return "127.0.0.1"
def download_file(url: str, target: Path, show_progress=True) -> None:
@@ -568,3 +606,33 @@ def get_distro_info() -> Tuple[str, str]:
raise ValueError("Error reading distro version!")
return distro_id.lower(), distro_version
def get_system_timezone() -> str:
timezone = "UTC"
try:
with open("/etc/timezone", "r") as f:
timezone = f.read().strip()
except FileNotFoundError:
# fallback to reading timezone from timedatectl
try:
result = run(
["timedatectl", "show", "--property=Timezone"],
capture_output=True,
text=True,
check=True,
)
timezone = result.stdout.strip().split("=")[1]
except CalledProcessError:
# fallback if timedatectl fails, try reading from readlink
try:
result = run(
["readlink", "-f", "/etc/localtime"],
capture_output=True,
text=True,
check=True,
)
timezone = result.stdout.strip().split("zoneinfo/")[1]
except (CalledProcessError, IndexError):
Logger.print_warn("Could not determine system timezone, using UTC")
return timezone

View File

@@ -2,7 +2,7 @@
requires-python = ">=3.8"
[project.optional-dependencies]
dev=["ruff", "mypy"]
dev=["ruff", "pyright"]
[tool.ruff]
required-version = ">=0.9.10"
@@ -20,14 +20,3 @@ quote-style = "double"
[tool.ruff.lint]
extend-select = ["I"]
[tool.mypy]
python_version = "3.8"
platform = "linux"
# strict = true # TODO: enable this once everything is else is handled
check_untyped_defs = true
ignore_missing_imports = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_return_any = true
warn_unreachable = true

6
pyrightconfig.json Normal file
View File

@@ -0,0 +1,6 @@
{
"pythonVersion": "3.8",
"pythonPlatform": "Linux",
"typeCheckingMode": "standard",
"venvPath": "./.kiauh-env"
}

2
requirements-dev.txt Normal file
View File

@@ -0,0 +1,2 @@
ruff (>=0.9.10)
pyright

View File

@@ -280,7 +280,6 @@ function create_klipper_virtualenv() {
status_msg "Installing $("python${python_version}" -V) virtual environment..."
if virtualenv -p "python${python_version}" "${KLIPPY_ENV}"; then
(( python_version == 3 )) && "${KLIPPY_ENV}"/bin/pip install -U pip
"${KLIPPY_ENV}"/bin/pip install -r "${KLIPPER_DIR}"/scripts/klippy-requirements.txt
else
log_error "failure while creating python3 klippy-env"

View File

@@ -336,7 +336,6 @@ function create_moonraker_virtualenv() {
[[ -d ${MOONRAKER_ENV} ]] && rm -rf "${MOONRAKER_ENV}"
if virtualenv -p /usr/bin/python3 "${MOONRAKER_ENV}"; then
"${MOONRAKER_ENV}"/bin/pip install -U pip
"${MOONRAKER_ENV}"/bin/pip install -r "${MOONRAKER_DIR}/scripts/moonraker-requirements.txt"
else
log_error "failure while creating python3 moonraker-env"