mirror of
https://github.com/dw-0/kiauh.git
synced 2025-12-25 08:43:36 +05:00
refactor: composition > inheritance
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
@@ -28,7 +28,6 @@ from components.crowsnest import (
|
||||
from components.klipper.klipper import Klipper
|
||||
from core.backup_manager.backup_manager import BackupManager
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import DialogType, Logger
|
||||
from core.settings.kiauh_settings import KiauhSettings
|
||||
from core.types import ComponentStatus
|
||||
@@ -41,6 +40,7 @@ from utils.git_utils import (
|
||||
git_pull_wrapper,
|
||||
)
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
cmd_sysctl_service,
|
||||
parse_packages_from_file,
|
||||
@@ -55,8 +55,7 @@ def install_crowsnest() -> None:
|
||||
check_install_dependencies({"make"})
|
||||
|
||||
# Step 3: Check for Multi Instance
|
||||
im = InstanceManager(Klipper)
|
||||
instances: List[Klipper] = im.instances
|
||||
instances: List[Klipper] = get_instances(Klipper)
|
||||
|
||||
if len(instances) > 1:
|
||||
print_multi_instance_warning(instances)
|
||||
@@ -95,7 +94,7 @@ def print_multi_instance_warning(instances: List[Klipper]) -> None:
|
||||
"this instance to set up your 'crowsnest.conf' and steering it's service.",
|
||||
"\n\n",
|
||||
"The following instances were found:",
|
||||
*[f"● {instance.data_dir_name}" for instance in instances],
|
||||
*[f"● {instance.data_dir.name}" for instance in instances],
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
@@ -23,28 +23,36 @@ from components.klipper import (
|
||||
KLIPPER_SERVICE_TEMPLATE,
|
||||
KLIPPER_UDS_NAME,
|
||||
)
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.logger import Logger
|
||||
from utils.fs_utils import create_folders, get_data_dir
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
@dataclass
|
||||
class Klipper(BaseInstance):
|
||||
@dataclass(repr=True)
|
||||
class Klipper:
|
||||
suffix: str
|
||||
base: BaseInstance = field(init=False, repr=False)
|
||||
service_file_path: Path = field(init=False)
|
||||
log_file_name: str = KLIPPER_LOG_NAME
|
||||
klipper_dir: Path = KLIPPER_DIR
|
||||
env_dir: Path = KLIPPER_ENV_DIR
|
||||
cfg_file: Path | None = None
|
||||
serial: Path | None = None
|
||||
uds: Path | None = None
|
||||
data_dir: Path = field(init=False)
|
||||
cfg_file: Path = field(init=False)
|
||||
serial: Path = field(init=False)
|
||||
uds: Path = field(init=False)
|
||||
|
||||
def __init__(self, suffix: str = "") -> None:
|
||||
super().__init__(suffix=suffix)
|
||||
def __post_init__(self):
|
||||
self.base: BaseInstance = BaseInstance(Klipper, self.suffix)
|
||||
self.base.log_file_name = self.log_file_name
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super().__post_init__()
|
||||
self.log_file_name = KLIPPER_LOG_NAME
|
||||
self.cfg_file = self.cfg_dir.joinpath(KLIPPER_CFG_NAME)
|
||||
self.serial = self.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
|
||||
self.uds = self.comms_dir.joinpath(KLIPPER_UDS_NAME)
|
||||
self.service_file_path: Path = get_service_file_path(Klipper, self.suffix)
|
||||
self.data_dir: Path = get_data_dir(Klipper, self.suffix)
|
||||
self.cfg_file: Path = self.base.cfg_dir.joinpath(KLIPPER_CFG_NAME)
|
||||
self.serial: Path = self.base.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
|
||||
self.uds: Path = self.base.comms_dir.joinpath(KLIPPER_UDS_NAME)
|
||||
|
||||
def create(self) -> None:
|
||||
from utils.sys_utils import create_env_file, create_service_file
|
||||
@@ -52,7 +60,7 @@ class Klipper(BaseInstance):
|
||||
Logger.print_status("Creating new Klipper Instance ...")
|
||||
|
||||
try:
|
||||
self.create_folders()
|
||||
create_folders(self.base.base_folders)
|
||||
|
||||
create_service_file(
|
||||
name=self.service_file_path.name,
|
||||
@@ -60,7 +68,7 @@ class Klipper(BaseInstance):
|
||||
)
|
||||
|
||||
create_env_file(
|
||||
path=self.sysd_dir.joinpath(KLIPPER_ENV_FILE_NAME),
|
||||
path=self.base.sysd_dir.joinpath(KLIPPER_ENV_FILE_NAME),
|
||||
content=self._prep_env_file_content(),
|
||||
)
|
||||
|
||||
@@ -83,7 +91,7 @@ class Klipper(BaseInstance):
|
||||
|
||||
service_content = template_content.replace(
|
||||
"%USER%",
|
||||
self.user,
|
||||
CURRENT_USER,
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%KLIPPER_DIR%",
|
||||
@@ -95,7 +103,7 @@ class Klipper(BaseInstance):
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%ENV_FILE%",
|
||||
self.sysd_dir.joinpath(KLIPPER_ENV_FILE_NAME).as_posix(),
|
||||
self.base.sysd_dir.joinpath(KLIPPER_ENV_FILE_NAME).as_posix(),
|
||||
)
|
||||
return service_content
|
||||
|
||||
@@ -114,7 +122,7 @@ class Klipper(BaseInstance):
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%CFG%",
|
||||
f"{self.cfg_dir}/{KLIPPER_CFG_NAME}",
|
||||
f"{self.base.cfg_dir}/{KLIPPER_CFG_NAME}",
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%SERIAL%",
|
||||
@@ -122,7 +130,7 @@ class Klipper(BaseInstance):
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%LOG%",
|
||||
self.log_dir.joinpath(self.log_file_name).as_posix(),
|
||||
self.base.log_dir.joinpath(self.log_file_name).as_posix(),
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%UDS%",
|
||||
|
||||
@@ -17,7 +17,7 @@ from core.constants import (
|
||||
COLOR_YELLOW,
|
||||
RESET_FORMAT,
|
||||
)
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.instance_type import InstanceType
|
||||
from core.menus.base_menu import print_back_footer
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ class DisplayType(Enum):
|
||||
|
||||
|
||||
def print_instance_overview(
|
||||
instances: List[BaseInstance],
|
||||
instances: List[InstanceType],
|
||||
display_type: DisplayType = DisplayType.SERVICE_NAME,
|
||||
show_headline=True,
|
||||
show_index=False,
|
||||
|
||||
@@ -17,6 +17,7 @@ from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.input_utils import get_selection_input
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def run_klipper_removal(
|
||||
@@ -24,7 +25,7 @@ def run_klipper_removal(
|
||||
remove_dir: bool,
|
||||
remove_env: bool,
|
||||
) -> None:
|
||||
klipper_instances = InstanceManager(Klipper).instances
|
||||
klipper_instances: List[Klipper] = get_instances(Klipper)
|
||||
|
||||
if remove_service:
|
||||
Logger.print_status("Removing Klipper instances ...")
|
||||
@@ -80,13 +81,13 @@ def remove_instances(
|
||||
|
||||
for instance in instance_list:
|
||||
Logger.print_status(f"Removing instance {instance.service_file_path.stem} ...")
|
||||
instance.remove()
|
||||
InstanceManager.remove(instance)
|
||||
|
||||
|
||||
def delete_klipper_logs(instances: List[Klipper]) -> None:
|
||||
all_logfiles = []
|
||||
for instance in instances:
|
||||
all_logfiles = list(instance.log_dir.glob("klippy.log*"))
|
||||
all_logfiles = list(instance.base.log_dir.glob("klippy.log*"))
|
||||
if not all_logfiles:
|
||||
Logger.print_info("No Klipper logs found. Skipped ...")
|
||||
return
|
||||
|
||||
@@ -40,8 +40,10 @@ from core.settings.kiauh_settings import KiauhSettings
|
||||
from utils.common import check_install_dependencies
|
||||
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
cmd_sysctl_manage,
|
||||
cmd_sysctl_service,
|
||||
create_python_venv,
|
||||
install_python_requirements,
|
||||
parse_packages_from_file,
|
||||
@@ -51,8 +53,8 @@ from utils.sys_utils import (
|
||||
def install_klipper() -> None:
|
||||
Logger.print_status("Installing Klipper ...")
|
||||
|
||||
klipper_list: List[Klipper] = InstanceManager(Klipper).instances
|
||||
moonraker_list: List[Moonraker] = InstanceManager(Moonraker).instances
|
||||
klipper_list: List[Klipper] = get_instances(Klipper)
|
||||
moonraker_list: List[Moonraker] = get_instances(Moonraker)
|
||||
match_moonraker: bool = False
|
||||
|
||||
# if there are more moonraker instances than klipper instances, ask the user to
|
||||
@@ -94,7 +96,7 @@ def install_klipper() -> None:
|
||||
|
||||
|
||||
def run_klipper_setup(
|
||||
klipper_list: List[Klipper], name_dict: Dict[int, str], example_cfg: bool
|
||||
klipper_list: List[Klipper], name_dict: Dict[int, str], create_example_cfg: bool
|
||||
) -> None:
|
||||
if not klipper_list:
|
||||
setup_klipper_prerequesites()
|
||||
@@ -104,7 +106,16 @@ def run_klipper_setup(
|
||||
if name_dict[i] in [n.suffix for n in klipper_list]:
|
||||
continue
|
||||
|
||||
create_klipper_instance(name_dict[i], example_cfg)
|
||||
instance = Klipper(suffix=name_dict[i])
|
||||
instance.create()
|
||||
cmd_sysctl_service(instance.service_file_path.name, "enable")
|
||||
|
||||
if create_example_cfg:
|
||||
# if a client-config is installed, include it in the new example cfg
|
||||
clients = get_existing_clients()
|
||||
create_example_printer_cfg(instance, clients)
|
||||
|
||||
cmd_sysctl_service(instance.service_file_path.name, "start")
|
||||
|
||||
cmd_sysctl_manage("daemon-reload")
|
||||
|
||||
@@ -189,8 +200,8 @@ def update_klipper() -> None:
|
||||
if settings.kiauh.backup_before_update:
|
||||
backup_klipper_dir()
|
||||
|
||||
instance_manager = InstanceManager(Klipper)
|
||||
instance_manager.stop_all_instance()
|
||||
instances = get_instances(Klipper)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
git_pull_wrapper(repo=settings.klipper.repo_url, target_dir=KLIPPER_DIR)
|
||||
|
||||
@@ -199,20 +210,7 @@ def update_klipper() -> None:
|
||||
# install possible new python dependencies
|
||||
install_python_requirements(KLIPPER_ENV_DIR, KLIPPER_REQ_FILE)
|
||||
|
||||
instance_manager.start_all_instance()
|
||||
|
||||
|
||||
def create_klipper_instance(name: str, create_example_cfg: bool) -> None:
|
||||
kl_im = InstanceManager(Klipper)
|
||||
new_instance = Klipper(suffix=name)
|
||||
kl_im.current_instance = new_instance
|
||||
kl_im.create_instance()
|
||||
kl_im.enable_instance()
|
||||
if create_example_cfg:
|
||||
# if a client-config is installed, include it in the new example cfg
|
||||
clients = get_existing_clients()
|
||||
create_example_printer_cfg(new_instance, clients)
|
||||
kl_im.start_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
|
||||
def use_custom_names_or_go_back() -> bool | None:
|
||||
|
||||
@@ -31,15 +31,16 @@ from components.webui_client.client_config.client_config_setup import (
|
||||
)
|
||||
from core.backup_manager.backup_manager import BackupManager
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.instance_manager.base_instance import SUFFIX_BLACKLIST
|
||||
from core.logger import DialogType, Logger
|
||||
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||
SimpleConfigParser,
|
||||
)
|
||||
from core.types import ComponentStatus
|
||||
from utils.common import get_install_status
|
||||
from utils.input_utils import get_confirm, get_number_input, get_string_input
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import cmd_sysctl_service
|
||||
from core.types import ComponentStatus
|
||||
|
||||
|
||||
def get_klipper_status() -> ComponentStatus:
|
||||
@@ -47,7 +48,7 @@ def get_klipper_status() -> ComponentStatus:
|
||||
|
||||
|
||||
def add_to_existing() -> bool | None:
|
||||
kl_instances: List[Klipper] = InstanceManager(Klipper).instances
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
print_instance_overview(kl_instances)
|
||||
_input: bool | None = get_confirm("Add new instances?", allow_go_back=True)
|
||||
return _input
|
||||
@@ -60,7 +61,7 @@ def get_install_count() -> int | None:
|
||||
user selected to go back, otherwise an integer greater or equal than 1 |
|
||||
:return: Integer >= 1 or None
|
||||
"""
|
||||
kl_instances = InstanceManager(Klipper).instances
|
||||
kl_instances = get_instances(Klipper)
|
||||
print_select_instance_count_dialog()
|
||||
question = (
|
||||
f"Number of"
|
||||
@@ -73,7 +74,7 @@ def get_install_count() -> int | None:
|
||||
|
||||
def assign_custom_name(key: int, name_dict: Dict[int, str]) -> None:
|
||||
existing_names = []
|
||||
existing_names.extend(Klipper.blacklist())
|
||||
existing_names.extend(SUFFIX_BLACKLIST)
|
||||
existing_names.extend(name_dict[n] for n in name_dict)
|
||||
pattern = r"^[a-zA-Z0-9]+$"
|
||||
|
||||
@@ -160,7 +161,7 @@ def handle_disruptive_system_packages() -> None:
|
||||
def create_example_printer_cfg(
|
||||
instance: Klipper, clients: List[BaseWebClient] | None = None
|
||||
) -> None:
|
||||
Logger.print_status(f"Creating example printer.cfg in '{instance.cfg_dir}'")
|
||||
Logger.print_status(f"Creating example printer.cfg in '{instance.base.cfg_dir}'")
|
||||
if instance.cfg_file.is_file():
|
||||
Logger.print_info(f"'{instance.cfg_file}' already exists.")
|
||||
return
|
||||
@@ -175,7 +176,7 @@ def create_example_printer_cfg(
|
||||
|
||||
scp = SimpleConfigParser()
|
||||
scp.read(target)
|
||||
scp.set("virtual_sdcard", "path", str(instance.gcodes_dir))
|
||||
scp.set("virtual_sdcard", "path", str(instance.base.gcodes_dir))
|
||||
|
||||
# include existing client configs in the example config
|
||||
if clients is not None and len(clients) > 0:
|
||||
@@ -187,7 +188,7 @@ def create_example_printer_cfg(
|
||||
|
||||
scp.write(target)
|
||||
|
||||
Logger.print_ok(f"Example printer.cfg created in '{instance.cfg_dir}'")
|
||||
Logger.print_ok(f"Example printer.cfg created in '{instance.base.cfg_dir}'")
|
||||
|
||||
|
||||
def backup_klipper_dir() -> None:
|
||||
|
||||
@@ -19,6 +19,7 @@ from components.klipper_firmware.flash_options import (
|
||||
)
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import log_process
|
||||
|
||||
|
||||
@@ -117,13 +118,13 @@ def start_flash_process(flash_options: FlashOptions) -> None:
|
||||
else:
|
||||
raise Exception("Invalid value for flash_method!")
|
||||
|
||||
instance_manager = InstanceManager(Klipper)
|
||||
instance_manager.stop_all_instance()
|
||||
instances = get_instances(Klipper)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
process = Popen(cmd, cwd=KLIPPER_DIR, stdout=PIPE, stderr=STDOUT, text=True)
|
||||
log_process(process)
|
||||
|
||||
instance_manager.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
rc = process.returncode
|
||||
if rc != 0:
|
||||
|
||||
@@ -42,6 +42,7 @@ from utils.git_utils import (
|
||||
git_pull_wrapper,
|
||||
)
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
check_python_version,
|
||||
cmd_sysctl_service,
|
||||
@@ -56,8 +57,7 @@ def install_klipperscreen() -> None:
|
||||
if not check_python_version(3, 7):
|
||||
return
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances = mr_im.instances
|
||||
mr_instances = get_instances(Moonraker)
|
||||
if not mr_instances:
|
||||
Logger.print_dialog(
|
||||
DialogType.WARNING,
|
||||
@@ -86,7 +86,7 @@ def install_klipperscreen() -> None:
|
||||
run(KLIPPERSCREEN_INSTALL_SCRIPT.as_posix(), shell=True, check=True)
|
||||
if mr_instances:
|
||||
patch_klipperscreen_update_manager(mr_instances)
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
else:
|
||||
Logger.print_info(
|
||||
"Moonraker is not installed! Cannot add "
|
||||
@@ -174,17 +174,15 @@ def remove_klipperscreen() -> None:
|
||||
remove_with_sudo(logfile)
|
||||
Logger.print_ok("KlipperScreen log file successfully removed!")
|
||||
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances: List[Klipper] = kl_im.instances
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
for instance in kl_instances:
|
||||
logfile = instance.log_dir.joinpath(KLIPPERSCREEN_LOG_NAME)
|
||||
logfile = instance.base.log_dir.joinpath(KLIPPERSCREEN_LOG_NAME)
|
||||
if logfile.exists():
|
||||
Logger.print_status(f"Removing {logfile} ...")
|
||||
Path(logfile).unlink()
|
||||
Logger.print_ok(f"{logfile} successfully removed!")
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
if mr_instances:
|
||||
Logger.print_status("Removing KlipperScreen from update manager ...")
|
||||
remove_config_section("update_manager KlipperScreen", mr_instances)
|
||||
|
||||
@@ -13,13 +13,14 @@ from typing import List
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.log_uploads import LogFile
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def get_logfile_list() -> List[LogFile]:
|
||||
cm = InstanceManager(Klipper)
|
||||
log_dirs: List[Path] = [instance.log_dir for instance in cm.instances]
|
||||
log_dirs: List[Path] = [
|
||||
instance.base.log_dir for instance in get_instances(Klipper)
|
||||
]
|
||||
|
||||
logfiles: List[LogFile] = []
|
||||
for _dir in log_dirs:
|
||||
|
||||
@@ -37,6 +37,7 @@ from utils.git_utils import (
|
||||
git_pull_wrapper,
|
||||
)
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
check_python_version,
|
||||
cmd_sysctl_service,
|
||||
@@ -51,8 +52,7 @@ def install_mobileraker() -> None:
|
||||
if not check_python_version(3, 7):
|
||||
return
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances = mr_im.instances
|
||||
mr_instances = get_instances(Moonraker)
|
||||
if not mr_instances:
|
||||
Logger.print_dialog(
|
||||
DialogType.WARNING,
|
||||
@@ -78,7 +78,7 @@ def install_mobileraker() -> None:
|
||||
run(MOBILERAKER_INSTALL_SCRIPT.as_posix(), shell=True, check=True)
|
||||
if mr_instances:
|
||||
patch_mobileraker_update_manager(mr_instances)
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
else:
|
||||
Logger.print_info(
|
||||
"Moonraker is not installed! Cannot add Mobileraker's "
|
||||
@@ -163,17 +163,15 @@ def remove_mobileraker() -> None:
|
||||
if MOBILERAKER_SERVICE_FILE.exists():
|
||||
remove_system_service(MOBILERAKER_SERVICE_NAME)
|
||||
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances: List[Klipper] = kl_im.instances
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
for instance in kl_instances:
|
||||
logfile = instance.log_dir.joinpath(MOBILERAKER_LOG_NAME)
|
||||
logfile = instance.base.log_dir.joinpath(MOBILERAKER_LOG_NAME)
|
||||
if logfile.exists():
|
||||
Logger.print_status(f"Removing {logfile} ...")
|
||||
Path(logfile).unlink()
|
||||
Logger.print_ok(f"{logfile} successfully removed!")
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
if mr_instances:
|
||||
Logger.print_status(
|
||||
"Removing Mobileraker's companion from update manager ..."
|
||||
|
||||
@@ -8,10 +8,11 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker import (
|
||||
MOONRAKER_CFG_NAME,
|
||||
MOONRAKER_DIR,
|
||||
@@ -21,49 +22,58 @@ from components.moonraker import (
|
||||
MOONRAKER_LOG_NAME,
|
||||
MOONRAKER_SERVICE_TEMPLATE,
|
||||
)
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.logger import Logger
|
||||
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||
SimpleConfigParser,
|
||||
)
|
||||
from utils.fs_utils import create_folders
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
@dataclass
|
||||
class Moonraker(BaseInstance):
|
||||
class Moonraker:
|
||||
suffix: str
|
||||
base: BaseInstance = field(init=False, repr=False)
|
||||
service_file_path: Path = field(init=False)
|
||||
log_file_name: str = MOONRAKER_LOG_NAME
|
||||
moonraker_dir: Path = MOONRAKER_DIR
|
||||
env_dir: Path = MOONRAKER_ENV_DIR
|
||||
cfg_file: Path | None = None
|
||||
port: int | None = None
|
||||
backup_dir: Path | None = None
|
||||
certs_dir: Path | None = None
|
||||
db_dir: Path | None = None
|
||||
data_dir: Path = field(init=False)
|
||||
cfg_file: Path = field(init=False)
|
||||
backup_dir: Path = field(init=False)
|
||||
certs_dir: Path = field(init=False)
|
||||
db_dir: Path = field(init=False)
|
||||
port: int | None = field(init=False)
|
||||
|
||||
def __init__(self, suffix: str = ""):
|
||||
super().__init__(suffix=suffix)
|
||||
def __post_init__(self):
|
||||
self.base: BaseInstance = BaseInstance(Klipper, self.suffix)
|
||||
self.base.log_file_name = self.log_file_name
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super().__post_init__()
|
||||
self.log_file_name = MOONRAKER_LOG_NAME
|
||||
self.cfg_file = self.cfg_dir.joinpath(MOONRAKER_CFG_NAME)
|
||||
self.port = self._get_port()
|
||||
self.backup_dir = self.data_dir.joinpath("backup")
|
||||
self.certs_dir = self.data_dir.joinpath("certs")
|
||||
self.db_dir = self.data_dir.joinpath("database")
|
||||
self.service_file_path: Path = get_service_file_path(Moonraker, self.suffix)
|
||||
self.data_dir: Path = self.base.data_dir
|
||||
self.cfg_file: Path = self.base.cfg_dir.joinpath(MOONRAKER_CFG_NAME)
|
||||
self.backup_dir: Path = self.base.data_dir.joinpath("backup")
|
||||
self.certs_dir: Path = self.base.data_dir.joinpath("certs")
|
||||
self.db_dir: Path = self.base.data_dir.joinpath("database")
|
||||
self.port: int | None = self._get_port()
|
||||
|
||||
def create(self, create_example_cfg: bool = False) -> None:
|
||||
def create(self) -> None:
|
||||
from utils.sys_utils import create_env_file, create_service_file
|
||||
|
||||
Logger.print_status("Creating new Moonraker Instance ...")
|
||||
|
||||
try:
|
||||
self.create_folders([self.backup_dir, self.certs_dir, self.db_dir])
|
||||
create_folders(self.base.base_folders)
|
||||
|
||||
create_service_file(
|
||||
name=self.service_file_path.name,
|
||||
content=self._prep_service_file_content(),
|
||||
)
|
||||
create_env_file(
|
||||
path=self.sysd_dir.joinpath(MOONRAKER_ENV_FILE_NAME),
|
||||
path=self.base.sysd_dir.joinpath(MOONRAKER_ENV_FILE_NAME),
|
||||
content=self._prep_env_file_content(),
|
||||
)
|
||||
|
||||
@@ -86,7 +96,7 @@ class Moonraker(BaseInstance):
|
||||
|
||||
service_content = template_content.replace(
|
||||
"%USER%",
|
||||
self.user,
|
||||
CURRENT_USER,
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%MOONRAKER_DIR%",
|
||||
@@ -98,7 +108,7 @@ class Moonraker(BaseInstance):
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%ENV_FILE%",
|
||||
self.sysd_dir.joinpath(MOONRAKER_ENV_FILE_NAME).as_posix(),
|
||||
self.base.sysd_dir.joinpath(MOONRAKER_ENV_FILE_NAME).as_posix(),
|
||||
)
|
||||
return service_content
|
||||
|
||||
@@ -118,7 +128,7 @@ class Moonraker(BaseInstance):
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%PRINTER_DATA%",
|
||||
self.data_dir.as_posix(),
|
||||
self.base.data_dir.as_posix(),
|
||||
)
|
||||
|
||||
return env_file_content
|
||||
|
||||
@@ -18,6 +18,7 @@ from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.input_utils import get_selection_input
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def run_moonraker_removal(
|
||||
@@ -26,17 +27,17 @@ def run_moonraker_removal(
|
||||
remove_env: bool,
|
||||
remove_polkit: bool,
|
||||
) -> None:
|
||||
moonraker_instances = InstanceManager(Moonraker).instances
|
||||
instances = get_instances(Moonraker)
|
||||
|
||||
if remove_service:
|
||||
Logger.print_status("Removing Moonraker instances ...")
|
||||
if moonraker_instances:
|
||||
instances_to_remove = select_instances_to_remove(moonraker_instances)
|
||||
if instances:
|
||||
instances_to_remove = select_instances_to_remove(instances)
|
||||
remove_instances(instances_to_remove)
|
||||
else:
|
||||
Logger.print_info("No Moonraker Services installed! Skipped ...")
|
||||
|
||||
if (remove_polkit or remove_dir or remove_env) and moonraker_instances:
|
||||
if (remove_polkit or remove_dir or remove_env) and instances:
|
||||
Logger.print_info("There are still other Moonraker services installed")
|
||||
Logger.print_info(
|
||||
"● Moonraker PolicyKit rules were not removed.", prefix=False
|
||||
@@ -90,7 +91,7 @@ def remove_instances(
|
||||
return
|
||||
for instance in instance_list:
|
||||
Logger.print_status(f"Removing instance {instance.service_file_path.stem} ...")
|
||||
instance.remove()
|
||||
InstanceManager.remove(instance)
|
||||
|
||||
|
||||
def remove_polkit_rules() -> None:
|
||||
@@ -111,7 +112,7 @@ def remove_polkit_rules() -> None:
|
||||
def delete_moonraker_logs(instances: List[Moonraker]) -> None:
|
||||
all_logfiles = []
|
||||
for instance in instances:
|
||||
all_logfiles = list(instance.log_dir.glob("moonraker.log*"))
|
||||
all_logfiles = list(instance.base.log_dir.glob("moonraker.log*"))
|
||||
if not all_logfiles:
|
||||
Logger.print_info("No Moonraker logs found. Skipped ...")
|
||||
return
|
||||
|
||||
@@ -31,7 +31,6 @@ from components.moonraker.moonraker_dialogs import print_moonraker_overview
|
||||
from components.moonraker.moonraker_utils import (
|
||||
backup_moonraker_dir,
|
||||
create_example_moonraker_conf,
|
||||
moonraker_factory,
|
||||
)
|
||||
from components.webui_client.client_utils import (
|
||||
enable_mainsail_remotemode,
|
||||
@@ -48,6 +47,7 @@ from utils.input_utils import (
|
||||
get_confirm,
|
||||
get_selection_input,
|
||||
)
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
check_python_version,
|
||||
cmd_sysctl_manage,
|
||||
@@ -59,16 +59,17 @@ from utils.sys_utils import (
|
||||
|
||||
|
||||
def install_moonraker() -> None:
|
||||
if not check_moonraker_install_requirements():
|
||||
klipper_list: List[Klipper] = get_instances(Klipper)
|
||||
|
||||
if not check_moonraker_install_requirements(klipper_list):
|
||||
return
|
||||
|
||||
klipper_list: List[Klipper] = InstanceManager(Klipper).instances
|
||||
moonraker_list: List[Moonraker] = InstanceManager(Moonraker).instances
|
||||
moonraker_list: List[Moonraker] = get_instances(Moonraker)
|
||||
instances: List[Moonraker] = []
|
||||
selected_option: str | Klipper
|
||||
|
||||
if len(klipper_list) == 1:
|
||||
instances.append(moonraker_factory(klipper_list[0]))
|
||||
instances.append(Moonraker(klipper_list[0].suffix))
|
||||
else:
|
||||
print_moonraker_overview(
|
||||
klipper_list,
|
||||
@@ -87,12 +88,12 @@ def install_moonraker() -> None:
|
||||
return
|
||||
|
||||
if selected_option == "a":
|
||||
instances.extend([moonraker_factory(k) for k in klipper_list])
|
||||
instances.extend([Moonraker(k.suffix) for k in klipper_list])
|
||||
else:
|
||||
klipper_instance: Klipper | None = options.get(selected_option)
|
||||
if klipper_instance is None:
|
||||
raise Exception("Error selecting instance!")
|
||||
instances.append(moonraker_factory(klipper_instance))
|
||||
instances.append(Moonraker(klipper_instance.suffix))
|
||||
|
||||
create_example_cfg = get_confirm("Create example moonraker.conf?")
|
||||
|
||||
@@ -126,9 +127,9 @@ def install_moonraker() -> None:
|
||||
return
|
||||
|
||||
|
||||
def check_moonraker_install_requirements() -> bool:
|
||||
def check_moonraker_install_requirements(klipper_list: List[Klipper]) -> bool:
|
||||
def check_klipper_instances() -> bool:
|
||||
if len(InstanceManager(Klipper).instances) >= 1:
|
||||
if len(klipper_list) >= 1:
|
||||
return True
|
||||
|
||||
Logger.print_warn("Klipper not installed!")
|
||||
@@ -205,8 +206,8 @@ def update_moonraker() -> None:
|
||||
if settings.kiauh.backup_before_update:
|
||||
backup_moonraker_dir()
|
||||
|
||||
instance_manager = InstanceManager(Moonraker)
|
||||
instance_manager.stop_all_instance()
|
||||
instances = get_instances(Moonraker)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
git_pull_wrapper(repo=settings.moonraker.repo_url, target_dir=MOONRAKER_DIR)
|
||||
|
||||
@@ -215,4 +216,4 @@ def update_moonraker() -> None:
|
||||
# install possible new python dependencies
|
||||
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
|
||||
|
||||
instance_manager.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
@@ -10,11 +10,9 @@
|
||||
import shutil
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker import (
|
||||
MODULE_PATH,
|
||||
MOONRAKER_BACKUP_DIR,
|
||||
MOONRAKER_CFG_NAME,
|
||||
MOONRAKER_DB_BACKUP_DIR,
|
||||
MOONRAKER_DEFAULT_PORT,
|
||||
MOONRAKER_DIR,
|
||||
@@ -23,37 +21,18 @@ from components.moonraker import (
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from components.webui_client.base_data import BaseWebClient
|
||||
from core.backup_manager.backup_manager import BackupManager
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||
SimpleConfigParser,
|
||||
)
|
||||
from core.types import ComponentStatus
|
||||
from utils.common import get_install_status
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
get_ipv4_addr,
|
||||
)
|
||||
|
||||
|
||||
def moonraker_factory(klipper_instance: Klipper) -> Moonraker:
|
||||
"""Create a new Moonraker instance from a Klipper instance."""
|
||||
|
||||
instance: Moonraker = Moonraker(suffix=klipper_instance.suffix)
|
||||
instance.is_legacy_instance = klipper_instance.is_legacy_instance
|
||||
instance.data_dir = klipper_instance.data_dir
|
||||
instance.data_dir_name = klipper_instance.data_dir_name
|
||||
instance.cfg_dir = klipper_instance.cfg_dir
|
||||
instance.cfg_file = instance.cfg_dir.joinpath(MOONRAKER_CFG_NAME)
|
||||
instance.log_dir = klipper_instance.log_dir
|
||||
instance.sysd_dir = klipper_instance.sysd_dir
|
||||
instance.comms_dir = klipper_instance.comms_dir
|
||||
instance.gcodes_dir = klipper_instance.gcodes_dir
|
||||
instance.db_dir = instance.data_dir.joinpath("database")
|
||||
instance.backup_dir = instance.data_dir.joinpath("backup")
|
||||
instance.certs_dir = instance.data_dir.joinpath("certs")
|
||||
return instance
|
||||
|
||||
|
||||
def get_moonraker_status() -> ComponentStatus:
|
||||
return get_install_status(MOONRAKER_DIR, MOONRAKER_ENV_DIR, Moonraker)
|
||||
|
||||
@@ -63,7 +42,7 @@ def create_example_moonraker_conf(
|
||||
ports_map: Dict[str, int],
|
||||
clients: Optional[List[BaseWebClient]] = None,
|
||||
) -> None:
|
||||
Logger.print_status(f"Creating example moonraker.conf in '{instance.cfg_dir}'")
|
||||
Logger.print_status(f"Creating example moonraker.conf in '{instance.base.cfg_dir}'")
|
||||
if instance.cfg_file.is_file():
|
||||
Logger.print_info(f"'{instance.cfg_file}' already exists.")
|
||||
return
|
||||
@@ -95,7 +74,7 @@ def create_example_moonraker_conf(
|
||||
|
||||
ip = get_ipv4_addr().split(".")[:2]
|
||||
ip.extend(["0", "0/16"])
|
||||
uds = instance.comms_dir.joinpath("klippy.sock")
|
||||
uds = instance.base.comms_dir.joinpath("klippy.sock")
|
||||
|
||||
scp = SimpleConfigParser()
|
||||
scp.read(target)
|
||||
@@ -144,7 +123,7 @@ def create_example_moonraker_conf(
|
||||
scp.set(c_config_section, option[0], option[1])
|
||||
|
||||
scp.write(target)
|
||||
Logger.print_ok(f"Example moonraker.conf created in '{instance.cfg_dir}'")
|
||||
Logger.print_ok(f"Example moonraker.conf created in '{instance.base.cfg_dir}'")
|
||||
|
||||
|
||||
def backup_moonraker_dir() -> None:
|
||||
@@ -156,12 +135,11 @@ def backup_moonraker_dir() -> None:
|
||||
|
||||
|
||||
def backup_moonraker_db_dir() -> None:
|
||||
im = InstanceManager(Moonraker)
|
||||
instances: List[Moonraker] = im.instances
|
||||
instances: List[Moonraker] = get_instances(Moonraker)
|
||||
bm = BackupManager()
|
||||
|
||||
for instance in instances:
|
||||
name = f"database-{instance.data_dir_name}"
|
||||
name = f"database-{instance.data_dir.name}"
|
||||
bm.backup_directory(
|
||||
name, source=instance.db_dir, target=MOONRAKER_DB_BACKUP_DIR
|
||||
)
|
||||
|
||||
@@ -8,47 +8,57 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError, run
|
||||
|
||||
from components.moonraker import MOONRAKER_CFG_NAME
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from components.octoeverywhere import (
|
||||
OE_CFG_NAME,
|
||||
OE_DIR,
|
||||
OE_ENV_DIR,
|
||||
OE_INSTALL_SCRIPT,
|
||||
OE_LOG_NAME,
|
||||
OE_STORE_DIR,
|
||||
OE_SYS_CFG_NAME,
|
||||
OE_UPDATE_SCRIPT,
|
||||
)
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.logger import Logger
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
|
||||
@dataclass
|
||||
class Octoeverywhere(BaseInstance):
|
||||
class Octoeverywhere:
|
||||
suffix: str
|
||||
base: BaseInstance = field(init=False, repr=False)
|
||||
service_file_path: Path = field(init=False)
|
||||
log_file_name = OE_LOG_NAME
|
||||
dir: Path = OE_DIR
|
||||
env_dir: Path = OE_ENV_DIR
|
||||
log_file_name = OE_LOG_NAME
|
||||
store_dir: Path = OE_STORE_DIR
|
||||
cfg_file: Path | None = None
|
||||
sys_cfg_file: Path | None = None
|
||||
|
||||
def __init__(self, suffix: str = ""):
|
||||
super().__init__(suffix=suffix)
|
||||
data_dir: Path = field(init=False)
|
||||
store_dir: Path = field(init=False)
|
||||
cfg_file: Path = field(init=False)
|
||||
sys_cfg_file: Path = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
self.cfg_file = self.cfg_dir.joinpath(OE_CFG_NAME)
|
||||
self.sys_cfg_file = self.cfg_dir.joinpath(OE_SYS_CFG_NAME)
|
||||
self.base: BaseInstance = BaseInstance(Moonraker, self.suffix)
|
||||
self.base.log_file_name = self.log_file_name
|
||||
|
||||
self.service_file_path: Path = get_service_file_path(
|
||||
Octoeverywhere, self.suffix
|
||||
)
|
||||
self.store_dir = self.base.data_dir.joinpath("store")
|
||||
self.cfg_file = self.base.cfg_dir.joinpath(OE_CFG_NAME)
|
||||
self.sys_cfg_file = self.base.cfg_dir.joinpath(OE_SYS_CFG_NAME)
|
||||
self.data_dir = self.base.data_dir
|
||||
self.sys_cfg_file = self.base.cfg_dir.joinpath(OE_SYS_CFG_NAME)
|
||||
|
||||
def create(self) -> None:
|
||||
Logger.print_status("Creating OctoEverywhere for Klipper Instance ...")
|
||||
|
||||
try:
|
||||
cmd = f"{OE_INSTALL_SCRIPT} {self.cfg_dir}/{MOONRAKER_CFG_NAME}"
|
||||
cmd = f"{OE_INSTALL_SCRIPT} {self.base.cfg_dir}/{MOONRAKER_CFG_NAME}"
|
||||
run(cmd, check=True, shell=True)
|
||||
|
||||
except CalledProcessError as e:
|
||||
|
||||
@@ -35,6 +35,7 @@ from utils.config_utils import (
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.git_utils import git_clone_wrapper
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
install_python_requirements,
|
||||
parse_packages_from_file,
|
||||
@@ -53,8 +54,7 @@ def install_octoeverywhere() -> None:
|
||||
return
|
||||
|
||||
force_clone = False
|
||||
oe_im = InstanceManager(Octoeverywhere)
|
||||
oe_instances: List[Octoeverywhere] = oe_im.instances
|
||||
oe_instances: List[Octoeverywhere] = get_instances(Octoeverywhere)
|
||||
if oe_instances:
|
||||
Logger.print_dialog(
|
||||
DialogType.INFO,
|
||||
@@ -73,10 +73,9 @@ def install_octoeverywhere() -> None:
|
||||
Logger.print_status("Re-Installing OctoEverywhere for Klipper ...")
|
||||
force_clone = True
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
|
||||
mr_names = [f"● {moonraker.data_dir_name}" for moonraker in mr_instances]
|
||||
mr_names = [f"● {moonraker.data_dir.name}" for moonraker in mr_instances]
|
||||
if len(mr_names) > 1:
|
||||
Logger.print_dialog(
|
||||
DialogType.INFO,
|
||||
@@ -102,10 +101,10 @@ def install_octoeverywhere() -> None:
|
||||
git_clone_wrapper(OE_REPO, OE_DIR, force=force_clone)
|
||||
|
||||
for moonraker in mr_instances:
|
||||
oe_im.current_instance = Octoeverywhere(suffix=moonraker.suffix)
|
||||
oe_im.create_instance()
|
||||
instance = Octoeverywhere(suffix=moonraker.suffix)
|
||||
instance.create()
|
||||
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
|
||||
Logger.print_dialog(
|
||||
DialogType.SUCCESS,
|
||||
@@ -135,10 +134,9 @@ def update_octoeverywhere() -> None:
|
||||
|
||||
def remove_octoeverywhere() -> None:
|
||||
Logger.print_status("Removing OctoEverywhere for Klipper ...")
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
ob_im = InstanceManager(Octoeverywhere)
|
||||
ob_instances: List[Octoeverywhere] = ob_im.instances
|
||||
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
ob_instances: List[Octoeverywhere] = get_instances(Octoeverywhere)
|
||||
|
||||
try:
|
||||
remove_oe_instances(ob_instances)
|
||||
@@ -180,7 +178,7 @@ def remove_oe_instances(
|
||||
|
||||
for instance in instance_list:
|
||||
Logger.print_status(f"Removing instance {instance.service_file_path.stem} ...")
|
||||
instance.remove()
|
||||
InstanceManager.remove(instance)
|
||||
|
||||
|
||||
def remove_oe_dir() -> None:
|
||||
|
||||
@@ -13,10 +13,10 @@ from typing import List
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from components.webui_client.base_data import BaseWebClientConfig
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.config_utils import remove_config_section
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def run_client_config_removal(
|
||||
@@ -36,7 +36,8 @@ def remove_client_config_dir(client_config: BaseWebClientConfig) -> None:
|
||||
|
||||
|
||||
def remove_client_config_symlink(client_config: BaseWebClientConfig) -> None:
|
||||
im = InstanceManager(Klipper)
|
||||
instances: List[Klipper] = im.instances
|
||||
instances: List[Klipper] = get_instances(Klipper)
|
||||
for instance in instances:
|
||||
run_remove_routines(instance.cfg_dir.joinpath(client_config.config_filename))
|
||||
run_remove_routines(
|
||||
instance.base.cfg_dir.joinpath(client_config.config_filename)
|
||||
)
|
||||
|
||||
@@ -31,6 +31,7 @@ from utils.config_utils import add_config_section, add_config_section_at_top
|
||||
from utils.fs_utils import create_symlink
|
||||
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def install_client_config(client_data: BaseWebClient) -> None:
|
||||
@@ -48,10 +49,8 @@ def install_client_config(client_data: BaseWebClient) -> None:
|
||||
else:
|
||||
return
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances = kl_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
kl_instances = get_instances(Klipper)
|
||||
|
||||
try:
|
||||
download_client_config(client_config)
|
||||
@@ -71,7 +70,7 @@ def install_client_config(client_data: BaseWebClient) -> None:
|
||||
],
|
||||
)
|
||||
add_config_section_at_top(client_config.config_section, kl_instances)
|
||||
kl_im.restart_all_instance()
|
||||
InstanceManager.restart_all(kl_instances)
|
||||
|
||||
except Exception as e:
|
||||
Logger.print_error(f"{display_name} installation failed!\n{e}")
|
||||
@@ -113,16 +112,12 @@ def update_client_config(client: BaseWebClient) -> None:
|
||||
|
||||
|
||||
def create_client_config_symlink(
|
||||
client_config: BaseWebClientConfig, klipper_instances: List[Klipper] | None = None
|
||||
client_config: BaseWebClientConfig, klipper_instances: List[Klipper]
|
||||
) -> None:
|
||||
if klipper_instances is None:
|
||||
kl_im = InstanceManager(Klipper)
|
||||
klipper_instances = kl_im.instances
|
||||
|
||||
Logger.print_status(f"Create symlink for {client_config.config_filename} ...")
|
||||
source = Path(client_config.config_dir, client_config.config_filename)
|
||||
for instance in klipper_instances:
|
||||
target = instance.cfg_dir
|
||||
Logger.print_status(f"Create symlink for {client_config.config_filename} ...")
|
||||
source = Path(client_config.config_dir, client_config.config_filename)
|
||||
target = instance.base.cfg_dir
|
||||
Logger.print_status(f"Linking {source} to {target}")
|
||||
try:
|
||||
create_symlink(source, target)
|
||||
|
||||
@@ -18,13 +18,13 @@ from components.webui_client.client_config.client_config_remove import (
|
||||
)
|
||||
from core.backup_manager.backup_manager import BackupManager
|
||||
from core.constants import NGINX_SITES_AVAILABLE, NGINX_SITES_ENABLED
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import Logger
|
||||
from utils.config_utils import remove_config_section
|
||||
from utils.fs_utils import (
|
||||
remove_with_sudo,
|
||||
run_remove_routines,
|
||||
)
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def run_client_removal(
|
||||
@@ -33,10 +33,8 @@ def run_client_removal(
|
||||
remove_client_cfg: bool,
|
||||
backup_config: bool,
|
||||
) -> None:
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances: List[Klipper] = kl_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
|
||||
if backup_config:
|
||||
bm = BackupManager()
|
||||
@@ -81,5 +79,7 @@ def remove_client_nginx_logs(client: BaseWebClient, instances: List[Klipper]) ->
|
||||
return
|
||||
|
||||
for instance in instances:
|
||||
run_remove_routines(instance.log_dir.joinpath(client.nginx_access_log.name))
|
||||
run_remove_routines(instance.log_dir.joinpath(client.nginx_error_log.name))
|
||||
run_remove_routines(
|
||||
instance.base.log_dir.joinpath(client.nginx_access_log.name)
|
||||
)
|
||||
run_remove_routines(instance.base.log_dir.joinpath(client.nginx_error_log.name))
|
||||
|
||||
@@ -45,6 +45,7 @@ from utils.common import check_install_dependencies
|
||||
from utils.config_utils import add_config_section
|
||||
from utils.fs_utils import unzip
|
||||
from utils.input_utils import get_confirm, get_number_input
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
cmd_sysctl_service,
|
||||
download_file,
|
||||
@@ -62,8 +63,7 @@ def install_client(client: BaseWebClient) -> None:
|
||||
)
|
||||
return
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
|
||||
enable_remotemode = False
|
||||
if not mr_instances:
|
||||
@@ -80,8 +80,7 @@ def install_client(client: BaseWebClient) -> None:
|
||||
):
|
||||
enable_remotemode = True
|
||||
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances = kl_im.instances
|
||||
kl_instances = get_instances(Klipper)
|
||||
install_client_cfg = False
|
||||
client_config: BaseWebClientConfig = client.client_config
|
||||
if (
|
||||
@@ -126,7 +125,7 @@ def install_client(client: BaseWebClient) -> None:
|
||||
("path", str(client.client_dir)),
|
||||
],
|
||||
)
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
if install_client_cfg and kl_instances:
|
||||
install_client_config(client)
|
||||
|
||||
|
||||
@@ -109,11 +109,11 @@ def symlink_webui_nginx_log(
|
||||
error_log = client.nginx_error_log
|
||||
|
||||
for instance in klipper_instances:
|
||||
desti_access = instance.log_dir.joinpath(access_log.name)
|
||||
desti_access = instance.base.log_dir.joinpath(access_log.name)
|
||||
if not desti_access.exists():
|
||||
desti_access.symlink_to(access_log)
|
||||
|
||||
desti_error = instance.log_dir.joinpath(error_log.name)
|
||||
desti_error = instance.base.log_dir.joinpath(error_log.name)
|
||||
if not desti_error.exists():
|
||||
desti_error.symlink_to(error_log)
|
||||
|
||||
|
||||
@@ -10,131 +10,48 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from core.constants import CURRENT_USER, SYSTEMD
|
||||
from core.logger import Logger
|
||||
from utils.fs_utils import get_data_dir
|
||||
|
||||
SUFFIX_BLACKLIST: List[str] = ["None", "mcu", "obico", "bambu", "companion"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseInstance(ABC):
|
||||
@dataclass(repr=True)
|
||||
class BaseInstance:
|
||||
instance_type: type
|
||||
suffix: str
|
||||
user: str = field(default=CURRENT_USER, init=False)
|
||||
service_file_path: Path | None = None
|
||||
is_legacy_instance: bool = False
|
||||
data_dir: Path | None = None
|
||||
data_dir_name: str = ""
|
||||
cfg_dir: Path | None = None
|
||||
sysd_dir: Path | None = None # NOT to be confused with /etc/systemd/system
|
||||
comms_dir: Path | None = None
|
||||
gcodes_dir: Path | None = None
|
||||
log_dir: Path | None = None
|
||||
log_file_name: str = ""
|
||||
log_file_name: str | None = None
|
||||
data_dir: Path = field(init=False)
|
||||
base_folders: List[Path] = field(init=False)
|
||||
cfg_dir: Path = field(init=False)
|
||||
log_dir: Path = field(init=False)
|
||||
gcodes_dir: Path = field(init=False)
|
||||
comms_dir: Path = field(init=False)
|
||||
sysd_dir: Path = field(init=False)
|
||||
is_legacy_instance: bool = field(init=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self._set_service_file_path()
|
||||
self._set_data_dir()
|
||||
|
||||
if self.data_dir is not None:
|
||||
self.data_dir_name = self.data_dir.name
|
||||
self._set_is_legacy_instance()
|
||||
self.cfg_dir = self.data_dir.joinpath("config")
|
||||
self.log_dir = self.data_dir.joinpath("logs")
|
||||
self.comms_dir = self.data_dir.joinpath("comms")
|
||||
self.sysd_dir = self.data_dir.joinpath("systemd")
|
||||
self.gcodes_dir = self.data_dir.joinpath("gcodes")
|
||||
|
||||
@classmethod
|
||||
def blacklist(cls) -> List[str]:
|
||||
return ["None", "mcu", "obico", "bambu", "companion"]
|
||||
|
||||
@abstractmethod
|
||||
def create(self) -> None:
|
||||
raise NotImplementedError("Subclasses must implement the create method")
|
||||
|
||||
def remove(self) -> None:
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.sys_utils import remove_system_service
|
||||
|
||||
try:
|
||||
# remove the service file
|
||||
if self.service_file_path is not None:
|
||||
remove_system_service(self.service_file_path.name)
|
||||
|
||||
# then remove all the log files
|
||||
if not self.log_file_name or not self.log_dir or not self.log_dir.exists():
|
||||
return
|
||||
|
||||
files = self.log_dir.iterdir()
|
||||
logs = [f for f in files if f.name.startswith(self.log_file_name)]
|
||||
for log in logs:
|
||||
Logger.print_status(f"Remove '{log}'")
|
||||
run_remove_routines(log)
|
||||
|
||||
except Exception as e:
|
||||
Logger.print_error(f"Error removing service: {e}")
|
||||
raise
|
||||
|
||||
def create_folders(self, add_dirs: List[Path] | None = None) -> None:
|
||||
dirs: List[Path | None] = [
|
||||
def __post_init__(self):
|
||||
self.data_dir = get_data_dir(self.instance_type, self.suffix)
|
||||
# the following attributes require the data_dir to be set
|
||||
self.cfg_dir = self.data_dir.joinpath("config")
|
||||
self.log_dir = self.data_dir.joinpath("logs")
|
||||
self.gcodes_dir = self.data_dir.joinpath("gcodes")
|
||||
self.comms_dir = self.data_dir.joinpath("comms")
|
||||
self.sysd_dir = self.data_dir.joinpath("systemd")
|
||||
self.is_legacy_instance = self._set_is_legacy_instance()
|
||||
self.base_folders = [
|
||||
self.data_dir,
|
||||
self.cfg_dir,
|
||||
self.log_dir,
|
||||
self.comms_dir,
|
||||
self.sysd_dir,
|
||||
self.gcodes_dir,
|
||||
self.comms_dir,
|
||||
]
|
||||
|
||||
if add_dirs:
|
||||
dirs.extend(add_dirs)
|
||||
|
||||
for _dir in dirs:
|
||||
if _dir is None:
|
||||
continue
|
||||
_dir.mkdir(exist_ok=True)
|
||||
|
||||
def remove_logfiles(self, log_name: str) -> None:
|
||||
from utils.fs_utils import run_remove_routines
|
||||
|
||||
if not self.log_dir or not self.log_dir.exists():
|
||||
return
|
||||
|
||||
files = self.log_dir.iterdir()
|
||||
logs = [f for f in files if f.name.startswith(log_name)]
|
||||
for log in logs:
|
||||
Logger.print_status(f"Remove '{log}'")
|
||||
run_remove_routines(log)
|
||||
|
||||
def _set_data_dir(self) -> None:
|
||||
if self.suffix == "":
|
||||
self.data_dir = Path.home().joinpath("printer_data")
|
||||
else:
|
||||
self.data_dir = Path.home().joinpath(f"printer_{self.suffix}_data")
|
||||
|
||||
if self.service_file_path and self.service_file_path.exists():
|
||||
with open(self.service_file_path, "r") as service_file:
|
||||
lines = service_file.readlines()
|
||||
for line in lines:
|
||||
pattern = r"^EnvironmentFile=(.+)(/systemd/.+\.env)"
|
||||
match = re.search(pattern, line)
|
||||
if match:
|
||||
self.data_dir = Path(match.group(1))
|
||||
break
|
||||
|
||||
def _set_service_file_path(self) -> None:
|
||||
from utils.common import convert_camelcase_to_kebabcase
|
||||
|
||||
name: str = convert_camelcase_to_kebabcase(self.__class__.__name__)
|
||||
if self.suffix != "":
|
||||
name += f"-{self.suffix}"
|
||||
|
||||
self.service_file_path = SYSTEMD.joinpath(f"{name}.service")
|
||||
|
||||
def _set_is_legacy_instance(self) -> None:
|
||||
def _set_is_legacy_instance(self) -> bool:
|
||||
legacy_pattern = r"^(?!printer)(.+)_data"
|
||||
match = re.search(legacy_pattern, self.data_dir_name)
|
||||
if match and self.suffix != "":
|
||||
self.is_legacy_instance = True
|
||||
match = re.search(legacy_pattern, self.data_dir.name)
|
||||
|
||||
return True if (match and self.suffix != "") else False
|
||||
|
||||
@@ -8,179 +8,101 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Type, TypeVar
|
||||
from subprocess import CalledProcessError
|
||||
from typing import List
|
||||
|
||||
from core.constants import SYSTEMD
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.instance_type import InstanceType
|
||||
from core.logger import Logger
|
||||
from utils.sys_utils import cmd_sysctl_service
|
||||
|
||||
T = TypeVar("T", bound=BaseInstance, covariant=True)
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
class InstanceManager:
|
||||
def __init__(self, instance_type: Type[T]) -> None:
|
||||
self._instance_type = instance_type
|
||||
self._current_instance: Type[T] | None = None
|
||||
self._instance_suffix: str | None = None
|
||||
self._instance_service: str | None = None
|
||||
self._instance_service_full: str | None = None
|
||||
self._instance_service_path: str | None = None
|
||||
self._instances: List[T] = []
|
||||
|
||||
@property
|
||||
def instance_type(self) -> Type[T]:
|
||||
return self._instance_type
|
||||
|
||||
@instance_type.setter
|
||||
def instance_type(self, value: Type[T]):
|
||||
self._instance_type = value
|
||||
|
||||
@property
|
||||
def current_instance(self) -> Type[T] | None:
|
||||
return self._current_instance
|
||||
|
||||
@current_instance.setter
|
||||
def current_instance(self, value: Type[T] | None) -> None:
|
||||
self._current_instance = value
|
||||
if value is not None:
|
||||
self.instance_suffix = value.suffix
|
||||
self.instance_service = value.service_file_path.stem
|
||||
self.instance_service_path = value.service_file_path
|
||||
|
||||
@property
|
||||
def instance_suffix(self) -> str | None:
|
||||
return self._instance_suffix
|
||||
|
||||
@instance_suffix.setter
|
||||
def instance_suffix(self, value: str | None):
|
||||
self._instance_suffix = value
|
||||
|
||||
@property
|
||||
def instance_service(self) -> str | None:
|
||||
return self._instance_service
|
||||
|
||||
@instance_service.setter
|
||||
def instance_service(self, value: str | None) -> None:
|
||||
self._instance_service = value
|
||||
|
||||
@property
|
||||
def instance_service_full(self) -> str:
|
||||
return f"{self._instance_service}.service"
|
||||
|
||||
@property
|
||||
def instance_service_path(self) -> str | None:
|
||||
return self._instance_service_path
|
||||
|
||||
@instance_service_path.setter
|
||||
def instance_service_path(self, value: str | None) -> None:
|
||||
self._instance_service_path = value
|
||||
|
||||
@property
|
||||
def instances(self) -> List[Type[T]]:
|
||||
return self.find_instances()
|
||||
|
||||
@instances.setter
|
||||
def instances(self, value: List[T]) -> None:
|
||||
self._instances = value
|
||||
|
||||
def create_instance(self) -> None:
|
||||
if self.current_instance is not None:
|
||||
try:
|
||||
self.current_instance.create()
|
||||
except (OSError, subprocess.CalledProcessError) as e:
|
||||
Logger.print_error(f"Creating instance failed: {e}")
|
||||
raise
|
||||
else:
|
||||
raise ValueError("current_instance cannot be None")
|
||||
|
||||
def enable_instance(self) -> None:
|
||||
@staticmethod
|
||||
def enable(instance: InstanceType) -> None:
|
||||
service_name: str = instance.service_file_path.name
|
||||
try:
|
||||
cmd_sysctl_service(self.instance_service_full, "enable")
|
||||
except subprocess.CalledProcessError as e:
|
||||
Logger.print_error(f"Error enabling service {self.instance_service_full}:")
|
||||
cmd_sysctl_service(service_name, "enable")
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"Error enabling service {service_name}:")
|
||||
Logger.print_error(f"{e}")
|
||||
|
||||
def disable_instance(self) -> None:
|
||||
@staticmethod
|
||||
def disable(instance: InstanceType) -> None:
|
||||
service_name: str = instance.service_file_path.name
|
||||
try:
|
||||
cmd_sysctl_service(self.instance_service_full, "disable")
|
||||
except subprocess.CalledProcessError as e:
|
||||
Logger.print_error(f"Error disabling {self.instance_service_full}:")
|
||||
Logger.print_error(f"{e}")
|
||||
|
||||
def start_instance(self) -> None:
|
||||
try:
|
||||
cmd_sysctl_service(self.instance_service_full, "start")
|
||||
except subprocess.CalledProcessError as e:
|
||||
Logger.print_error(f"Error starting {self.instance_service_full}:")
|
||||
Logger.print_error(f"{e}")
|
||||
|
||||
def restart_instance(self) -> None:
|
||||
try:
|
||||
cmd_sysctl_service(self.instance_service_full, "restart")
|
||||
except subprocess.CalledProcessError as e:
|
||||
Logger.print_error(f"Error restarting {self.instance_service_full}:")
|
||||
Logger.print_error(f"{e}")
|
||||
|
||||
def start_all_instance(self) -> None:
|
||||
for instance in self.instances:
|
||||
self.current_instance = instance
|
||||
self.start_instance()
|
||||
|
||||
def restart_all_instance(self) -> None:
|
||||
for instance in self.instances:
|
||||
self.current_instance = instance
|
||||
self.restart_instance()
|
||||
|
||||
def stop_instance(self) -> None:
|
||||
try:
|
||||
cmd_sysctl_service(self.instance_service_full, "stop")
|
||||
except subprocess.CalledProcessError as e:
|
||||
Logger.print_error(f"Error stopping {self.instance_service_full}:")
|
||||
Logger.print_error(f"{e}")
|
||||
cmd_sysctl_service(service_name, "disable")
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"Error disabling {service_name}: {e}")
|
||||
raise
|
||||
|
||||
def stop_all_instance(self) -> None:
|
||||
for instance in self.instances:
|
||||
self.current_instance = instance
|
||||
self.stop_instance()
|
||||
@staticmethod
|
||||
def start(instance: InstanceType) -> None:
|
||||
service_name: str = instance.service_file_path.name
|
||||
try:
|
||||
cmd_sysctl_service(service_name, "start")
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"Error starting {service_name}: {e}")
|
||||
raise
|
||||
|
||||
def find_instances(self) -> List[Type[T]]:
|
||||
from utils.common import convert_camelcase_to_kebabcase
|
||||
@staticmethod
|
||||
def stop(instance: InstanceType) -> None:
|
||||
name: str = instance.service_file_path.name
|
||||
try:
|
||||
cmd_sysctl_service(name, "stop")
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"Error stopping {name}: {e}")
|
||||
raise
|
||||
|
||||
name = convert_camelcase_to_kebabcase(self.instance_type.__name__)
|
||||
pattern = re.compile(f"^{name}(-[0-9a-zA-Z]+)?.service$")
|
||||
excluded = self.instance_type.blacklist()
|
||||
@staticmethod
|
||||
def restart(instance: InstanceType) -> None:
|
||||
name: str = instance.service_file_path.name
|
||||
try:
|
||||
cmd_sysctl_service(name, "restart")
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"Error restarting {name}: {e}")
|
||||
raise
|
||||
|
||||
service_list = [
|
||||
Path(SYSTEMD, service)
|
||||
for service in SYSTEMD.iterdir()
|
||||
if pattern.search(service.name)
|
||||
and not any(s in service.name for s in excluded)
|
||||
]
|
||||
@staticmethod
|
||||
def start_all(instances: List[InstanceType]) -> None:
|
||||
for instance in instances:
|
||||
InstanceManager.start(instance)
|
||||
|
||||
instance_list = [
|
||||
self.instance_type(suffix=self._get_instance_suffix(name, service))
|
||||
for service in service_list
|
||||
]
|
||||
@staticmethod
|
||||
def stop_all(instances: List[InstanceType]) -> None:
|
||||
for instance in instances:
|
||||
InstanceManager.stop(instance)
|
||||
|
||||
return sorted(instance_list, key=lambda x: self._sort_instance_list(x.suffix))
|
||||
@staticmethod
|
||||
def restart_all(instances: List[InstanceType]) -> None:
|
||||
for instance in instances:
|
||||
InstanceManager.restart(instance)
|
||||
|
||||
def _get_instance_suffix(self, name: str, file_path: Path) -> str:
|
||||
# to get the suffix of the instance, we remove the name of the instance from
|
||||
# the file name, if the remaining part an empty string we return it
|
||||
# otherwise there is and hyphen left, and we return the part after the hyphen
|
||||
suffix = file_path.stem[len(name) :]
|
||||
return suffix[1:] if suffix else ""
|
||||
@staticmethod
|
||||
def remove(instance: InstanceType) -> None:
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.sys_utils import remove_system_service
|
||||
|
||||
def _sort_instance_list(self, suffix: int | str | None):
|
||||
if suffix is None:
|
||||
return
|
||||
elif isinstance(suffix, str) and suffix.isdigit():
|
||||
return f"{int(suffix):04}"
|
||||
else:
|
||||
return suffix
|
||||
try:
|
||||
# remove the service file
|
||||
service_file_path: Path = instance.service_file_path
|
||||
if service_file_path is not None:
|
||||
remove_system_service(service_file_path.name)
|
||||
|
||||
# then remove all the log files
|
||||
if (
|
||||
not instance.log_file_name
|
||||
or not instance.base.log_dir
|
||||
or not instance.base.log_dir.exists()
|
||||
):
|
||||
return
|
||||
|
||||
files = instance.base.log_dir.iterdir()
|
||||
logs = [f for f in files if f.name.startswith(instance.log_file_name)]
|
||||
for log in logs:
|
||||
Logger.print_status(f"Remove '{log}'")
|
||||
run_remove_routines(log)
|
||||
|
||||
except Exception as e:
|
||||
Logger.print_error(f"Error removing service: {e}")
|
||||
raise
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
from enum import Enum, unique
|
||||
|
||||
|
||||
@unique
|
||||
class NameScheme(Enum):
|
||||
SINGLE = "SINGLE"
|
||||
INDEX = "INDEX"
|
||||
CUSTOM = "CUSTOM"
|
||||
25
kiauh/core/instance_type.py
Normal file
25
kiauh/core/instance_type.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# ======================================================================= #
|
||||
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
|
||||
# #
|
||||
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||
# https://github.com/dw-0/kiauh #
|
||||
# #
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||
# ======================================================================= #
|
||||
|
||||
from typing import TypeVar
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from components.octoeverywhere.octoeverywhere import Octoeverywhere
|
||||
from extensions.obico.moonraker_obico import MoonrakerObico
|
||||
from extensions.telegram_bot.moonraker_telegram_bot import MoonrakerTelegramBot
|
||||
|
||||
InstanceType = TypeVar(
|
||||
"InstanceType",
|
||||
Klipper,
|
||||
Moonraker,
|
||||
MoonrakerTelegramBot,
|
||||
MoonrakerObico,
|
||||
Octoeverywhere,
|
||||
)
|
||||
@@ -25,6 +25,7 @@ from core.menus.base_menu import BaseMenu
|
||||
from core.settings.kiauh_settings import KiauhSettings
|
||||
from utils.git_utils import git_clone_wrapper
|
||||
from utils.input_utils import get_confirm, get_string_input
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@@ -177,14 +178,14 @@ class SettingsMenu(BaseMenu):
|
||||
if target_dir.exists():
|
||||
shutil.rmtree(target_dir)
|
||||
|
||||
im = InstanceManager(_type)
|
||||
im.stop_all_instance()
|
||||
instances = get_instances(_type)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
repo = self.settings.get(name, "repo_url")
|
||||
branch = self.settings.get(name, "branch")
|
||||
git_clone_wrapper(repo, target_dir, branch)
|
||||
|
||||
im.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
def set_klipper_repo(self, **kwargs) -> None:
|
||||
self._set_repo("klipper")
|
||||
|
||||
@@ -28,6 +28,7 @@ from extensions.gcode_shell_cmd import (
|
||||
)
|
||||
from utils.fs_utils import check_file_exist
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
@@ -55,8 +56,8 @@ class GcodeShellCmdExtension(BaseExtension):
|
||||
Logger.print_warn("Installation aborted due to user request.")
|
||||
return
|
||||
|
||||
im = InstanceManager(Klipper)
|
||||
im.stop_all_instance()
|
||||
instances = get_instances(Klipper)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
try:
|
||||
Logger.print_status(f"Copy extension to '{KLIPPER_EXTRAS}' ...")
|
||||
@@ -66,9 +67,9 @@ class GcodeShellCmdExtension(BaseExtension):
|
||||
return
|
||||
|
||||
if install_example:
|
||||
self.install_example_cfg(im.instances)
|
||||
self.install_example_cfg(instances)
|
||||
|
||||
im.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
Logger.print_ok("Installing G-Code Shell Command extension successful!")
|
||||
|
||||
@@ -94,7 +95,7 @@ class GcodeShellCmdExtension(BaseExtension):
|
||||
Logger.print_warn("Make sure to remove them from the printer.cfg!")
|
||||
|
||||
def install_example_cfg(self, instances: List[Klipper]):
|
||||
cfg_dirs = [instance.cfg_dir for instance in instances]
|
||||
cfg_dirs = [instance.base.cfg_dir for instance in instances]
|
||||
# copy extension to klippy/extras
|
||||
for cfg_dir in cfg_dirs:
|
||||
Logger.print_status(f"Create shell_command.cfg in '{cfg_dir}' ...")
|
||||
|
||||
@@ -21,13 +21,14 @@ from components.klipper.klipper_dialogs import (
|
||||
)
|
||||
from core.constants import COLOR_CYAN, COLOR_YELLOW, RESET_FORMAT
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.instance_type import InstanceType
|
||||
from core.logger import Logger
|
||||
from core.menus import Option
|
||||
from core.menus.base_menu import BaseMenu
|
||||
from extensions.base_extension import BaseExtension
|
||||
from utils.git_utils import git_clone_wrapper
|
||||
from utils.input_utils import get_selection_input
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
class ThemeData(TypedDict):
|
||||
@@ -39,8 +40,7 @@ class ThemeData(TypedDict):
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
class MainsailThemeInstallerExtension(BaseExtension):
|
||||
im = InstanceManager(Klipper)
|
||||
instances: List[Klipper] = im.instances
|
||||
instances: List[Klipper] = get_instances(Klipper)
|
||||
|
||||
def install_extension(self, **kwargs) -> None:
|
||||
MainsailThemeInstallMenu(self.instances).run()
|
||||
@@ -155,7 +155,7 @@ class MainsailThemeInstallMenu(BaseMenu):
|
||||
|
||||
|
||||
def get_printer_selection(
|
||||
instances: List[BaseInstance], is_install: bool
|
||||
instances: List[InstanceType], is_install: bool
|
||||
) -> Union[List[BaseInstance], None]:
|
||||
options = [str(i) for i in range(len(instances))]
|
||||
options.extend(["a", "b"])
|
||||
|
||||
@@ -8,10 +8,12 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError, run
|
||||
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.logger import Logger
|
||||
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||
@@ -27,23 +29,32 @@ from extensions.obico import (
|
||||
OBICO_LOG_NAME,
|
||||
OBICO_SERVICE_TEMPLATE,
|
||||
)
|
||||
from utils.fs_utils import create_folders
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
@dataclass
|
||||
class MoonrakerObico(BaseInstance):
|
||||
@dataclass(repr=True)
|
||||
class MoonrakerObico:
|
||||
suffix: str
|
||||
base: BaseInstance = field(init=False, repr=False)
|
||||
service_file_path: Path = field(init=False)
|
||||
log_file_name: str = OBICO_LOG_NAME
|
||||
dir: Path = OBICO_DIR
|
||||
env_dir: Path = OBICO_ENV_DIR
|
||||
log_file_name = OBICO_LOG_NAME
|
||||
cfg_file: Path | None = None
|
||||
data_dir: Path = field(init=False)
|
||||
cfg_file: Path = field(init=False)
|
||||
is_linked: bool = False
|
||||
|
||||
def __init__(self, suffix: str = ""):
|
||||
super().__init__(suffix=suffix)
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
self.cfg_file = self.cfg_dir.joinpath(OBICO_CFG_NAME)
|
||||
self.base: BaseInstance = BaseInstance(Moonraker, self.suffix)
|
||||
self.base.log_file_name = self.log_file_name
|
||||
|
||||
self.service_file_path: Path = get_service_file_path(
|
||||
MoonrakerObico, self.suffix
|
||||
)
|
||||
self.data_dir: Path = self.base.data_dir
|
||||
self.cfg_file = self.base.cfg_dir.joinpath(OBICO_CFG_NAME)
|
||||
self.is_linked: bool = self._check_link_status()
|
||||
|
||||
def create(self) -> None:
|
||||
@@ -52,13 +63,13 @@ class MoonrakerObico(BaseInstance):
|
||||
Logger.print_status("Creating new Obico for Klipper Instance ...")
|
||||
|
||||
try:
|
||||
self.create_folders()
|
||||
create_folders(self.base.base_folders)
|
||||
create_service_file(
|
||||
name=self.service_file_path.name,
|
||||
content=self._prep_service_file_content(),
|
||||
)
|
||||
create_env_file(
|
||||
path=self.sysd_dir.joinpath(OBICO_ENV_FILE_NAME),
|
||||
path=self.base.sysd_dir.joinpath(OBICO_ENV_FILE_NAME),
|
||||
content=self._prep_env_file_content(),
|
||||
)
|
||||
|
||||
@@ -71,7 +82,7 @@ class MoonrakerObico(BaseInstance):
|
||||
|
||||
def link(self) -> None:
|
||||
Logger.print_status(
|
||||
f"Linking instance for printer {self.data_dir_name} to the Obico server ..."
|
||||
f"Linking instance for printer {self.data_dir.name} to the Obico server ..."
|
||||
)
|
||||
try:
|
||||
cmd = [f"{OBICO_LINK_SCRIPT} -q -c {self.cfg_file}"]
|
||||
@@ -94,7 +105,7 @@ class MoonrakerObico(BaseInstance):
|
||||
|
||||
service_content = template_content.replace(
|
||||
"%USER%",
|
||||
self.user,
|
||||
CURRENT_USER,
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%OBICO_DIR%",
|
||||
@@ -106,7 +117,7 @@ class MoonrakerObico(BaseInstance):
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%ENV_FILE%",
|
||||
self.sysd_dir.joinpath(OBICO_ENV_FILE_NAME).as_posix(),
|
||||
self.base.sysd_dir.joinpath(OBICO_ENV_FILE_NAME).as_posix(),
|
||||
)
|
||||
return service_content
|
||||
|
||||
@@ -121,7 +132,7 @@ class MoonrakerObico(BaseInstance):
|
||||
raise
|
||||
env_file_content = env_template_file_content.replace(
|
||||
"%CFG%",
|
||||
f"{self.cfg_dir}/{self.cfg_file}",
|
||||
f"{self.base.cfg_dir}/{self.cfg_file}",
|
||||
)
|
||||
return env_file_content
|
||||
|
||||
|
||||
@@ -38,8 +38,10 @@ from utils.config_utils import (
|
||||
from utils.fs_utils import run_remove_routines
|
||||
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
|
||||
from utils.input_utils import get_confirm, get_selection_input, get_string_input
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
cmd_sysctl_manage,
|
||||
cmd_sysctl_service,
|
||||
create_python_venv,
|
||||
install_python_requirements,
|
||||
parse_packages_from_file,
|
||||
@@ -60,8 +62,7 @@ class ObicoExtension(BaseExtension):
|
||||
# if obico is already installed, ask if the user wants to repair an
|
||||
# incomplete installation or link to the obico server
|
||||
force_clone = False
|
||||
obico_im = InstanceManager(MoonrakerObico)
|
||||
obico_instances: List[MoonrakerObico] = obico_im.instances
|
||||
obico_instances: List[MoonrakerObico] = get_instances(MoonrakerObico)
|
||||
if obico_instances:
|
||||
self._print_is_already_installed()
|
||||
options = ["l", "r", "b"]
|
||||
@@ -80,10 +81,8 @@ class ObicoExtension(BaseExtension):
|
||||
force_clone = True
|
||||
|
||||
# let the user confirm installation
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances: List[Klipper] = kl_im.instances
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
self._print_moonraker_instances(mr_instances)
|
||||
if not get_confirm(
|
||||
"Continue Obico for Klipper installation?",
|
||||
@@ -101,14 +100,13 @@ class ObicoExtension(BaseExtension):
|
||||
|
||||
# create obico instances
|
||||
for moonraker in mr_instances:
|
||||
current_instance = MoonrakerObico(suffix=moonraker.suffix)
|
||||
instance = MoonrakerObico(suffix=moonraker.suffix)
|
||||
instance.create()
|
||||
|
||||
obico_im.current_instance = current_instance
|
||||
obico_im.create_instance()
|
||||
obico_im.enable_instance()
|
||||
cmd_sysctl_service(instance.service_file_path.name, "enable")
|
||||
|
||||
# create obico config
|
||||
self._create_obico_cfg(current_instance, moonraker)
|
||||
self._create_obico_cfg(instance, moonraker)
|
||||
|
||||
# create obico macros
|
||||
self._create_obico_macros_cfg(moonraker)
|
||||
@@ -116,17 +114,17 @@ class ObicoExtension(BaseExtension):
|
||||
# create obico update manager
|
||||
self._create_obico_update_manager_cfg(moonraker)
|
||||
|
||||
obico_im.start_instance()
|
||||
cmd_sysctl_service(instance.service_file_path.name, "start")
|
||||
|
||||
cmd_sysctl_manage("daemon-reload")
|
||||
|
||||
# add to klippers config
|
||||
self._patch_printer_cfg(kl_instances)
|
||||
kl_im.restart_all_instance()
|
||||
InstanceManager.restart_all(kl_instances)
|
||||
|
||||
# add to moonraker update manager
|
||||
self._patch_moonraker_conf(mr_instances)
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
|
||||
# check linking of / ask for linking instances
|
||||
self._check_and_opt_link_instances()
|
||||
@@ -143,13 +141,13 @@ class ObicoExtension(BaseExtension):
|
||||
def update_extension(self, **kwargs) -> None:
|
||||
Logger.print_status("Updating Obico for Klipper ...")
|
||||
try:
|
||||
tb_im = InstanceManager(MoonrakerObico)
|
||||
tb_im.stop_all_instance()
|
||||
instances = get_instances(MoonrakerObico)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
git_pull_wrapper(OBICO_REPO, OBICO_DIR)
|
||||
self._install_dependencies()
|
||||
|
||||
tb_im.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
Logger.print_ok("Obico for Klipper successfully updated!")
|
||||
|
||||
except Exception as e:
|
||||
@@ -157,12 +155,10 @@ class ObicoExtension(BaseExtension):
|
||||
|
||||
def remove_extension(self, **kwargs) -> None:
|
||||
Logger.print_status("Removing Obico for Klipper ...")
|
||||
kl_im = InstanceManager(Klipper)
|
||||
kl_instances: List[Klipper] = kl_im.instances
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
ob_im = InstanceManager(MoonrakerObico)
|
||||
ob_instances: List[MoonrakerObico] = ob_im.instances
|
||||
|
||||
kl_instances: List[Klipper] = get_instances(Klipper)
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
ob_instances: List[MoonrakerObico] = get_instances(MoonrakerObico)
|
||||
|
||||
try:
|
||||
self._remove_obico_instances(ob_instances)
|
||||
@@ -197,8 +193,8 @@ class ObicoExtension(BaseExtension):
|
||||
],
|
||||
)
|
||||
|
||||
def _print_moonraker_instances(self, mr_instances) -> None:
|
||||
mr_names = [f"● {moonraker.data_dir_name}" for moonraker in mr_instances]
|
||||
def _print_moonraker_instances(self, mr_instances: List[Moonraker]) -> None:
|
||||
mr_names = [f"● {moonraker.data_dir.name}" for moonraker in mr_instances]
|
||||
if len(mr_names) > 1:
|
||||
Logger.print_dialog(
|
||||
DialogType.INFO,
|
||||
@@ -243,24 +239,24 @@ class ObicoExtension(BaseExtension):
|
||||
if create_python_venv(OBICO_ENV_DIR):
|
||||
install_python_requirements(OBICO_ENV_DIR, OBICO_REQ_FILE)
|
||||
|
||||
def _create_obico_macros_cfg(self, moonraker) -> None:
|
||||
def _create_obico_macros_cfg(self, moonraker: Moonraker) -> None:
|
||||
macros_cfg = OBICO_DIR.joinpath(f"include_cfgs/{OBICO_MACROS_CFG_NAME}")
|
||||
macros_target = moonraker.cfg_dir.joinpath(OBICO_MACROS_CFG_NAME)
|
||||
macros_target = moonraker.base.cfg_dir.joinpath(OBICO_MACROS_CFG_NAME)
|
||||
if not macros_target.exists():
|
||||
shutil.copy(macros_cfg, macros_target)
|
||||
else:
|
||||
Logger.print_info(
|
||||
f"Obico's '{OBICO_MACROS_CFG_NAME}' in {moonraker.cfg_dir} already exists! Skipped ..."
|
||||
f"Obico's '{OBICO_MACROS_CFG_NAME}' in {moonraker.base.cfg_dir} already exists! Skipped ..."
|
||||
)
|
||||
|
||||
def _create_obico_update_manager_cfg(self, moonraker) -> None:
|
||||
def _create_obico_update_manager_cfg(self, moonraker: Moonraker) -> None:
|
||||
update_cfg = OBICO_DIR.joinpath(OBICO_UPDATE_CFG_SAMPLE_NAME)
|
||||
update_cfg_target = moonraker.cfg_dir.joinpath(OBICO_UPDATE_CFG_NAME)
|
||||
update_cfg_target = moonraker.base.cfg_dir.joinpath(OBICO_UPDATE_CFG_NAME)
|
||||
if not update_cfg_target.exists():
|
||||
shutil.copy(update_cfg, update_cfg_target)
|
||||
else:
|
||||
Logger.print_info(
|
||||
f"Obico's '{OBICO_UPDATE_CFG_NAME}' in {moonraker.cfg_dir} already exists! Skipped ..."
|
||||
f"Obico's '{OBICO_UPDATE_CFG_NAME}' in {moonraker.base.cfg_dir} already exists! Skipped ..."
|
||||
)
|
||||
|
||||
def _create_obico_cfg(
|
||||
@@ -280,7 +276,7 @@ class ObicoExtension(BaseExtension):
|
||||
self._patch_obico_cfg(moonraker, current_instance)
|
||||
else:
|
||||
Logger.print_info(
|
||||
f"Obico config in {current_instance.cfg_dir} already exists! Skipped ..."
|
||||
f"Obico config in {current_instance.base.cfg_dir} already exists! Skipped ..."
|
||||
)
|
||||
|
||||
def _patch_obico_cfg(self, moonraker: Moonraker, obico: MoonrakerObico) -> None:
|
||||
@@ -291,7 +287,7 @@ class ObicoExtension(BaseExtension):
|
||||
scp.set(
|
||||
"logging",
|
||||
"path",
|
||||
obico.log_dir.joinpath(obico.log_file_name).as_posix(),
|
||||
obico.base.log_dir.joinpath(obico.log_file_name).as_posix(),
|
||||
)
|
||||
scp.write(obico.cfg_file)
|
||||
|
||||
@@ -311,8 +307,8 @@ class ObicoExtension(BaseExtension):
|
||||
|
||||
def _check_and_opt_link_instances(self) -> None:
|
||||
Logger.print_status("Checking link status of Obico instances ...")
|
||||
ob_im = InstanceManager(MoonrakerObico)
|
||||
ob_instances: List[MoonrakerObico] = ob_im.instances
|
||||
|
||||
ob_instances: List[MoonrakerObico] = get_instances(MoonrakerObico)
|
||||
unlinked_instances: List[MoonrakerObico] = [
|
||||
obico for obico in ob_instances if not obico.is_linked
|
||||
]
|
||||
@@ -322,7 +318,7 @@ class ObicoExtension(BaseExtension):
|
||||
[
|
||||
"The Obico instances for the following printers are not "
|
||||
"linked to the server:",
|
||||
*[f"● {obico.data_dir_name}" for obico in unlinked_instances],
|
||||
*[f"● {obico.data_dir.name}" for obico in unlinked_instances],
|
||||
"\n\n",
|
||||
"It will take only 10 seconds to link the printer to the Obico server.",
|
||||
"For more information visit:",
|
||||
@@ -350,7 +346,7 @@ class ObicoExtension(BaseExtension):
|
||||
Logger.print_status(
|
||||
f"Removing instance {instance.service_file_path.stem} ..."
|
||||
)
|
||||
instance.remove()
|
||||
InstanceManager.remove(instance)
|
||||
|
||||
def _remove_obico_dir(self) -> None:
|
||||
Logger.print_status("Removing Obico for Klipper directory ...")
|
||||
|
||||
@@ -8,10 +8,12 @@
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from core.constants import CURRENT_USER
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.logger import Logger
|
||||
from extensions.telegram_bot import (
|
||||
@@ -23,22 +25,31 @@ from extensions.telegram_bot import (
|
||||
TG_BOT_LOG_NAME,
|
||||
TG_BOT_SERVICE_TEMPLATE,
|
||||
)
|
||||
from utils.fs_utils import create_folders
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
@dataclass
|
||||
class MoonrakerTelegramBot(BaseInstance):
|
||||
@dataclass(repr=True)
|
||||
class MoonrakerTelegramBot:
|
||||
suffix: str
|
||||
base: BaseInstance = field(init=False, repr=False)
|
||||
service_file_path: Path = field(init=False)
|
||||
log_file_name: str = TG_BOT_LOG_NAME
|
||||
bot_dir: Path = TG_BOT_DIR
|
||||
env_dir: Path = TG_BOT_ENV
|
||||
log_file_name = TG_BOT_LOG_NAME
|
||||
cfg_file: Path | None = None
|
||||
|
||||
def __init__(self, suffix: str = ""):
|
||||
super().__init__(suffix=suffix)
|
||||
data_dir: Path = field(init=False)
|
||||
cfg_file: Path = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
self.cfg_file = self.cfg_dir.joinpath(TG_BOT_CFG_NAME)
|
||||
self.base: BaseInstance = BaseInstance(Moonraker, self.suffix)
|
||||
self.base.log_file_name = self.log_file_name
|
||||
|
||||
self.service_file_path: Path = get_service_file_path(
|
||||
MoonrakerTelegramBot, self.suffix
|
||||
)
|
||||
self.data_dir: Path = self.base.data_dir
|
||||
self.cfg_file = self.base.cfg_dir.joinpath(TG_BOT_CFG_NAME)
|
||||
|
||||
def create(self) -> None:
|
||||
from utils.sys_utils import create_env_file, create_service_file
|
||||
@@ -46,13 +57,13 @@ class MoonrakerTelegramBot(BaseInstance):
|
||||
Logger.print_status("Creating new Moonraker Telegram Bot Instance ...")
|
||||
|
||||
try:
|
||||
self.create_folders()
|
||||
create_folders(self.base.base_folders)
|
||||
create_service_file(
|
||||
name=self.service_file_path.name,
|
||||
content=self._prep_service_file_content(),
|
||||
)
|
||||
create_env_file(
|
||||
path=self.sysd_dir.joinpath(TG_BOT_ENV_FILE_NAME),
|
||||
path=self.base.sysd_dir.joinpath(TG_BOT_ENV_FILE_NAME),
|
||||
content=self._prep_env_file_content(),
|
||||
)
|
||||
|
||||
@@ -75,7 +86,7 @@ class MoonrakerTelegramBot(BaseInstance):
|
||||
|
||||
service_content = template_content.replace(
|
||||
"%USER%",
|
||||
self.user,
|
||||
CURRENT_USER,
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%TELEGRAM_BOT_DIR%",
|
||||
@@ -87,7 +98,7 @@ class MoonrakerTelegramBot(BaseInstance):
|
||||
)
|
||||
service_content = service_content.replace(
|
||||
"%ENV_FILE%",
|
||||
self.sysd_dir.joinpath(TG_BOT_ENV_FILE_NAME).as_posix(),
|
||||
self.base.sysd_dir.joinpath(TG_BOT_ENV_FILE_NAME).as_posix(),
|
||||
)
|
||||
return service_content
|
||||
|
||||
@@ -107,10 +118,10 @@ class MoonrakerTelegramBot(BaseInstance):
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%CFG%",
|
||||
f"{self.cfg_dir}/printer.cfg",
|
||||
f"{self.base.cfg_dir}/printer.cfg",
|
||||
)
|
||||
env_file_content = env_file_content.replace(
|
||||
"%LOG%",
|
||||
self.log_dir.joinpath(self.log_file_name).as_posix(),
|
||||
self.base.log_dir.joinpath(self.log_file_name).as_posix(),
|
||||
)
|
||||
return env_file_content
|
||||
|
||||
@@ -25,8 +25,10 @@ from utils.config_utils import add_config_section, remove_config_section
|
||||
from utils.fs_utils import remove_file
|
||||
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
|
||||
from utils.input_utils import get_confirm
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
cmd_sysctl_manage,
|
||||
cmd_sysctl_service,
|
||||
create_python_venv,
|
||||
install_python_requirements,
|
||||
parse_packages_from_file,
|
||||
@@ -37,8 +39,8 @@ from utils.sys_utils import (
|
||||
class TelegramBotExtension(BaseExtension):
|
||||
def install_extension(self, **kwargs) -> None:
|
||||
Logger.print_status("Installing Moonraker Telegram Bot ...")
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
if not mr_instances:
|
||||
Logger.print_dialog(
|
||||
DialogType.WARNING,
|
||||
@@ -47,10 +49,14 @@ class TelegramBotExtension(BaseExtension):
|
||||
"Moonraker Telegram Bot requires Moonraker to be installed. "
|
||||
"Please install Moonraker first!",
|
||||
],
|
||||
padding_top=0,
|
||||
padding_bottom=0,
|
||||
)
|
||||
return
|
||||
|
||||
instance_names = [f"● {instance.data_dir_name}" for instance in mr_instances]
|
||||
instance_names = [
|
||||
f"● {instance.service_file_path.name}" for instance in mr_instances
|
||||
]
|
||||
Logger.print_dialog(
|
||||
DialogType.INFO,
|
||||
[
|
||||
@@ -59,6 +65,8 @@ class TelegramBotExtension(BaseExtension):
|
||||
"\n\n",
|
||||
"The setup will apply the same names to Telegram Bot!",
|
||||
],
|
||||
padding_top=0,
|
||||
padding_bottom=0,
|
||||
)
|
||||
if not get_confirm(
|
||||
"Continue Moonraker Telegram Bot installation?",
|
||||
@@ -75,30 +83,30 @@ class TelegramBotExtension(BaseExtension):
|
||||
|
||||
# create and start services / create bot configs
|
||||
show_config_dialog = False
|
||||
tb_im = InstanceManager(MoonrakerTelegramBot)
|
||||
tb_names = [mr_i.suffix for mr_i in mr_instances]
|
||||
for name in tb_names:
|
||||
current_instance = MoonrakerTelegramBot(suffix=name)
|
||||
instance = MoonrakerTelegramBot(suffix=name)
|
||||
instance.create()
|
||||
|
||||
tb_im.current_instance = current_instance
|
||||
tb_im.create_instance()
|
||||
tb_im.enable_instance()
|
||||
print(instance)
|
||||
|
||||
cmd_sysctl_service(instance.service_file_path.name, "enable")
|
||||
|
||||
if create_example_cfg:
|
||||
Logger.print_status(
|
||||
f"Creating Telegram Bot config in {current_instance.cfg_dir} ..."
|
||||
f"Creating Telegram Bot config in {instance.base.cfg_dir} ..."
|
||||
)
|
||||
template = TG_BOT_DIR.joinpath("scripts/base_install_template")
|
||||
target_file = current_instance.cfg_file
|
||||
target_file = instance.cfg_file
|
||||
if not target_file.exists():
|
||||
show_config_dialog = True
|
||||
run(["cp", template, target_file], check=True)
|
||||
else:
|
||||
Logger.print_info(
|
||||
f"Telegram Bot config in {current_instance.cfg_dir} already exists! Skipped ..."
|
||||
f"Telegram Bot config in {instance.base.cfg_dir} already exists! Skipped ..."
|
||||
)
|
||||
|
||||
tb_im.start_instance()
|
||||
cmd_sysctl_service(instance.service_file_path.name, "start")
|
||||
|
||||
cmd_sysctl_manage("daemon-reload")
|
||||
|
||||
@@ -106,7 +114,7 @@ class TelegramBotExtension(BaseExtension):
|
||||
self._patch_bot_update_manager(mr_instances)
|
||||
|
||||
# restart moonraker
|
||||
mr_im.restart_all_instance()
|
||||
InstanceManager.restart_all(mr_instances)
|
||||
|
||||
if show_config_dialog:
|
||||
Logger.print_dialog(
|
||||
@@ -128,20 +136,20 @@ class TelegramBotExtension(BaseExtension):
|
||||
|
||||
def update_extension(self, **kwargs) -> None:
|
||||
Logger.print_status("Updating Moonraker Telegram Bot ...")
|
||||
tb_im = InstanceManager(MoonrakerTelegramBot)
|
||||
tb_im.stop_all_instance()
|
||||
|
||||
instances = get_instances(MoonrakerTelegramBot)
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
git_pull_wrapper(TG_BOT_REPO, TG_BOT_DIR)
|
||||
self._install_dependencies()
|
||||
|
||||
tb_im.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
def remove_extension(self, **kwargs) -> None:
|
||||
Logger.print_status("Removing Moonraker Telegram Bot ...")
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
tb_im = InstanceManager(MoonrakerTelegramBot)
|
||||
tb_instances: List[MoonrakerTelegramBot] = tb_im.instances
|
||||
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
tb_instances: List[MoonrakerTelegramBot] = get_instances(MoonrakerTelegramBot)
|
||||
|
||||
try:
|
||||
self._remove_bot_instances(tb_instances)
|
||||
@@ -187,7 +195,7 @@ class TelegramBotExtension(BaseExtension):
|
||||
Logger.print_status(
|
||||
f"Removing instance {instance.service_file_path.stem} ..."
|
||||
)
|
||||
instance.remove()
|
||||
InstanceManager.remove(instance)
|
||||
|
||||
def _remove_bot_dir(self) -> None:
|
||||
if not TG_BOT_DIR.exists():
|
||||
@@ -212,7 +220,7 @@ class TelegramBotExtension(BaseExtension):
|
||||
def _delete_bot_logs(self, instances: List[MoonrakerTelegramBot]) -> None:
|
||||
all_logfiles = []
|
||||
for instance in instances:
|
||||
all_logfiles = list(instance.log_dir.glob("telegram_bot.log*"))
|
||||
all_logfiles = list(instance.base.log_dir.glob("telegram_bot.log*"))
|
||||
if not all_logfiles:
|
||||
Logger.print_info("No Moonraker Telegram Bot logs found. Skipped ...")
|
||||
return
|
||||
|
||||
@@ -11,7 +11,7 @@ from __future__ import annotations
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Literal, Optional, Set, Type
|
||||
from typing import Dict, List, Literal, Optional, Set
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from core.constants import (
|
||||
@@ -20,11 +20,10 @@ from core.constants import (
|
||||
PRINTER_CFG_BACKUP_DIR,
|
||||
RESET_FORMAT,
|
||||
)
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.logger import DialogType, Logger
|
||||
from core.types import ComponentStatus, StatusCode
|
||||
from utils.git_utils import get_local_commit, get_remote_commit, get_repo_name
|
||||
from utils.instance_utils import get_instances
|
||||
from utils.sys_utils import (
|
||||
check_package_install,
|
||||
install_system_packages,
|
||||
@@ -77,7 +76,7 @@ def check_install_dependencies(
|
||||
def get_install_status(
|
||||
repo_dir: Path,
|
||||
env_dir: Optional[Path] = None,
|
||||
instance_type: Optional[Type[BaseInstance]] = None,
|
||||
instance_type: type | None = None,
|
||||
files: Optional[List[Path]] = None,
|
||||
) -> ComponentStatus:
|
||||
"""
|
||||
@@ -88,15 +87,16 @@ def get_install_status(
|
||||
:param files: List of optional files to check for existence
|
||||
:return: Dictionary with status string, statuscode and instance count
|
||||
"""
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
checks = [repo_dir.exists()]
|
||||
|
||||
if env_dir is not None:
|
||||
checks.append(env_dir.exists())
|
||||
|
||||
im = InstanceManager(instance_type)
|
||||
instances = 0
|
||||
if instance_type is not None:
|
||||
instances = len(im.instances)
|
||||
instances = len(get_instances(instance_type))
|
||||
checks.append(instances > 0)
|
||||
|
||||
if files is not None:
|
||||
@@ -124,15 +124,14 @@ def backup_printer_config_dir() -> None:
|
||||
# local import to prevent circular import
|
||||
from core.backup_manager.backup_manager import BackupManager
|
||||
|
||||
im = InstanceManager(Klipper)
|
||||
instances: List[Klipper] = im.instances
|
||||
instances: List[Klipper] = get_instances(Klipper)
|
||||
bm = BackupManager()
|
||||
|
||||
for instance in instances:
|
||||
name = f"config-{instance.data_dir_name}"
|
||||
name = f"config-{instance.data_dir.name}"
|
||||
bm.backup_directory(
|
||||
name,
|
||||
source=instance.cfg_dir,
|
||||
source=instance.base.cfg_dir,
|
||||
target=PRINTER_CFG_BACKUP_DIR,
|
||||
)
|
||||
|
||||
@@ -145,8 +144,7 @@ def moonraker_exists(name: str = "") -> bool:
|
||||
"""
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
|
||||
mr_im = InstanceManager(Moonraker)
|
||||
mr_instances: List[Moonraker] = mr_im.instances
|
||||
mr_instances: List[Moonraker] = get_instances(Moonraker)
|
||||
|
||||
info = (
|
||||
f"{name} requires Moonraker to be installed"
|
||||
|
||||
@@ -6,25 +6,25 @@
|
||||
# #
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple, TypeVar
|
||||
from typing import List, Tuple
|
||||
|
||||
from components.klipper.klipper import Klipper
|
||||
from components.moonraker.moonraker import Moonraker
|
||||
from core.instance_type import InstanceType
|
||||
from core.logger import Logger
|
||||
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
|
||||
SimpleConfigParser,
|
||||
)
|
||||
|
||||
B = TypeVar("B", Klipper, Moonraker)
|
||||
ConfigOption = Tuple[str, str]
|
||||
|
||||
|
||||
def add_config_section(
|
||||
section: str,
|
||||
instances: List[B],
|
||||
options: Optional[List[ConfigOption]] = None,
|
||||
instances: List[InstanceType],
|
||||
options: List[ConfigOption] | None = None,
|
||||
) -> None:
|
||||
for instance in instances:
|
||||
cfg_file = instance.cfg_file
|
||||
@@ -49,7 +49,7 @@ def add_config_section(
|
||||
scp.write(cfg_file)
|
||||
|
||||
|
||||
def add_config_section_at_top(section: str, instances: List[B]) -> None:
|
||||
def add_config_section_at_top(section: str, instances: List[InstanceType]) -> None:
|
||||
# TODO: this could be implemented natively in SimpleConfigParser
|
||||
for instance in instances:
|
||||
tmp_cfg = tempfile.NamedTemporaryFile(mode="w", delete=False)
|
||||
@@ -70,7 +70,7 @@ def add_config_section_at_top(section: str, instances: List[B]) -> None:
|
||||
tmp_cfg_path.rename(cfg_file)
|
||||
|
||||
|
||||
def remove_config_section(section: str, instances: List[B]) -> None:
|
||||
def remove_config_section(section: str, instances: List[InstanceType]) -> None:
|
||||
for instance in instances:
|
||||
cfg_file = instance.cfg_file
|
||||
Logger.print_status(f"Remove section '[{section}]' from '{cfg_file}' ...")
|
||||
|
||||
@@ -8,10 +8,13 @@
|
||||
# #
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from subprocess import DEVNULL, PIPE, CalledProcessError, check_output, run
|
||||
from typing import List
|
||||
from zipfile import ZipFile
|
||||
|
||||
from core.decorators import deprecated
|
||||
@@ -103,3 +106,37 @@ def unzip(filepath: Path, target_dir: Path) -> None:
|
||||
"""
|
||||
with ZipFile(filepath, "r") as _zip:
|
||||
_zip.extractall(target_dir)
|
||||
|
||||
|
||||
def create_folders(dirs: List[Path]) -> None:
|
||||
try:
|
||||
for _dir in dirs:
|
||||
if _dir.exists():
|
||||
continue
|
||||
_dir.mkdir(exist_ok=True)
|
||||
Logger.print_ok(f"Created directory '{_dir}'!")
|
||||
except OSError as e:
|
||||
Logger.print_error(f"Error creating directories: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def get_data_dir(instance_type: type, suffix: str) -> Path:
|
||||
from utils.sys_utils import get_service_file_path
|
||||
|
||||
# if the service file exists, we read the data dir path from it
|
||||
# this also ensures compatibility with pre v6.0.0 instances
|
||||
service_file_path: Path = get_service_file_path(instance_type, suffix)
|
||||
if service_file_path and service_file_path.exists():
|
||||
with open(service_file_path, "r") as service_file:
|
||||
lines = service_file.readlines()
|
||||
for line in lines:
|
||||
pattern = r"^EnvironmentFile=(.+)(/systemd/.+\.env)"
|
||||
match = re.search(pattern, line)
|
||||
if match:
|
||||
return Path(match.group(1))
|
||||
|
||||
if suffix != "":
|
||||
# this is the new data dir naming scheme introduced in v6.0.0
|
||||
return Path.home().joinpath(f"printer_{suffix}_data")
|
||||
|
||||
return Path.home().joinpath("printer_data")
|
||||
|
||||
@@ -9,10 +9,11 @@ from pathlib import Path
|
||||
from subprocess import DEVNULL, PIPE, CalledProcessError, check_output, run
|
||||
from typing import List, Type
|
||||
|
||||
from core.instance_manager.base_instance import BaseInstance
|
||||
from core.instance_manager.instance_manager import InstanceManager
|
||||
from core.instance_type import InstanceType
|
||||
from core.logger import Logger
|
||||
from utils.input_utils import get_confirm, get_number_input
|
||||
from utils.instance_utils import get_instances
|
||||
|
||||
|
||||
def git_clone_wrapper(
|
||||
@@ -196,15 +197,15 @@ def git_cmd_pull(target_dir: Path) -> None:
|
||||
raise
|
||||
|
||||
|
||||
def rollback_repository(repo_dir: Path, instance: Type[BaseInstance]) -> None:
|
||||
def rollback_repository(repo_dir: Path, instance: Type[InstanceType]) -> None:
|
||||
q1 = "How many commits do you want to roll back"
|
||||
amount = get_number_input(q1, 1, allow_go_back=True)
|
||||
|
||||
im = InstanceManager(instance)
|
||||
instances = get_instances(instance)
|
||||
|
||||
Logger.print_warn("Do not continue if you have ongoing prints!", start="\n")
|
||||
Logger.print_warn(
|
||||
f"All currently running {im.instance_type.__name__} services will be stopped!"
|
||||
f"All currently running {instance.__name__} services will be stopped!"
|
||||
)
|
||||
if not get_confirm(
|
||||
f"Roll back {amount} commit{'s' if amount > 1 else ''}",
|
||||
@@ -214,7 +215,7 @@ def rollback_repository(repo_dir: Path, instance: Type[BaseInstance]) -> None:
|
||||
Logger.print_info("Aborting roll back ...")
|
||||
return
|
||||
|
||||
im.stop_all_instance()
|
||||
InstanceManager.stop_all(instances)
|
||||
|
||||
try:
|
||||
cmd = ["git", "reset", "--hard", f"HEAD~{amount}"]
|
||||
@@ -223,4 +224,4 @@ def rollback_repository(repo_dir: Path, instance: Type[BaseInstance]) -> None:
|
||||
except CalledProcessError as e:
|
||||
Logger.print_error(f"An error occured during repo rollback:\n{e}")
|
||||
|
||||
im.start_all_instance()
|
||||
InstanceManager.start_all(instances)
|
||||
|
||||
56
kiauh/utils/instance_utils.py
Normal file
56
kiauh/utils/instance_utils.py
Normal file
@@ -0,0 +1,56 @@
|
||||
# ======================================================================= #
|
||||
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
|
||||
# #
|
||||
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||
# https://github.com/dw-0/kiauh #
|
||||
# #
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||
# ======================================================================= #
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from core.constants import SYSTEMD
|
||||
from core.instance_manager.base_instance import SUFFIX_BLACKLIST
|
||||
from core.instance_type import InstanceType
|
||||
|
||||
|
||||
def get_instances(instance_type: type) -> List[InstanceType]:
|
||||
from utils.common import convert_camelcase_to_kebabcase
|
||||
|
||||
if not isinstance(instance_type, type):
|
||||
raise ValueError("instance_type must be a class")
|
||||
|
||||
name = convert_camelcase_to_kebabcase(instance_type.__name__)
|
||||
pattern = re.compile(f"^{name}(-[0-9a-zA-Z]+)?.service$")
|
||||
|
||||
service_list = [
|
||||
Path(SYSTEMD, service)
|
||||
for service in SYSTEMD.iterdir()
|
||||
if pattern.search(service.name)
|
||||
and not any(s in service.name for s in SUFFIX_BLACKLIST)
|
||||
]
|
||||
|
||||
instance_list = [
|
||||
instance_type(get_instance_suffix(name, service)) for service in service_list
|
||||
]
|
||||
|
||||
def _sort_instance_list(suffix: int | str | None):
|
||||
if suffix is None:
|
||||
return
|
||||
elif isinstance(suffix, str) and suffix.isdigit():
|
||||
return f"{int(suffix):04}"
|
||||
else:
|
||||
return suffix
|
||||
|
||||
return sorted(instance_list, key=lambda x: _sort_instance_list(x.suffix))
|
||||
|
||||
|
||||
def get_instance_suffix(name: str, file_path: Path) -> str:
|
||||
# to get the suffix of the instance, we remove the name of the instance from
|
||||
# the file name, if the remaining part an empty string we return it
|
||||
# otherwise there is and hyphen left, and we return the part after the hyphen
|
||||
suffix = file_path.stem[len(name) :]
|
||||
return suffix[1:] if suffix else ""
|
||||
@@ -511,3 +511,18 @@ def remove_system_service(service_name: str) -> None:
|
||||
except Exception as e:
|
||||
Logger.print_error(f"Error removing {service_name}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def get_service_file_path(instance_type: type, suffix: str) -> Path:
|
||||
from utils.common import convert_camelcase_to_kebabcase
|
||||
|
||||
if not isinstance(instance_type, type):
|
||||
raise ValueError("instance_type must be a class")
|
||||
|
||||
name: str = convert_camelcase_to_kebabcase(instance_type.__name__)
|
||||
if suffix != "":
|
||||
name += f"-{suffix}"
|
||||
|
||||
file_path: Path = SYSTEMD.joinpath(f"{name}.service")
|
||||
|
||||
return file_path
|
||||
|
||||
Reference in New Issue
Block a user