refactor: overhaul of the klipper setup process

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2024-07-27 21:13:56 +02:00
parent fee2dd0bda
commit 871bedb76b
9 changed files with 211 additions and 406 deletions

View File

@@ -6,10 +6,9 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import json
from dataclasses import dataclass
from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.klipper import (
KLIPPER_CFG_NAME,
@@ -27,49 +26,24 @@ from utils.logger import Logger
# noinspection PyMethodMayBeStatic
@dataclass
class Klipper(BaseInstance):
@classmethod
def blacklist(cls) -> List[str]:
return ["None", "mcu"]
klipper_dir: Path = KLIPPER_DIR
env_dir: Path = KLIPPER_ENV_DIR
cfg_file: Path = None
log: Path = None
serial: Path = None
uds: Path = None
def __init__(self, suffix: str = ""):
def __init__(self, suffix: str = "") -> None:
super().__init__(instance_type=self, suffix=suffix)
self.klipper_dir: Path = KLIPPER_DIR
self.env_dir: Path = KLIPPER_ENV_DIR
self._cfg_file = self.cfg_dir.joinpath(KLIPPER_CFG_NAME)
self._log = self.log_dir.joinpath(KLIPPER_LOG_NAME)
self._serial = self.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
self._uds = self.comms_dir.joinpath(KLIPPER_UDS_NAME)
def __repr__(self):
return json.dumps(
{
"suffix": self.suffix,
"klipper_dir": self.klipper_dir.as_posix(),
"env_dir": self.env_dir.as_posix(),
"cfg_file": self.cfg_file.as_posix(),
"log": self.log.as_posix(),
"serial": self.serial.as_posix(),
"uds": self.uds.as_posix(),
},
indent=4,
)
@property
def cfg_file(self) -> Path:
return self._cfg_file
@property
def log(self) -> Path:
return self._log
@property
def serial(self) -> Path:
return self._serial
@property
def uds(self) -> Path:
return self._uds
def __post_init__(self) -> None:
super().__post_init__()
self.cfg_file = self.cfg_dir.joinpath(KLIPPER_CFG_NAME)
self.log = self.log_dir.joinpath(KLIPPER_LOG_NAME)
self.serial = self.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
self.uds = self.comms_dir.joinpath(KLIPPER_UDS_NAME)
def create(self) -> None:
from utils.sys_utils import create_env_file, create_service_file

View File

@@ -90,9 +90,19 @@ def print_select_custom_name_dialog():
dialog = textwrap.dedent(
f"""
╔═══════════════════════════════════════════════════════╗
You can now assign a custom name to each instance.
Do you want to assign a custom name to each instance?
║ ║
║ Assigning a custom name will create a Klipper service ║
║ and a printer directory with the chosen name. ║
║ ║
║ Example for custom name 'kiauh': ║
║ ● Klipper service: klipper-kiauh.service ║
║ ● Printer directory: printer_kiauh_data ║
║ ║
║ If skipped, each instance will get an index assigned ║
║ in ascending order, starting at index '1'.
║ in ascending order, starting at '1' in case of a new
║ installation. Otherwise, the index will be derived ║
║ from amount of already existing instances. ║
║ ║
{line1:<63}
{line2:<63}

View File

@@ -6,8 +6,10 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Tuple
from components.klipper import (
EXIT_KLIPPER_SETUP,
@@ -17,18 +19,16 @@ from components.klipper import (
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_select_custom_name_dialog,
)
from components.klipper.klipper_utils import (
add_to_existing,
assign_custom_name,
backup_klipper_dir,
check_is_single_to_multi_conversion,
check_user_groups,
create_example_printer_cfg,
get_install_count,
handle_disruptive_system_packages,
handle_instance_naming,
handle_to_multi_instance_conversion,
init_name_scheme,
update_name_scheme,
)
from components.moonraker.moonraker import Moonraker
from components.webui_client.client_utils import (
@@ -49,57 +49,65 @@ from utils.sys_utils import (
def install_klipper() -> None:
kl_im = InstanceManager(Klipper)
Logger.print_status("Installing Klipper ...")
# ask to add new instances, if there are existing ones
if kl_im.instances and not add_to_existing():
Logger.print_status(EXIT_KLIPPER_SETUP)
return
klipper_list: List[Klipper] = InstanceManager(Klipper).instances
moonraker_list: List[Moonraker] = InstanceManager(Moonraker).instances
match_moonraker: bool = False
install_count = get_install_count()
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
# if there are more moonraker instances than klipper instances, ask the user to
# match the klipper instance count to the count of moonraker instances with the same suffix
if len(moonraker_list) > len(klipper_list):
is_confirmed = display_moonraker_info(moonraker_list)
if not is_confirmed:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
match_moonraker = True
# create a dict of the size of the existing instances + install count
name_dict = {c: "" for c in range(len(kl_im.instances) + install_count)}
name_scheme = init_name_scheme(kl_im.instances, install_count)
mr_im = InstanceManager(Moonraker)
name_scheme = update_name_scheme(
name_scheme, name_dict, kl_im.instances, mr_im.instances
install_count, name_dict = get_install_count_and_name_dict(
klipper_list, moonraker_list
)
handle_instance_naming(name_dict, name_scheme)
if install_count == 0 or name_dict == {}:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
is_multi_install = install_count > 1 or (len(name_dict) >= 1 and install_count >= 1)
if not name_dict and install_count == 1:
name_dict = {0: ""}
elif is_multi_install and not match_moonraker:
custom_names = use_custom_names_or_go_back()
if custom_names is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
handle_instance_names(install_count, name_dict, custom_names)
create_example_cfg = get_confirm("Create example printer.cfg?")
# run the actual installation
try:
if not kl_im.instances:
check_install_dependencies(["git", "python3-virtualenv"])
setup_klipper_prerequesites()
count = 0
for name in name_dict:
if name_dict[name] in [n.suffix for n in kl_im.instances]:
continue
if check_is_single_to_multi_conversion(kl_im.instances):
handle_to_multi_instance_conversion(name_dict[name])
continue
count += 1
create_klipper_instance(name_dict[name], create_example_cfg)
if count == install_count:
break
cmd_sysctl_manage("daemon-reload")
run_klipper_setup(klipper_list, name_dict, create_example_cfg)
except Exception as e:
Logger.print_error(e)
Logger.print_error("Klipper installation failed!")
return
def run_klipper_setup(
klipper_list: List[Klipper], name_dict: Dict[int, str], example_cfg: bool
) -> None:
if not klipper_list:
setup_klipper_prerequesites()
for i in name_dict:
# skip this iteration if there is already an instance with the name
if name_dict[i] in [n.suffix for n in klipper_list]:
continue
create_klipper_instance(name_dict[i], example_cfg)
cmd_sysctl_manage("daemon-reload")
# step 4: check/handle conflicting packages/services
handle_disruptive_system_packages()
@@ -107,6 +115,35 @@ def install_klipper() -> None:
check_user_groups()
def handle_instance_names(
install_count: int, name_dict: Dict[int, str], custom_names: bool
) -> None:
for i in range(install_count):
index = len(name_dict) + i + 1
if custom_names:
assign_custom_name(index, name_dict)
else:
name_dict[i + 1] = str(index)
def get_install_count_and_name_dict(
klipper_list: List[Klipper], moonraker_list: List[Moonraker]
) -> Tuple[int, Dict[int, str]]:
if len(moonraker_list) > len(klipper_list):
install_count = len(moonraker_list)
name_dict = {i: moonraker.suffix for i, moonraker in enumerate(moonraker_list)}
else:
install_count = get_install_count()
name_dict = {i: klipper.suffix for i, klipper in enumerate(klipper_list)}
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return 0, {}
return install_count, name_dict
def setup_klipper_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.klipper.repo_url
@@ -176,3 +213,29 @@ def create_klipper_instance(name: str, create_example_cfg: bool) -> None:
clients = get_existing_clients()
create_example_printer_cfg(new_instance, clients)
kl_im.start_instance()
def use_custom_names_or_go_back() -> bool | None:
print_select_custom_name_dialog()
return get_confirm(
"Assign custom names?",
False,
allow_go_back=True,
)
def display_moonraker_info(moonraker_list: List[Moonraker]) -> bool:
# todo: only show the klipper instances that are not already installed
Logger.print_dialog(
DialogType.INFO,
[
"Existing Moonraker instances detected:",
*[f"{m.get_service_file_name()}" for m in moonraker_list],
"\n\n",
"The following Klipper instances will be installed:",
*[f"● klipper-{m.suffix}" for m in moonraker_list],
],
padding_top=0,
padding_bottom=0,
)
return get_confirm("Proceed with installation?")

View File

@@ -6,13 +6,13 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import grp
import os
import re
import shutil
from subprocess import CalledProcessError, run
from typing import Dict, List, Optional, Union
from typing import Dict, List
from components.klipper import (
KLIPPER_BACKUP_DIR,
@@ -23,23 +23,17 @@ from components.klipper import (
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_instance_overview,
print_select_custom_name_dialog,
print_select_instance_count_dialog,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_utils import moonraker_to_multi_conversion
from components.webui_client.base_data import BaseWebClient
from components.webui_client.client_config.client_config_setup import (
create_client_config_symlink,
)
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from core.instance_manager.name_scheme import NameScheme
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
SimpleConfigParser,
)
from utils import PRINTER_CFG_BACKUP_DIR
from utils.common import get_install_status
from utils.constants import CURRENT_USER
from utils.input_utils import get_confirm, get_number_input, get_string_input
@@ -52,75 +46,13 @@ def get_klipper_status() -> ComponentStatus:
return get_install_status(KLIPPER_DIR, KLIPPER_ENV_DIR, Klipper)
def check_is_multi_install(
existing_instances: List[Klipper], install_count: int
) -> bool:
return not existing_instances and install_count > 1
def check_is_single_to_multi_conversion(
existing_instances: List[Klipper],
) -> bool:
return len(existing_instances) == 1 and existing_instances[0].suffix == ""
def init_name_scheme(
existing_instances: List[Klipper], install_count: int
) -> NameScheme:
if check_is_multi_install(
existing_instances, install_count
) or check_is_single_to_multi_conversion(existing_instances):
print_select_custom_name_dialog()
if get_confirm("Assign custom names?", False, allow_go_back=True):
return NameScheme.CUSTOM
else:
return NameScheme.INDEX
else:
return NameScheme.SINGLE
def update_name_scheme(
name_scheme: NameScheme,
name_dict: Dict[int, str],
klipper_instances: List[Klipper],
moonraker_instances: List[Moonraker],
) -> NameScheme:
# if there are more moonraker instances installed
# than klipper, we load their names into the name_dict,
# as we will detect and enforce that naming scheme
if len(moonraker_instances) > len(klipper_instances):
update_name_dict(name_dict, moonraker_instances)
return detect_name_scheme(moonraker_instances)
elif len(klipper_instances) > 1:
update_name_dict(name_dict, klipper_instances)
return detect_name_scheme(klipper_instances)
else:
return name_scheme
def update_name_dict(name_dict: Dict[int, str], instances: List[BaseInstance]) -> None:
for k, v in enumerate(instances):
name_dict[k] = v.suffix
def handle_instance_naming(name_dict: Dict[int, str], name_scheme: NameScheme) -> None:
if name_scheme == NameScheme.SINGLE:
return
for k in name_dict:
if name_dict[k] == "" and name_scheme == NameScheme.INDEX:
name_dict[k] = str(k + 1)
elif name_dict[k] == "" and name_scheme == NameScheme.CUSTOM:
assign_custom_name(k, name_dict)
def add_to_existing() -> bool:
kl_instances = InstanceManager(Klipper).instances
print_instance_overview(kl_instances)
return get_confirm("Add new instances?", allow_go_back=True)
def get_install_count() -> Union[int, None]:
def get_install_count() -> int | None:
"""
Print a dialog for selecting the amount of Klipper instances
to set up with an option to navigate back. Returns None if the
@@ -143,64 +75,10 @@ def assign_custom_name(key: int, name_dict: Dict[int, str]) -> None:
existing_names.extend(name_dict[n] for n in name_dict)
pattern = r"^[a-zA-Z0-9]+$"
question = f"Enter name for instance {key + 1}"
question = f"Enter name for instance {key}"
name_dict[key] = get_string_input(question, exclude=existing_names, regex=pattern)
def handle_to_multi_instance_conversion(new_name: str) -> None:
Logger.print_status("Converting single instance to multi instances ...")
klipper_to_multi_conversion(new_name)
moonraker_to_multi_conversion(new_name)
def klipper_to_multi_conversion(new_name: str) -> None:
Logger.print_status("Convert Klipper single to multi instance ...")
im = InstanceManager(Klipper)
im.current_instance = im.instances[0]
# temporarily store the data dir path
old_data_dir = im.instances[0].data_dir
old_data_dir_name = im.instances[0].data_dir_name
# backup the old data_dir
bm = BackupManager()
name = f"config-{old_data_dir_name}"
bm.backup_directory(
name,
source=im.current_instance.cfg_dir,
target=PRINTER_CFG_BACKUP_DIR,
)
# remove the old single instance
im.stop_instance()
im.disable_instance()
im.delete_instance()
# create a new klipper instance with the new name
new_instance = Klipper(suffix=new_name)
im.current_instance = new_instance
if not new_instance.data_dir.is_dir():
# rename the old data dir and use it for the new instance
Logger.print_status(f"Rename '{old_data_dir}' to '{new_instance.data_dir}' ...")
old_data_dir.rename(new_instance.data_dir)
else:
Logger.print_info(f"Existing '{new_instance.data_dir}' found ...")
# patch the virtual_sdcard sections path
# value to match the new printer_data foldername
scp = SimpleConfigParser()
scp.read(new_instance.cfg_file)
if scp.has_section("virtual_sdcard"):
scp.set("virtual_sdcard", "path", str(new_instance.gcodes_dir))
scp.write(new_instance.cfg_file)
# finalize creating the new instance
im.create_instance()
im.enable_instance()
im.start_instance()
def check_user_groups():
user_groups = [grp.getgrgid(gid).gr_name for gid in os.getgroups()]
missing_groups = [g for g in ["tty", "dialout"] if g not in user_groups]
@@ -277,21 +155,8 @@ def handle_disruptive_system_packages() -> None:
)
def detect_name_scheme(instance_list: List[BaseInstance]) -> NameScheme:
pattern = re.compile("^\d+$")
for instance in instance_list:
if not pattern.match(instance.suffix):
return NameScheme.CUSTOM
return NameScheme.INDEX
def get_highest_index(instance_list: List[Klipper]) -> int:
return max([int(instance.suffix.split("-")[-1]) for instance in instance_list])
def create_example_printer_cfg(
instance: Klipper, clients: Optional[List[BaseWebClient]] = None
instance: Klipper, clients: List[BaseWebClient] | None = None
) -> None:
Logger.print_status(f"Creating example printer.cfg in '{instance.cfg_dir}'")
if instance.cfg_file.is_file():

View File

@@ -10,7 +10,6 @@ from __future__ import annotations
from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.moonraker import (
MOONRAKER_CFG_NAME,
@@ -30,10 +29,6 @@ from utils.logger import Logger
# noinspection PyMethodMayBeStatic
class Moonraker(BaseInstance):
@classmethod
def blacklist(cls) -> List[str]:
return ["None", "mcu", "obico"]
def __init__(self, suffix: str = ""):
super().__init__(instance_type=self, suffix=suffix)
self.moonraker_dir: Path = MOONRAKER_DIR
@@ -42,25 +37,16 @@ class Moonraker(BaseInstance):
self.port = self._get_port()
self.backup_dir = self.data_dir.joinpath("backup")
self.certs_dir = self.data_dir.joinpath("certs")
self._db_dir = self.data_dir.joinpath("database")
self._comms_dir = self.data_dir.joinpath("comms")
self.db_dir = self.data_dir.joinpath("database")
self.log = self.log_dir.joinpath(MOONRAKER_LOG_NAME)
@property
def db_dir(self) -> Path:
return self._db_dir
@property
def comms_dir(self) -> Path:
return self._comms_dir
def create(self, create_example_cfg: bool = False) -> None:
from utils.sys_utils import create_env_file, create_service_file
Logger.print_status("Creating new Moonraker Instance ...")
try:
self.create_folders([self.backup_dir, self.certs_dir, self._db_dir])
self.create_folders([self.backup_dir, self.certs_dir, self.db_dir])
create_service_file(
name=self.get_service_file_name(extension=True),
content=self._prep_service_file_content(),

View File

@@ -20,8 +20,6 @@ from components.moonraker import (
)
from components.moonraker.moonraker import Moonraker
from components.webui_client.base_data import BaseWebClient
from components.webui_client.client_utils import enable_mainsail_remotemode
from components.webui_client.mainsail_data import MainsailData
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.instance_manager import InstanceManager
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
@@ -128,58 +126,6 @@ def create_example_moonraker_conf(
Logger.print_ok(f"Example moonraker.conf created in '{instance.cfg_dir}'")
def moonraker_to_multi_conversion(new_name: str) -> None:
"""
Converts the first instance in the List of Moonraker instances to an instance
with a new name. This method will be called when converting from a single Klipper
instance install to a multi instance install when Moonraker is also already
installed with a single instance.
:param new_name: new name the previous single instance is renamed to
:return: None
"""
im = InstanceManager(Moonraker)
instances: List[Moonraker] = im.instances
if not instances:
return
# in case there are multiple Moonraker instances, we don't want to do anything
if len(instances) > 1:
Logger.print_info("More than a single Moonraker instance found. Skipped ...")
return
Logger.print_status("Convert Moonraker single to multi instance ...")
# remove the old single instance
im.current_instance = im.instances[0]
im.stop_instance()
im.disable_instance()
im.delete_instance()
# create a new moonraker instance with the new name
new_instance = Moonraker(suffix=new_name)
im.current_instance = new_instance
# patch the server sections klippy_uds_address value to match the new printer_data foldername
scp = SimpleConfigParser()
scp.read(new_instance.cfg_file)
if scp.has_section("server"):
scp.set(
"server",
"klippy_uds_address",
str(new_instance.comms_dir.joinpath("klippy.sock")),
)
scp.write(new_instance.cfg_file)
# create, enable and start the new moonraker instance
im.create_instance()
im.enable_instance()
im.start_instance()
# if mainsail is installed, we enable mainsails remote mode
if MainsailData().client_dir.exists() and len(im.instances) > 1:
enable_mainsail_remotemode()
def backup_moonraker_dir():
bm = BackupManager()
bm.backup_directory("moonraker", source=MOONRAKER_DIR, target=MOONRAKER_BACKUP_DIR)

View File

@@ -9,7 +9,9 @@
from __future__ import annotations
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
@@ -17,106 +19,32 @@ from utils.constants import CURRENT_USER, SYSTEMD
from utils.logger import Logger
@dataclass
class BaseInstance(ABC):
instance_type: BaseInstance
suffix: str
user: str = field(default=CURRENT_USER, init=False)
data_dir: Path = None
data_dir_name: str = ""
is_legacy_instance: bool = False
cfg_dir: Path = None
log_dir: Path = None
comms_dir: Path = None
sysd_dir: Path = None
gcodes_dir: Path = None
def __post_init__(self) -> None:
self._set_data_dir()
self._set_is_legacy_instance()
self.cfg_dir = self.data_dir.joinpath("config")
self.log_dir = self.data_dir.joinpath("logs")
self.comms_dir = self.data_dir.joinpath("comms")
self.sysd_dir = self.data_dir.joinpath("systemd")
self.gcodes_dir = self.data_dir.joinpath("gcodes")
@classmethod
def blacklist(cls) -> List[str]:
return []
def __init__(
self,
suffix: str,
instance_type: BaseInstance,
):
self._instance_type = instance_type
self._suffix = suffix
self._user = CURRENT_USER
self._data_dir_name = self.get_data_dir_name_from_suffix()
self._data_dir = Path.home().joinpath(f"{self._data_dir_name}_data")
self._cfg_dir = self.data_dir.joinpath("config")
self._log_dir = self.data_dir.joinpath("logs")
self._comms_dir = self.data_dir.joinpath("comms")
self._sysd_dir = self.data_dir.joinpath("systemd")
self._gcodes_dir = self.data_dir.joinpath("gcodes")
@property
def instance_type(self) -> BaseInstance:
return self._instance_type
@instance_type.setter
def instance_type(self, value: BaseInstance) -> None:
self._instance_type = value
@property
def suffix(self) -> str:
return self._suffix
@suffix.setter
def suffix(self, value: str) -> None:
self._suffix = value
@property
def user(self) -> str:
return self._user
@user.setter
def user(self, value: str) -> None:
self._user = value
@property
def data_dir_name(self) -> str:
return self._data_dir_name
@data_dir_name.setter
def data_dir_name(self, value: str) -> None:
self._data_dir_name = value
@property
def data_dir(self) -> Path:
return self._data_dir
@data_dir.setter
def data_dir(self, value: Path) -> None:
self._data_dir = value
@property
def cfg_dir(self) -> Path:
return self._cfg_dir
@cfg_dir.setter
def cfg_dir(self, value: Path) -> None:
self._cfg_dir = value
@property
def log_dir(self) -> Path:
return self._log_dir
@log_dir.setter
def log_dir(self, value: Path) -> None:
self._log_dir = value
@property
def comms_dir(self) -> Path:
return self._comms_dir
@comms_dir.setter
def comms_dir(self, value: Path) -> None:
self._comms_dir = value
@property
def sysd_dir(self) -> Path:
return self._sysd_dir
@sysd_dir.setter
def sysd_dir(self, value: Path) -> None:
self._sysd_dir = value
@property
def gcodes_dir(self) -> Path:
return self._gcodes_dir
@gcodes_dir.setter
def gcodes_dir(self, value: Path) -> None:
self._gcodes_dir = value
return ["None", "mcu", "obico", "bambu", "companion"]
@abstractmethod
def create(self) -> None:
@@ -133,6 +61,7 @@ class BaseInstance(ABC):
self.log_dir,
self.comms_dir,
self.sysd_dir,
self.gcodes_dir,
]
if add_dirs:
@@ -141,6 +70,7 @@ class BaseInstance(ABC):
for _dir in dirs:
_dir.mkdir(exist_ok=True)
# todo: refactor into a set method and access the value by accessing the property
def get_service_file_name(self, extension: bool = False) -> str:
from utils.common import convert_camelcase_to_kebabcase
@@ -150,17 +80,10 @@ class BaseInstance(ABC):
return name if not extension else f"{name}.service"
# todo: refactor into a set method and access the value by accessing the property
def get_service_file_path(self) -> Path:
return SYSTEMD.joinpath(self.get_service_file_name(extension=True))
def get_data_dir_name_from_suffix(self) -> str:
if self._suffix == "":
return "printer"
elif self._suffix.isdigit():
return f"printer_{self._suffix}"
else:
return self._suffix
def delete_logfiles(self, log_name: str) -> None:
from utils.fs_utils import run_remove_routines
@@ -172,3 +95,27 @@ class BaseInstance(ABC):
for log in logs:
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)
def _set_data_dir(self) -> None:
if self.suffix == "":
self.data_dir = Path.home().joinpath("printer_data")
else:
self.data_dir = Path.home().joinpath(f"printer_{self.suffix}_data")
if self.get_service_file_path().exists():
with open(self.get_service_file_path(), "r") as service_file:
service_content = service_file.read()
pattern = re.compile("^EnvironmentFile=(.+)(/systemd/.+\.env)")
match = re.search(pattern, service_content)
if match:
self.data_dir = Path(match.group(1))
def _set_is_legacy_instance(self) -> None:
if (
self.suffix != ""
and not self.data_dir_name.startswith("printer_")
and not self.data_dir_name.endswith("_data")
):
self.is_legacy_instance = True
else:
self.is_legacy_instance = False

View File

@@ -6,6 +6,8 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from typing import List, Union
@@ -14,9 +16,7 @@ from utils.constants import COLOR_CYAN, RESET_FORMAT
from utils.logger import Logger
def get_confirm(
question: str, default_choice=True, allow_go_back=False
) -> Union[bool, None]:
def get_confirm(question: str, default_choice=True, allow_go_back=False) -> bool | None:
"""
Helper method for validating confirmation (yes/no) user input. |
:param question: The question to display
@@ -56,7 +56,7 @@ def get_number_input(
max_count=None,
default=None,
allow_go_back=False,
) -> Union[int, None]:
) -> int | None:
"""
Helper method to get a number input from the user
:param question: The question to display

View File

@@ -93,6 +93,20 @@ class Logger:
padding_top: int = 1,
padding_bottom: int = 1,
) -> None:
"""
Prints a dialog with the given title and content.
Those dialogs should be used to display verbose messages to the user which
require simple interaction like confirmation or input. Do not use this for
navigating through the application.
:param title: The type of the dialog.
:param content: The content of the dialog.
:param center_content: Whether to center the content or not.
:param custom_title: A custom title for the dialog.
:param custom_color: A custom color for the dialog.
:param padding_top: The number of empty lines to print before the dialog.
:param padding_bottom: The number of empty lines to print after the dialog.
"""
dialog_color = Logger._get_dialog_color(title, custom_color)
dialog_title = Logger._get_dialog_title(title, custom_title)
dialog_title_formatted = Logger._format_dialog_title(dialog_title)