feat: allow configuration of multiple repos in kiauh.cfg (#668)

* remove existing simple_config_parser directory

* Squashed 'kiauh/core/submodules/simple_config_parser/' content from commit da22e6a

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: da22e6ad9ca4bc121c39dc3bc6c63175a72e78a2

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from da22e6a..9ae5749

9ae5749 fix: comment out file writing in test
1ac4e3d refactor: improve section writing

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 9ae574930dfe82107a3712c7c72b3aa777588996

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from 9ae5749..53e8408

53e8408 fix: do not add a blank line before writing a section header
dc77569 test: add test for removing option before writing

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 53e840853f12318dcac68196fb74c1843cb75808

* Squashed 'kiauh/core/submodules/simple_config_parser/' changes from 53e8408..4a6e5f2

4a6e5f2 refactor: full rework of the internal storage of the parsed config

git-subtree-dir: kiauh/core/submodules/simple_config_parser
git-subtree-split: 4a6e5f23cb1f298f0a3efbf042186b16c91763c7

* refactor!: switching repos now offers list of repositories to choose from

this rework aligns more with the feature provided in kiauh v5.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>

---------

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2025-03-29 16:18:20 +01:00
committed by GitHub
parent b99e6612e2
commit 88742ab496
28 changed files with 876 additions and 377 deletions

View File

@@ -2,12 +2,26 @@
backup_before_update: False
[klipper]
repo_url: https://github.com/Klipper3d/klipper
branch: master
# add custom repositories here, if at least one is given, the first in the list will be used by default
# otherwise the official repository is used
#
# format: https://github.com/username/repository, branch
# example: https://github.com/Klipper3d/klipper, master
#
# branch is optional, if given, it must be preceded by a comma, if not given, 'master' is used
repositories:
https://github.com/Klipper3d/klipper
[moonraker]
repo_url: https://github.com/Arksine/moonraker
branch: master
# add custom repositories here, if at least one is given, the first in the list will be used by default
# otherwise the official repository is used
#
# format: https://github.com/username/repository, branch
# example: https://github.com/Arksine/moonraker, master
#
# branch is optional, if given, it must be preceded by a comma, if not given, 'master' is used
repositories:
https://github.com/Arksine/moonraker
[mainsail]
port: 80

View File

@@ -15,6 +15,7 @@ from components.klipper import (
EXIT_KLIPPER_SETUP,
KLIPPER_DIR,
KLIPPER_ENV_DIR,
KLIPPER_REPO_URL,
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
@@ -161,7 +162,7 @@ class KlipperSetupService:
backup_klipper_dir()
InstanceManager.stop_all(self.klipper_list)
git_pull_wrapper(self.settings.klipper.repo_url, KLIPPER_DIR)
git_pull_wrapper("", KLIPPER_DIR)
install_klipper_packages()
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
InstanceManager.start_all(self.klipper_list)
@@ -266,9 +267,10 @@ class KlipperSetupService:
check_user_groups()
def __install_deps(self) -> None:
repo = self.settings.klipper.repo_url
branch = self.settings.klipper.branch
default_repo = (KLIPPER_REPO_URL, "master")
repo = self.settings.klipper.repositories
# pull the first repo defined in kiauh.cfg or fallback to the official Klipper repo
repo, branch = (repo[0].url, repo[0].branch) if repo else default_repo
git_clone_wrapper(repo, KLIPPER_DIR, branch)
try:

View File

@@ -18,6 +18,7 @@ from components.moonraker import (
MOONRAKER_DIR,
MOONRAKER_ENV_DIR,
MOONRAKER_INSTALL_SCRIPT,
MOONRAKER_REPO_URL,
MOONRAKER_REQ_FILE,
MOONRAKER_SPEEDUPS_REQ_FILE,
POLKIT_FILE,
@@ -181,9 +182,10 @@ def check_moonraker_install_requirements(klipper_list: List[Klipper]) -> bool:
def setup_moonraker_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.moonraker.repo_url
branch = settings.moonraker.branch
default_repo = (MOONRAKER_REPO_URL, "master")
repo = settings.moonraker.repositories
# pull the first repo defined in kiauh.cfg or fallback to the official Moonraker repo
repo, branch = (repo[0].url, repo[0].branch) if repo else default_repo
git_clone_wrapper(repo, MOONRAKER_DIR, branch)
# install moonraker dependencies and create python virtualenv
@@ -259,7 +261,7 @@ def update_moonraker() -> None:
instances = get_instances(Moonraker)
InstanceManager.stop_all(instances)
git_pull_wrapper(repo=settings.moonraker.repo_url, target_dir=MOONRAKER_DIR)
git_pull_wrapper("", target_dir=MOONRAKER_DIR)
# install possible new system packages
install_moonraker_packages()

View File

@@ -91,7 +91,8 @@ class Logger:
color = Logger._get_dialog_color(title, custom_color)
dialog_title = Logger._get_dialog_title(title, custom_title)
print("\n" * margin_top)
if margin_top > 0:
print("\n" * margin_top, end="")
print(Color.apply(BORDER_TOP, color))
@@ -111,7 +112,8 @@ class Logger:
print(Color.apply(BORDER_BOTTOM, color))
print("\n" * margin_bottom)
if margin_bottom > 0:
print("\n" * margin_bottom, end="")
@staticmethod
def _get_dialog_title(

View File

@@ -0,0 +1,79 @@
# ======================================================================= #
# Copyright (C) 2020 - 2025 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from typing import List, Literal, Type
from core.logger import Logger
from core.menus import Option
from core.menus.base_menu import BaseMenu
from core.settings.kiauh_settings import KiauhSettings, Repository
from core.types.color import Color
from procedures.switch_repo import run_switch_repo_routine
class RepoSelectMenu(BaseMenu):
def __init__(
self,
name: Literal["klipper", "moonraker"],
repos: List[Repository],
previous_menu: Type[BaseMenu] | None = None,
) -> None:
super().__init__()
self.title_color = Color.CYAN
self.previous_menu = previous_menu
self.settings = KiauhSettings()
self.input_label_txt = "Select repository"
self.name = name
self.repos = repos
if self.name == "klipper":
self.title = "Klipper Repository Selection Menu"
elif self.name == "moonraker":
self.title = "Moonraker Repository Selection Menu"
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.settings_menu import SettingsMenu
self.previous_menu = (
previous_menu if previous_menu is not None else SettingsMenu
)
def set_options(self) -> None:
self.options = {}
if not self.repos:
return
for idx, repo in enumerate(self.repos, start=1):
self.options[str(idx)] = Option(
method=self.select_repository, opt_data=repo
)
def print_menu(self) -> None:
menu = "╟───────────────────────────────────────────────────────╢\n"
menu += "║ Available Repositories: ║\n"
menu += "╟───────────────────────────────────────────────────────╢\n"
for idx, repo in enumerate(self.repos, start=1):
url = f"● Repo: {repo.url.replace('.git', '')}"
branch = f"└► Branch: {repo.branch}"
menu += f"{idx}) {Color.apply(url, Color.CYAN):<59}\n"
menu += f"{Color.apply(branch, Color.CYAN):<59}\n"
menu += "╟───────────────────────────────────────────────────────╢\n"
print(menu, end="")
def select_repository(self, **kwargs) -> None:
repo: Repository = kwargs.get("opt_data")
Logger.print_status(
f"Switching to {self.name.capitalize()}'s new source repository ..."
)
run_switch_repo_routine(self.name, repo.url, repo.branch)

View File

@@ -9,20 +9,17 @@
from __future__ import annotations
import textwrap
from pathlib import Path
from typing import Literal, Tuple, Type
from typing import Type
from components.klipper import KLIPPER_DIR, KLIPPER_REPO_URL
from components.klipper.klipper_utils import get_klipper_status
from components.moonraker import MOONRAKER_DIR, MOONRAKER_REPO_URL
from components.moonraker.utils.utils import get_moonraker_status
from core.logger import DialogType, Logger
from core.menus import Option
from core.menus.base_menu import BaseMenu
from core.settings.kiauh_settings import KiauhSettings, RepoSettings
from core.menus.repo_select_menu import RepoSelectMenu
from core.settings.kiauh_settings import KiauhSettings
from core.types.color import Color
from procedures.switch_repo import run_switch_repo_routine
from utils.input_utils import get_confirm, get_string_input
from core.types.component_status import ComponentStatus
# noinspection PyUnusedLocal
@@ -37,8 +34,14 @@ class SettingsMenu(BaseMenu):
self.mainsail_unstable: bool | None = None
self.fluidd_unstable: bool | None = None
self.auto_backups_enabled: bool | None = None
na: str = "Not available!"
self.kl_repo_url: str = Color.apply(na, Color.RED)
self.kl_branch: str = Color.apply(na, Color.RED)
self.mr_repo_url: str = Color.apply(na, Color.RED)
self.mr_branch: str = Color.apply(na, Color.RED)
self._load_settings()
print(self.klipper_status)
def set_previous_menu(self, previous_menu: Type[BaseMenu] | None) -> None:
from core.menus.main_menu import MainMenu
@@ -47,54 +50,39 @@ class SettingsMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"1": Option(method=self.set_klipper_repo),
"2": Option(method=self.set_moonraker_repo),
"1": Option(method=self.switch_klipper_repo),
"2": Option(method=self.switch_moonraker_repo),
"3": Option(method=self.toggle_mainsail_release),
"4": Option(method=self.toggle_fluidd_release),
"5": Option(method=self.toggle_backup_before_update),
}
def print_menu(self) -> None:
color = Color.CYAN
checked = f"[{Color.apply('x', Color.GREEN)}]"
unchecked = "[ ]"
kl_repo: str = Color.apply(self.klipper_status.repo, color)
kl_branch: str = Color.apply(self.klipper_status.branch, color)
kl_owner: str = Color.apply(self.klipper_status.owner, color)
mr_repo: str = Color.apply(self.moonraker_status.repo, color)
mr_branch: str = Color.apply(self.moonraker_status.branch, color)
mr_owner: str = Color.apply(self.moonraker_status.owner, color)
o1 = checked if self.mainsail_unstable else unchecked
o2 = checked if self.fluidd_unstable else unchecked
o3 = checked if self.auto_backups_enabled else unchecked
menu = textwrap.dedent(
f"""
╟───────────────────────────────────────────────────────╢
Klipper:
● Repo: {kl_repo:51}
● Owner: {kl_owner:51}
Branch: {kl_branch:51}
1) Switch Klipper source repository
● Current repository:
└► Repo: {self.kl_repo_url:50}
└► Branch: {self.kl_branch:48}
╟───────────────────────────────────────────────────────╢
Moonraker:
● Repo: {mr_repo:51}
● Owner: {mr_owner:51}
Branch: {mr_branch:51}
2) Switch Moonraker source repository
● Current repository:
└► Repo: {self.mr_repo_url:50}
└► Branch: {self.mr_branch:48}
╟───────────────────────────────────────────────────────╢
║ Install unstable releases: ║
{o1} Mainsail
{o2} Fluidd
3) {o1} Mainsail ║
4) {o2} Fluidd ║
╟───────────────────────────────────────────────────────╢
║ Auto-Backup: ║
{o3} Automatic backup before update ║
╟───────────────────────────────────────────────────────╢
║ 1) Set Klipper source repository ║
║ 2) Set Moonraker source repository ║
║ ║
║ 3) Toggle unstable Mainsail releases ║
║ 4) Toggle unstable Fluidd releases ║
║ ║
║ 5) Toggle automatic backups before updates ║
5) {o3} Backup before update
╟───────────────────────────────────────────────────────╢
"""
)[1:]
@@ -106,103 +94,43 @@ class SettingsMenu(BaseMenu):
self.mainsail_unstable = self.settings.mainsail.unstable_releases
self.fluidd_unstable = self.settings.fluidd.unstable_releases
# by default, we show the status of the installed repositories
self.klipper_status = get_klipper_status()
self.moonraker_status = get_moonraker_status()
# if the repository is not installed, we show the status of the settings from the config file
if self.klipper_status.repo == "-":
url_parts = self.settings.klipper.repo_url.split("/")
self.klipper_status.repo = url_parts[-1]
self.klipper_status.owner = url_parts[-2]
self.klipper_status.branch = self.settings.klipper.branch
if self.moonraker_status.repo == "-":
url_parts = self.settings.moonraker.repo_url.split("/")
self.moonraker_status.repo = url_parts[-1]
self.moonraker_status.owner = url_parts[-2]
self.moonraker_status.branch = self.settings.moonraker.branch
klipper_status: ComponentStatus = get_klipper_status()
moonraker_status: ComponentStatus = get_moonraker_status()
def _gather_input(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> Tuple[str, str]:
warn_msg = [
"There is only basic input validation in place! "
"Make sure your the input is valid and has no typos or invalid characters!"
]
def trim_repo_url(repo: str) -> str:
return repo.replace(".git", "").replace("https://", "").replace("git@", "")
if repo_dir.exists():
warn_msg.extend(
[
"For the change to take effect, the new repository will be cloned. "
"A backup of the old repository will be created.",
"\n\n",
"Make sure you don't have any ongoing prints running, as the services "
"will be restarted during this process! You will loose any ongoing print!",
]
)
if not klipper_status.repo == "-":
url = trim_repo_url(klipper_status.repo_url)
self.kl_repo_url = Color.apply(url, Color.CYAN)
self.kl_branch = Color.apply(klipper_status.branch, Color.CYAN)
if not moonraker_status.repo == "-":
url = trim_repo_url(moonraker_status.repo_url)
self.mr_repo_url = Color.apply(url, Color.CYAN)
self.mr_branch = Color.apply(moonraker_status.branch, Color.CYAN)
Logger.print_dialog(DialogType.ATTENTION, warn_msg)
repo = get_string_input(
"Enter new repository URL",
regex=r"^[\w/.:-]+$",
default=KLIPPER_REPO_URL if repo_name == "klipper" else MOONRAKER_REPO_URL,
)
branch = get_string_input(
"Enter new branch name", regex=r"^.+$", default="master"
)
return repo, branch
def _set_repo(
self, repo_name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
repo_url, branch = self._gather_input(repo_name, repo_dir)
display_name = repo_name.capitalize()
def _warn_no_repos(self, name: str) -> None:
Logger.print_dialog(
DialogType.CUSTOM,
[
f"New {display_name} repository URL:",
f"{repo_url}",
f"New {display_name} repository branch:",
f"{branch}",
],
DialogType.WARNING,
[f"No {name} repositories configured in kiauh.cfg!"],
center_content=True,
)
if get_confirm("Apply changes?", allow_go_back=True):
repo: RepoSettings = self.settings[repo_name]
repo.repo_url = repo_url
repo.branch = branch
self.settings.save()
self._load_settings()
Logger.print_ok("Changes saved!")
else:
Logger.print_info(
f"Changing of {display_name} source repository canceled ..."
)
def switch_klipper_repo(self, **kwargs) -> None:
name = "Klipper"
repos = self.settings.klipper.repositories
if not repos:
self._warn_no_repos(name)
return
RepoSelectMenu(name.lower(), repos=repos, previous_menu=self.__class__).run()
self._switch_repo(repo_name, repo_dir)
def _switch_repo(
self, name: Literal["klipper", "moonraker"], repo_dir: Path
) -> None:
if not repo_dir.exists():
def switch_moonraker_repo(self, **kwargs) -> None:
name = "Moonraker"
repos = self.settings.moonraker.repositories
if not repos:
self._warn_no_repos(name)
return
Logger.print_status(
f"Switching to {name.capitalize()}'s new source repository ..."
)
repo: RepoSettings = self.settings[name]
run_switch_repo_routine(name, repo)
def set_klipper_repo(self, **kwargs) -> None:
self._set_repo("klipper", KLIPPER_DIR)
def set_moonraker_repo(self, **kwargs) -> None:
self._set_repo("moonraker", MOONRAKER_DIR)
RepoSelectMenu(name.lower(), repos=repos, previous_menu=self.__class__).run()
def toggle_mainsail_release(self, **kwargs) -> None:
self.mainsail_unstable = not self.mainsail_unstable

View File

@@ -9,14 +9,16 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from typing import Any, List
from core.backup_manager.backup_manager import BackupManager
from core.logger import DialogType, Logger
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
NoOptionError,
NoSectionError,
SimpleConfigParser,
)
from utils.input_utils import get_confirm
from utils.sys_utils import kill
from kiauh import PROJECT_ROOT
@@ -25,32 +27,53 @@ DEFAULT_CFG = PROJECT_ROOT.joinpath("default.kiauh.cfg")
CUSTOM_CFG = PROJECT_ROOT.joinpath("kiauh.cfg")
class NoValueError(Exception):
"""Raised when a required value is not defined for an option"""
def __init__(self, section: str, option: str):
msg = f"Missing value for option '{option}' in section '{section}'"
super().__init__(msg)
class InvalidValueError(Exception):
"""Raised when a value is invalid for an option"""
def __init__(self, section: str, option: str, value: str):
msg = f"Invalid value '{value}' for option '{option}' in section '{section}'"
super().__init__(msg)
@dataclass
class AppSettings:
backup_before_update: bool | None = field(default=None)
@dataclass
class Repository:
url: str
branch: str
@dataclass
class RepoSettings:
repo_url: str | None = field(default=None)
branch: str | None = field(default=None)
repositories: List[Repository] | None = field(default=None)
@dataclass
class WebUiSettings:
port: str | None = field(default=None)
port: int | None = field(default=None)
unstable_releases: bool | None = field(default=None)
# noinspection PyUnusedLocal
# noinspection PyMethodMayBeStatic
class KiauhSettings:
_instance = None
__instance = None
def __new__(cls, *args, **kwargs) -> "KiauhSettings":
if cls._instance is None:
cls._instance = super(KiauhSettings, cls).__new__(cls, *args, **kwargs)
return cls._instance
if cls.__instance is None:
cls.__instance = super(KiauhSettings, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def __repr__(self) -> str:
return (
@@ -100,20 +123,30 @@ class KiauhSettings:
def _load_config(self) -> None:
if not CUSTOM_CFG.exists() and not DEFAULT_CFG.exists():
self._kill()
self.__kill()
cfg = CUSTOM_CFG if CUSTOM_CFG.exists() else DEFAULT_CFG
self.config.read_file(cfg)
self._validate_cfg()
self._apply_settings_from_file()
needs_migration = self._check_deprecated_repo_config()
if needs_migration:
self._prompt_migration_dialog()
return
else:
# Only validate if no migration was needed
self._validate_cfg()
self.__set_internal_state()
def _validate_cfg(self) -> None:
def __err_and_kill(error: str) -> None:
Logger.print_error(f"Error validating kiauh.cfg: {error}")
kill()
try:
self._validate_bool("kiauh", "backup_before_update")
self._validate_str("klipper", "repo_url")
self._validate_str("klipper", "branch")
self._validate_repositories("klipper", "repositories")
self._validate_repositories("moonraker", "repositories")
self._validate_int("mainsail", "port")
self._validate_bool("mainsail", "unstable_releases")
@@ -123,16 +156,16 @@ class KiauhSettings:
except ValueError:
err = f"Invalid value for option '{self._v_option}' in section '{self._v_section}'"
Logger.print_error(err)
kill()
__err_and_kill(err)
except NoSectionError:
err = f"Missing section '{self._v_section}' in config file"
Logger.print_error(err)
kill()
__err_and_kill(err)
except NoOptionError:
err = f"Missing option '{self._v_option}' in section '{self._v_section}'"
Logger.print_error(err)
kill()
__err_and_kill(err)
except NoValueError:
err = f"Missing value for option '{self._v_option}' in section '{self._v_section}'"
__err_and_kill(err)
def _validate_bool(self, section: str, option: str) -> None:
self._v_section, self._v_option = (section, option)
@@ -149,14 +182,38 @@ class KiauhSettings:
if not v:
raise ValueError
def _apply_settings_from_file(self) -> None:
def _validate_repositories(self, section: str, option: str) -> None:
self._v_section, self._v_option = (section, option)
repos = self.config.getval(section, option)
if not repos:
raise NoValueError(section, option)
for repo in repos:
if repo.strip().startswith("#") or repo.strip().startswith(";"):
continue
try:
if "," in repo:
url, branch = repo.strip().split(",")
if not url:
raise InvalidValueError(section, option, repo)
else:
url = repo.strip()
if not url:
raise InvalidValueError(section, option, repo)
except ValueError:
raise InvalidValueError(section, option, repo)
def __set_internal_state(self) -> None:
self.kiauh.backup_before_update = self.config.getboolean(
"kiauh", "backup_before_update"
)
self.klipper.repo_url = self.config.getval("klipper", "repo_url")
self.klipper.branch = self.config.getval("klipper", "branch")
self.moonraker.repo_url = self.config.getval("moonraker", "repo_url")
self.moonraker.branch = self.config.getval("moonraker", "branch")
kl_repos = self.config.getval("klipper", "repositories")
self.klipper.repositories = self.__set_repo_state(kl_repos)
mr_repos = self.config.getval("moonraker", "repositories")
self.moonraker.repositories = self.__set_repo_state(mr_repos)
self.mainsail.port = self.config.getint("mainsail", "port")
self.mainsail.unstable_releases = self.config.getboolean(
"mainsail", "unstable_releases"
@@ -166,28 +223,147 @@ class KiauhSettings:
"fluidd", "unstable_releases"
)
def _set_config_options_state(self) -> None:
self.config.set_option(
"kiauh",
"backup_before_update",
str(self.kiauh.backup_before_update),
)
self.config.set_option("klipper", "repo_url", self.klipper.repo_url)
self.config.set_option("klipper", "branch", self.klipper.branch)
self.config.set_option("moonraker", "repo_url", self.moonraker.repo_url)
self.config.set_option("moonraker", "branch", self.moonraker.branch)
self.config.set_option("mainsail", "port", str(self.mainsail.port))
self.config.set_option(
"mainsail",
"unstable_releases",
str(self.mainsail.unstable_releases),
)
self.config.set_option("fluidd", "port", str(self.fluidd.port))
self.config.set_option(
"fluidd", "unstable_releases", str(self.fluidd.unstable_releases)
)
def __set_repo_state(self, repos: List[str]) -> List[Repository]:
_repos: List[Repository] = []
for repo in repos:
if repo.strip().startswith("#") or repo.strip().startswith(";"):
continue
if "," in repo:
url, branch = repo.strip().split(",")
if not branch:
branch = "master"
else:
url = repo.strip()
branch = "master"
_repos.append(Repository(url.strip(), branch.strip()))
return _repos
def _kill(self) -> None:
def _set_config_options_state(self) -> None:
"""Updates the config with current settings, preserving values that haven't been modified"""
if self.kiauh.backup_before_update is not None:
self.config.set_option(
"kiauh",
"backup_before_update",
str(self.kiauh.backup_before_update),
)
# Handle repositories
if self.klipper.repositories is not None:
repos = [f"{repo.url}, {repo.branch}" for repo in self.klipper.repositories]
self.config.set_option("klipper", "repositories", repos)
if self.moonraker.repositories is not None:
repos = [
f"{repo.url}, {repo.branch}" for repo in self.moonraker.repositories
]
self.config.set_option("moonraker", "repositories", repos)
# Handle Mainsail settings
if self.mainsail.port is not None:
self.config.set_option("mainsail", "port", str(self.mainsail.port))
if self.mainsail.unstable_releases is not None:
self.config.set_option(
"mainsail",
"unstable_releases",
str(self.mainsail.unstable_releases),
)
# Handle Fluidd settings
if self.fluidd.port is not None:
self.config.set_option("fluidd", "port", str(self.fluidd.port))
if self.fluidd.unstable_releases is not None:
self.config.set_option(
"fluidd", "unstable_releases", str(self.fluidd.unstable_releases)
)
def _check_deprecated_repo_config(self) -> bool:
# repo_url and branch are deprecated - 2025.03.23
for section in ["klipper", "moonraker"]:
if self.config.has_option(section, "repo_url") or self.config.has_option(
section, "branch"
):
return True
return False
def _prompt_migration_dialog(self) -> None:
migration_1: List[str] = [
"The old 'repo_url' and 'branch' options are now combined under 'repositories'.",
"\n\n",
"Example format:",
"[klipper]",
"repositories:",
" https://github.com/Klipper3d/klipper, master",
"\n\n",
"[moonraker]",
"repositories:",
" https://github.com/Arksine/moonraker, master",
]
Logger.print_dialog(
DialogType.ATTENTION,
[
"Deprecated repository configuration found!",
"KAIUH can now attempt to automatically migrate your configuration.",
"\n\n",
*migration_1,
],
)
if get_confirm("Migrate to the new format?"):
self._migrate_repo_config()
else:
Logger.print_dialog(
DialogType.ERROR,
[
"Please update your configuration file manually.",
],
center_content=True,
)
kill()
def _migrate_repo_config(self) -> None:
bm = BackupManager()
if not bm.backup_file(CUSTOM_CFG):
Logger.print_dialog(
DialogType.ERROR,
[
"Failed to create backup of kiauh.cfg. Aborting migration. Please migrate manually."
],
)
kill()
# run migrations
try:
# migrate deprecated repo_url and branch options - 2025.03.23
for section in ["klipper", "moonraker"]:
if not self.config.has_section(section):
continue
repo_url = self.config.getval(section, "repo_url", fallback="")
branch = self.config.getval(section, "branch", fallback="master")
if repo_url:
# create repositories option with the old values
repositories = [f"{repo_url}, {branch}\n"]
self.config.set_option(section, "repositories", repositories)
# remove deprecated options
self.config.remove_option(section, "repo_url")
self.config.remove_option(section, "branch")
Logger.print_ok(f"Successfully migrated {section} configuration")
self.config.write_file(CUSTOM_CFG)
self.config.read_file(CUSTOM_CFG) # reload config
# Validate the migrated config
self._validate_cfg()
self.__set_internal_state()
except Exception as e:
Logger.print_error(f"Error migrating configuration: {e}")
Logger.print_error("Please migrate manually.")
kill()
def __kill(self) -> None:
Logger.print_dialog(
DialogType.ERROR,
[

View File

@@ -3,4 +3,48 @@
A custom config parser inspired by Python's configparser module.
Specialized for handling Klipper style config files.
---
### When parsing a config file, it will be split into the following elements:
- Header: All lines before the first section
- Section: A section is defined by a line starting with a `[` and ending with a `]`
- Option: A line starting with a word, followed by a `:` or `=` and a value
- Option Block: A line starting with a word, followed by a `:` or `=` and a newline
- Comment: A line starting with a `#` or `;`
- Blank: A line containing only whitespace characters
---
### Internally, the config is stored as a dictionary of sections, each containing a header and a list of elements:
```python
config = {
"section_name": {
"header": "[section_name]\n",
"elements": [
{
"type": "comment",
"content": "# This is a comment\n"
},
{
"type": "option",
"name": "option1",
"value": "value1",
"raw": "option1: value1\n"
},
{
"type": "blank",
"content": "\n"
},
{
"type": "option_block",
"name": "option2",
"value": [
"value2",
"value3"
],
"raw": "option2:"
}
]
}
}
```

View File

@@ -36,7 +36,7 @@ extend-select = ["I"]
[tool.pytest.ini_options]
minversion = "8.2.1"
testpaths = ["tests/**/*.py"]
addopts = "--cov --cov-config=pyproject.toml --cov-report=html"
addopts = "-svvv --cov --cov-config=pyproject.toml --cov-report=html"
[tool.coverage.run]
branch = true

View File

@@ -6,6 +6,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import re
from enum import Enum
# definition of section line:
# - then line MUST start with an opening square bracket - it is the first section marker
@@ -60,3 +61,11 @@ BOOLEAN_STATES = {
}
HEADER_IDENT = "#_header"
INDENT = " " * 4
class LineType(Enum):
OPTION = "option"
OPTION_BLOCK = "option_block"
COMMENT = "comment"
BLANK = "blank"

View File

@@ -8,8 +8,6 @@
from __future__ import annotations
import secrets
import string
from pathlib import Path
from typing import Callable, Dict, List
@@ -20,7 +18,7 @@ from ..simple_config_parser.constants import (
LINE_COMMENT_RE,
OPTION_RE,
OPTIONS_BLOCK_START_RE,
SECTION_RE,
SECTION_RE, LineType, INDENT,
)
_UNSET = object()
@@ -49,6 +47,13 @@ class NoOptionError(Exception):
msg = f"Option '{option}' in section '{section}' is not defined"
super().__init__(msg)
class UnknownLineError(Exception):
"""Raised when a line is not recognized as any known type"""
def __init__(self, line: str):
msg = f"Unknown line: '{line}'"
super().__init__(msg)
# noinspection PyMethodMayBeStatic
class SimpleConfigParser:
@@ -59,7 +64,6 @@ class SimpleConfigParser:
self.config: Dict = {}
self.current_section: str | None = None
self.current_opt_block: str | None = None
self.current_collector: str | None = None
self.in_option_block: bool = False
def _match_section(self, line: str) -> bool:
@@ -85,28 +89,40 @@ class SimpleConfigParser:
def _parse_line(self, line: str) -> None:
"""Parses a line and determines its type"""
if self._match_section(line):
self.current_collector = None
self.current_opt_block = None
self.current_section = SECTION_RE.match(line).group(1)
self.config[self.current_section] = {"_raw": line}
self.config[self.current_section] = {
"header": line,
"elements": []
}
elif self._match_option(line):
self.current_collector = None
self.current_opt_block = None
option = OPTION_RE.match(line).group(1)
value = OPTION_RE.match(line).group(2)
self.config[self.current_section][option] = {"_raw": line, "value": value}
self.config[self.current_section]["elements"].append({
"type": LineType.OPTION.value,
"name": option,
"value": value,
"raw": line
})
elif self._match_options_block_start(line):
self.current_collector = None
option = OPTIONS_BLOCK_START_RE.match(line).group(1)
self.current_opt_block = option
self.config[self.current_section][option] = {"_raw": line, "value": []}
self.config[self.current_section]["elements"].append({
"type": LineType.OPTION_BLOCK.value,
"name": option,
"value": [],
"raw": line
})
elif self.current_opt_block is not None:
self.config[self.current_section][self.current_opt_block]["value"].append(
line
)
# we are in an option block, so we add the line to the option's value
for element in reversed(self.config[self.current_section]["elements"]):
if element["type"] == LineType.OPTION_BLOCK.value and element["name"] == self.current_opt_block:
element["value"].append(line.strip()) # indentation is removed
break
elif self._match_empty_line(line) or self._match_line_comment(line):
self.current_opt_block = None
@@ -116,15 +132,11 @@ class SimpleConfigParser:
if not self.current_section:
self.config.setdefault(HEADER_IDENT, []).append(line)
else:
section = self.config[self.current_section]
# set the current collector to a new value, so that continuous
# empty lines or comments are collected into the same collector
if not self.current_collector:
self.current_collector = self._generate_rand_id()
section[self.current_collector] = []
section[self.current_collector].append(line)
element_type = LineType.BLANK.value if self._match_empty_line(line) else LineType.COMMENT.value
self.config[self.current_section]["elements"].append({
"type": element_type,
"content": line
})
def read_file(self, file: Path) -> None:
"""Read and parse a config file"""
@@ -132,41 +144,46 @@ class SimpleConfigParser:
for line in file:
self._parse_line(line)
# print(json.dumps(self.config, indent=4))
def write_file(self, path: str | Path) -> None:
"""Write the config to a file"""
if path is None:
raise ValueError("File path cannot be None")
def write_file(self, file: Path) -> None:
"""Write the current config to the config file"""
if not file:
raise ValueError("No config file specified")
with open(path, "w", encoding="utf-8") as f:
if HEADER_IDENT in self.config:
for line in self.config[HEADER_IDENT]:
f.write(line)
with open(file, "w") as file:
self._write_header(file)
self._write_sections(file)
sections = self.get_sections()
for i, section in enumerate(sections):
f.write(self.config[section]["header"])
def _write_header(self, file) -> None:
"""Write the header to the config file"""
for line in self.config.get(HEADER_IDENT, []):
file.write(line)
for element in self.config[section]["elements"]:
if element["type"] == LineType.OPTION.value:
f.write(element["raw"])
elif element["type"] == LineType.OPTION_BLOCK.value:
f.write(element["raw"])
for line in element["value"]:
f.write(INDENT + line.strip() + "\n")
elif element["type"] in [LineType.COMMENT.value, LineType.BLANK.value]:
f.write(element["content"])
else:
raise UnknownLineError(element["raw"])
def _write_sections(self, file) -> None:
"""Write the sections to the config file"""
for section in self.get_sections():
for key, value in self.config[section].items():
self._write_section_content(file, key, value)
# Ensure file ends with a single newline
if sections: # Only if we have any sections
last_section = sections[-1]
last_elements = self.config[last_section]["elements"]
def _write_section_content(self, file, key, value) -> None:
"""Write the content of a section to the config file"""
if key == "_raw":
file.write(value)
elif key.startswith("#_"):
for line in value:
file.write(line)
elif isinstance(value["value"], list):
file.write(value["_raw"])
for line in value["value"]:
file.write(line)
else:
file.write(value["_raw"])
if last_elements:
last_element = last_elements[-1]
if "raw" in last_element:
last_line = last_element["raw"]
else: # comment or blank line
last_line = last_element["content"]
if not last_line.endswith("\n"):
f.write("\n")
def get_sections(self) -> List[str]:
"""Return a list of all section names, but exclude any section starting with '#_'"""
@@ -189,29 +206,40 @@ class SimpleConfigParser:
if len(self.get_sections()) >= 1:
self._check_set_section_spacing()
self.config[section] = {"_raw": f"[{section}]\n"}
self.config[section] = {
"header": f"[{section}]\n",
"elements": []
}
def _check_set_section_spacing(self):
"""Check if there is a blank line between the last section and the new section"""
prev_section_name: str = self.get_sections()[-1]
prev_section_content: Dict = self.config[prev_section_name]
last_option_name: str = list(prev_section_content.keys())[-1]
prev_section = self.config[prev_section_name]
prev_elements = prev_section["elements"]
if last_option_name.startswith("#_"):
last_elem_value: str = prev_section_content[last_option_name][-1]
if prev_elements:
last_element = prev_elements[-1]
# if the last section is a collector, we first check if the last element
# in the collector ends with a newline. if it does not, we append a newline.
# this can happen if the config file does not end with a newline.
if not last_elem_value.endswith("\n"):
prev_section_content[last_option_name][-1] = f"{last_elem_value}\n"
# If the last element is a comment or blank line
if last_element["type"] in [LineType.COMMENT.value, LineType.BLANK.value]:
last_content = last_element["content"]
# if the last item in a collector is not a newline, we append a newline, so
# that the new section is seperated from the options of the previous section
# by a newline
if last_elem_value != "\n":
prev_section_content[last_option_name].append("\n")
else:
prev_section_content[self._generate_rand_id()] = ["\n"]
# If the last element doesn't end with a newline, add one
if not last_content.endswith("\n"):
last_element["content"] += "\n"
# If the last element is not a blank line, add a blank line
if last_content.strip() != "":
prev_elements.append({
"type": "blank",
"content": "\n"
})
else:
# If the last element is an option, add a blank line
prev_elements.append({
"type": LineType.BLANK.value,
"content": "\n"
})
def remove_section(self, section: str) -> None:
"""Remove a section from the config"""
@@ -219,12 +247,12 @@ class SimpleConfigParser:
def get_options(self, section: str) -> List[str]:
"""Return a list of all option names for a given section"""
return list(
filter(
lambda option: option != "_raw" and not option.startswith("#_"),
self.config[section].keys(),
)
)
options = []
if self.has_section(section):
for element in self.config[section]["elements"]:
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value]:
options.append(element["name"])
return options
def has_option(self, section: str, option: str) -> bool:
"""Check if an option exists in a section"""
@@ -238,22 +266,53 @@ class SimpleConfigParser:
if not self.has_section(section):
self.add_section(section)
if not self.has_option(section, option):
self.config[section][option] = {
"_raw": f"{option}:\n"
if isinstance(value, list)
else f"{option}: {value}\n",
# Check if option already exists
for element in self.config[section]["elements"]:
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value] and element["name"] == option:
# Update existing option
if isinstance(value, list):
element["type"] = LineType.OPTION_BLOCK.value
element["value"] = value
element["raw"] = f"{option}:\n"
else:
element["type"] = LineType.OPTION.value
element["value"] = value
element["raw"] = f"{option}: {value}\n"
return
# Option doesn't exist, create new one
if isinstance(value, list):
new_element = {
"type": LineType.OPTION_BLOCK.value,
"name": option,
"value": value,
"raw": f"{option}:\n"
}
else:
opt = self.config[section][option]
if not isinstance(value, list):
opt["_raw"] = opt["_raw"].replace(opt["value"], value)
opt["value"] = value
new_element = {
"type": LineType.OPTION.value,
"name": option,
"value": value,
"raw": f"{option}: {value}\n"
}
# scan through elements to find the last option, after which we insert the new option
insert_pos = 0
elements = self.config[section]["elements"]
for i, element in enumerate(elements):
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value]:
insert_pos = i + 1
elements.insert(insert_pos, new_element)
def remove_option(self, section: str, option: str) -> None:
"""Remove an option from a section"""
self.config[section].pop(option, None)
if self.has_section(section):
elements = self.config[section]["elements"]
for i, element in enumerate(elements):
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value] and element["name"] == option:
elements.pop(i)
break
def getval(
self, section: str, option: str, fallback: str | _UNSET = _UNSET
@@ -269,7 +328,23 @@ class SimpleConfigParser:
raise NoSectionError(section)
if option not in self.get_options(section):
raise NoOptionError(option, section)
return self.config[section][option]["value"]
# Find the option in the elements list
for element in self.config[section]["elements"]:
if element["type"] in [LineType.OPTION.value, LineType.OPTION_BLOCK.value] and element["name"] == option:
raw_value = element["value"]
if isinstance(raw_value, str) and raw_value.endswith("\n"):
return raw_value[:-1].strip()
elif isinstance(raw_value, list):
values: List[str] = []
for i, val in enumerate(raw_value):
val = val.strip().strip("\n")
if len(val) < 1:
continue
values.append(val.strip())
return values
return str(raw_value)
raise NoOptionError(option, section)
except (NoSectionError, NoOptionError):
if fallback is _UNSET:
raise
@@ -317,9 +392,3 @@ class SimpleConfigParser:
raise ValueError(
f"Cannot convert {self.getval(section, option)} to {conv.__name__}"
) from e
def _generate_rand_id(self) -> str:
"""Generate a random id with 6 characters"""
chars = string.ascii_letters + string.digits
rand_string = "".join(secrets.choice(chars) for _ in range(12))
return f"#_{rand_string}"

View File

@@ -25,8 +25,8 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1

View File

@@ -25,9 +25,9 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1
# config ending with a comment

View File

@@ -25,22 +25,22 @@ option_4: value_4
#option_5: value_5
option_5 = this.is.value-5
multi_option:
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
# these are multi-line values
value_5_1
value_5_2 ; here is a comment
value_5_3
option_5_1: value_5_1
[gcode_macro M117]
rename_existing: M117.1
gcode:
{% if rawparams %}
{% set escaped_msg = rawparams.split(';', 1)[0].split('\x23', 1)[0]|replace('"', '\\"') %}
SET_DISPLAY_TEXT MSG="{escaped_msg}"
RESPOND TYPE=command MSG="{escaped_msg}"
{% else %}
SET_DISPLAY_TEXT
{% endif %}
{% if rawparams %}
{% set escaped_msg = rawparams.split(';', 1)[0].split('\x23', 1)[0]|replace('"', '\\"') %}
SET_DISPLAY_TEXT MSG="{escaped_msg}"
RESPOND TYPE=command MSG="{escaped_msg}"
{% else %}
SET_DISPLAY_TEXT
{% endif %}
# SDCard 'looping' (aka Marlin M808 commands) support
#
@@ -48,47 +48,47 @@ gcode:
[sdcard_loop]
[gcode_macro M486]
gcode:
# Parameters known to M486 are as follows:
# [C<flag>] Cancel the current object
# [P<index>] Cancel the object with the given index
# [S<index>] Set the index of the current object.
# If the object with the given index has been canceled, this will cause
# the firmware to skip to the next object. The value -1 is used to
# indicate something that isnt an object and shouldnt be skipped.
# [T<count>] Reset the state and set the number of objects
# [U<index>] Un-cancel the object with the given index. This command will be
# ignored if the object has already been skipped
# Parameters known to M486 are as follows:
# [C<flag>] Cancel the current object
# [P<index>] Cancel the object with the given index
# [S<index>] Set the index of the current object.
# If the object with the given index has been canceled, this will cause
# the firmware to skip to the next object. The value -1 is used to
# indicate something that isnt an object and shouldnt be skipped.
# [T<count>] Reset the state and set the number of objects
# [U<index>] Un-cancel the object with the given index. This command will be
# ignored if the object has already been skipped
{% if 'exclude_object' not in printer %}
{action_raise_error("[exclude_object] is not enabled")}
{% endif %}
{% if 'T' in params %}
EXCLUDE_OBJECT RESET=1
{% for i in range(params.T | int) %}
EXCLUDE_OBJECT_DEFINE NAME={i}
{% endfor %}
{% endif %}
{% if 'C' in params %}
EXCLUDE_OBJECT CURRENT=1
{% endif %}
{% if 'P' in params %}
EXCLUDE_OBJECT NAME={params.P}
{% endif %}
{% if 'S' in params %}
{% if params.S == '-1' %}
{% if printer.exclude_object.current_object %}
EXCLUDE_OBJECT_END NAME={printer.exclude_object.current_object}
{% endif %}
{% else %}
EXCLUDE_OBJECT_START NAME={params.S}
{% if 'exclude_object' not in printer %}
{action_raise_error("[exclude_object] is not enabled")}
{% endif %}
{% endif %}
{% if 'U' in params %}
EXCLUDE_OBJECT RESET=1 NAME={params.U}
{% endif %}
{% if 'T' in params %}
EXCLUDE_OBJECT RESET=1
{% for i in range(params.T | int) %}
EXCLUDE_OBJECT_DEFINE NAME={i}
{% endfor %}
{% endif %}
{% if 'C' in params %}
EXCLUDE_OBJECT CURRENT=1
{% endif %}
{% if 'P' in params %}
EXCLUDE_OBJECT NAME={params.P}
{% endif %}
{% if 'S' in params %}
{% if params.S == '-1' %}
{% if printer.exclude_object.current_object %}
EXCLUDE_OBJECT_END NAME={printer.exclude_object.current_object}
{% endif %}
{% else %}
EXCLUDE_OBJECT_START NAME={params.S}
{% endif %}
{% endif %}
{% if 'U' in params %}
EXCLUDE_OBJECT RESET=1 NAME={params.U}
{% endif %}

View File

@@ -0,0 +1,8 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
new_option: new_value
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
# comment
option_1: value_1
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,8 @@
[section_1]
# comment
option_1: value_1
option_to_remove: value_to_remove
option_2: value_2 ; comment
[section_2]
option_3: value_3

View File

@@ -0,0 +1,7 @@
[section_1]
option_1: value_1
option_2: value_2
# comment
[section_2]
option_5: value_5

View File

@@ -0,0 +1,11 @@
[section_1]
option_1: value_1
option_2: value_2
# comment
[section_to_remove]
option_3: value_3
option_4: value_4
[section_2]
option_5: value_5

View File

@@ -5,11 +5,12 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import json
from pathlib import Path
import pytest
from src.simple_config_parser.constants import HEADER_IDENT
from src.simple_config_parser.constants import HEADER_IDENT, LineType
from src.simple_config_parser.simple_config_parser import SimpleConfigParser
from tests.utils import load_testdata_from_file
@@ -33,16 +34,17 @@ def test_section_parsing(parser):
), f"Expected keys: {expected_keys}, got: {parser.config.keys()}"
assert parser.in_option_block is False
assert parser.current_section == parser.get_sections()[-1]
assert parser.config["section_2"]["_raw"] == "[section_2] ; comment"
assert parser.config["section_2"] is not None
assert parser.config["section_2"]["header"] == "[section_2] ; comment"
assert parser.config["section_2"]["elements"] is not None
assert len(parser.config["section_2"]["elements"]) > 0
def test_option_parsing(parser):
assert parser.config["section_1"]["option_1"]["value"] == "value_1"
assert parser.config["section_1"]["option_1"]["_raw"] == "option_1: value_1"
assert parser.config["section_3"]["option_3"]["value"] == "value_3"
assert (
parser.config["section_3"]["option_3"]["_raw"] == "option_3: value_3 # comment"
)
assert parser.config["section_1"]["elements"][0]["type"] == LineType.OPTION.value
assert parser.config["section_1"]["elements"][0]["name"] == "option_1"
assert parser.config["section_1"]["elements"][0]["value"] == "value_1"
assert parser.config["section_1"]["elements"][0]["raw"] == "option_1: value_1"
def test_header_parsing(parser):
@@ -51,12 +53,27 @@ def test_header_parsing(parser):
assert len(header) > 0
def test_collector_parsing(parser):
section = "section_2"
section_content = list(parser.config[section].keys())
coll_name = [name for name in section_content if name.startswith("#_")][0]
collector = parser.config[section][coll_name]
assert collector is not None
assert isinstance(collector, list)
assert len(collector) > 0
assert "; comment" in collector
def test_option_block_parsing(parser):
section = "section number 5"
option_block = None
for element in parser.config[section]["elements"]:
if (element["type"] == LineType.OPTION_BLOCK.value and
element["name"] == "multi_option"):
option_block = element
break
assert option_block is not None, "multi_option block not found"
assert option_block["type"] == LineType.OPTION_BLOCK.value
assert option_block["name"] == "multi_option"
assert option_block["raw"] == "multi_option:"
expected_values = [
"# these are multi-line values",
"value_5_1",
"value_5_2 ; here is a comment",
"value_5_3"
]
assert option_block["value"] == expected_values, (
f"Expected values: {expected_values}, "
f"got: {option_block['value']}"
)

View File

@@ -8,6 +8,7 @@
import pytest
from src.simple_config_parser.constants import LineType
from src.simple_config_parser.simple_config_parser import (
NoOptionError,
NoSectionError,
@@ -148,13 +149,11 @@ def test_getfloat_fallback(parser):
def test_set_existing_option(parser):
parser.set_option("section_1", "new_option", "new_value")
assert parser.getval("section_1", "new_option") == "new_value"
assert parser.config["section_1"]["new_option"]["_raw"] == "new_option: new_value\n"
parser.set_option("section_1", "new_option", "new_value_2")
assert parser.getval("section_1", "new_option") == "new_value_2"
assert (
parser.config["section_1"]["new_option"]["_raw"] == "new_option: new_value_2\n"
)
assert parser.config["section_1"]["elements"][4] is not None
assert parser.config["section_1"]["elements"][4]["type"] == LineType.OPTION.value
assert parser.config["section_1"]["elements"][4]["name"] == "new_option"
assert parser.config["section_1"]["elements"][4]["value"] == "new_value"
assert parser.config["section_1"]["elements"][4]["raw"] == "new_option: new_value\n"
def test_set_new_option(parser):
@@ -170,7 +169,16 @@ def test_set_new_option(parser):
"value_2",
"value_3",
]
assert parser.config["section_2"]["array_option"]["_raw"] == "array_option:\n"
assert parser.config["section_2"]["elements"][1] is not None
assert parser.config["section_2"]["elements"][1]["type"] == LineType.OPTION_BLOCK.value
assert parser.config["section_2"]["elements"][1]["name"] == "array_option"
assert parser.config["section_2"]["elements"][1]["value"] == [
"value_1",
"value_2",
"value_3",
]
assert parser.config["section_2"]["elements"][1]["raw"] == "array_option:\n"
def test_remove_option(parser):

View File

@@ -41,16 +41,15 @@ def test_add_section(parser):
new_section = parser.config["new_section"]
assert isinstance(new_section, dict)
assert new_section["_raw"] == "[new_section]\n"
# this should be the collector, added by the parser before
# then second section was added
assert list(new_section.keys())[-1].startswith("#_")
assert "\n" in new_section[list(new_section.keys())[-1]]
assert new_section["header"] == "[new_section]\n"
assert new_section["elements"] is not None
assert new_section["elements"] == []
new_section2 = parser.config["new_section2"]
assert isinstance(new_section2, dict)
assert new_section2["_raw"] == "[new_section2]\n"
assert new_section2["header"] == "[new_section2]\n"
assert new_section2["elements"] is not None
assert new_section2["elements"] == []
def test_add_section_duplicate(parser):

View File

@@ -39,3 +39,81 @@ def test_write_to_file(tmp_path):
with open(TEST_DATA_PATH, "r") as original, open(tmp_file, "r") as written:
assert original.read() == written.read()
def test_remove_option_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/remove_option")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and remove option
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.remove_option("section_1", "option_to_remove")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert not parser2.has_option("section_1", "option_to_remove")
def test_remove_section_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/remove_section")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and remove section
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.remove_section("section_to_remove")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert not parser2.has_section("section_to_remove")
assert "section_1" in parser2.get_sections()
assert "section_2" in parser2.get_sections()
def test_add_option_and_write(tmp_path):
# Setup paths
test_dir = BASE_DIR.joinpath("write_tests/add_option")
input_file = test_dir.joinpath("input.cfg")
expected_file = test_dir.joinpath("expected.cfg")
output_file = Path(tmp_path).joinpath("output.cfg")
# Read input file and add option
parser = SimpleConfigParser()
parser.read_file(input_file)
parser.set_option("section_1", "new_option", "new_value")
# Write modified config
parser.write_file(output_file)
# parser.write_file(test_dir.joinpath("output.cfg"))
# Compare with expected output
with open(expected_file, "r") as expected, open(output_file, "r") as actual:
assert expected.read() == actual.read()
# Additional verification
parser2 = SimpleConfigParser()
parser2.read_file(output_file)
assert parser2.has_option("section_1", "new_option")
assert parser2.getval("section_1", "new_option") == "new_value"

View File

@@ -25,6 +25,7 @@ class ComponentStatus:
status: StatusCode
owner: str | None = None
repo: str | None = None
repo_url: str | None = None
branch: str = ""
local: str | None = None
remote: str | None = None

View File

@@ -31,7 +31,6 @@ from components.moonraker.moonraker_setup import install_moonraker_packages
from core.backup_manager.backup_manager import BackupManager, BackupManagerException
from core.instance_manager.instance_manager import InstanceManager
from core.logger import Logger
from core.settings.kiauh_settings import RepoSettings
from utils.git_utils import GitException, get_repo_name, git_clone_wrapper
from utils.instance_utils import get_instances
from utils.sys_utils import (
@@ -46,7 +45,7 @@ class RepoSwitchFailedException(Exception):
def run_switch_repo_routine(
name: Literal["klipper", "moonraker"], repo_settings: RepoSettings
name: Literal["klipper", "moonraker"], repo_url: str, branch: str
) -> None:
repo_dir: Path = KLIPPER_DIR if name == "klipper" else MOONRAKER_DIR
env_dir: Path = KLIPPER_ENV_DIR if name == "klipper" else MOONRAKER_ENV_DIR
@@ -78,10 +77,6 @@ def run_switch_repo_routine(
backup_dir,
)
# step 3: read repo url and branch from settings
repo_url = repo_settings.repo_url
branch = repo_settings.branch
if not (repo_url or branch):
error = f"Invalid repository URL ({repo_url}) or branch ({branch})!"
raise ValueError(error)

View File

@@ -29,6 +29,7 @@ from utils.git_utils import (
get_local_tags,
get_remote_commit,
get_repo_name,
get_repo_url,
)
from utils.instance_utils import get_instances
from utils.sys_utils import (
@@ -133,11 +134,14 @@ def get_install_status(
status = 1 # incomplete
org, repo = get_repo_name(repo_dir)
repo_url = get_repo_url(repo_dir) if repo_dir.exists() else None
return ComponentStatus(
status=status,
instances=instances,
owner=org,
repo=repo,
repo_url=repo_url,
branch=branch,
local=get_local_commit(repo_dir),
remote=get_remote_commit(repo_dir),

View File

@@ -58,15 +58,17 @@ def git_clone_wrapper(
raise GitException(f"Error removing existing repository: {e.strerror}")
def git_pull_wrapper(repo: str, target_dir: Path) -> None:
# !todo: remove url parameter, as it is not really required. may be a reason to remove this function completely
def git_pull_wrapper(url: str, target_dir: Path) -> None:
"""
A function that updates a repository using git pull.
:param repo: The repository to update.
:param url: The repo url - only used for logging.
:param target_dir: The directory of the repository.
:return: None
"""
Logger.print_status(f"Updating repository '{repo}' ...")
_repo = f"'{url}'" if url else ""
Logger.print_status(f"Updating repository {_repo}...")
try:
git_cmd_pull(target_dir)
except CalledProcessError:
@@ -337,3 +339,25 @@ def rollback_repository(repo_dir: Path, instance: Type[InstanceType]) -> None:
Logger.print_error(f"An error occured during repo rollback:\n{e}")
InstanceManager.start_all(instances)
def get_repo_url(repo_dir: Path) -> str | None:
"""
Get the remote repository URL for a git repository
:param repo_dir: Path to the git repository
:return: URL of the remote repository or None if not found
"""
if not repo_dir.exists():
return None
try:
result = run(
["git", "config", "--get", "remote.origin.url"],
cwd=repo_dir,
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except CalledProcessError:
return None