feat(style): use black code style / formatter

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2023-10-29 00:31:34 +02:00
parent 84a530be7d
commit 358c666da9
17 changed files with 261 additions and 170 deletions

View File

@@ -19,8 +19,13 @@ class BaseInstance(ABC):
def blacklist(cls) -> List[str]:
return []
def __init__(self, prefix: Optional[str], name: Optional[str],
user: Optional[str], data_dir_name: Optional[str]):
def __init__(
self,
prefix: Optional[str],
name: Optional[str],
user: Optional[str],
data_dir_name: Optional[str],
):
self._prefix = prefix
self._name = name
self._user = user
@@ -82,4 +87,5 @@ class BaseInstance(ABC):
@abstractmethod
def get_service_file_name(self) -> str:
raise NotImplementedError(
"Subclasses must implement the get_service_file_name method")
"Subclasses must implement the get_service_file_name method"
)

View File

@@ -22,11 +22,16 @@ from kiauh.utils.logger import Logger
# noinspection PyMethodMayBeStatic
class InstanceManager:
def __init__(self, instance_type: Type[BaseInstance],
current_instance: Optional[BaseInstance] = None) -> None:
def __init__(
self,
instance_type: Type[BaseInstance],
current_instance: Optional[BaseInstance] = None,
) -> None:
self.instance_type = instance_type
self.current_instance = current_instance
self.instance_name = current_instance.name if current_instance is not None else None
self.instance_name = (
current_instance.name if current_instance is not None else None
)
self.instances = []
def get_current_instance(self) -> BaseInstance:
@@ -34,7 +39,9 @@ class InstanceManager:
def set_current_instance(self, instance: BaseInstance) -> None:
self.current_instance = instance
self.instance_name = f"{instance.prefix}-{instance.name}" if instance.name else instance.prefix
self.instance_name = (
f"{instance.prefix}-{instance.name}" if instance.name else instance.prefix
)
def create_instance(self) -> None:
if self.current_instance is not None:
@@ -59,49 +66,41 @@ class InstanceManager:
def enable_instance(self) -> None:
Logger.print_info(f"Enabling {self.instance_name}.service ...")
try:
command = ["sudo", "systemctl", "enable",
f"{self.instance_name}.service"]
command = ["sudo", "systemctl", "enable", f"{self.instance_name}.service"]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_name}.service enabled.")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error enabling service {self.instance_name}.service:")
Logger.print_error(f"Error enabling service {self.instance_name}.service:")
Logger.print_error(f"{e}")
def disable_instance(self) -> None:
Logger.print_info(f"Disabling {self.instance_name}.service ...")
try:
command = ["sudo", "systemctl", "disable",
f"{self.instance_name}.service"]
command = ["sudo", "systemctl", "disable", f"{self.instance_name}.service"]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_name}.service disabled.")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error disabling service {self.instance_name}.service:")
Logger.print_error(f"Error disabling service {self.instance_name}.service:")
Logger.print_error(f"{e}")
def start_instance(self) -> None:
Logger.print_info(f"Starting {self.instance_name}.service ...")
try:
command = ["sudo", "systemctl", "start",
f"{self.instance_name}.service"]
command = ["sudo", "systemctl", "start", f"{self.instance_name}.service"]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_name}.service started.")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error starting service {self.instance_name}.service:")
Logger.print_error(f"Error starting service {self.instance_name}.service:")
Logger.print_error(f"{e}")
def stop_instance(self) -> None:
Logger.print_info(f"Stopping {self.instance_name}.service ...")
try:
command = ["sudo", "systemctl", "stop",
f"{self.instance_name}.service"]
command = ["sudo", "systemctl", "stop", f"{self.instance_name}.service"]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_name}.service stopped.")
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error stopping service {self.instance_name}.service:")
Logger.print_error(f"Error stopping service {self.instance_name}.service:")
Logger.print_error(f"{e}")
raise
@@ -120,8 +119,7 @@ class InstanceManager:
if not self.instances:
self._find_instances()
return sorted(self.instances,
key=lambda x: self._sort_instance_list(x.name))
return sorted(self.instances, key=lambda x: self._sort_instance_list(x.name))
def _find_instances(self) -> None:
prefix = self.instance_type.__name__.lower()
@@ -131,12 +129,13 @@ class InstanceManager:
service_list = [
os.path.join(SYSTEMD, service)
for service in os.listdir(SYSTEMD)
if pattern.search(service)
and not any(s in service for s in excluded)]
if pattern.search(service) and not any(s in service for s in excluded)
]
instance_list = [
self.instance_type(name=self._get_instance_name(Path(service)))
for service in service_list]
for service in service_list
]
self.instances = instance_list

View File

@@ -17,14 +17,11 @@ from kiauh.utils.constants import COLOR_YELLOW, RESET_FORMAT
class AdvancedMenu(BaseMenu):
def __init__(self):
super().__init__(
header=True,
options={},
footer_type="back"
)
super().__init__(header=True, options={}, footer_type="back")
def print_menu(self):
menu = textwrap.dedent(f"""
menu = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_YELLOW}~~~~~~~~~~~~~ [ Advanced Menu ] ~~~~~~~~~~~~~{RESET_FORMAT} |
|-------------------------------------------------------|
@@ -36,5 +33,6 @@ class AdvancedMenu(BaseMenu):
| 2) [Flash only] | |
| 3) [Build + Flash] | Extras: |
| 4) [Get MCU ID] | 7) [G-Code Shell Command] |
""")[1:]
"""
)[1:]
print(menu, end="")

View File

@@ -15,8 +15,13 @@ import textwrap
from abc import abstractmethod, ABC
from typing import Dict, Any
from kiauh.utils.constants import COLOR_GREEN, COLOR_YELLOW, COLOR_RED, \
COLOR_CYAN, RESET_FORMAT
from kiauh.utils.constants import (
COLOR_GREEN,
COLOR_YELLOW,
COLOR_RED,
COLOR_CYAN,
RESET_FORMAT,
)
def clear():
@@ -24,58 +29,70 @@ def clear():
def print_header():
header = textwrap.dedent(f"""
header = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_CYAN}~~~~~~~~~~~~~~~~~ [ KIAUH ] ~~~~~~~~~~~~~~~~~{RESET_FORMAT} |
| {COLOR_CYAN} Klipper Installation And Update Helper {RESET_FORMAT} |
| {COLOR_CYAN}~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(header, end="")
def print_quit_footer():
footer = textwrap.dedent(f"""
footer = textwrap.dedent(
f"""
|-------------------------------------------------------|
| {COLOR_RED}Q) Quit{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(footer, end="")
def print_back_footer():
footer = textwrap.dedent(f"""
footer = textwrap.dedent(
f"""
|-------------------------------------------------------|
| {COLOR_GREEN}B) « Back{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(footer, end="")
def print_back_help_footer():
footer = textwrap.dedent(f"""
footer = textwrap.dedent(
f"""
|-------------------------------------------------------|
| {COLOR_GREEN}B) « Back{RESET_FORMAT} | {COLOR_RED}Q) Quit{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(footer, end="")
def print_back_quit_footer():
footer = textwrap.dedent(f"""
footer = textwrap.dedent(
f"""
|-------------------------------------------------------|
| {COLOR_GREEN}B) « Back{RESET_FORMAT} | {COLOR_YELLOW}H) Help [?]{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(footer, end="")
def print_back_quit_help_footer():
footer = textwrap.dedent(f"""
footer = textwrap.dedent(
f"""
|-------------------------------------------------------|
| {COLOR_GREEN}B) « Back{RESET_FORMAT} | {COLOR_RED}Q) Quit{RESET_FORMAT} | {COLOR_YELLOW}H) Help [?]{RESET_FORMAT} |
\=======================================================/
""")[1:]
"""
)[1:]
print(footer, end="")
@@ -86,8 +103,9 @@ class BaseMenu(ABC):
BACK_QUIT_FOOTER = "back_quit"
BACK_QUIT_HELP_FOOTER = "back_quit_help"
def __init__(self, options: Dict[int, Any], options_offset=0, header=True,
footer_type="quit"):
def __init__(
self, options: Dict[int, Any], options_offset=0, header=True, footer_type="quit"
):
self.options = options
self.options_offset = options_offset
self.header = header
@@ -95,8 +113,7 @@ class BaseMenu(ABC):
@abstractmethod
def print_menu(self):
raise NotImplementedError(
"Subclasses must implement the print_menu method")
raise NotImplementedError("Subclasses must implement the print_menu method")
def print_footer(self):
footer_type_map = {
@@ -104,10 +121,9 @@ class BaseMenu(ABC):
self.BACK_FOOTER: print_back_footer,
self.BACK_HELP_FOOTER: print_back_help_footer,
self.BACK_QUIT_FOOTER: print_back_quit_footer,
self.BACK_QUIT_HELP_FOOTER: print_back_quit_help_footer
self.BACK_QUIT_HELP_FOOTER: print_back_quit_help_footer,
}
footer_function = footer_type_map.get(self.footer_type,
print_quit_footer)
footer_function = footer_type_map.get(self.footer_type, print_quit_footer)
footer_function()
def display(self):
@@ -121,9 +137,11 @@ class BaseMenu(ABC):
while True:
choice = input(f"{COLOR_CYAN}###### Perform action: {RESET_FORMAT}")
error_msg = f"{COLOR_RED}Invalid input.{RESET_FORMAT}" \
if choice.isalpha() \
error_msg = (
f"{COLOR_RED}Invalid input.{RESET_FORMAT}"
if choice.isalpha()
else f"{COLOR_RED}Invalid input. Select a number between {min(self.options)} and {max(self.options)}.{RESET_FORMAT}"
)
if choice.isdigit() and 0 <= int(choice) < len(self.options):
return choice
@@ -133,10 +151,12 @@ class BaseMenu(ABC):
"back": ["b"],
"back_help": ["b", "h"],
"back_quit": ["b", "q"],
"back_quit_help": ["b", "q", "h"]
"back_quit_help": ["b", "q", "h"],
}
if self.footer_type in allowed_input and choice.lower() in \
allowed_input[self.footer_type]:
if (
self.footer_type in allowed_input
and choice.lower() in allowed_input[self.footer_type]
):
return choice
else:
print(error_msg)
@@ -169,7 +189,8 @@ class BaseMenu(ABC):
raise NotImplementedError(f"No implementation for option {choice}")
else:
raise TypeError(
f"Type {type(option)} of option {choice} not of type BaseMenu or Method")
f"Type {type(option)} of option {choice} not of type BaseMenu or Method"
)
def navigate_to_submenu(self, submenu_class):
submenu = submenu_class()

View File

@@ -32,13 +32,14 @@ class InstallMenu(BaseMenu):
8: self.install_obico,
9: self.install_octoeverywhere,
10: self.install_mobileraker,
11: self.install_crowsnest
11: self.install_crowsnest,
},
footer_type="back"
footer_type="back",
)
def print_menu(self):
menu = textwrap.dedent(f"""
menu = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_GREEN}~~~~~~~~~~~ [ Installation Menu ] ~~~~~~~~~~~{RESET_FORMAT} |
|-------------------------------------------------------|
@@ -56,7 +57,8 @@ class InstallMenu(BaseMenu):
| | Webcam Streamer: |
| Touchscreen GUI: | 11) [Crowsnest] |
| 5) [KlipperScreen] | |
""")[1:]
"""
)[1:]
print(menu, end="")
def install_klipper(self):

View File

@@ -31,13 +31,14 @@ class MainMenu(BaseMenu):
3: RemoveMenu,
4: AdvancedMenu,
5: None,
6: SettingsMenu
6: SettingsMenu,
},
footer_type="quit"
footer_type="quit",
)
def print_menu(self):
menu = textwrap.dedent(f"""
menu = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_CYAN}~~~~~~~~~~~~~~~ [ Main Menu ] ~~~~~~~~~~~~~~~{RESET_FORMAT} |
|-------------------------------------------------------|
@@ -58,7 +59,8 @@ class MainMenu(BaseMenu):
| | OctoEverywhere: <TODO> |
|-------------------------------------------------------|
| {COLOR_CYAN}KIAUH v6.0.0{RESET_FORMAT} | Changelog: {COLOR_MAGENTA}https://git.io/JnmlX{RESET_FORMAT} |
""")[1:]
"""
)[1:]
print(menu, end="")
def test(self):

View File

@@ -38,11 +38,12 @@ class RemoveMenu(BaseMenu):
14: self.remove_mobileraker,
15: self.remove_nginx,
},
footer_type="back"
footer_type="back",
)
def print_menu(self):
menu = textwrap.dedent(f"""
menu = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_RED}~~~~~~~~~~~~~~ [ Remove Menu ] ~~~~~~~~~~~~~~{RESET_FORMAT} |
|-------------------------------------------------------|
@@ -60,7 +61,8 @@ class RemoveMenu(BaseMenu):
| | 14) [Mobileraker] |
| Touchscreen GUI: | 15) [NGINX] |
| 7) [KlipperScreen] | |
""")[1:]
"""
)[1:]
print(menu, end="")
def remove_klipper(self):

View File

@@ -15,10 +15,7 @@ from kiauh.menus.base_menu import BaseMenu
# noinspection PyMethodMayBeStatic
class SettingsMenu(BaseMenu):
def __init__(self):
super().__init__(
header=True,
options={}
)
super().__init__(header=True, options={})
def print_menu(self):
print("self")

View File

@@ -35,11 +35,12 @@ class UpdateMenu(BaseMenu):
11: self.update_crowsnest,
12: self.upgrade_system_packages,
},
footer_type="back"
footer_type="back",
)
def print_menu(self):
menu = textwrap.dedent(f"""
menu = textwrap.dedent(
f"""
/=======================================================\\
| {COLOR_GREEN}~~~~~~~~~~~~~~ [ Update Menu ] ~~~~~~~~~~~~~~{RESET_FORMAT} |
|-------------------------------------------------------|
@@ -65,7 +66,8 @@ class UpdateMenu(BaseMenu):
| 11) [Crowsnest] | | |
| |-----------------------------|
| 12) [System] | | |
""")[1:]
"""
)[1:]
print(menu, end="")
def update_all(self):

View File

@@ -10,7 +10,6 @@
# ======================================================================= #
import os
import pwd
import shutil
import subprocess
from pathlib import Path
@@ -29,10 +28,12 @@ class Klipper(BaseInstance):
return ["None", "mcu"]
def __init__(self, name: str):
super().__init__(name=name,
super().__init__(
name=name,
prefix="klipper",
user=CURRENT_USER,
data_dir_name=self._get_data_dir_from_name(name))
data_dir_name=self._get_data_dir_from_name(name),
)
self.klipper_dir = KLIPPER_DIR
self.env_dir = KLIPPER_ENV_DIR
self.cfg_file = f"{self.cfg_dir}/printer.cfg"
@@ -43,26 +44,35 @@ class Klipper(BaseInstance):
def create(self) -> None:
Logger.print_info("Creating Klipper Instance")
module_path = os.path.dirname(os.path.abspath(__file__))
service_template_path = os.path.join(module_path, "res",
"klipper.service")
service_template_path = os.path.join(module_path, "res", "klipper.service")
env_template_file_path = os.path.join(module_path, "res", "klipper.env")
service_file_name = self.get_service_file_name(extension=True)
service_file_target = f"{SYSTEMD}/{service_file_name}"
env_file_target = os.path.abspath(f"{self.sysd_dir}/klipper.env")
# create folder structure
dirs = [self.data_dir, self.cfg_dir, self.log_dir,
self.comms_dir, self.sysd_dir]
dirs = [
self.data_dir,
self.cfg_dir,
self.log_dir,
self.comms_dir,
self.sysd_dir,
]
for _dir in dirs:
create_directory(Path(_dir))
try:
# writing the klipper service file (requires sudo!)
service_content = self._prep_service_file(service_template_path,
env_file_target)
service_content = self._prep_service_file(
service_template_path, env_file_target
)
command = ["sudo", "tee", service_file_target]
subprocess.run(command, input=service_content.encode(),
stdout=subprocess.DEVNULL, check=True)
subprocess.run(
command,
input=service_content.encode(),
stdout=subprocess.DEVNULL,
check=True,
)
Logger.print_ok(f"Service file created: {service_file_target}")
# writing the klipper.env file
@@ -73,11 +83,11 @@ class Klipper(BaseInstance):
except subprocess.CalledProcessError as e:
Logger.print_error(
f"Error creating service file {service_file_target}: {e}")
f"Error creating service file {service_file_target}: {e}"
)
raise
except OSError as e:
Logger.print_error(
f"Error creating env file {env_file_target}: {e}")
Logger.print_error(f"Error creating env file {env_file_target}: {e}")
raise
def read(self) -> None:
@@ -118,7 +128,7 @@ class Klipper(BaseInstance):
Logger.print_ok("Directories successfully deleted.")
def get_service_file_name(self, extension=False) -> str:
name = self.prefix if self.name is None else self.prefix + '-' + self.name
name = self.prefix if self.name is None else self.prefix + "-" + self.name
return name if not extension else f"{name}.service"
def _get_service_file_path(self):
@@ -138,11 +148,11 @@ class Klipper(BaseInstance):
template_content = template_file.read()
except FileNotFoundError:
Logger.print_error(
f"Unable to open {service_template_path} - File not found")
f"Unable to open {service_template_path} - File not found"
)
raise
service_content = template_content.replace("%USER%", self.user)
service_content = service_content.replace("%KLIPPER_DIR%",
self.klipper_dir)
service_content = service_content.replace("%KLIPPER_DIR%", self.klipper_dir)
service_content = service_content.replace("%ENV%", self.env_dir)
service_content = service_content.replace("%ENV_FILE%", env_file_path)
return service_content
@@ -153,10 +163,12 @@ class Klipper(BaseInstance):
env_template_file_content = env_file.read()
except FileNotFoundError:
Logger.print_error(
f"Unable to open {env_template_file_path} - File not found")
f"Unable to open {env_template_file_path} - File not found"
)
raise
env_file_content = env_template_file_content.replace("%KLIPPER_DIR%",
self.klipper_dir)
env_file_content = env_template_file_content.replace(
"%KLIPPER_DIR%", self.klipper_dir
)
env_file_content = env_file_content.replace("%CFG%", self.cfg_file)
env_file_content = env_file_content.replace("%SERIAL%", self.serial)
env_file_content = env_file_content.replace("%LOG%", self.log)

View File

@@ -19,15 +19,27 @@ from typing import Optional, List, Union
from kiauh.instance_manager.instance_manager import InstanceManager
from kiauh.modules.klipper.klipper import Klipper
from kiauh.modules.klipper.klipper_utils import print_instance_overview, print_missing_usergroup_dialog
from kiauh.modules.klipper.klipper_utils import (
print_instance_overview,
print_missing_usergroup_dialog,
)
from kiauh.utils.constants import CURRENT_USER, KLIPPER_DIR, KLIPPER_ENV_DIR
from kiauh.utils.input_utils import get_user_confirm, get_user_number_input, \
get_user_string_input, get_user_selection_input
from kiauh.utils.input_utils import (
get_user_confirm,
get_user_number_input,
get_user_string_input,
get_user_selection_input,
)
from kiauh.utils.logger import Logger
from kiauh.utils.system_utils import parse_packages_from_file, \
clone_repo, create_python_venv, \
install_python_requirements, update_system_package_lists, \
install_system_packages, mask_system_service
from kiauh.utils.system_utils import (
parse_packages_from_file,
clone_repo,
create_python_venv,
install_python_requirements,
update_system_package_lists,
install_system_packages,
mask_system_service,
)
def run_klipper_setup(install: bool) -> None:
@@ -81,8 +93,8 @@ def install_klipper(instance_manager: InstanceManager) -> None:
instance_list = instance_manager.get_instances()
if_adding = " additional" if len(instance_list) > 0 else ""
install_count = get_user_number_input(
f"Number of{if_adding} Klipper instances to set up",
1, default=1)
f"Number of{if_adding} Klipper instances to set up", 1, default=1
)
instance_names = set_instance_names(instance_list, install_count)
@@ -132,8 +144,7 @@ def install_klipper_packages(klipper_dir: Path) -> None:
install_system_packages(packages)
def set_instance_names(instance_list, install_count: int) -> List[
Union[str, None]]:
def set_instance_names(instance_list, install_count: int) -> List[Union[str, None]]:
instance_count = len(instance_list)
# default single instance install
@@ -141,9 +152,11 @@ def set_instance_names(instance_list, install_count: int) -> List[
return [None]
# new multi instance install
elif ((instance_count == 0 and install_count > 1)
elif (
(instance_count == 0 and install_count > 1)
# or convert single instance install to multi instance install
or (instance_count == 1 and install_count >= 1)):
or (instance_count == 1 and install_count >= 1)
):
if get_user_confirm("Assign custom names?", False):
return assign_custom_names(instance_count, install_count, None)
else:
@@ -153,8 +166,7 @@ def set_instance_names(instance_list, install_count: int) -> List[
# existing multi instance install
elif instance_count > 1:
if has_custom_names(instance_list):
return assign_custom_names(instance_count, install_count,
instance_list)
return assign_custom_names(instance_count, install_count, instance_list)
else:
start = get_highest_index(instance_list) + 1
_range = range(start, start + install_count)
@@ -170,8 +182,9 @@ def has_custom_names(instance_list: List[Klipper]) -> bool:
return False
def assign_custom_names(instance_count: int, install_count: int,
instance_list: Optional[List[Klipper]]) -> List[str]:
def assign_custom_names(
instance_count: int, install_count: int, instance_list: Optional[List[Klipper]]
) -> List[str]:
instance_names = []
exclude = Klipper.blacklist()
@@ -190,7 +203,7 @@ def assign_custom_names(instance_count: int, install_count: int,
def get_highest_index(instance_list: List[Klipper]) -> int:
indices = [int(instance.name.split('-')[-1]) for instance in instance_list]
indices = [int(instance.name.split("-")[-1]) for instance in instance_list]
return max(indices)
@@ -209,14 +222,12 @@ def remove_single_instance(instance_manager: InstanceManager) -> None:
def remove_multi_instance(instance_manager: InstanceManager) -> None:
instance_list = instance_manager.get_instances()
print_instance_overview(instance_list, show_index=True,
show_select_all=True)
print_instance_overview(instance_list, show_index=True, show_select_all=True)
options = [str(i) for i in range(len(instance_list))]
options.extend(["a", "A", "b", "B"])
selection = get_user_selection_input(
"Select Klipper instance to remove", options)
selection = get_user_selection_input("Select Klipper instance to remove", options)
print(selection)
if selection == "b".lower():
@@ -231,7 +242,8 @@ def remove_multi_instance(instance_manager: InstanceManager) -> None:
else:
instance = instance_list[int(selection)]
Logger.print_info(
f"Removing Klipper instance: {instance.get_service_file_name()}")
f"Removing Klipper instance: {instance.get_service_file_name()}"
)
instance_manager.set_current_instance(instance)
instance_manager.stop_instance()
instance_manager.disable_instance()
@@ -254,7 +266,9 @@ def check_user_groups():
print_missing_usergroup_dialog(missing_groups)
if not get_user_confirm(f"Add user '{CURRENT_USER}' to group(s) now?"):
Logger.warn("Skipped adding user to required groups. You might encounter issues.")
Logger.warn(
"Skipped adding user to required groups. You might encounter issues."
)
return
try:
@@ -267,13 +281,19 @@ def check_user_groups():
Logger.print_error(f"Unable to add user to usergroups: {e}")
raise
Logger.print_warn("Remember to relog/restart this machine for the group(s) to be applied!")
Logger.print_warn(
"Remember to relog/restart this machine for the group(s) to be applied!"
)
def handle_disruptive_system_packages() -> None:
services = []
brltty_status = subprocess.run(["systemctl", "is-enabled", "brltty"], capture_output=True, text=True)
modem_manager_status = subprocess.run(["systemctl", "is-enabled", "ModemManager"], capture_output=True, text=True)
brltty_status = subprocess.run(
["systemctl", "is-enabled", "brltty"], capture_output=True, text=True
)
modem_manager_status = subprocess.run(
["systemctl", "is-enabled", "ModemManager"], capture_output=True, text=True
)
if "enabled" in brltty_status.stdout:
services.append("brltty")
@@ -282,13 +302,17 @@ def handle_disruptive_system_packages() -> None:
for service in services if services else []:
try:
Logger.print_info(f"{service} service detected! Masking {service} service ...")
Logger.print_info(
f"{service} service detected! Masking {service} service ..."
)
mask_system_service(service)
Logger.print_ok(f"{service} service masked!")
except subprocess.CalledProcessError:
warn_msg = textwrap.dedent(f"""
warn_msg = textwrap.dedent(
f"""
KIAUH was unable to mask the {service} system service.
Please fix the problem manually. Otherwise, this may have
undesirable effects on the operation of Klipper.
""")[1:]
"""
)[1:]
Logger.print_warn(warn_msg)

View File

@@ -13,12 +13,12 @@ from typing import List
from kiauh.instance_manager.base_instance import BaseInstance
from kiauh.menus.base_menu import print_back_footer
from kiauh.utils.constants import COLOR_GREEN, COLOR_CYAN, COLOR_YELLOW, \
RESET_FORMAT
from kiauh.utils.constants import COLOR_GREEN, COLOR_CYAN, COLOR_YELLOW, RESET_FORMAT
def print_instance_overview(instances: List[BaseInstance], show_index=False,
show_select_all=False):
def print_instance_overview(
instances: List[BaseInstance], show_index=False, show_select_all=False
):
headline = f"{COLOR_GREEN}The following Klipper instances were found:{RESET_FORMAT}"
print("/=======================================================\\")
@@ -38,13 +38,20 @@ def print_instance_overview(instances: List[BaseInstance], show_index=False,
print_back_footer()
def print_missing_usergroup_dialog(missing_groups) -> None:
print("/=======================================================\\")
print(f"| {COLOR_YELLOW}WARNING: Your current user is not in group:{RESET_FORMAT} |")
print(
f"| {COLOR_YELLOW}WARNING: Your current user is not in group:{RESET_FORMAT} |"
)
if "tty" in missing_groups:
print(f"| {COLOR_CYAN}● tty{RESET_FORMAT} |")
print(
f"| {COLOR_CYAN}● tty{RESET_FORMAT} |"
)
if "dialout" in missing_groups:
print(f"| {COLOR_CYAN}● dialout{RESET_FORMAT} |")
print(
f"| {COLOR_CYAN}● dialout{RESET_FORMAT} |"
)
print("| |")
print("| It is possible that you won't be able to successfully |")
print("| connect and/or flash the controller board without |")
@@ -52,6 +59,10 @@ def print_missing_usergroup_dialog(missing_groups) -> None:
print("| If you want to add the current user to the group(s) |")
print("| listed above, answer with 'Y'. Else skip with 'n'. |")
print("| |")
print(f"| {COLOR_YELLOW}INFO:{RESET_FORMAT} |")
print(f"| {COLOR_YELLOW}Relog required for group assignments to take effect!{RESET_FORMAT} |")
print(
f"| {COLOR_YELLOW}INFO:{RESET_FORMAT} |"
)
print(
f"| {COLOR_YELLOW}Relog required for group assignments to take effect!{RESET_FORMAT} |"
)
print("\\=======================================================/")

View File

@@ -11,8 +11,8 @@
from typing import Optional, List
from kiauh.utils.logger import Logger
from kiauh.utils.constants import COLOR_CYAN, RESET_FORMAT
from kiauh.utils.logger import Logger
def get_user_confirm(question: str, default_choice=True) -> bool:
@@ -30,7 +30,8 @@ def get_user_confirm(question: str, default_choice=True) -> bool:
choice = (
input(f"{COLOR_CYAN}###### {question} {def_choice} {RESET_FORMAT}")
.strip()
.lower())
.lower()
)
if choice in options_confirm:
return True
@@ -40,8 +41,9 @@ def get_user_confirm(question: str, default_choice=True) -> bool:
Logger.print_error("Invalid choice. Please select 'y' or 'n'.")
def get_user_number_input(question: str, min_count: int, max_count=None,
default=None) -> int:
def get_user_number_input(
question: str, min_count: int, max_count=None, default=None
) -> int:
_question = question + f" (default={default})" if default else question
_question = f"{COLOR_CYAN}###### {_question}: {RESET_FORMAT}"
while True:
@@ -65,8 +67,7 @@ def get_user_number_input(question: str, min_count: int, max_count=None,
def get_user_string_input(question: str, exclude=Optional[List]) -> str:
while True:
_input = (input(f"{COLOR_CYAN}###### {question}: {RESET_FORMAT}")
.strip())
_input = input(f"{COLOR_CYAN}###### {question}: {RESET_FORMAT}").strip()
if _input.isalnum() and _input not in exclude:
return _input
@@ -78,8 +79,7 @@ def get_user_string_input(question: str, exclude=Optional[List]) -> str:
def get_user_selection_input(question: str, option_list: List) -> str:
while True:
_input = (input(f"{COLOR_CYAN}###### {question}: {RESET_FORMAT}")
.strip())
_input = input(f"{COLOR_CYAN}###### {question}: {RESET_FORMAT}").strip()
if _input in option_list:
return _input

View File

@@ -9,12 +9,16 @@
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from kiauh.utils.constants import COLOR_GREEN, COLOR_YELLOW, COLOR_RED, \
COLOR_MAGENTA, RESET_FORMAT
from kiauh.utils.constants import (
COLOR_GREEN,
COLOR_YELLOW,
COLOR_RED,
COLOR_MAGENTA,
RESET_FORMAT,
)
class Logger:
@staticmethod
def info(msg):
# log to kiauh.log

View File

@@ -57,7 +57,8 @@ def clone_repo(target_dir: Path, url: str, branch: str) -> None:
print("Error cloning repository:", e.output.decode())
else:
overwrite_target = get_user_confirm(
"Target directory already exists. Overwrite?")
"Target directory already exists. Overwrite?"
)
if overwrite_target:
try:
shutil.rmtree(target_dir)
@@ -71,13 +72,13 @@ def clone_repo(target_dir: Path, url: str, branch: str) -> None:
def parse_packages_from_file(source_file) -> List[str]:
packages = []
print("Reading dependencies...")
with open(source_file, 'r') as file:
with open(source_file, "r") as file:
for line in file:
line = line.strip()
if line.startswith("PKGLIST="):
line = line.replace("\"", "")
line = line.replace('"', "")
line = line.replace("PKGLIST=", "")
line = line.replace('${PKGLIST}', '')
line = line.replace("${PKGLIST}", "")
packages.extend(line.split())
return packages
@@ -97,16 +98,15 @@ def create_python_venv(target: Path) -> None:
except subprocess.CalledProcessError as e:
print("Error setting up virtualenv:", e.output.decode())
else:
overwrite_venv = get_user_confirm(
"Virtualenv already exists. Re-create?")
overwrite_venv = get_user_confirm("Virtualenv already exists. Re-create?")
if overwrite_venv:
try:
shutil.rmtree(target)
create_python_venv(target)
except OSError as e:
Logger.print_error(
f"Error removing existing virtualenv: {e.strerror}",
False)
f"Error removing existing virtualenv: {e.strerror}", False
)
else:
print("Skipping re-creation of virtualenv ...")
@@ -144,10 +144,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
cache_mtime = 0
cache_files = [
"/var/lib/apt/periodic/update-success-stamp",
"/var/lib/apt/lists"
]
cache_files = ["/var/lib/apt/periodic/update-success-stamp", "/var/lib/apt/lists"]
for cache_file in cache_files:
if Path(cache_file).exists():
cache_mtime = max(cache_mtime, os.path.getmtime(cache_file))
@@ -196,8 +193,7 @@ def create_directory(_dir: Path) -> None:
os.makedirs(_dir, exist_ok=True)
Logger.print_ok("Directory created!")
else:
Logger.print_info(
f"Directory already exists: {_dir}\nSkip creation ...")
Logger.print_info(f"Directory already exists: {_dir}\nSkip creation ...")
except OSError as e:
Logger.print_error(f"Error creating folder: {e}")
raise
@@ -208,5 +204,7 @@ def mask_system_service(service_name: str) -> None:
command = ["sudo", "systemctl", "mask", service_name]
subprocess.run(command, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as e:
Logger.print_error(f"Unable to mask system service {service_name}: {e.stderr.decode()}")
Logger.print_error(
f"Unable to mask system service {service_name}: {e.stderr.decode()}"
)
raise

13
pyproject.toml Normal file
View File

@@ -0,0 +1,13 @@
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
exclude = '''
(
\.git/
| \.github/
| docs/
| resources/
| scripts/
)
'''