Compare commits

..

18 Commits

Author SHA1 Message Date
dw-0
b640aa37ab Merge f00d41b1bf into a929c6983d 2024-07-27 23:21:06 +02:00
dw-0
f00d41b1bf fix: fix logic bug in handle_instance_names
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-27 23:18:32 +02:00
dw-0
f578247b74 fix: fix logic bug in conditional
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-27 23:02:08 +02:00
dw-0
a7c67721b6 refactor: make Moonraker to dataclass
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-27 22:30:53 +02:00
dw-0
32742943a0 refactor: start at index 1 in moonraker setup dialog if multi instance
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-27 22:23:06 +02:00
dw-0
871bedb76b refactor: overhaul of the klipper setup process
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-27 21:13:56 +02:00
dw-0
fee2dd0bda refactor: use | instead of Union
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-14 14:44:08 +02:00
dw-0
e5bcab5d85 fix: return if instance_list is empty
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-13 13:34:55 +02:00
dw-0
31ea6c2e5a refactor: add moonraker speedup dependencies
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:29:49 +02:00
dw-0
1384f7328a refactor: use global deps list to check for generally required dependencies
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
6bf55b5f69 refactor: use virtualenv instead of venv
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
398705b176 fix: prevent exception when trying to remove log files from non-existing directory
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
ed2e318d0e refactor: add __repr__ to Klipper class
This commit adds a __repr__ method to the Klipper class. This method returns a JSON string representation of the instance, which can be used for debugging purposes.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
75ac8a22d5 refactor: add regex pattern to assign custom names
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
005e2d3339 refactor: improve robustness of instance sorting
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
bdb2c85e9b fix: fix usage of wrong status code
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
7e251eb31e refactor: more extraction into constant
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-07 22:16:37 +02:00
dw-0
64ea337e7e refactor: create service removal helper function
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-07-02 22:07:52 +02:00
32 changed files with 478 additions and 588 deletions

View File

@@ -10,7 +10,21 @@
from pathlib import Path
from core.backup_manager import BACKUP_ROOT_DIR
from utils.constants import SYSTEMD
CROWSNEST_DIR = Path.home().joinpath("crowsnest")
# repo
CROWSNEST_REPO = "https://github.com/mainsail-crew/crowsnest.git"
# names
CROWSNEST_SERVICE_NAME = "crowsnest.service"
# directories
CROWSNEST_DIR = Path.home().joinpath("crowsnest")
CROWSNEST_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("crowsnest-backups")
# files
CROWSNEST_MULTI_CONFIG = CROWSNEST_DIR.joinpath("tools/.config")
CROWSNEST_INSTALL_SCRIPT = CROWSNEST_DIR.joinpath("tools/install.sh")
CROWSNEST_BIN_FILE = Path("/usr/local/bin/crowsnest")
CROWSNEST_LOGROTATE_FILE = Path("/etc/logrotate.d/crowsnest")
CROWSNEST_SERVICE_FILE = SYSTEMD.joinpath(CROWSNEST_SERVICE_NAME)

View File

@@ -14,7 +14,17 @@ from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.crowsnest import CROWSNEST_BACKUP_DIR, CROWSNEST_DIR, CROWSNEST_REPO
from components.crowsnest import (
CROWSNEST_BACKUP_DIR,
CROWSNEST_BIN_FILE,
CROWSNEST_DIR,
CROWSNEST_INSTALL_SCRIPT,
CROWSNEST_LOGROTATE_FILE,
CROWSNEST_MULTI_CONFIG,
CROWSNEST_REPO,
CROWSNEST_SERVICE_FILE,
CROWSNEST_SERVICE_NAME,
)
from components.klipper.klipper import Klipper
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.instance_manager import InstanceManager
@@ -75,7 +85,6 @@ def install_crowsnest() -> None:
def print_multi_instance_warning(instances: List[Klipper]) -> None:
_instances = [f"{instance.data_dir_name}" for instance in instances]
Logger.print_dialog(
DialogType.WARNING,
[
@@ -86,13 +95,12 @@ def print_multi_instance_warning(instances: List[Klipper]) -> None:
"this instance to set up your 'crowsnest.conf' and steering it's service.",
"\n\n",
"The following instances were found:",
*_instances,
*[f"{instance.data_dir_name}" for instance in instances],
],
)
def configure_multi_instance() -> None:
config = Path(CROWSNEST_DIR).joinpath("tools/.config")
try:
run(
"make config",
@@ -102,17 +110,17 @@ def configure_multi_instance() -> None:
)
except CalledProcessError as e:
Logger.print_error(f"Something went wrong! Please try again...\n{e}")
if config.exists():
Path.unlink(config)
if CROWSNEST_MULTI_CONFIG.exists():
Path.unlink(CROWSNEST_MULTI_CONFIG)
return
if not config.exists():
if not CROWSNEST_MULTI_CONFIG.exists():
Logger.print_error("Generating .config failed, installation aborted")
def update_crowsnest() -> None:
try:
cmd_sysctl_service("crowsnest", "stop")
cmd_sysctl_service(CROWSNEST_SERVICE_NAME, "stop")
if not CROWSNEST_DIR.exists():
git_clone_wrapper(CROWSNEST_REPO, CROWSNEST_DIR, "master")
@@ -123,18 +131,17 @@ def update_crowsnest() -> None:
if settings.kiauh.backup_before_update:
bm = BackupManager()
bm.backup_directory(
"crowsnest",
CROWSNEST_DIR.name,
source=CROWSNEST_DIR,
target=CROWSNEST_BACKUP_DIR,
)
git_pull_wrapper(CROWSNEST_REPO, CROWSNEST_DIR)
script = CROWSNEST_DIR.joinpath("tools/install.sh")
deps = parse_packages_from_file(script)
deps = parse_packages_from_file(CROWSNEST_INSTALL_SCRIPT)
check_install_dependencies(deps)
cmd_sysctl_service("crowsnest", "restart")
cmd_sysctl_service(CROWSNEST_SERVICE_NAME, "restart")
Logger.print_ok("Crowsnest updated successfully.", end="\n\n")
except CalledProcessError as e:
@@ -144,9 +151,9 @@ def update_crowsnest() -> None:
def get_crowsnest_status() -> ComponentStatus:
files = [
Path("/usr/local/bin/crowsnest"),
Path("/etc/logrotate.d/crowsnest"),
Path("/etc/systemd/system/crowsnest.service"),
CROWSNEST_BIN_FILE,
CROWSNEST_LOGROTATE_FILE,
CROWSNEST_SERVICE_FILE,
]
return get_install_status(CROWSNEST_DIR, files=files)

View File

@@ -6,10 +6,9 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from dataclasses import dataclass
from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.klipper import (
KLIPPER_CFG_NAME,
@@ -27,35 +26,24 @@ from utils.logger import Logger
# noinspection PyMethodMayBeStatic
@dataclass
class Klipper(BaseInstance):
@classmethod
def blacklist(cls) -> List[str]:
return ["None", "mcu"]
klipper_dir: Path = KLIPPER_DIR
env_dir: Path = KLIPPER_ENV_DIR
cfg_file: Path = None
log: Path = None
serial: Path = None
uds: Path = None
def __init__(self, suffix: str = ""):
def __init__(self, suffix: str = "") -> None:
super().__init__(instance_type=self, suffix=suffix)
self.klipper_dir: Path = KLIPPER_DIR
self.env_dir: Path = KLIPPER_ENV_DIR
self._cfg_file = self.cfg_dir.joinpath(KLIPPER_CFG_NAME)
self._log = self.log_dir.joinpath(KLIPPER_LOG_NAME)
self._serial = self.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
self._uds = self.comms_dir.joinpath(KLIPPER_UDS_NAME)
@property
def cfg_file(self) -> Path:
return self._cfg_file
@property
def log(self) -> Path:
return self._log
@property
def serial(self) -> Path:
return self._serial
@property
def uds(self) -> Path:
return self._uds
def __post_init__(self) -> None:
super().__post_init__()
self.cfg_file = self.cfg_dir.joinpath(KLIPPER_CFG_NAME)
self.log = self.log_dir.joinpath(KLIPPER_LOG_NAME)
self.serial = self.comms_dir.joinpath(KLIPPER_SERIAL_NAME)
self.uds = self.comms_dir.joinpath(KLIPPER_UDS_NAME)
def create(self) -> None:
from utils.sys_utils import create_env_file, create_service_file

View File

@@ -90,9 +90,19 @@ def print_select_custom_name_dialog():
dialog = textwrap.dedent(
f"""
╔═══════════════════════════════════════════════════════╗
You can now assign a custom name to each instance.
Do you want to assign a custom name to each instance?
║ ║
║ Assigning a custom name will create a Klipper service ║
║ and a printer directory with the chosen name. ║
║ ║
║ Example for custom name 'kiauh': ║
║ ● Klipper service: klipper-kiauh.service ║
║ ● Printer directory: printer_kiauh_data ║
║ ║
║ If skipped, each instance will get an index assigned ║
║ in ascending order, starting at index '1'.
║ in ascending order, starting at '1' in case of a new
║ installation. Otherwise, the index will be derived ║
║ from amount of already existing instances. ║
║ ║
{line1:<63}
{line2:<63}

View File

@@ -52,7 +52,7 @@ def select_instances_to_remove(
) -> Union[List[Klipper], None]:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
options.extend(["a", "b"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(
@@ -64,9 +64,9 @@ def select_instances_to_remove(
selection = get_selection_input("Select Klipper instance to remove", options)
instances_to_remove = []
if selection == "b".lower():
if selection == "b":
return None
elif selection == "a".lower():
elif selection == "a":
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])
@@ -78,6 +78,9 @@ def remove_instances(
instance_manager: InstanceManager,
instance_list: List[Klipper],
) -> None:
if not instance_list:
return
for instance in instance_list:
Logger.print_status(f"Removing instance {instance.get_service_file_name()} ...")
instance_manager.current_instance = instance

View File

@@ -6,8 +6,10 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Tuple
from components.klipper import (
EXIT_KLIPPER_SETUP,
@@ -17,18 +19,16 @@ from components.klipper import (
KLIPPER_REQ_FILE,
)
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_select_custom_name_dialog,
)
from components.klipper.klipper_utils import (
add_to_existing,
assign_custom_name,
backup_klipper_dir,
check_is_single_to_multi_conversion,
check_user_groups,
create_example_printer_cfg,
get_install_count,
handle_disruptive_system_packages,
handle_instance_naming,
handle_to_multi_instance_conversion,
init_name_scheme,
update_name_scheme,
)
from components.moonraker.moonraker import Moonraker
from components.webui_client.client_utils import (
@@ -49,57 +49,65 @@ from utils.sys_utils import (
def install_klipper() -> None:
kl_im = InstanceManager(Klipper)
Logger.print_status("Installing Klipper ...")
# ask to add new instances, if there are existing ones
if kl_im.instances and not add_to_existing():
Logger.print_status(EXIT_KLIPPER_SETUP)
return
klipper_list: List[Klipper] = InstanceManager(Klipper).instances
moonraker_list: List[Moonraker] = InstanceManager(Moonraker).instances
match_moonraker: bool = False
install_count = get_install_count()
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
# if there are more moonraker instances than klipper instances, ask the user to
# match the klipper instance count to the count of moonraker instances with the same suffix
if len(moonraker_list) > len(klipper_list):
is_confirmed = display_moonraker_info(moonraker_list)
if not is_confirmed:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
match_moonraker = True
# create a dict of the size of the existing instances + install count
name_dict = {c: "" for c in range(len(kl_im.instances) + install_count)}
name_scheme = init_name_scheme(kl_im.instances, install_count)
mr_im = InstanceManager(Moonraker)
name_scheme = update_name_scheme(
name_scheme, name_dict, kl_im.instances, mr_im.instances
install_count, name_dict = get_install_count_and_name_dict(
klipper_list, moonraker_list
)
handle_instance_naming(name_dict, name_scheme)
if install_count == 0:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
is_multi_install = install_count > 1 or (len(name_dict) >= 1 and install_count >= 1)
if not name_dict and install_count == 1:
name_dict = {0: ""}
elif is_multi_install and not match_moonraker:
custom_names = use_custom_names_or_go_back()
if custom_names is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
handle_instance_names(install_count, name_dict, custom_names)
create_example_cfg = get_confirm("Create example printer.cfg?")
# run the actual installation
try:
if not kl_im.instances:
check_install_dependencies(["git"])
setup_klipper_prerequesites()
count = 0
for name in name_dict:
if name_dict[name] in [n.suffix for n in kl_im.instances]:
continue
if check_is_single_to_multi_conversion(kl_im.instances):
handle_to_multi_instance_conversion(name_dict[name])
continue
count += 1
create_klipper_instance(name_dict[name], create_example_cfg)
if count == install_count:
break
cmd_sysctl_manage("daemon-reload")
run_klipper_setup(klipper_list, name_dict, create_example_cfg)
except Exception as e:
Logger.print_error(e)
Logger.print_error("Klipper installation failed!")
return
def run_klipper_setup(
klipper_list: List[Klipper], name_dict: Dict[int, str], example_cfg: bool
) -> None:
if not klipper_list:
setup_klipper_prerequesites()
for i in name_dict:
# skip this iteration if there is already an instance with the name
if name_dict[i] in [n.suffix for n in klipper_list]:
continue
create_klipper_instance(name_dict[i], example_cfg)
cmd_sysctl_manage("daemon-reload")
# step 4: check/handle conflicting packages/services
handle_disruptive_system_packages()
@@ -107,6 +115,35 @@ def install_klipper() -> None:
check_user_groups()
def handle_instance_names(
install_count: int, name_dict: Dict[int, str], custom_names: bool
) -> None:
for i in range(install_count): # 3
key = max(name_dict.keys()) + 1
if custom_names:
assign_custom_name(key, name_dict)
else:
name_dict[key] = str(len(name_dict) + 1)
def get_install_count_and_name_dict(
klipper_list: List[Klipper], moonraker_list: List[Moonraker]
) -> Tuple[int, Dict[int, str]]:
if len(moonraker_list) > len(klipper_list):
install_count = len(moonraker_list)
name_dict = {i: moonraker.suffix for i, moonraker in enumerate(moonraker_list)}
else:
install_count = get_install_count()
name_dict = {i: klipper.suffix for i, klipper in enumerate(klipper_list)}
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return 0, {}
return install_count, name_dict
def setup_klipper_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.klipper.repo_url
@@ -127,7 +164,6 @@ def setup_klipper_prerequesites() -> None:
def install_klipper_packages() -> None:
script = KLIPPER_INSTALL_SCRIPT
packages = parse_packages_from_file(script)
packages.append("python3-venv") # todo: remove once switched to virtualenv
# Add dbus requirement for DietPi distro
if Path("/boot/dietpi/.version").exists():
@@ -177,3 +213,29 @@ def create_klipper_instance(name: str, create_example_cfg: bool) -> None:
clients = get_existing_clients()
create_example_printer_cfg(new_instance, clients)
kl_im.start_instance()
def use_custom_names_or_go_back() -> bool | None:
print_select_custom_name_dialog()
return get_confirm(
"Assign custom names?",
False,
allow_go_back=True,
)
def display_moonraker_info(moonraker_list: List[Moonraker]) -> bool:
# todo: only show the klipper instances that are not already installed
Logger.print_dialog(
DialogType.INFO,
[
"Existing Moonraker instances detected:",
*[f"{m.get_service_file_name()}" for m in moonraker_list],
"\n\n",
"The following Klipper instances will be installed:",
*[f"● klipper-{m.suffix}" for m in moonraker_list],
],
padding_top=0,
padding_bottom=0,
)
return get_confirm("Proceed with installation?")

View File

@@ -6,13 +6,13 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import grp
import os
import re
import shutil
from subprocess import CalledProcessError, run
from typing import Dict, List, Optional, Union
from typing import Dict, List
from components.klipper import (
KLIPPER_BACKUP_DIR,
@@ -23,23 +23,17 @@ from components.klipper import (
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_instance_overview,
print_select_custom_name_dialog,
print_select_instance_count_dialog,
)
from components.moonraker.moonraker import Moonraker
from components.moonraker.moonraker_utils import moonraker_to_multi_conversion
from components.webui_client.base_data import BaseWebClient
from components.webui_client.client_config.client_config_setup import (
create_client_config_symlink,
)
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from core.instance_manager.name_scheme import NameScheme
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
SimpleConfigParser,
)
from utils import PRINTER_CFG_BACKUP_DIR
from utils.common import get_install_status
from utils.constants import CURRENT_USER
from utils.input_utils import get_confirm, get_number_input, get_string_input
@@ -52,75 +46,13 @@ def get_klipper_status() -> ComponentStatus:
return get_install_status(KLIPPER_DIR, KLIPPER_ENV_DIR, Klipper)
def check_is_multi_install(
existing_instances: List[Klipper], install_count: int
) -> bool:
return not existing_instances and install_count > 1
def check_is_single_to_multi_conversion(
existing_instances: List[Klipper],
) -> bool:
return len(existing_instances) == 1 and existing_instances[0].suffix == ""
def init_name_scheme(
existing_instances: List[Klipper], install_count: int
) -> NameScheme:
if check_is_multi_install(
existing_instances, install_count
) or check_is_single_to_multi_conversion(existing_instances):
print_select_custom_name_dialog()
if get_confirm("Assign custom names?", False, allow_go_back=True):
return NameScheme.CUSTOM
else:
return NameScheme.INDEX
else:
return NameScheme.SINGLE
def update_name_scheme(
name_scheme: NameScheme,
name_dict: Dict[int, str],
klipper_instances: List[Klipper],
moonraker_instances: List[Moonraker],
) -> NameScheme:
# if there are more moonraker instances installed
# than klipper, we load their names into the name_dict,
# as we will detect and enforce that naming scheme
if len(moonraker_instances) > len(klipper_instances):
update_name_dict(name_dict, moonraker_instances)
return detect_name_scheme(moonraker_instances)
elif len(klipper_instances) > 1:
update_name_dict(name_dict, klipper_instances)
return detect_name_scheme(klipper_instances)
else:
return name_scheme
def update_name_dict(name_dict: Dict[int, str], instances: List[BaseInstance]) -> None:
for k, v in enumerate(instances):
name_dict[k] = v.suffix
def handle_instance_naming(name_dict: Dict[int, str], name_scheme: NameScheme) -> None:
if name_scheme == NameScheme.SINGLE:
return
for k in name_dict:
if name_dict[k] == "" and name_scheme == NameScheme.INDEX:
name_dict[k] = str(k + 1)
elif name_dict[k] == "" and name_scheme == NameScheme.CUSTOM:
assign_custom_name(k, name_dict)
def add_to_existing() -> bool:
kl_instances = InstanceManager(Klipper).instances
print_instance_overview(kl_instances)
return get_confirm("Add new instances?", allow_go_back=True)
def get_install_count() -> Union[int, None]:
def get_install_count() -> int | None:
"""
Print a dialog for selecting the amount of Klipper instances
to set up with an option to navigate back. Returns None if the
@@ -141,62 +73,10 @@ def assign_custom_name(key: int, name_dict: Dict[int, str]) -> None:
existing_names = []
existing_names.extend(Klipper.blacklist())
existing_names.extend(name_dict[n] for n in name_dict)
question = f"Enter name for instance {key + 1}"
name_dict[key] = get_string_input(question, exclude=existing_names)
pattern = r"^[a-zA-Z0-9]+$"
def handle_to_multi_instance_conversion(new_name: str) -> None:
Logger.print_status("Converting single instance to multi instances ...")
klipper_to_multi_conversion(new_name)
moonraker_to_multi_conversion(new_name)
def klipper_to_multi_conversion(new_name: str) -> None:
Logger.print_status("Convert Klipper single to multi instance ...")
im = InstanceManager(Klipper)
im.current_instance = im.instances[0]
# temporarily store the data dir path
old_data_dir = im.instances[0].data_dir
old_data_dir_name = im.instances[0].data_dir_name
# backup the old data_dir
bm = BackupManager()
name = f"config-{old_data_dir_name}"
bm.backup_directory(
name,
source=im.current_instance.cfg_dir,
target=PRINTER_CFG_BACKUP_DIR,
)
# remove the old single instance
im.stop_instance()
im.disable_instance()
im.delete_instance()
# create a new klipper instance with the new name
new_instance = Klipper(suffix=new_name)
im.current_instance = new_instance
if not new_instance.data_dir.is_dir():
# rename the old data dir and use it for the new instance
Logger.print_status(f"Rename '{old_data_dir}' to '{new_instance.data_dir}' ...")
old_data_dir.rename(new_instance.data_dir)
else:
Logger.print_info(f"Existing '{new_instance.data_dir}' found ...")
# patch the virtual_sdcard sections path
# value to match the new printer_data foldername
scp = SimpleConfigParser()
scp.read(new_instance.cfg_file)
if scp.has_section("virtual_sdcard"):
scp.set("virtual_sdcard", "path", str(new_instance.gcodes_dir))
scp.write(new_instance.cfg_file)
# finalize creating the new instance
im.create_instance()
im.enable_instance()
im.start_instance()
question = f"Enter name for instance {key}"
name_dict[key] = get_string_input(question, exclude=existing_names, regex=pattern)
def check_user_groups():
@@ -275,21 +155,8 @@ def handle_disruptive_system_packages() -> None:
)
def detect_name_scheme(instance_list: List[BaseInstance]) -> NameScheme:
pattern = re.compile("^\d+$")
for instance in instance_list:
if not pattern.match(instance.suffix):
return NameScheme.CUSTOM
return NameScheme.INDEX
def get_highest_index(instance_list: List[Klipper]) -> int:
return max([int(instance.suffix.split("-")[-1]) for instance in instance_list])
def create_example_printer_cfg(
instance: Klipper, clients: Optional[List[BaseWebClient]] = None
instance: Klipper, clients: List[BaseWebClient] | None = None
) -> None:
Logger.print_status(f"Creating example printer.cfg in '{instance.cfg_dir}'")
if instance.cfg_file.is_file():

View File

@@ -6,10 +6,11 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from dataclasses import field
from enum import Enum
from typing import List, Union
from typing import List
class FlashMethod(Enum):
@@ -30,9 +31,9 @@ class ConnectionType(Enum):
class FlashOptions:
_instance = None
_flash_method: Union[FlashMethod, None] = None
_flash_command: Union[FlashCommand, None] = None
_connection_type: Union[ConnectionType, None] = None
_flash_method: FlashMethod | None = None
_flash_command: FlashCommand | None = None
_connection_type: ConnectionType | None = None
_mcu_list: List[str] = field(default_factory=list)
_selected_mcu: str = ""
_selected_board: str = ""
@@ -48,27 +49,27 @@ class FlashOptions:
cls._instance = None
@property
def flash_method(self) -> Union[FlashMethod, None]:
def flash_method(self) -> FlashMethod | None:
return self._flash_method
@flash_method.setter
def flash_method(self, value: Union[FlashMethod, None]):
def flash_method(self, value: FlashMethod | None):
self._flash_method = value
@property
def flash_command(self) -> Union[FlashCommand, None]:
def flash_command(self) -> FlashCommand | None:
return self._flash_command
@flash_command.setter
def flash_command(self, value: Union[FlashCommand, None]):
def flash_command(self, value: FlashCommand | None):
self._flash_command = value
@property
def connection_type(self) -> Union[ConnectionType, None]:
def connection_type(self) -> ConnectionType | None:
return self._connection_type
@connection_type.setter
def connection_type(self, value: Union[ConnectionType, None]):
def connection_type(self, value: ConnectionType | None):
self._connection_type = value
@property

View File

@@ -9,8 +9,26 @@
from pathlib import Path
from core.backup_manager import BACKUP_ROOT_DIR
from utils.constants import SYSTEMD
# repo
KLIPPERSCREEN_REPO = "https://github.com/KlipperScreen/KlipperScreen.git"
# names
KLIPPERSCREEN_SERVICE_NAME = "KlipperScreen.service"
KLIPPERSCREEN_UPDATER_SECTION_NAME = "update_manager KlipperScreen"
KLIPPERSCREEN_LOG_NAME = "KlipperScreen.log"
# directories
KLIPPERSCREEN_DIR = Path.home().joinpath("KlipperScreen")
KLIPPERSCREEN_ENV = Path.home().joinpath(".KlipperScreen-env")
KLIPPERSCREEN_ENV_DIR = Path.home().joinpath(".KlipperScreen-env")
KLIPPERSCREEN_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("klipperscreen-backups")
# files
KLIPPERSCREEN_REQ_FILE = KLIPPERSCREEN_DIR.joinpath(
"scripts/KlipperScreen-requirements.txt"
)
KLIPPERSCREEN_INSTALL_SCRIPT = KLIPPERSCREEN_DIR.joinpath(
"scripts/KlipperScreen-install.sh"
)
KLIPPERSCREEN_SERVICE_FILE = SYSTEMD.joinpath(KLIPPERSCREEN_SERVICE_NAME)

View File

@@ -15,8 +15,14 @@ from components.klipper.klipper import Klipper
from components.klipperscreen import (
KLIPPERSCREEN_BACKUP_DIR,
KLIPPERSCREEN_DIR,
KLIPPERSCREEN_ENV,
KLIPPERSCREEN_ENV_DIR,
KLIPPERSCREEN_INSTALL_SCRIPT,
KLIPPERSCREEN_LOG_NAME,
KLIPPERSCREEN_REPO,
KLIPPERSCREEN_REQ_FILE,
KLIPPERSCREEN_SERVICE_FILE,
KLIPPERSCREEN_SERVICE_NAME,
KLIPPERSCREEN_UPDATER_SECTION_NAME,
)
from components.moonraker.moonraker import Moonraker
from core.backup_manager.backup_manager import BackupManager
@@ -37,9 +43,9 @@ from utils.input_utils import get_confirm
from utils.logger import DialogType, Logger
from utils.sys_utils import (
check_python_version,
cmd_sysctl_manage,
cmd_sysctl_service,
install_python_requirements,
remove_service_file,
)
from utils.types import ComponentStatus
@@ -78,8 +84,7 @@ def install_klipperscreen() -> None:
git_clone_wrapper(KLIPPERSCREEN_REPO, KLIPPERSCREEN_DIR)
try:
script = f"{KLIPPERSCREEN_DIR}/scripts/KlipperScreen-install.sh"
run(script, shell=True, check=True)
run(KLIPPERSCREEN_INSTALL_SCRIPT.as_posix(), shell=True, check=True)
if mr_instances:
patch_klipperscreen_update_manager(mr_instances)
mr_im.restart_all_instance()
@@ -95,34 +100,30 @@ def install_klipperscreen() -> None:
def patch_klipperscreen_update_manager(instances: List[Moonraker]) -> None:
env_py = f"{KLIPPERSCREEN_ENV}/bin/python"
add_config_section(
section="update_manager KlipperScreen",
section=KLIPPERSCREEN_UPDATER_SECTION_NAME,
instances=instances,
options=[
("type", "git_repo"),
("path", str(KLIPPERSCREEN_DIR)),
("path", KLIPPERSCREEN_DIR.as_posix()),
("orgin", KLIPPERSCREEN_REPO),
("env", env_py),
("requirements", "scripts/KlipperScreen-requirements.txt"),
("install_script", "scripts/KlipperScreen-install.sh"),
("manages_servcies", "KlipperScreen"),
("env", f"{KLIPPERSCREEN_ENV_DIR}/bin/python"),
("requirements", KLIPPERSCREEN_REQ_FILE.as_posix()),
("install_script", KLIPPERSCREEN_INSTALL_SCRIPT.as_posix()),
],
)
def update_klipperscreen() -> None:
if not KLIPPERSCREEN_DIR.exists():
Logger.print_info("KlipperScreen does not seem to be installed! Skipping ...")
return
try:
cmd_sysctl_service("KlipperScreen", "stop")
if not KLIPPERSCREEN_DIR.exists():
Logger.print_info(
"KlipperScreen does not seem to be installed! Skipping ..."
)
return
Logger.print_status("Updating KlipperScreen ...")
cmd_sysctl_service("KlipperScreen", "stop")
cmd_sysctl_service(KLIPPERSCREEN_SERVICE_NAME, "stop")
settings = KiauhSettings()
if settings.kiauh.backup_before_update:
@@ -130,12 +131,9 @@ def update_klipperscreen() -> None:
git_pull_wrapper(KLIPPERSCREEN_REPO, KLIPPERSCREEN_DIR)
requirements = KLIPPERSCREEN_DIR.joinpath(
"/scripts/KlipperScreen-requirements.txt"
)
install_python_requirements(KLIPPERSCREEN_ENV, requirements)
install_python_requirements(KLIPPERSCREEN_ENV_DIR, KLIPPERSCREEN_REQ_FILE)
cmd_sysctl_service("KlipperScreen", "start")
cmd_sysctl_service(KLIPPERSCREEN_SERVICE_NAME, "start")
Logger.print_ok("KlipperScreen updated successfully.", end="\n\n")
except CalledProcessError as e:
@@ -146,8 +144,8 @@ def update_klipperscreen() -> None:
def get_klipperscreen_status() -> ComponentStatus:
return get_install_status(
KLIPPERSCREEN_DIR,
KLIPPERSCREEN_ENV,
files=[SYSTEMD.joinpath("KlipperScreen.service")],
KLIPPERSCREEN_ENV_DIR,
files=[SYSTEMD.joinpath(KLIPPERSCREEN_SERVICE_NAME)],
)
@@ -161,24 +159,20 @@ def remove_klipperscreen() -> None:
else:
Logger.print_warn("KlipperScreen directory not found!")
if KLIPPERSCREEN_ENV.exists():
if KLIPPERSCREEN_ENV_DIR.exists():
Logger.print_status("Removing KlipperScreen environment ...")
shutil.rmtree(KLIPPERSCREEN_ENV)
shutil.rmtree(KLIPPERSCREEN_ENV_DIR)
Logger.print_ok("KlipperScreen environment successfully removed!")
else:
Logger.print_warn("KlipperScreen environment not found!")
service = SYSTEMD.joinpath("KlipperScreen.service")
if service.exists():
Logger.print_status("Removing KlipperScreen service ...")
cmd_sysctl_service(service, "stop")
cmd_sysctl_service(service, "disable")
remove_with_sudo(service)
cmd_sysctl_manage("daemon-reload")
cmd_sysctl_manage("reset-failed")
Logger.print_ok("KlipperScreen service successfully removed!")
if KLIPPERSCREEN_SERVICE_FILE.exists():
remove_service_file(
KLIPPERSCREEN_SERVICE_NAME,
KLIPPERSCREEN_SERVICE_FILE,
)
logfile = Path("/tmp/KlipperScreen.log")
logfile = Path(f"/tmp/{KLIPPERSCREEN_LOG_NAME}")
if logfile.exists():
Logger.print_status("Removing KlipperScreen log file ...")
remove_with_sudo(logfile)
@@ -187,7 +181,7 @@ def remove_klipperscreen() -> None:
kl_im = InstanceManager(Klipper)
kl_instances: List[Klipper] = kl_im.instances
for instance in kl_instances:
logfile = instance.log_dir.joinpath("KlipperScreen.log")
logfile = instance.log_dir.joinpath(KLIPPERSCREEN_LOG_NAME)
if logfile.exists():
Logger.print_status(f"Removing {logfile} ...")
Path(logfile).unlink()
@@ -209,12 +203,12 @@ def remove_klipperscreen() -> None:
def backup_klipperscreen_dir() -> None:
bm = BackupManager()
bm.backup_directory(
"KlipperScreen",
KLIPPERSCREEN_DIR.name,
source=KLIPPERSCREEN_DIR,
target=KLIPPERSCREEN_BACKUP_DIR,
)
bm.backup_directory(
"KlipperScreen-env",
source=KLIPPERSCREEN_ENV,
KLIPPERSCREEN_ENV_DIR.name,
source=KLIPPERSCREEN_ENV_DIR,
target=KLIPPERSCREEN_BACKUP_DIR,
)

View File

@@ -11,17 +11,20 @@ from pathlib import Path
from core.backup_manager import BACKUP_ROOT_DIR
from utils.constants import SYSTEMD
# names
# repo
MOBILERAKER_REPO = "https://github.com/Clon1998/mobileraker_companion.git"
# names
MOBILERAKER_SERVICE_NAME = "mobileraker.service"
MOBILERAKER_UPDATER_SECTION_NAME = "update_manager mobileraker"
MOBILERAKER_LOG_NAME = "mobileraker.log"
# directories
MOBILERAKER_DIR = Path.home().joinpath("mobileraker_companion")
MOBILERAKER_ENV_DIR = Path.home().joinpath("mobileraker-env")
MOBILERAKER_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("mobileraker-backups")
# files
MOBILERAKER_ENV = Path.home().joinpath("mobileraker-env")
MOBILERAKER_INSTALL_SCRIPT = MOBILERAKER_DIR.joinpath("scripts/install.sh")
MOBILERAKER_REQ_FILE = MOBILERAKER_DIR.joinpath("scripts/mobileraker-requirements.txt")
MOBILERAKER_SERVICE_FILE = SYSTEMD.joinpath("mobileraker.service")
MOBILERAKER_SERVICE_FILE = SYSTEMD.joinpath(MOBILERAKER_SERVICE_NAME)

View File

@@ -15,12 +15,13 @@ from components.klipper.klipper import Klipper
from components.mobileraker import (
MOBILERAKER_BACKUP_DIR,
MOBILERAKER_DIR,
MOBILERAKER_ENV,
MOBILERAKER_ENV_DIR,
MOBILERAKER_INSTALL_SCRIPT,
MOBILERAKER_LOG_NAME,
MOBILERAKER_REPO,
MOBILERAKER_REQ_FILE,
MOBILERAKER_SERVICE_FILE,
MOBILERAKER_SERVICE_NAME,
MOBILERAKER_UPDATER_SECTION_NAME,
)
from components.moonraker.moonraker import Moonraker
@@ -29,7 +30,6 @@ from core.instance_manager.instance_manager import InstanceManager
from core.settings.kiauh_settings import KiauhSettings
from utils.common import check_install_dependencies, get_install_status
from utils.config_utils import add_config_section, remove_config_section
from utils.fs_utils import remove_with_sudo
from utils.git_utils import (
git_clone_wrapper,
git_pull_wrapper,
@@ -38,9 +38,9 @@ from utils.input_utils import get_confirm
from utils.logger import DialogType, Logger
from utils.sys_utils import (
check_python_version,
cmd_sysctl_manage,
cmd_sysctl_service,
install_python_requirements,
remove_service_file,
)
from utils.types import ComponentStatus
@@ -70,8 +70,7 @@ def install_mobileraker() -> None:
):
return
package_list = ["git", "wget", "curl", "unzip", "dfu-util"]
check_install_dependencies(package_list)
check_install_dependencies()
git_clone_wrapper(MOBILERAKER_REPO, MOBILERAKER_DIR)
@@ -101,7 +100,7 @@ def patch_mobileraker_update_manager(instances: List[Moonraker]) -> None:
("origin", MOBILERAKER_REPO),
("primary_branch", "main"),
("managed_services", "mobileraker"),
("env", f"{MOBILERAKER_ENV}/bin/python"),
("env", f"{MOBILERAKER_ENV_DIR}/bin/python"),
("requirements", MOBILERAKER_REQ_FILE.as_posix()),
("install_script", MOBILERAKER_INSTALL_SCRIPT.as_posix()),
],
@@ -118,7 +117,7 @@ def update_mobileraker() -> None:
Logger.print_status("Updating Mobileraker's companion ...")
cmd_sysctl_service("mobileraker", "stop")
cmd_sysctl_service(MOBILERAKER_SERVICE_NAME, "stop")
settings = KiauhSettings()
if settings.kiauh.backup_before_update:
@@ -126,9 +125,9 @@ def update_mobileraker() -> None:
git_pull_wrapper(MOBILERAKER_REPO, MOBILERAKER_DIR)
install_python_requirements(MOBILERAKER_ENV, MOBILERAKER_REQ_FILE)
install_python_requirements(MOBILERAKER_ENV_DIR, MOBILERAKER_REQ_FILE)
cmd_sysctl_service("mobileraker", "start")
cmd_sysctl_service(MOBILERAKER_SERVICE_NAME, "start")
Logger.print_ok("Mobileraker's companion updated successfully.", end="\n\n")
except CalledProcessError as e:
@@ -139,7 +138,7 @@ def update_mobileraker() -> None:
def get_mobileraker_status() -> ComponentStatus:
return get_install_status(
MOBILERAKER_DIR,
MOBILERAKER_ENV,
MOBILERAKER_ENV_DIR,
files=[MOBILERAKER_SERVICE_FILE],
)
@@ -154,21 +153,18 @@ def remove_mobileraker() -> None:
else:
Logger.print_warn("Mobileraker's companion directory not found!")
if MOBILERAKER_ENV.exists():
if MOBILERAKER_ENV_DIR.exists():
Logger.print_status("Removing Mobileraker's companion environment ...")
shutil.rmtree(MOBILERAKER_ENV)
shutil.rmtree(MOBILERAKER_ENV_DIR)
Logger.print_ok("Mobileraker's companion environment successfully removed!")
else:
Logger.print_warn("Mobileraker's companion environment not found!")
if MOBILERAKER_SERVICE_FILE.exists():
Logger.print_status("Removing mobileraker service ...")
cmd_sysctl_service(MOBILERAKER_SERVICE_FILE, "stop")
cmd_sysctl_service(MOBILERAKER_SERVICE_FILE, "disable")
remove_with_sudo(MOBILERAKER_SERVICE_FILE)
cmd_sysctl_manage("daemon-reload")
cmd_sysctl_manage("reset-failed")
Logger.print_ok("Mobileraker's companion service successfully removed!")
remove_service_file(
MOBILERAKER_SERVICE_NAME,
MOBILERAKER_SERVICE_FILE,
)
kl_im = InstanceManager(Klipper)
kl_instances: List[Klipper] = kl_im.instances
@@ -204,7 +200,7 @@ def backup_mobileraker_dir() -> None:
target=MOBILERAKER_BACKUP_DIR,
)
bm.backup_directory(
MOBILERAKER_ENV.name,
source=MOBILERAKER_ENV,
MOBILERAKER_ENV_DIR.name,
source=MOBILERAKER_ENV_DIR,
target=MOBILERAKER_BACKUP_DIR,
)

View File

@@ -27,14 +27,17 @@ MOONRAKER_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("moonraker-backups")
MOONRAKER_DB_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("moonraker-db-backups")
# files
MOONRAKER_INSTALL_SCRIPT = MOONRAKER_DIR.joinpath("scripts/install-moonraker.sh")
MOONRAKER_REQ_FILE = MOONRAKER_DIR.joinpath("scripts/moonraker-requirements.txt")
MOONRAKER_SPEEDUPS_REQ_FILE = MOONRAKER_DIR.joinpath("scripts/moonraker-speedups.txt")
MOONRAKER_DEPS_JSON_FILE = MOONRAKER_DIR.joinpath("scripts/system-dependencies.json")
# introduced due to
# https://github.com/Arksine/moonraker/issues/349
# https://github.com/Arksine/moonraker/pull/346
POLKIT_LEGACY_FILE = Path("/etc/polkit-1/localauthority/50-local.d/10-moonraker.pkla")
POLKIT_FILE = Path("/etc/polkit-1/rules.d/moonraker.rules")
POLKIT_USR_FILE = Path("/usr/share/polkit-1/rules.d/moonraker.rules")
POLKIT_SCRIPT = Path.home().joinpath("moonraker/scripts/set-policykit-rules.sh")
POLKIT_SCRIPT = MOONRAKER_DIR.joinpath("scripts/set-policykit-rules.sh")
MOONRAKER_SERVICE_TEMPLATE = MODULE_PATH.joinpath(f"assets/{MOONRAKER_SERVICE_NAME}")
MOONRAKER_ENV_FILE_TEMPLATE = MODULE_PATH.joinpath(f"assets/{MOONRAKER_ENV_FILE_NAME}")

View File

@@ -8,9 +8,9 @@
# ======================================================================= #
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.moonraker import (
MOONRAKER_CFG_NAME,
@@ -29,38 +29,36 @@ from utils.logger import Logger
# noinspection PyMethodMayBeStatic
@dataclass
class Moonraker(BaseInstance):
@classmethod
def blacklist(cls) -> List[str]:
return ["None", "mcu", "obico"]
moonraker_dir: Path = MOONRAKER_DIR
env_dir: Path = MOONRAKER_ENV_DIR
cfg_file: Path = None
port: int = None
backup_dir: Path = None
certs_dir: Path = None
db_dir: Path = None
log: Path = None
def __init__(self, suffix: str = ""):
super().__init__(instance_type=self, suffix=suffix)
self.moonraker_dir: Path = MOONRAKER_DIR
self.env_dir: Path = MOONRAKER_ENV_DIR
def __post_init__(self) -> None:
super().__post_init__()
self.cfg_file = self.cfg_dir.joinpath(MOONRAKER_CFG_NAME)
self.port = self._get_port()
self.backup_dir = self.data_dir.joinpath("backup")
self.certs_dir = self.data_dir.joinpath("certs")
self._db_dir = self.data_dir.joinpath("database")
self._comms_dir = self.data_dir.joinpath("comms")
self.db_dir = self.data_dir.joinpath("database")
self.log = self.log_dir.joinpath(MOONRAKER_LOG_NAME)
@property
def db_dir(self) -> Path:
return self._db_dir
@property
def comms_dir(self) -> Path:
return self._comms_dir
def create(self, create_example_cfg: bool = False) -> None:
from utils.sys_utils import create_env_file, create_service_file
Logger.print_status("Creating new Moonraker Instance ...")
try:
self.create_folders([self.backup_dir, self.certs_dir, self._db_dir])
self.create_folders([self.backup_dir, self.certs_dir, self.db_dir])
create_service_file(
name=self.get_service_file_name(extension=True),
content=self._prep_service_file_content(),

View File

@@ -48,7 +48,7 @@ def print_moonraker_overview(
for i, k in enumerate(instance_map):
mr_name = instance_map.get(k)
m = f"<-> {mr_name}" if mr_name != "" else ""
line = f"{COLOR_CYAN}{f'{i})' if show_index else ''} {k} {m} {RESET_FORMAT}"
line = f"{COLOR_CYAN}{f'{i+1})' if show_index else ''} {k} {m} {RESET_FORMAT}"
dialog += f"{line:<63}\n"
warn_l1 = f"{COLOR_YELLOW}PLEASE NOTE: {RESET_FORMAT}"

View File

@@ -60,7 +60,7 @@ def select_instances_to_remove(
) -> Union[List[Moonraker], None]:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
options.extend(["a", "b"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(
@@ -72,9 +72,9 @@ def select_instances_to_remove(
selection = get_selection_input("Select Moonraker instance to remove", options)
instances_to_remove = []
if selection == "b".lower():
if selection == "b":
return None
elif selection == "a".lower():
elif selection == "a":
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])

View File

@@ -6,16 +6,21 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import List
from components.klipper.klipper import Klipper
from components.moonraker import (
EXIT_MOONRAKER_SETUP,
MOONRAKER_DEPS_JSON_FILE,
MOONRAKER_DIR,
MOONRAKER_ENV_DIR,
MOONRAKER_INSTALL_SCRIPT,
MOONRAKER_REQ_FILE,
MOONRAKER_SPEEDUPS_REQ_FILE,
POLKIT_FILE,
POLKIT_LEGACY_FILE,
POLKIT_SCRIPT,
@@ -55,47 +60,45 @@ def install_moonraker() -> None:
if not check_moonraker_install_requirements():
return
kl_im = InstanceManager(Klipper)
klipper_instances = kl_im.instances
klipper_list: List[Klipper] = InstanceManager(Klipper).instances
mr_im = InstanceManager(Moonraker)
moonraker_instances = mr_im.instances
moonraker_list: List[Moonraker] = mr_im.instances
selected_klipper_instance = 0
if len(klipper_instances) > 1:
instance_names = []
selected_option: str | Klipper
if len(klipper_list) == 0:
instance_names.append(klipper_list[0].suffix)
else:
print_moonraker_overview(
klipper_instances,
moonraker_instances,
klipper_list,
moonraker_list,
show_index=True,
show_select_all=True,
)
options = [str(i) for i in range(len(klipper_instances))]
options.extend(["a", "A", "b", "B"])
options = {str(i + 1): k for i, k in enumerate(klipper_list)}
additional_options = {"a": None, "b": None}
options = {**options, **additional_options}
question = "Select Klipper instance to setup Moonraker for"
selected_klipper_instance = get_selection_input(question, options).lower()
selected_option = get_selection_input(question, options)
instance_names = []
if selected_klipper_instance == "b":
Logger.print_status(EXIT_MOONRAKER_SETUP)
return
if selected_option == "b":
Logger.print_status(EXIT_MOONRAKER_SETUP)
return
elif selected_klipper_instance == "a":
for instance in klipper_instances:
instance_names.append(instance.suffix)
else:
index = int(selected_klipper_instance)
instance_names.append(klipper_instances[index].suffix)
if selected_option == "a":
instance_names.extend([k.suffix for k in klipper_list])
else:
instance_names.append(options.get(selected_option).suffix)
create_example_cfg = get_confirm("Create example moonraker.conf?")
try:
check_install_dependencies(["git"])
check_install_dependencies()
setup_moonraker_prerequesites()
install_moonraker_polkit()
used_ports_map = {
instance.suffix: instance.port for instance in moonraker_instances
}
used_ports_map = {m.suffix: m.port for m in moonraker_list}
for name in instance_names:
current_instance = Moonraker(suffix=name)
@@ -143,21 +146,20 @@ def setup_moonraker_prerequesites() -> None:
git_clone_wrapper(repo, MOONRAKER_DIR, branch)
# install moonraker dependencies and create python virtualenv
install_moonraker_packages(MOONRAKER_DIR)
install_moonraker_packages()
create_python_venv(MOONRAKER_ENV_DIR)
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_SPEEDUPS_REQ_FILE)
def install_moonraker_packages(moonraker_dir: Path) -> None:
install_script = moonraker_dir.joinpath("scripts/install-moonraker.sh")
deps_json = MOONRAKER_DIR.joinpath("scripts/system-dependencies.json")
def install_moonraker_packages() -> None:
moonraker_deps = []
if deps_json.exists():
with open(deps_json, "r") as deps:
if MOONRAKER_DEPS_JSON_FILE.exists():
with open(MOONRAKER_DEPS_JSON_FILE, "r") as deps:
moonraker_deps = json.load(deps).get("debian", [])
elif install_script.exists():
moonraker_deps = parse_packages_from_file(install_script)
elif MOONRAKER_INSTALL_SCRIPT.exists():
moonraker_deps = parse_packages_from_file(MOONRAKER_INSTALL_SCRIPT)
if not moonraker_deps:
raise ValueError("Error reading Moonraker dependencies!")
@@ -209,7 +211,7 @@ def update_moonraker() -> None:
git_pull_wrapper(repo=settings.moonraker.repo_url, target_dir=MOONRAKER_DIR)
# install possible new system packages
install_moonraker_packages(MOONRAKER_DIR)
install_moonraker_packages()
# install possible new python dependencies
install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQ_FILE)

View File

@@ -20,8 +20,6 @@ from components.moonraker import (
)
from components.moonraker.moonraker import Moonraker
from components.webui_client.base_data import BaseWebClient
from components.webui_client.client_utils import enable_mainsail_remotemode
from components.webui_client.mainsail_data import MainsailData
from core.backup_manager.backup_manager import BackupManager
from core.instance_manager.instance_manager import InstanceManager
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
@@ -128,58 +126,6 @@ def create_example_moonraker_conf(
Logger.print_ok(f"Example moonraker.conf created in '{instance.cfg_dir}'")
def moonraker_to_multi_conversion(new_name: str) -> None:
"""
Converts the first instance in the List of Moonraker instances to an instance
with a new name. This method will be called when converting from a single Klipper
instance install to a multi instance install when Moonraker is also already
installed with a single instance.
:param new_name: new name the previous single instance is renamed to
:return: None
"""
im = InstanceManager(Moonraker)
instances: List[Moonraker] = im.instances
if not instances:
return
# in case there are multiple Moonraker instances, we don't want to do anything
if len(instances) > 1:
Logger.print_info("More than a single Moonraker instance found. Skipped ...")
return
Logger.print_status("Convert Moonraker single to multi instance ...")
# remove the old single instance
im.current_instance = im.instances[0]
im.stop_instance()
im.disable_instance()
im.delete_instance()
# create a new moonraker instance with the new name
new_instance = Moonraker(suffix=new_name)
im.current_instance = new_instance
# patch the server sections klippy_uds_address value to match the new printer_data foldername
scp = SimpleConfigParser()
scp.read(new_instance.cfg_file)
if scp.has_section("server"):
scp.set(
"server",
"klippy_uds_address",
str(new_instance.comms_dir.joinpath("klippy.sock")),
)
scp.write(new_instance.cfg_file)
# create, enable and start the new moonraker instance
im.create_instance()
im.enable_instance()
im.start_instance()
# if mainsail is installed, we enable mainsails remote mode
if MainsailData().client_dir.exists() and len(im.instances) > 1:
enable_mainsail_remotemode()
def backup_moonraker_dir():
bm = BackupManager()
bm.backup_directory("moonraker", source=MOONRAKER_DIR, target=MOONRAKER_BACKUP_DIR)

View File

@@ -21,6 +21,7 @@ OE_REQ_FILE = OE_DIR.joinpath("requirements.txt")
OE_DEPS_JSON_FILE = OE_DIR.joinpath("moonraker-system-dependencies.json")
OE_INSTALL_SCRIPT = OE_DIR.joinpath("install.sh")
OE_UPDATE_SCRIPT = OE_DIR.joinpath("update.sh")
OE_INSTALLER_LOG_FILE = Path.home().joinpath("octoeverywhere-installer.log")
# filenames
OE_CFG_NAME = "octoeverywhere.conf"

View File

@@ -11,6 +11,7 @@ from pathlib import Path
from subprocess import CalledProcessError, run
from typing import List
from components.moonraker import MOONRAKER_CFG_NAME
from components.octoeverywhere import (
OE_CFG_NAME,
OE_DIR,
@@ -55,7 +56,7 @@ class Octoeverywhere(BaseInstance):
Logger.print_status("Creating OctoEverywhere for Klipper Instance ...")
try:
cmd = f"{OE_INSTALL_SCRIPT} {self.cfg_dir}/moonraker.conf"
cmd = f"{OE_INSTALL_SCRIPT} {self.cfg_dir}/{MOONRAKER_CFG_NAME}"
run(cmd, check=True, shell=True)
except CalledProcessError as e:
@@ -65,7 +66,7 @@ class Octoeverywhere(BaseInstance):
@staticmethod
def update():
try:
run(str(OE_UPDATE_SCRIPT), check=True, shell=True, cwd=OE_DIR)
run(OE_UPDATE_SCRIPT.as_posix(), check=True, shell=True, cwd=OE_DIR)
except CalledProcessError as e:
Logger.print_error(f"Error updating OctoEverywhere for Klipper: {e}")
@@ -82,6 +83,7 @@ class Octoeverywhere(BaseInstance):
try:
command = ["sudo", "rm", "-f", service_file_path]
run(command, check=True)
self.delete_logfiles(OE_LOG_NAME)
Logger.print_ok(f"Service file deleted: {service_file_path}")
except CalledProcessError as e:
Logger.print_error(f"Error deleting service file: {e}")

View File

@@ -7,7 +7,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import json
from pathlib import Path
from typing import List
from components.moonraker.moonraker import Moonraker
@@ -16,7 +15,7 @@ from components.octoeverywhere import (
OE_DIR,
OE_ENV_DIR,
OE_INSTALL_SCRIPT,
OE_LOG_NAME,
OE_INSTALLER_LOG_FILE,
OE_REPO,
OE_REQ_FILE,
OE_SYS_CFG_NAME,
@@ -147,7 +146,7 @@ def remove_octoeverywhere() -> None:
remove_oe_dir()
remove_oe_env()
remove_config_section(f"include {OE_SYS_CFG_NAME}", mr_instances)
delete_oe_logs(ob_instances)
run_remove_routines(OE_INSTALLER_LOG_FILE)
Logger.print_dialog(
DialogType.SUCCESS,
["OctoEverywhere for Klipper successfully removed!"],
@@ -209,23 +208,3 @@ def remove_oe_env() -> None:
return
run_remove_routines(OE_ENV_DIR)
def delete_oe_logs(instances: List[Octoeverywhere]) -> None:
Logger.print_status("Removing OctoEverywhere logs ...")
all_logfiles = []
for instance in instances:
all_logfiles = list(instance.log_dir.glob(f"{OE_LOG_NAME}*"))
install_log = Path.home().joinpath("octoeverywhere-installer.log")
if install_log.exists():
all_logfiles.append(install_log)
if not all_logfiles:
Logger.print_info("No OctoEverywhere logs found. Skipped ...")
return
for log in all_logfiles:
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)

View File

@@ -112,7 +112,7 @@ def install_client(client: BaseWebClient) -> None:
)
valid_port = is_valid_port(port, ports_in_use)
check_install_dependencies(["nginx", "unzip"])
check_install_dependencies(["nginx"])
try:
download_client(client)

View File

@@ -9,7 +9,9 @@
from __future__ import annotations
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
@@ -17,106 +19,32 @@ from utils.constants import CURRENT_USER, SYSTEMD
from utils.logger import Logger
@dataclass
class BaseInstance(ABC):
instance_type: BaseInstance
suffix: str
user: str = field(default=CURRENT_USER, init=False)
data_dir: Path = None
data_dir_name: str = ""
is_legacy_instance: bool = False
cfg_dir: Path = None
log_dir: Path = None
comms_dir: Path = None
sysd_dir: Path = None
gcodes_dir: Path = None
def __post_init__(self) -> None:
self._set_data_dir()
self._set_is_legacy_instance()
self.cfg_dir = self.data_dir.joinpath("config")
self.log_dir = self.data_dir.joinpath("logs")
self.comms_dir = self.data_dir.joinpath("comms")
self.sysd_dir = self.data_dir.joinpath("systemd")
self.gcodes_dir = self.data_dir.joinpath("gcodes")
@classmethod
def blacklist(cls) -> List[str]:
return []
def __init__(
self,
suffix: str,
instance_type: BaseInstance,
):
self._instance_type = instance_type
self._suffix = suffix
self._user = CURRENT_USER
self._data_dir_name = self.get_data_dir_name_from_suffix()
self._data_dir = Path.home().joinpath(f"{self._data_dir_name}_data")
self._cfg_dir = self.data_dir.joinpath("config")
self._log_dir = self.data_dir.joinpath("logs")
self._comms_dir = self.data_dir.joinpath("comms")
self._sysd_dir = self.data_dir.joinpath("systemd")
self._gcodes_dir = self.data_dir.joinpath("gcodes")
@property
def instance_type(self) -> BaseInstance:
return self._instance_type
@instance_type.setter
def instance_type(self, value: BaseInstance) -> None:
self._instance_type = value
@property
def suffix(self) -> str:
return self._suffix
@suffix.setter
def suffix(self, value: str) -> None:
self._suffix = value
@property
def user(self) -> str:
return self._user
@user.setter
def user(self, value: str) -> None:
self._user = value
@property
def data_dir_name(self) -> str:
return self._data_dir_name
@data_dir_name.setter
def data_dir_name(self, value: str) -> None:
self._data_dir_name = value
@property
def data_dir(self) -> Path:
return self._data_dir
@data_dir.setter
def data_dir(self, value: Path) -> None:
self._data_dir = value
@property
def cfg_dir(self) -> Path:
return self._cfg_dir
@cfg_dir.setter
def cfg_dir(self, value: Path) -> None:
self._cfg_dir = value
@property
def log_dir(self) -> Path:
return self._log_dir
@log_dir.setter
def log_dir(self, value: Path) -> None:
self._log_dir = value
@property
def comms_dir(self) -> Path:
return self._comms_dir
@comms_dir.setter
def comms_dir(self, value: Path) -> None:
self._comms_dir = value
@property
def sysd_dir(self) -> Path:
return self._sysd_dir
@sysd_dir.setter
def sysd_dir(self, value: Path) -> None:
self._sysd_dir = value
@property
def gcodes_dir(self) -> Path:
return self._gcodes_dir
@gcodes_dir.setter
def gcodes_dir(self, value: Path) -> None:
self._gcodes_dir = value
return ["None", "mcu", "obico", "bambu", "companion"]
@abstractmethod
def create(self) -> None:
@@ -133,6 +61,7 @@ class BaseInstance(ABC):
self.log_dir,
self.comms_dir,
self.sysd_dir,
self.gcodes_dir,
]
if add_dirs:
@@ -141,6 +70,7 @@ class BaseInstance(ABC):
for _dir in dirs:
_dir.mkdir(exist_ok=True)
# todo: refactor into a set method and access the value by accessing the property
def get_service_file_name(self, extension: bool = False) -> str:
from utils.common import convert_camelcase_to_kebabcase
@@ -150,22 +80,42 @@ class BaseInstance(ABC):
return name if not extension else f"{name}.service"
# todo: refactor into a set method and access the value by accessing the property
def get_service_file_path(self) -> Path:
return SYSTEMD.joinpath(self.get_service_file_name(extension=True))
def get_data_dir_name_from_suffix(self) -> str:
if self._suffix == "":
return "printer"
elif self._suffix.isdigit():
return f"printer_{self._suffix}"
else:
return self._suffix
def delete_logfiles(self, log_name: str) -> None:
from utils.fs_utils import run_remove_routines
if not self.log_dir.exists():
return
files = self.log_dir.iterdir()
logs = [f for f in files if f.name.startswith(log_name)]
for log in logs:
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)
def _set_data_dir(self) -> None:
if self.suffix == "":
self.data_dir = Path.home().joinpath("printer_data")
else:
self.data_dir = Path.home().joinpath(f"printer_{self.suffix}_data")
if self.get_service_file_path().exists():
with open(self.get_service_file_path(), "r") as service_file:
service_content = service_file.read()
pattern = re.compile("^EnvironmentFile=(.+)(/systemd/.+\.env)")
match = re.search(pattern, service_content)
if match:
self.data_dir = Path(match.group(1))
def _set_is_legacy_instance(self) -> None:
if (
self.suffix != ""
and not self.data_dir_name.startswith("printer_")
and not self.data_dir_name.endswith("_data")
):
self.is_legacy_instance = True
else:
self.is_legacy_instance = False

View File

@@ -185,8 +185,10 @@ class InstanceManager:
suffix = file_path.stem[len(name) :]
return suffix[1:] if suffix else ""
def _sort_instance_list(self, s: Union[int, str, None]):
if s is None:
def _sort_instance_list(self, suffix: Union[int, str, None]):
if suffix is None:
return
return int(s) if s.isdigit() else s
elif suffix.isdigit():
return f"{int(suffix):04}"
else:
return suffix

View File

@@ -56,7 +56,7 @@ class MainMenu(BaseMenu):
self.kl_status = self.kl_repo = self.mr_status = self.mr_repo = ""
self.ms_status = self.fl_status = self.ks_status = self.mb_status = ""
self.cn_status = self.cc_status = self.oe_status = ""
self.init_status()
self._init_status()
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
"""MainMenu does not have a previous menu"""
@@ -74,7 +74,7 @@ class MainMenu(BaseMenu):
"s": Option(method=self.settings_menu, menu=True),
}
def init_status(self) -> None:
def _init_status(self) -> None:
status_vars = ["kl", "mr", "ms", "fl", "ks", "mb", "cn", "oe"]
for var in status_vars:
setattr(
@@ -83,7 +83,7 @@ class MainMenu(BaseMenu):
f"{COLOR_RED}Not installed{RESET_FORMAT}",
)
def fetch_status(self) -> None:
def _fetch_status(self) -> None:
self._get_component_status("kl", get_klipper_status)
self._get_component_status("mr", get_moonraker_status)
self._get_component_status("ms", get_client_status, MainsailData())
@@ -102,7 +102,7 @@ class MainMenu(BaseMenu):
instance_count: int = status_data.instances
count_txt: str = ""
if instance_count > 0 and code == 1:
if instance_count > 0 and code == 2:
count_txt = f": {instance_count}"
setattr(self, f"{name}_status", self._format_by_code(code, status, count_txt))
@@ -120,7 +120,7 @@ class MainMenu(BaseMenu):
return f"{color}{status}{count}{RESET_FORMAT}"
def print_menu(self):
self.fetch_status()
self._fetch_status()
header = " [ Main Menu ] "
footer1 = f"{COLOR_CYAN}KIAUH v6.0.0{RESET_FORMAT}"

View File

@@ -157,7 +157,7 @@ def get_printer_selection(
instances: List[BaseInstance], is_install: bool
) -> Union[List[BaseInstance], None]:
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
options.extend(["a", "b"])
if is_install:
q = "Select the printer to install the theme for"
@@ -166,9 +166,9 @@ def get_printer_selection(
selection = get_selection_input(q, options)
install_for = []
if selection == "b".lower():
if selection == "b":
return None
elif selection == "a".lower():
elif selection == "a":
install_for.extend(instances)
else:
instance = instances[int(selection)]

View File

@@ -62,7 +62,7 @@ class ObicoExtension(BaseExtension):
obico_instances: List[MoonrakerObico] = obico_im.instances
if obico_instances:
self._print_is_already_installed()
options = ["l", "L", "r", "R", "b", "B"]
options = ["l", "r", "b"]
action = get_selection_input("Perform action", option_list=options)
if action.lower() == "b":
Logger.print_info("Exiting Obico for Klipper installation ...")

View File

@@ -15,6 +15,8 @@ MODULE_PATH = Path(__file__).resolve().parent
INVALID_CHOICE = "Invalid choice. Please select a valid value."
PRINTER_CFG_BACKUP_DIR = BACKUP_ROOT_DIR.joinpath("printer-cfg-backups")
GLOBAL_DEPS = ["git", "wget", "curl", "unzip", "dfu-util", "python3-virtualenv"]
# ================== NGINX =====================#
NGINX_SITES_AVAILABLE = Path("/etc/nginx/sites-available")
NGINX_SITES_ENABLED = Path("/etc/nginx/sites-enabled")

View File

@@ -6,6 +6,8 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from datetime import datetime
from pathlib import Path
@@ -14,7 +16,7 @@ from typing import Dict, List, Literal, Optional, Type
from components.klipper.klipper import Klipper
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from utils import PRINTER_CFG_BACKUP_DIR
from utils import GLOBAL_DEPS, PRINTER_CFG_BACKUP_DIR
from utils.constants import (
COLOR_CYAN,
RESET_FORMAT,
@@ -45,19 +47,22 @@ def get_current_date() -> Dict[Literal["date", "time"], str]:
return {"date": date, "time": time}
def check_install_dependencies(deps: List[str]) -> None:
def check_install_dependencies(deps: List[str] | None = None) -> None:
"""
Common helper method to check if dependencies are installed
and if not, install them automatically |
:param deps: List of strings of package names to check if installed
:return: None
"""
requirements = check_package_install(deps)
if deps is None:
deps = []
requirements = check_package_install({*GLOBAL_DEPS, *deps})
if requirements:
Logger.print_status("Installing dependencies ...")
Logger.print_info("The following packages need installation:")
for _ in requirements:
print(f"{COLOR_CYAN}{_}{RESET_FORMAT}")
for r in requirements:
print(f"{COLOR_CYAN}{r}{RESET_FORMAT}")
update_system_package_lists(silent=False)
install_system_packages(requirements)

View File

@@ -6,17 +6,17 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from typing import List, Union
from typing import Dict, List, Union
from utils import INVALID_CHOICE
from utils.constants import COLOR_CYAN, RESET_FORMAT
from utils.logger import Logger
def get_confirm(
question: str, default_choice=True, allow_go_back=False
) -> Union[bool, None]:
def get_confirm(question: str, default_choice=True, allow_go_back=False) -> bool | None:
"""
Helper method for validating confirmation (yes/no) user input. |
:param question: The question to display
@@ -56,7 +56,7 @@ def get_number_input(
max_count=None,
default=None,
allow_go_back=False,
) -> Union[int, None]:
) -> int | None:
"""
Helper method to get a number input from the user
:param question: The question to display
@@ -120,7 +120,7 @@ def get_string_input(
Logger.print_error(INVALID_CHOICE)
def get_selection_input(question: str, option_list: List, default=None) -> str:
def get_selection_input(question: str, option_list: List | Dict, default=None) -> str:
"""
Helper method to get a selection from a list of options from the user
:param question: The question to display
@@ -129,10 +129,16 @@ def get_selection_input(question: str, option_list: List, default=None) -> str:
:return: The option that was selected by the user
"""
while True:
_input = input(format_question(question, default)).strip()
_input = input(format_question(question, default)).strip().lower()
if _input in option_list:
return _input
if isinstance(option_list, list):
if _input in option_list:
return _input
elif isinstance(option_list, dict):
if _input in option_list.keys():
return _input
else:
raise ValueError("Invalid option_list type")
Logger.print_error(INVALID_CHOICE)

View File

@@ -93,6 +93,20 @@ class Logger:
padding_top: int = 1,
padding_bottom: int = 1,
) -> None:
"""
Prints a dialog with the given title and content.
Those dialogs should be used to display verbose messages to the user which
require simple interaction like confirmation or input. Do not use this for
navigating through the application.
:param title: The type of the dialog.
:param content: The content of the dialog.
:param center_content: Whether to center the content or not.
:param custom_title: A custom title for the dialog.
:param custom_color: A custom color for the dialog.
:param padding_top: The number of empty lines to print before the dialog.
:param padding_bottom: The number of empty lines to print after the dialog.
"""
dialog_color = Logger._get_dialog_color(title, custom_color)
dialog_title = Logger._get_dialog_title(title, custom_title)
dialog_title_formatted = Logger._format_dialog_title(dialog_title)

View File

@@ -16,13 +16,12 @@ import sys
import time
import urllib.error
import urllib.request
import venv
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError, Popen, run
from typing import List, Literal
from typing import List, Literal, Set
from utils.constants import SYSTEMD
from utils.fs_utils import check_file_exist
from utils.fs_utils import check_file_exist, remove_with_sudo
from utils.input_utils import get_confirm
from utils.logger import Logger
@@ -96,13 +95,11 @@ def create_python_venv(target: Path) -> None:
Logger.print_status("Set up Python virtual environment ...")
if not target.exists():
try:
venv.create(target, with_pip=True)
cmd = ["virtualenv", "-p", "/usr/bin/python3", target.as_posix()]
run(cmd, check=True)
Logger.print_ok("Setup of virtualenv successful!")
except OSError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e}")
raise
except CalledProcessError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}")
Logger.print_error(f"Error setting up virtualenv:\n{e}")
raise
else:
if get_confirm("Virtualenv already exists. Re-create?", default_choice=False):
@@ -220,7 +217,7 @@ def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
raise
def check_package_install(packages: List[str]) -> List[str]:
def check_package_install(packages: Set[str]) -> List[str]:
"""
Checks the system for installed packages |
:param packages: List of strings of package names
@@ -438,3 +435,23 @@ def create_env_file(path: Path, content: str) -> None:
except OSError as e:
Logger.print_error(f"Error creating env file: {e}")
raise
def remove_service_file(service_name: str, service_file: Path) -> None:
"""
Removes a systemd service file at the provided path with the provided name.
:param service_name: the name of the service
:param service_file: the path of the service file
:return: None
"""
try:
Logger.print_status(f"Removing {service_name} ...")
cmd_sysctl_service(service_name, "stop")
cmd_sysctl_service(service_name, "disable")
remove_with_sudo(service_file)
cmd_sysctl_manage("daemon-reload")
cmd_sysctl_manage("reset-failed")
Logger.print_ok(f"{service_name} successfully removed!")
except Exception as e:
Logger.print_error(f"Error removing {service_name}:\n{e}")
raise