Compare commits

..

1 Commits

Author SHA1 Message Date
dw-0
23c6bd47ac Merge 7444ae8cea into a929c6983d 2024-06-29 00:08:12 +02:00
22 changed files with 406 additions and 391 deletions

View File

@@ -7,8 +7,8 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import subprocess
from pathlib import Path
from subprocess import DEVNULL, CalledProcessError, run
from typing import List
from components.klipper import KLIPPER_DIR, KLIPPER_ENV_DIR, MODULE_PATH
@@ -50,46 +50,43 @@ class Klipper(BaseInstance):
def create(self) -> None:
Logger.print_status("Creating new Klipper Instance ...")
service_template_path = MODULE_PATH.joinpath("assets/klipper.service")
service_file_name = self.get_service_file_name(extension=True)
service_file_target = SYSTEMD.joinpath(service_file_name)
env_template_file_path = MODULE_PATH.joinpath("assets/klipper.env")
env_file_target = self.sysd_dir.joinpath("klipper.env")
try:
self.create_folders()
self._write_service_file(
service_template_path,
service_file_target,
env_file_target,
self.write_service_file(
service_template_path, service_file_target, env_file_target
)
self._write_env_file(env_template_file_path, env_file_target)
self.write_env_file(env_template_file_path, env_file_target)
except CalledProcessError as e:
Logger.print_error(f"Error creating instance: {e}")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error creating service file {service_file_target}: {e}"
)
raise
except OSError as e:
Logger.print_error(f"Error creating env file: {e}")
Logger.print_error(f"Error creating env file {env_file_target}: {e}")
raise
def delete(self) -> None:
service_file = self.get_service_file_name(extension=True)
service_file_path = self.get_service_file_path()
Logger.print_status(f"Removing Klipper Instance: {service_file}")
Logger.print_status(f"Deleting Klipper Instance: {service_file}")
try:
command = ["sudo", "rm", "-f", service_file_path]
run(command, check=True)
self._delete_logfiles()
Logger.print_ok("Instance successfully removed!")
except CalledProcessError as e:
Logger.print_error(f"Error removing instance: {e}")
subprocess.run(command, check=True)
Logger.print_ok(f"Service file deleted: {service_file_path}")
except subprocess.CalledProcessError as e:
Logger.print_error(f"Error deleting service file: {e}")
raise
def _write_service_file(
def write_service_file(
self,
service_template_path: Path,
service_file_target: Path,
@@ -99,15 +96,15 @@ class Klipper(BaseInstance):
service_template_path, env_file_target
)
command = ["sudo", "tee", service_file_target]
run(
subprocess.run(
command,
input=service_content.encode(),
stdout=DEVNULL,
stdout=subprocess.DEVNULL,
check=True,
)
Logger.print_ok(f"Service file created: {service_file_target}")
def _write_env_file(
def write_env_file(
self, env_template_file_path: Path, env_file_target: Path
) -> None:
env_file_content = self._prep_env_file(env_template_file_path)
@@ -153,10 +150,3 @@ class Klipper(BaseInstance):
env_file_content = env_file_content.replace("%LOG%", str(self.log))
env_file_content = env_file_content.replace("%UDS%", str(self.uds))
return env_file_content
def _delete_logfiles(self) -> None:
from utils.fs_utils import run_remove_routines
for log in list(self.log_dir.glob("klippy.log*")):
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)

View File

@@ -32,7 +32,6 @@ def print_instance_overview(
display_type: DisplayType = DisplayType.SERVICE_NAME,
show_headline=True,
show_index=False,
start_index=0,
show_select_all=False,
):
dialog = "╔═══════════════════════════════════════════════════════╗\n"
@@ -56,7 +55,7 @@ def print_instance_overview(
name = s.get_service_file_name()
else:
name = s.data_dir
line = f"{COLOR_CYAN}{f'{i + start_index})' if show_index else ''} {name}{RESET_FORMAT}"
line = f"{COLOR_CYAN}{f'{i})' if show_index else ''} {name}{RESET_FORMAT}"
dialog += f"{line:<63}\n"
dialog += "╟───────────────────────────────────────────────────────╢\n"

View File

@@ -7,13 +7,14 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import shutil
from typing import List, Union
from components.klipper import KLIPPER_DIR, KLIPPER_ENV_DIR
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import print_instance_overview
from core.instance_manager.instance_manager import InstanceManager
from utils.fs_utils import run_remove_routines
from utils.fs_utils import remove_file
from utils.input_utils import get_selection_input
from utils.logger import Logger
from utils.sys_utils import cmd_sysctl_manage
@@ -23,6 +24,7 @@ def run_klipper_removal(
remove_service: bool,
remove_dir: bool,
remove_env: bool,
delete_logs: bool,
) -> None:
im = InstanceManager(Klipper)
@@ -35,32 +37,37 @@ def run_klipper_removal(
Logger.print_info("No Klipper Services installed! Skipped ...")
if (remove_dir or remove_env) and im.instances:
Logger.print_info("There are still other Klipper services installed:")
Logger.print_info(f"'{KLIPPER_DIR}' was not removed.", prefix=False)
Logger.print_info(f"'{KLIPPER_ENV_DIR}' was not removed.", prefix=False)
Logger.print_warn("There are still other Klipper services installed!")
Logger.print_warn("Therefor the following parts cannot be removed:")
Logger.print_warn(
"""
● Klipper local repository
● Klipper Python environment
""",
False,
)
else:
if remove_dir:
Logger.print_status("Removing Klipper local repository ...")
run_remove_routines(KLIPPER_DIR)
remove_klipper_dir()
if remove_env:
Logger.print_status("Removing Klipper Python environment ...")
run_remove_routines(KLIPPER_ENV_DIR)
remove_klipper_env()
# delete klipper logs of all instances
if delete_logs:
Logger.print_status("Removing all Klipper logs ...")
delete_klipper_logs(im.instances)
def select_instances_to_remove(
instances: List[Klipper],
) -> Union[List[Klipper], None]:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(instances, show_index=True, show_select_all=True)
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
print_instance_overview(
instances,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Klipper instance to remove", options)
instances_to_remove = []
@@ -69,7 +76,8 @@ def select_instances_to_remove(
elif selection == "a".lower():
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])
instance = instances[int(selection)]
instances_to_remove.append(instance)
return instances_to_remove
@@ -88,6 +96,28 @@ def remove_instances(
cmd_sysctl_manage("daemon-reload")
def remove_klipper_dir() -> None:
if not KLIPPER_DIR.exists():
Logger.print_info(f"'{KLIPPER_DIR}' does not exist. Skipped ...")
return
try:
shutil.rmtree(KLIPPER_DIR)
except OSError as e:
Logger.print_error(f"Unable to delete '{KLIPPER_DIR}':\n{e}")
def remove_klipper_env() -> None:
if not KLIPPER_ENV_DIR.exists():
Logger.print_info(f"'{KLIPPER_ENV_DIR}' does not exist. Skipped ...")
return
try:
shutil.rmtree(KLIPPER_ENV_DIR)
except OSError as e:
Logger.print_error(f"Unable to delete '{KLIPPER_ENV_DIR}':\n{e}")
def delete_klipper_logs(instances: List[Klipper]) -> None:
all_logfiles = []
for instance in instances:
@@ -98,4 +128,4 @@ def delete_klipper_logs(instances: List[Klipper]) -> None:
for log in all_logfiles:
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)
remove_file(log)

View File

@@ -220,7 +220,6 @@ def check_user_groups():
"INFO:",
"Relog required for group assignments to take effect!",
],
padding_bottom=0,
)
if not get_confirm(f"Add user '{CURRENT_USER}' to group(s) now?"):

View File

@@ -21,11 +21,11 @@ class KlipperRemoveMenu(BaseMenu):
def __init__(self, previous_menu: Optional[Type[BaseMenu]] = None):
super().__init__()
self.previous_menu = previous_menu
self.footer_type = FooterType.BACK
self.footer_type = FooterType.BACK_HELP
self.remove_klipper_service = False
self.remove_klipper_dir = False
self.remove_klipper_env = False
self.selection_state = False
self.delete_klipper_logs = False
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
from core.menus.remove_menu import RemoveMenu
@@ -36,10 +36,11 @@ class KlipperRemoveMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"a": Option(method=self.toggle_all, menu=False),
"0": Option(method=self.toggle_all, menu=False),
"1": Option(method=self.toggle_remove_klipper_service, menu=False),
"2": Option(method=self.toggle_remove_klipper_dir, menu=False),
"3": Option(method=self.toggle_remove_klipper_env, menu=False),
"4": Option(method=self.toggle_delete_klipper_logs, menu=False),
"c": Option(method=self.run_removal_process, menu=False),
}
@@ -52,6 +53,7 @@ class KlipperRemoveMenu(BaseMenu):
o1 = checked if self.remove_klipper_service else unchecked
o2 = checked if self.remove_klipper_dir else unchecked
o3 = checked if self.remove_klipper_env else unchecked
o4 = checked if self.delete_klipper_logs else unchecked
menu = textwrap.dedent(
f"""
╔═══════════════════════════════════════════════════════╗
@@ -60,11 +62,12 @@ class KlipperRemoveMenu(BaseMenu):
║ Enter a number and hit enter to select / deselect ║
║ the specific option for removal. ║
╟───────────────────────────────────────────────────────╢
a) {self._get_selection_state_str():37}
0) Select everything
╟───────────────────────────────────────────────────────╢
║ 1) {o1} Remove Service ║
║ 2) {o2} Remove Local Repository ║
║ 3) {o3} Remove Python Environment ║
║ 4) {o4} Delete all Log-Files ║
╟───────────────────────────────────────────────────────╢
║ C) Continue ║
╟───────────────────────────────────────────────────────╢
@@ -73,10 +76,10 @@ class KlipperRemoveMenu(BaseMenu):
print(menu, end="")
def toggle_all(self, **kwargs) -> None:
self.remove_klipper_service = not self.remove_klipper_service
self.remove_klipper_dir = not self.remove_klipper_dir
self.remove_klipper_env = not self.remove_klipper_env
self.selection_state = not self.selection_state
self.remove_klipper_service = True
self.remove_klipper_dir = True
self.remove_klipper_env = True
self.delete_klipper_logs = True
def toggle_remove_klipper_service(self, **kwargs) -> None:
self.remove_klipper_service = not self.remove_klipper_service
@@ -87,11 +90,15 @@ class KlipperRemoveMenu(BaseMenu):
def toggle_remove_klipper_env(self, **kwargs) -> None:
self.remove_klipper_env = not self.remove_klipper_env
def toggle_delete_klipper_logs(self, **kwargs) -> None:
self.delete_klipper_logs = not self.delete_klipper_logs
def run_removal_process(self, **kwargs) -> None:
if (
not self.remove_klipper_service
and not self.remove_klipper_dir
and not self.remove_klipper_env
and not self.delete_klipper_logs
):
error = f"{COLOR_RED}Nothing selected! Select options to remove first.{RESET_FORMAT}"
print(error)
@@ -101,19 +108,10 @@ class KlipperRemoveMenu(BaseMenu):
self.remove_klipper_service,
self.remove_klipper_dir,
self.remove_klipper_env,
self.delete_klipper_logs,
)
self.remove_klipper_service = False
self.remove_klipper_dir = False
self.remove_klipper_env = False
self._go_back()
def _get_selection_state_str(self) -> str:
return (
"Select everything" if not self.selection_state else "Deselect everything"
)
def _go_back(self, **kwargs) -> None:
if self.previous_menu is not None:
self.previous_menu().run()
self.delete_klipper_logs = False

View File

@@ -62,8 +62,6 @@ def install_klipperscreen() -> None:
"KlipperScreens update manager configuration for Moonraker "
"will not be added to any moonraker.conf.",
],
padding_top=0,
padding_bottom=0,
)
if not get_confirm(
"Continue KlipperScreen installation?",

View File

@@ -25,7 +25,7 @@ class MoonrakerRemoveMenu(BaseMenu):
self.remove_moonraker_dir = False
self.remove_moonraker_env = False
self.remove_moonraker_polkit = False
self.selection_state = False
self.delete_moonraker_logs = False
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
from core.menus.remove_menu import RemoveMenu
@@ -36,11 +36,12 @@ class MoonrakerRemoveMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"a": Option(method=self.toggle_all, menu=False),
"0": Option(method=self.toggle_all, menu=False),
"1": Option(method=self.toggle_remove_moonraker_service, menu=False),
"2": Option(method=self.toggle_remove_moonraker_dir, menu=False),
"3": Option(method=self.toggle_remove_moonraker_env, menu=False),
"4": Option(method=self.toggle_remove_moonraker_polkit, menu=False),
"5": Option(method=self.toggle_delete_moonraker_logs, menu=False),
"c": Option(method=self.run_removal_process, menu=False),
}
@@ -54,6 +55,7 @@ class MoonrakerRemoveMenu(BaseMenu):
o2 = checked if self.remove_moonraker_dir else unchecked
o3 = checked if self.remove_moonraker_env else unchecked
o4 = checked if self.remove_moonraker_polkit else unchecked
o5 = checked if self.delete_moonraker_logs else unchecked
menu = textwrap.dedent(
f"""
╔═══════════════════════════════════════════════════════╗
@@ -62,12 +64,13 @@ class MoonrakerRemoveMenu(BaseMenu):
║ Enter a number and hit enter to select / deselect ║
║ the specific option for removal. ║
╟───────────────────────────────────────────────────────╢
a) {self._get_selection_state_str():37}
0) Select everything
╟───────────────────────────────────────────────────────╢
║ 1) {o1} Remove Service ║
║ 2) {o2} Remove Local Repository ║
║ 3) {o3} Remove Python Environment ║
║ 4) {o4} Remove Policy Kit Rules ║
║ 5) {o5} Delete all Log-Files ║
╟───────────────────────────────────────────────────────╢
║ C) Continue ║
╟───────────────────────────────────────────────────────╢
@@ -76,11 +79,11 @@ class MoonrakerRemoveMenu(BaseMenu):
print(menu, end="")
def toggle_all(self, **kwargs) -> None:
self.remove_moonraker_service = not self.remove_moonraker_service
self.remove_moonraker_dir = not self.remove_moonraker_dir
self.remove_moonraker_env = not self.remove_moonraker_env
self.remove_moonraker_polkit = not self.remove_moonraker_polkit
self.selection_state = not self.selection_state
self.remove_moonraker_service = True
self.remove_moonraker_dir = True
self.remove_moonraker_env = True
self.remove_moonraker_polkit = True
self.delete_moonraker_logs = True
def toggle_remove_moonraker_service(self, **kwargs) -> None:
self.remove_moonraker_service = not self.remove_moonraker_service
@@ -94,12 +97,16 @@ class MoonrakerRemoveMenu(BaseMenu):
def toggle_remove_moonraker_polkit(self, **kwargs) -> None:
self.remove_moonraker_polkit = not self.remove_moonraker_polkit
def toggle_delete_moonraker_logs(self, **kwargs) -> None:
self.delete_moonraker_logs = not self.delete_moonraker_logs
def run_removal_process(self, **kwargs) -> None:
if (
not self.remove_moonraker_service
and not self.remove_moonraker_dir
and not self.remove_moonraker_env
and not self.remove_moonraker_polkit
and not self.delete_moonraker_logs
):
error = f"{COLOR_RED}Nothing selected! Select options to remove first.{RESET_FORMAT}"
print(error)
@@ -110,20 +117,11 @@ class MoonrakerRemoveMenu(BaseMenu):
self.remove_moonraker_dir,
self.remove_moonraker_env,
self.remove_moonraker_polkit,
self.delete_moonraker_logs,
)
self.remove_moonraker_service = False
self.remove_moonraker_dir = False
self.remove_moonraker_env = False
self.remove_moonraker_polkit = False
self._go_back()
def _get_selection_state_str(self) -> str:
return (
"Select everything" if not self.selection_state else "Deselect everything"
)
def _go_back(self, **kwargs) -> None:
if self.previous_menu is not None:
self.previous_menu().run()
self.delete_moonraker_logs = False

View File

@@ -8,8 +8,8 @@
# ======================================================================= #
from __future__ import annotations
import subprocess
from pathlib import Path
from subprocess import DEVNULL, CalledProcessError, run
from typing import List
from components.moonraker import MODULE_PATH, MOONRAKER_DIR, MOONRAKER_ENV_DIR
@@ -49,46 +49,43 @@ class Moonraker(BaseInstance):
def create(self, create_example_cfg: bool = False) -> None:
Logger.print_status("Creating new Moonraker Instance ...")
service_template_path = MODULE_PATH.joinpath("assets/moonraker.service")
env_template_file_path = MODULE_PATH.joinpath("assets/moonraker.env")
service_file_name = self.get_service_file_name(extension=True)
service_file_target = SYSTEMD.joinpath(service_file_name)
env_template_file_path = MODULE_PATH.joinpath("assets/moonraker.env")
env_file_target = self.sysd_dir.joinpath("moonraker.env")
try:
self.create_folders([self.backup_dir, self.certs_dir, self._db_dir])
self._write_service_file(
service_template_path,
service_file_target,
env_file_target,
self.write_service_file(
service_template_path, service_file_target, env_file_target
)
self._write_env_file(env_template_file_path, env_file_target)
self.write_env_file(env_template_file_path, env_file_target)
except CalledProcessError as e:
Logger.print_error(f"Error creating instance: {e}")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error creating service file {service_file_target}: {e}"
)
raise
except OSError as e:
Logger.print_error(f"Error creating env file: {e}")
Logger.print_error(f"Error writing file: {e}")
raise
def delete(self) -> None:
service_file = self.get_service_file_name(extension=True)
service_file_path = self.get_service_file_path()
Logger.print_status(f"Removing Moonraker Instance: {service_file}")
Logger.print_status(f"Deleting Moonraker Instance: {service_file}")
try:
command = ["sudo", "rm", "-f", service_file_path]
run(command, check=True)
self._delete_logfiles()
Logger.print_ok("Instance successfully removed!")
except CalledProcessError as e:
Logger.print_error(f"Error removing instance: {e}")
subprocess.run(command, check=True)
Logger.print_ok(f"Service file deleted: {service_file_path}")
except subprocess.CalledProcessError as e:
Logger.print_error(f"Error deleting service file: {e}")
raise
def _write_service_file(
def write_service_file(
self,
service_template_path: Path,
service_file_target: Path,
@@ -98,15 +95,15 @@ class Moonraker(BaseInstance):
service_template_path, env_file_target
)
command = ["sudo", "tee", service_file_target]
run(
subprocess.run(
command,
input=service_content.encode(),
stdout=DEVNULL,
stdout=subprocess.DEVNULL,
check=True,
)
Logger.print_ok(f"Service file created: {service_file_target}")
def _write_env_file(
def write_env_file(
self, env_template_file_path: Path, env_file_target: Path
) -> None:
env_file_content = self._prep_env_file(env_template_file_path)
@@ -159,10 +156,3 @@ class Moonraker(BaseInstance):
port = scp.getint("server", "port", fallback=None)
return port
def _delete_logfiles(self) -> None:
from utils.fs_utils import run_remove_routines
for log in list(self.log_dir.glob("moonraker.log*")):
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)

View File

@@ -7,14 +7,15 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from subprocess import DEVNULL, PIPE, CalledProcessError, run
import shutil
import subprocess
from typing import List, Union
from components.klipper.klipper_dialogs import print_instance_overview
from components.moonraker import MOONRAKER_DIR, MOONRAKER_ENV_DIR
from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from utils.fs_utils import run_remove_routines
from utils.fs_utils import remove_file
from utils.input_utils import get_selection_input
from utils.logger import Logger
from utils.sys_utils import cmd_sysctl_manage
@@ -25,6 +26,7 @@ def run_moonraker_removal(
remove_dir: bool,
remove_env: bool,
remove_polkit: bool,
delete_logs: bool,
) -> None:
im = InstanceManager(Moonraker)
@@ -37,38 +39,41 @@ def run_moonraker_removal(
Logger.print_info("No Moonraker Services installed! Skipped ...")
if (remove_polkit or remove_dir or remove_env) and im.instances:
Logger.print_info("There are still other Moonraker services installed")
Logger.print_info(
"● Moonraker PolicyKit rules were not removed.", prefix=False
Logger.print_warn("There are still other Moonraker services installed!")
Logger.print_warn("Therefor the following parts cannot be removed:")
Logger.print_warn(
"""
● Moonraker PolicyKit rules
● Moonraker local repository
● Moonraker Python environment
""",
False,
)
Logger.print_info(f"'{MOONRAKER_DIR}' was not removed.", prefix=False)
Logger.print_info(f"'{MOONRAKER_ENV_DIR}' was not removed.", prefix=False)
else:
if remove_polkit:
Logger.print_status("Removing all Moonraker policykit rules ...")
remove_polkit_rules()
if remove_dir:
Logger.print_status("Removing Moonraker local repository ...")
run_remove_routines(MOONRAKER_DIR)
remove_moonraker_dir()
if remove_env:
Logger.print_status("Removing Moonraker Python environment ...")
run_remove_routines(MOONRAKER_ENV_DIR)
remove_moonraker_env()
# delete moonraker logs of all instances
if delete_logs:
Logger.print_status("Removing all Moonraker logs ...")
delete_moonraker_logs(im.instances)
def select_instances_to_remove(
instances: List[Moonraker],
) -> Union[List[Moonraker], None]:
start_index = 1
options = [str(i + start_index) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
instance_map = {options[i]: instances[i] for i in range(len(instances))}
print_instance_overview(instances, show_index=True, show_select_all=True)
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
print_instance_overview(
instances,
start_index=start_index,
show_index=True,
show_select_all=True,
)
selection = get_selection_input("Select Moonraker instance to remove", options)
instances_to_remove = []
@@ -77,7 +82,8 @@ def select_instances_to_remove(
elif selection == "a".lower():
instances_to_remove.extend(instances)
else:
instances_to_remove.append(instance_map[selection])
instance = instances[int(selection)]
instances_to_remove.append(instance)
return instances_to_remove
@@ -96,6 +102,28 @@ def remove_instances(
cmd_sysctl_manage("daemon-reload")
def remove_moonraker_dir() -> None:
if not MOONRAKER_DIR.exists():
Logger.print_info(f"'{MOONRAKER_DIR}' does not exist. Skipped ...")
return
try:
shutil.rmtree(MOONRAKER_DIR)
except OSError as e:
Logger.print_error(f"Unable to delete '{MOONRAKER_DIR}':\n{e}")
def remove_moonraker_env() -> None:
if not MOONRAKER_ENV_DIR.exists():
Logger.print_info(f"'{MOONRAKER_ENV_DIR}' does not exist. Skipped ...")
return
try:
shutil.rmtree(MOONRAKER_ENV_DIR)
except OSError as e:
Logger.print_error(f"Unable to delete '{MOONRAKER_ENV_DIR}':\n{e}")
def remove_polkit_rules() -> None:
if not MOONRAKER_DIR.exists():
log = "Cannot remove policykit rules. Moonraker directory not found."
@@ -103,9 +131,17 @@ def remove_polkit_rules() -> None:
return
try:
cmd = [f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh", "--clear"]
run(cmd, stderr=PIPE, stdout=DEVNULL, check=True)
except CalledProcessError as e:
command = [
f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh",
"--clear",
]
subprocess.run(
command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
check=True,
)
except subprocess.CalledProcessError as e:
Logger.print_error(f"Error while removing policykit rules: {e}")
Logger.print_ok("Policykit rules successfully removed!")
@@ -121,4 +157,4 @@ def delete_moonraker_logs(instances: List[Moonraker]) -> None:
for log in all_logfiles:
Logger.print_status(f"Remove '{log}'")
run_remove_routines(log)
remove_file(log)

View File

@@ -8,6 +8,8 @@
# ======================================================================= #
import shutil
import subprocess
from typing import List
from components.klipper.klipper import Klipper
@@ -15,7 +17,7 @@ from components.moonraker.moonraker import Moonraker
from components.webui_client.base_data import BaseWebClientConfig
from core.instance_manager.instance_manager import InstanceManager
from utils.config_utils import remove_config_section
from utils.fs_utils import run_remove_routines
from utils.fs_utils import remove_file
from utils.logger import Logger
@@ -31,12 +33,29 @@ def run_client_config_removal(
def remove_client_config_dir(client_config: BaseWebClientConfig) -> None:
Logger.print_status(f"Removing {client_config.display_name} ...")
run_remove_routines(client_config.config_dir)
Logger.print_status(f"Removing {client_config.name} ...")
client_config_dir = client_config.config_dir
if not client_config_dir.exists():
Logger.print_info(f"'{client_config_dir}' does not exist. Skipping ...")
return
try:
shutil.rmtree(client_config_dir)
except OSError as e:
Logger.print_error(f"Unable to delete '{client_config_dir}':\n{e}")
def remove_client_config_symlink(client_config: BaseWebClientConfig) -> None:
im = InstanceManager(Klipper)
instances: List[Klipper] = im.instances
for instance in instances:
run_remove_routines(instance.cfg_dir.joinpath(client_config.config_filename))
Logger.print_status(f"Removing symlink from '{instance.cfg_dir}' ...")
symlink = instance.cfg_dir.joinpath(client_config.config_filename)
if not symlink.is_symlink():
Logger.print_info(f"'{symlink}' does not exist. Skipping ...")
continue
try:
remove_file(symlink)
except subprocess.CalledProcessError:
Logger.print_error("Failed to remove symlink!")

View File

@@ -8,6 +8,7 @@
# ======================================================================= #
import shutil
from typing import List
from components.klipper.klipper import Klipper
@@ -25,15 +26,14 @@ from utils.config_utils import remove_config_section
from utils.fs_utils import (
remove_nginx_config,
remove_nginx_logs,
run_remove_routines,
)
from utils.logger import Logger
def run_client_removal(
client: BaseWebClient,
remove_client: bool,
remove_client_cfg: bool,
rm_client: bool,
rm_client_config: bool,
backup_ms_config_json: bool,
) -> None:
mr_im = InstanceManager(Moonraker)
@@ -44,7 +44,7 @@ def run_client_removal(
if backup_ms_config_json and client.client == WebClientType.MAINSAIL:
backup_mainsail_config_json()
if remove_client:
if rm_client:
client_name = client.name
remove_client_dir(client)
remove_nginx_config(client_name)
@@ -53,7 +53,7 @@ def run_client_removal(
section = f"update_manager {client_name}"
remove_config_section(section, mr_instances)
if remove_client_cfg:
if rm_client_config:
run_client_config_removal(
client.client_config,
kl_instances,
@@ -63,4 +63,12 @@ def run_client_removal(
def remove_client_dir(client: BaseWebClient) -> None:
Logger.print_status(f"Removing {client.display_name} ...")
run_remove_routines(client.client_dir)
client_dir = client.client_dir
if not client.client_dir.exists():
Logger.print_info(f"'{client_dir}' does not exist. Skipping ...")
return
try:
shutil.rmtree(client_dir)
except OSError as e:
Logger.print_error(f"Unable to delete '{client_dir}':\n{e}")

View File

@@ -24,6 +24,7 @@ from components.webui_client.client_config.client_config_setup import (
from components.webui_client.client_dialogs import (
print_client_port_select_dialog,
print_install_client_config_dialog,
print_ipv6_warning_dialog,
print_moonraker_not_found_dialog,
)
from components.webui_client.client_utils import (
@@ -49,6 +50,7 @@ from utils.fs_utils import (
from utils.input_utils import get_confirm, get_number_input
from utils.logger import Logger
from utils.sys_utils import (
check_ipv6,
cmd_sysctl_service,
download_file,
get_ipv4_addr,
@@ -112,6 +114,13 @@ def install_client(client: BaseWebClient) -> None:
)
valid_port = is_valid_port(port, ports_in_use)
# check if ipv6 is enabled, as this may cause issues with nginx
if check_ipv6():
print_ipv6_warning_dialog()
if not get_confirm(f"Continue with {client.display_name} installation?"):
Logger.print_info(f"Exiting {client.display_name} installation ...")
return
check_install_dependencies(["nginx", "unzip"])
try:

View File

@@ -6,7 +6,6 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import json # noqa: I001
import shutil
@@ -30,7 +29,7 @@ from utils.git_utils import (
get_latest_unstable_tag,
)
from utils.logger import Logger
from utils.types import ComponentStatus
from utils.types import ComponentStatus, InstallStatus
def get_client_status(
@@ -41,16 +40,16 @@ def get_client_status(
NGINX_CONFD.joinpath("upstreams.conf"),
NGINX_CONFD.joinpath("common_vars.conf"),
]
comp_status: ComponentStatus = get_install_status(client.client_dir, files=files)
status = get_install_status(client.client_dir, files=files)
# if the client dir does not exist, set the status to not
# installed even if the other files are present
if not client.client_dir.exists():
comp_status.status = 0
status["status"] = InstallStatus.NOT_INSTALLED
comp_status.local = get_local_client_version(client)
comp_status.remote = get_remote_client_version(client) if fetch_remote else None
return comp_status
status["local"] = get_local_client_version(client)
status["remote"] = get_remote_client_version(client) if fetch_remote else None
return status
def get_client_config_status(client: BaseWebClient) -> ComponentStatus:
@@ -75,6 +74,7 @@ def get_current_client_config(clients: List[BaseWebClient]) -> str:
def backup_mainsail_config_json(is_temp=False) -> None:
c_json = MainsailData().client_dir.joinpath("config.json")
Logger.print_status(f"Backup '{c_json}' ...")
bm = BackupManager()
if is_temp:
fn = Path.home().joinpath("config.json.kiauh.bak")
@@ -126,12 +126,12 @@ def symlink_webui_nginx_log(klipper_instances: List[Klipper]) -> None:
desti_error.symlink_to(error_log)
def get_local_client_version(client: BaseWebClient) -> str | None:
def get_local_client_version(client: BaseWebClient) -> str:
relinfo_file = client.client_dir.joinpath("release_info.json")
version_file = client.client_dir.joinpath(".version")
if not client.client_dir.exists():
return None
return "-"
if not relinfo_file.is_file() and not version_file.is_file():
return "n/a"
@@ -143,13 +143,13 @@ def get_local_client_version(client: BaseWebClient) -> str | None:
return f.readlines()[0]
def get_remote_client_version(client: BaseWebClient) -> str | None:
def get_remote_client_version(client: BaseWebClient) -> str:
try:
if (tag := get_latest_tag(client.repo_path)) != "":
return tag
return None
return "ERROR"
except Exception:
return None
return "ERROR"
def backup_client_data(client: BaseWebClient) -> None:

View File

@@ -25,10 +25,9 @@ class ClientRemoveMenu(BaseMenu):
super().__init__()
self.previous_menu = previous_menu
self.client = client
self.remove_client = False
self.remove_client_cfg = False
self.rm_client = False
self.rm_client_config = False
self.backup_mainsail_config_json = False
self.selection_state = False
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
from core.menus.remove_menu import RemoveMenu
@@ -39,7 +38,7 @@ class ClientRemoveMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"a": Option(method=self.toggle_all, menu=False),
"0": Option(method=self.toggle_all, menu=False),
"1": Option(method=self.toggle_rm_client, menu=False),
"2": Option(method=self.toggle_rm_client_config, menu=False),
"c": Option(method=self.run_removal_process, menu=False),
@@ -57,8 +56,8 @@ class ClientRemoveMenu(BaseMenu):
count = 62 - len(color) - len(RESET_FORMAT)
checked = f"[{COLOR_CYAN}x{RESET_FORMAT}]"
unchecked = "[ ]"
o1 = checked if self.remove_client else unchecked
o2 = checked if self.remove_client_cfg else unchecked
o1 = checked if self.rm_client else unchecked
o2 = checked if self.rm_client_config else unchecked
menu = textwrap.dedent(
f"""
╔═══════════════════════════════════════════════════════╗
@@ -67,7 +66,7 @@ class ClientRemoveMenu(BaseMenu):
║ Enter a number and hit enter to select / deselect ║
║ the specific option for removal. ║
╟───────────────────────────────────────────────────────╢
a) {self._get_selection_state_str():37}
0) Select everything
╟───────────────────────────────────────────────────────╢
║ 1) {o1} Remove {client_name:16}
║ 2) {o2} Remove {client_config_name:24}
@@ -92,24 +91,23 @@ class ClientRemoveMenu(BaseMenu):
print(menu, end="")
def toggle_all(self, **kwargs) -> None:
self.remove_client = not self.remove_client
self.remove_client_cfg = not self.remove_client_cfg
self.backup_mainsail_config_json = not self.backup_mainsail_config_json
self.selection_state = not self.selection_state
self.rm_client = True
self.rm_client_config = True
self.backup_mainsail_config_json = True
def toggle_rm_client(self, **kwargs) -> None:
self.remove_client = not self.remove_client
self.rm_client = not self.rm_client
def toggle_rm_client_config(self, **kwargs) -> None:
self.remove_client_cfg = not self.remove_client_cfg
self.rm_client_config = not self.rm_client_config
def toggle_backup_mainsail_config_json(self, **kwargs) -> None:
self.backup_mainsail_config_json = not self.backup_mainsail_config_json
def run_removal_process(self, **kwargs) -> None:
if (
not self.remove_client
and not self.remove_client_cfg
not self.rm_client
and not self.rm_client_config
and not self.backup_mainsail_config_json
):
error = f"{COLOR_RED}Nothing selected ...{RESET_FORMAT}"
@@ -118,22 +116,11 @@ class ClientRemoveMenu(BaseMenu):
client_remove.run_client_removal(
client=self.client,
remove_client=self.remove_client,
remove_client_cfg=self.remove_client_cfg,
rm_client=self.rm_client,
rm_client_config=self.rm_client_config,
backup_ms_config_json=self.backup_mainsail_config_json,
)
self.remove_client = False
self.remove_client_cfg = False
self.rm_client = False
self.rm_client_config = False
self.backup_mainsail_config_json = False
self._go_back()
def _get_selection_state_str(self) -> str:
return (
"Select everything" if not self.selection_state else "Deselect everything"
)
def _go_back(self, **kwargs) -> None:
if self.previous_menu is not None:
self.previous_menu().run()

View File

@@ -41,7 +41,7 @@ from utils.constants import (
RESET_FORMAT,
)
from utils.logger import Logger
from utils.types import ComponentStatus, StatusMap, StatusText
from utils.types import ComponentStatus
# noinspection PyUnusedLocal
@@ -96,10 +96,10 @@ class MainMenu(BaseMenu):
def _get_component_status(self, name: str, status_fn: callable, *args) -> None:
status_data: ComponentStatus = status_fn(*args)
code: int = status_data.status
status: StatusText = StatusMap[code]
repo: str = status_data.repo
instance_count: int = status_data.instances
code: int = status_data.get("status").value.code
status: str = status_data.get("status").value.txt
repo: str = status_data.get("repo")
instance_count: int = status_data.get("instances")
count_txt: str = ""
if instance_count > 0 and code == 1:
@@ -109,15 +109,12 @@ class MainMenu(BaseMenu):
setattr(self, f"{name}_repo", f"{COLOR_CYAN}{repo}{RESET_FORMAT}")
def _format_by_code(self, code: int, status: str, count: str) -> str:
color = COLOR_RED
if code == 0:
color = COLOR_RED
elif code == 1:
color = COLOR_YELLOW
if code == 1:
return f"{COLOR_GREEN}{status}{count}{RESET_FORMAT}"
elif code == 2:
color = COLOR_GREEN
return f"{COLOR_RED}{status}{count}{RESET_FORMAT}"
return f"{color}{status}{count}{RESET_FORMAT}"
return f"{COLOR_YELLOW}{status}{count}{RESET_FORMAT}"
def print_menu(self):
self.fetch_status()

View File

@@ -6,6 +6,7 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import textwrap
from typing import Optional, Type
@@ -46,8 +47,6 @@ from utils.constants import (
COLOR_YELLOW,
RESET_FORMAT,
)
from utils.logger import Logger
from utils.spinner import Spinner
from utils.types import ComponentStatus
@@ -58,31 +57,15 @@ class UpdateMenu(BaseMenu):
super().__init__()
self.previous_menu = previous_menu
self.klipper_local = self.klipper_remote = ""
self.moonraker_local = self.moonraker_remote = ""
self.mainsail_local = self.mainsail_remote = ""
self.mainsail_config_local = self.mainsail_config_remote = ""
self.fluidd_local = self.fluidd_remote = ""
self.fluidd_config_local = self.fluidd_config_remote = ""
self.klipperscreen_local = self.klipperscreen_remote = ""
self.mobileraker_local = self.mobileraker_remote = ""
self.crowsnest_local = self.crowsnest_remote = ""
self.octoeverywhere_local = self.octoeverywhere_remote = ""
self.kl_local = self.kl_remote = self.mr_local = self.mr_remote = ""
self.ms_local = self.ms_remote = self.fl_local = self.fl_remote = ""
self.mc_local = self.mc_remote = self.fc_local = self.fc_remote = ""
self.ks_local = self.ks_remote = self.mb_local = self.mb_remote = ""
self.cn_local = self.cn_remote = self.oe_local = self.oe_remote = ""
self.mainsail_data = MainsailData()
self.fluidd_data = FluiddData()
self.status_data = {
"klipper": {"installed": False, "local": None, "remote": None},
"moonraker": {"installed": False, "local": None, "remote": None},
"mainsail": {"installed": False, "local": None, "remote": None},
"mainsail_config": {"installed": False, "local": None, "remote": None},
"fluidd": {"installed": False, "local": None, "remote": None},
"fluidd_config": {"installed": False, "local": None, "remote": None},
"mobileraker": {"installed": False, "local": None, "remote": None},
"klipperscreen": {"installed": False, "local": None, "remote": None},
"crowsnest": {"installed": False, "local": None, "remote": None},
"octoeverywhere": {"installed": False, "local": None, "remote": None},
}
self._fetch_update_status()
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
from core.menus.main_menu import MainMenu
@@ -93,7 +76,7 @@ class UpdateMenu(BaseMenu):
def set_options(self) -> None:
self.options = {
"a": Option(self.update_all, menu=False),
"0": Option(self.update_all, menu=False),
"1": Option(self.update_klipper, menu=False),
"2": Option(self.update_moonraker, menu=False),
"3": Option(self.update_mainsail, menu=False),
@@ -108,13 +91,8 @@ class UpdateMenu(BaseMenu):
}
def print_menu(self):
spinner = Spinner()
spinner.start()
self._fetch_update_status()
spinner.stop()
header = " [ Update Menu ] "
color = COLOR_GREEN
count = 62 - len(color) - len(RESET_FORMAT)
@@ -123,25 +101,25 @@ class UpdateMenu(BaseMenu):
╔═══════════════════════════════════════════════════════╗
{color}{header:~^{count}}{RESET_FORMAT}
╟───────────────────────┬───────────────┬───────────────╢
a) Update all │ │ ║
0) Update all │ │ ║
║ │ Current: │ Latest: ║
║ Klipper & API: ├───────────────┼───────────────╢
║ 1) Klipper │ {self.klipper_local:<22}{self.klipper_remote:<22}
║ 2) Moonraker │ {self.moonraker_local:<22}{self.moonraker_remote:<22}
║ 1) Klipper │ {self.kl_local:<22}{self.kl_remote:<22}
║ 2) Moonraker │ {self.mr_local:<22}{self.mr_remote:<22}
║ │ │ ║
║ Webinterface: ├───────────────┼───────────────╢
║ 3) Mainsail │ {self.mainsail_local:<22}{self.mainsail_remote:<22}
║ 4) Fluidd │ {self.fluidd_local:<22}{self.fluidd_remote:<22}
║ 3) Mainsail │ {self.ms_local:<22}{self.ms_remote:<22}
║ 4) Fluidd │ {self.fl_local:<22}{self.fl_remote:<22}
║ │ │ ║
║ Client-Config: ├───────────────┼───────────────╢
║ 5) Mainsail-Config │ {self.mainsail_config_local:<22}{self.mainsail_config_remote:<22}
║ 6) Fluidd-Config │ {self.fluidd_config_local:<22}{self.fluidd_config_remote:<22}
║ 5) Mainsail-Config │ {self.mc_local:<22}{self.mc_remote:<22}
║ 6) Fluidd-Config │ {self.fc_local:<22}{self.fc_remote:<22}
║ │ │ ║
║ Other: ├───────────────┼───────────────╢
║ 7) KlipperScreen │ {self.klipperscreen_local:<22}{self.klipperscreen_remote:<22}
║ 8) Mobileraker │ {self.mobileraker_local:<22}{self.mobileraker_remote:<22}
║ 9) Crowsnest │ {self.crowsnest_local:<22}{self.crowsnest_remote:<22}
║ 10) OctoEverywhere │ {self.octoeverywhere_local:<22}{self.octoeverywhere_remote:<22}
║ 7) KlipperScreen │ {self.ks_local:<22}{self.ks_remote:<22}
║ 8) Mobileraker │ {self.mb_local:<22}{self.mb_remote:<22}
║ 9) Crowsnest │ {self.cn_local:<22}{self.cn_remote:<22}
║ 10) OctoEverywhere │ {self.oe_local:<22}{self.oe_remote:<22}
║ ├───────────────┴───────────────╢
║ 11) System │ ║
╟───────────────────────┴───────────────────────────────╢
@@ -153,96 +131,68 @@ class UpdateMenu(BaseMenu):
print("update_all")
def update_klipper(self, **kwargs):
if self._check_is_installed("klipper"):
update_klipper()
update_klipper()
def update_moonraker(self, **kwargs):
if self._check_is_installed("moonraker"):
update_moonraker()
update_moonraker()
def update_mainsail(self, **kwargs):
if self._check_is_installed("mainsail"):
update_client(self.mainsail_data)
update_client(self.mainsail_data)
def update_mainsail_config(self, **kwargs):
if self._check_is_installed("mainsail_config"):
update_client_config(self.mainsail_data)
update_client_config(self.mainsail_data)
def update_fluidd(self, **kwargs):
if self._check_is_installed("fluidd"):
update_client(self.fluidd_data)
update_client(self.fluidd_data)
def update_fluidd_config(self, **kwargs):
if self._check_is_installed("fluidd_config"):
update_client_config(self.fluidd_data)
update_client_config(self.fluidd_data)
def update_klipperscreen(self, **kwargs):
if self._check_is_installed("klipperscreen"):
update_klipperscreen()
update_klipperscreen()
def update_mobileraker(self, **kwargs):
if self._check_is_installed("mobileraker"):
update_mobileraker()
update_mobileraker()
def update_crowsnest(self, **kwargs):
if self._check_is_installed("crowsnest"):
update_crowsnest()
update_crowsnest()
def update_octoeverywhere(self, **kwargs):
if self._check_is_installed("octoeverywhere"):
update_octoeverywhere()
update_octoeverywhere()
def upgrade_system_packages(self, **kwargs): ...
def _fetch_update_status(self):
self._set_status_data("klipper", get_klipper_status)
self._set_status_data("moonraker", get_moonraker_status)
self._set_status_data("mainsail", get_client_status, self.mainsail_data, True)
self._set_status_data(
"mainsail_config", get_client_config_status, self.mainsail_data
)
self._set_status_data("fluidd", get_client_status, self.fluidd_data, True)
self._set_status_data(
"fluidd_config", get_client_config_status, self.fluidd_data
)
self._set_status_data("klipperscreen", get_klipperscreen_status)
self._set_status_data("mobileraker", get_mobileraker_status)
self._set_status_data("crowsnest", get_crowsnest_status)
self._set_status_data("octoeverywhere", get_octoeverywhere_status)
# klipper
self._get_update_status("kl", get_klipper_status)
# moonraker
self._get_update_status("mr", get_moonraker_status)
# mainsail
self._get_update_status("ms", get_client_status, self.mainsail_data, True)
# mainsail-config
self._get_update_status("mc", get_client_config_status, self.mainsail_data)
# fluidd
self._get_update_status("fl", get_client_status, self.fluidd_data, True)
# fluidd-config
self._get_update_status("fc", get_client_config_status, self.fluidd_data)
# klipperscreen
self._get_update_status("ks", get_klipperscreen_status)
# mobileraker
self._get_update_status("mb", get_mobileraker_status)
# crowsnest
self._get_update_status("cn", get_crowsnest_status)
# octoeverywhere
self._get_update_status("oe", get_octoeverywhere_status)
def _format_local_status(self, local_version, remote_version) -> str:
color = COLOR_RED
if not local_version:
color = COLOR_RED
elif local_version == remote_version:
color = COLOR_GREEN
elif local_version != remote_version:
color = COLOR_YELLOW
if local_version == remote_version:
return f"{COLOR_GREEN}{local_version}{RESET_FORMAT}"
return f"{COLOR_YELLOW}{local_version}{RESET_FORMAT}"
return f"{color}{local_version or '-'}{RESET_FORMAT}"
def _set_status_data(self, name: str, status_fn: callable, *args) -> None:
comp_status: ComponentStatus = status_fn(*args)
self.status_data[name]["installed"] = True if comp_status.status == 2 else False
self.status_data[name]["local"] = comp_status.local
self.status_data[name]["remote"] = comp_status.remote
self._set_status_string(name)
def _set_status_string(self, name: str) -> None:
local_status = self.status_data[name].get("local", None)
remote_status = self.status_data[name].get("remote", None)
color = COLOR_GREEN if remote_status else COLOR_RED
local_txt = self._format_local_status(local_status, remote_status)
remote_txt = f"{color}{remote_status or '-'}{RESET_FORMAT}"
setattr(self, f"{name}_local", local_txt)
setattr(self, f"{name}_remote", remote_txt)
def _check_is_installed(self, name: str) -> bool:
if not self.status_data[name]["installed"]:
Logger.print_info(f"{name.capitalize()} is not installed! Skipped ...")
return False
return True
def _get_update_status(self, name: str, status_fn: callable, *args) -> None:
status_data: ComponentStatus = status_fn(*args)
local_ver = status_data.get("local")
remote_ver = status_data.get("remote")
color = COLOR_GREEN if remote_ver != "ERROR" else COLOR_RED
setattr(self, f"{name}_local", self._format_local_status(local_ver, remote_ver))
setattr(self, f"{name}_remote", f"{color}{remote_ver}{RESET_FORMAT}")

View File

@@ -26,7 +26,7 @@ from utils.sys_utils import (
install_system_packages,
update_system_package_lists,
)
from utils.types import ComponentStatus, StatusCode
from utils.types import ComponentStatus, InstallStatus
def convert_camelcase_to_kebabcase(name: str) -> str:
@@ -92,11 +92,13 @@ def get_install_status(
checks.append(f.exists())
if all(checks):
status: StatusCode = 2 # installed
status = InstallStatus.INSTALLED
elif not any(checks):
status: StatusCode = 0 # not installed
status = InstallStatus.NOT_INSTALLED
else:
status: StatusCode = 1 # incomplete
status = InstallStatus.INCOMPLETE
return ComponentStatus(
status=status,

View File

@@ -237,20 +237,27 @@ def get_next_free_port(ports_in_use: List[int]) -> int:
def remove_nginx_config(name: str) -> None:
Logger.print_status(f"Removing NGINX config for {name.capitalize()} ...")
try:
remove_file(NGINX_SITES_AVAILABLE.joinpath(name), True)
remove_file(NGINX_SITES_ENABLED.joinpath(name), True)
run_remove_routines(NGINX_SITES_AVAILABLE.joinpath(name))
run_remove_routines(NGINX_SITES_ENABLED.joinpath(name))
except CalledProcessError as e:
log = f"Unable to remove NGINX config '{name}':\n{e.stderr.decode()}"
Logger.print_error(log)
def remove_nginx_logs(name: str, instances: List[Klipper]) -> None:
Logger.print_status(f"Removing NGINX logs for {name.capitalize()} ...")
try:
remove_file(Path(f"/var/log/nginx/{name}-access.log"), True)
remove_file(Path(f"/var/log/nginx/{name}-error.log"), True)
run_remove_routines(Path(f"/var/log/nginx/{name}-access.log"))
run_remove_routines(Path(f"/var/log/nginx/{name}-error.log"))
if not instances:
return
if not instances:
return
for instance in instances:
remove_file(instance.log_dir.joinpath(f"{name}-access.log"))
remove_file(instance.log_dir.joinpath(f"{name}-error.log"))
for instance in instances:
run_remove_routines(instance.log_dir.joinpath(f"{name}-access.log"))
run_remove_routines(instance.log_dir.joinpath(f"{name}-error.log"))
except (OSError, CalledProcessError) as e:
Logger.print_error(f"Unable to remove NGINX logs:\n{e}")

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
import json
import shutil
import urllib.request
@@ -65,11 +63,11 @@ def git_pull_wrapper(repo: str, target_dir: Path) -> None:
return
def get_repo_name(repo: Path) -> str | None:
def get_repo_name(repo: Path) -> str:
"""
Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from
:return: String in form of "<orga>/<name>" or None
:return: String in form of "<orga>/<name>"
"""
if not repo.exists() or not repo.joinpath(".git").exists():
return "-"
@@ -79,7 +77,7 @@ def get_repo_name(repo: Path) -> str | None:
result = check_output(cmd, stderr=DEVNULL)
return "/".join(result.decode().strip().split("/")[-2:])
except CalledProcessError:
return None
return "-"
def get_tags(repo_path: str) -> List[str]:
@@ -112,6 +110,7 @@ def get_latest_tag(repo_path: str) -> str:
else:
return ""
except Exception:
Logger.print_error("Error while getting the latest tag")
raise
@@ -131,20 +130,20 @@ def get_latest_unstable_tag(repo_path: str) -> str:
raise
def get_local_commit(repo: Path) -> str | None:
def get_local_commit(repo: Path) -> str:
if not repo.exists() or not repo.joinpath(".git").exists():
return None
return "-"
try:
cmd = f"cd {repo} && git describe HEAD --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return None
return "-"
def get_remote_commit(repo: Path) -> str | None:
def get_remote_commit(repo: Path) -> str:
if not repo.exists() or not repo.joinpath(".git").exists():
return None
return "-"
try:
# get locally checked out branch
@@ -154,7 +153,7 @@ def get_remote_commit(repo: Path) -> str | None:
cmd = f"cd {repo} && git describe 'origin/{branch}' --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return None
return "-"
def git_cmd_clone(repo: str, target_dir: Path) -> None:

View File

@@ -1,33 +0,0 @@
import sys
import threading
import time
class Spinner:
def __init__(self, message="Loading", delay=0.2):
self.message = f"{message} ..."
self.delay = delay
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._animate)
def _animate(self):
animation = ["", "", "", ""]
while not self._stop_event.is_set():
for char in animation:
sys.stdout.write(f"\r{char} {self.message}")
sys.stdout.flush()
time.sleep(self.delay)
if self._stop_event.is_set():
break
sys.stdout.write("\r" + " " * (len(self.message) + 1) + "\r")
sys.stdout.flush()
def start(self):
self._stop_event.clear()
if not self._thread.is_alive():
self._thread = threading.Thread(target=self._animate)
self._thread.start()
def stop(self):
self._stop_event.set()
self._thread.join()

View File

@@ -402,3 +402,34 @@ def log_process(process: Popen) -> None:
if process.poll() is not None:
break
def check_ipv6() -> bool:
"""
Check if IPv6 is enabled
:return: True if IPv6 is enabled, False otherwise
"""
try:
file1 = Path("/proc/sys/net/ipv6/conf/all/disable_ipv6")
if file1.exists():
with open(file1, "r") as file:
if file.read().strip() == "0":
return True
elif file.read().strip() == "1":
return False
file3 = Path("/etc/sysctl.conf")
if file3.exists():
with open(file3, "r") as file:
for line in file.readlines():
if (
"net.ipv6.conf.all.disable_ipv6" in line
and not line.startswith("#")
and line.split("=")[1].strip() == "0"
):
return True
return False
except Exception as e:
Logger.print_error(f"Error checking IPv6: {e}")
return True

View File

@@ -6,24 +6,25 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Literal
StatusText = Literal["Installed", "Not installed", "Incomplete"]
StatusCode = Literal[0, 1, 2]
StatusMap: Dict[StatusCode, StatusText] = {
0: "Not installed",
1: "Incomplete",
2: "Installed",
}
from enum import Enum
from typing import Optional, TypedDict
@dataclass
class ComponentStatus:
status: StatusCode
repo: str | None = None
local: str | None = None
remote: str | None = None
instances: int | None = None
class StatusInfo:
def __init__(self, txt: str, code: int):
self.txt: str = txt
self.code: int = code
class InstallStatus(Enum):
INSTALLED = StatusInfo("Installed", 1)
NOT_INSTALLED = StatusInfo("Not installed", 2)
INCOMPLETE = StatusInfo("Incomplete", 3)
class ComponentStatus(TypedDict):
status: InstallStatus
repo: Optional[str]
local: Optional[str]
remote: Optional[str]
instances: Optional[int]