feat: KIAUH v6 - full rewrite of KIAUH in Python (#428)

This commit is contained in:
dw-0
2024-08-31 19:16:52 +02:00
committed by GitHub
parent 8547942986
commit 0ee0fa3325
159 changed files with 13461 additions and 54 deletions

12
kiauh/utils/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parent

177
kiauh/utils/common.py Normal file
View File

@@ -0,0 +1,177 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional, Set
from components.klipper.klipper import Klipper
from core.constants import (
COLOR_CYAN,
GLOBAL_DEPS,
PRINTER_CFG_BACKUP_DIR,
RESET_FORMAT,
)
from core.logger import DialogType, Logger
from core.types import ComponentStatus, StatusCode
from utils.git_utils import (
get_local_commit,
get_local_tags,
get_remote_commit,
get_repo_name,
)
from utils.instance_utils import get_instances
from utils.sys_utils import (
check_package_install,
install_system_packages,
update_system_package_lists,
)
def get_kiauh_version() -> str:
"""
Helper method to get the current KIAUH version by reading the latest tag
:return: string of the latest tag
"""
return get_local_tags(Path(__file__).parent.parent)[-1]
def convert_camelcase_to_kebabcase(name: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "-", name).lower()
def get_current_date() -> Dict[Literal["date", "time"], str]:
"""
Get the current date |
:return: Dict holding a date and time key:value pair
"""
now: datetime = datetime.today()
date: str = now.strftime("%Y%m%d")
time: str = now.strftime("%H%M%S")
return {"date": date, "time": time}
def check_install_dependencies(
deps: Set[str] | None = None, include_global: bool = True
) -> None:
"""
Common helper method to check if dependencies are installed
and if not, install them automatically |
:param include_global: Wether to include the global dependencies or not
:param deps: List of strings of package names to check if installed
:return: None
"""
if deps is None:
deps = set()
if include_global:
deps.update(GLOBAL_DEPS)
requirements = check_package_install(deps)
if requirements:
Logger.print_status("Installing dependencies ...")
Logger.print_info("The following packages need installation:")
for r in requirements:
print(f"{COLOR_CYAN}{r}{RESET_FORMAT}")
update_system_package_lists(silent=False)
install_system_packages(requirements)
def get_install_status(
repo_dir: Path,
env_dir: Optional[Path] = None,
instance_type: type | None = None,
files: Optional[List[Path]] = None,
) -> ComponentStatus:
"""
Helper method to get the installation status of software components
:param repo_dir: the repository directory
:param env_dir: the python environment directory
:param instance_type: The component type
:param files: List of optional files to check for existence
:return: Dictionary with status string, statuscode and instance count
"""
from utils.instance_utils import get_instances
checks = [repo_dir.exists()]
if env_dir is not None:
checks.append(env_dir.exists())
instances = 0
if instance_type is not None:
instances = len(get_instances(instance_type))
checks.append(instances > 0)
if files is not None:
for f in files:
checks.append(f.exists())
status: StatusCode
if all(checks):
status = 2 # installed
elif not any(checks):
status = 0 # not installed
else:
status = 1 # incomplete
return ComponentStatus(
status=status,
instances=instances,
repo=get_repo_name(repo_dir),
local=get_local_commit(repo_dir),
remote=get_remote_commit(repo_dir),
)
def backup_printer_config_dir() -> None:
# local import to prevent circular import
from core.backup_manager.backup_manager import BackupManager
instances: List[Klipper] = get_instances(Klipper)
bm = BackupManager()
for instance in instances:
name = f"config-{instance.data_dir.name}"
bm.backup_directory(
name,
source=instance.base.cfg_dir,
target=PRINTER_CFG_BACKUP_DIR,
)
def moonraker_exists(name: str = "") -> bool:
"""
Helper method to check if a Moonraker instance exists
:param name: Optional name of an installer where the check is performed
:return: True if at least one Moonraker instance exists, False otherwise
"""
from components.moonraker.moonraker import Moonraker
mr_instances: List[Moonraker] = get_instances(Moonraker)
info = (
f"{name} requires Moonraker to be installed"
if name
else "A Moonraker installation is required"
)
if not mr_instances:
Logger.print_dialog(
DialogType.WARNING,
[
"No Moonraker instances found!",
f"{info}. Please install Moonraker first!",
],
)
return False
return True

View File

@@ -0,0 +1,89 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import List, Tuple
from core.instance_type import InstanceType
from core.logger import Logger
from core.submodules.simple_config_parser.src.simple_config_parser.simple_config_parser import (
SimpleConfigParser,
)
ConfigOption = Tuple[str, str]
def add_config_section(
section: str,
instances: List[InstanceType],
options: List[ConfigOption] | None = None,
) -> None:
for instance in instances:
cfg_file = instance.cfg_file
Logger.print_status(f"Add section '[{section}]' to '{cfg_file}' ...")
if not Path(cfg_file).exists():
Logger.print_warn(f"'{cfg_file}' not found!")
continue
scp = SimpleConfigParser()
scp.read(cfg_file)
if scp.has_section(section):
Logger.print_info("Section already exist. Skipped ...")
continue
scp.add_section(section)
if options is not None:
for option in reversed(options):
scp.set(section, option[0], option[1])
scp.write(cfg_file)
def add_config_section_at_top(section: str, instances: List[InstanceType]) -> None:
# TODO: this could be implemented natively in SimpleConfigParser
for instance in instances:
tmp_cfg = tempfile.NamedTemporaryFile(mode="w", delete=False)
tmp_cfg_path = Path(tmp_cfg.name)
scp = SimpleConfigParser()
scp.read(tmp_cfg_path)
scp.add_section(section)
scp.write(tmp_cfg_path)
tmp_cfg.close()
cfg_file = instance.cfg_file
with open(cfg_file, "r") as org:
org_content = org.readlines()
with open(tmp_cfg_path, "a") as tmp:
tmp.writelines(org_content)
cfg_file.unlink()
tmp_cfg_path.rename(cfg_file)
def remove_config_section(section: str, instances: List[InstanceType]) -> None:
for instance in instances:
cfg_file = instance.cfg_file
Logger.print_status(f"Remove section '[{section}]' from '{cfg_file}' ...")
if not Path(cfg_file).exists():
Logger.print_warn(f"'{cfg_file}' not found!")
continue
scp = SimpleConfigParser()
scp.read(cfg_file)
if not scp.has_section(section):
Logger.print_info("Section does not exist. Skipped ...")
continue
scp.remove_section(section)
scp.write(cfg_file)

142
kiauh/utils/fs_utils.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
import shutil
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError, check_output, run
from typing import List
from zipfile import ZipFile
from core.decorators import deprecated
from core.logger import Logger
def check_file_exist(file_path: Path, sudo=False) -> bool:
"""
Helper function for checking the existence of a file |
:param file_path: the absolute path of the file to check
:param sudo: use sudo if required
:return: True, if file exists, otherwise False
"""
if sudo:
try:
command = ["sudo", "find", file_path.as_posix()]
check_output(command, stderr=DEVNULL)
return True
except CalledProcessError:
return False
else:
if file_path.exists():
return True
else:
return False
def create_symlink(source: Path, target: Path, sudo=False) -> None:
try:
cmd = ["ln", "-sf", source.as_posix(), target.as_posix()]
if sudo:
cmd.insert(0, "sudo")
run(cmd, stderr=PIPE, check=True)
except CalledProcessError as e:
Logger.print_error(f"Failed to create symlink: {e}")
raise
def remove_with_sudo(file: Path) -> None:
try:
cmd = ["sudo", "rm", "-rf", file.as_posix()]
run(cmd, stderr=PIPE, check=True)
except CalledProcessError as e:
Logger.print_error(f"Failed to remove {file}: {e}")
raise
@deprecated(info="Use remove_with_sudo instead", replaced_by=remove_with_sudo)
def remove_file(file_path: Path, sudo=False) -> None:
try:
cmd = f"{'sudo ' if sudo else ''}rm -f {file_path}"
run(cmd, stderr=PIPE, check=True, shell=True)
except CalledProcessError as e:
log = f"Cannot remove file {file_path}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def run_remove_routines(file: Path) -> None:
try:
if not file.is_symlink() and not file.exists():
Logger.print_info(f"File '{file}' does not exist. Skipped ...")
return
if file.is_dir():
shutil.rmtree(file)
elif file.is_file() or file.is_symlink():
file.unlink()
else:
raise OSError(f"File '{file}' is neither a file nor a directory!")
Logger.print_ok(f"File '{file}' was successfully removed!")
except OSError as e:
Logger.print_error(f"Unable to delete '{file}':\n{e}")
try:
Logger.print_info("Trying to remove with sudo ...")
remove_with_sudo(file)
Logger.print_ok(f"File '{file}' was successfully removed!")
except CalledProcessError as e:
Logger.print_error(f"Error deleting '{file}' with sudo:\n{e}")
Logger.print_error("Remove this directory manually!")
def unzip(filepath: Path, target_dir: Path) -> None:
"""
Helper function to unzip a zip-archive into a target directory |
:param filepath: the path to the zip-file to unzip
:param target_dir: the target directory to extract the files into
:return: None
"""
with ZipFile(filepath, "r") as _zip:
_zip.extractall(target_dir)
def create_folders(dirs: List[Path]) -> None:
try:
for _dir in dirs:
if _dir.exists():
continue
_dir.mkdir(exist_ok=True)
Logger.print_ok(f"Created directory '{_dir}'!")
except OSError as e:
Logger.print_error(f"Error creating directories: {e}")
raise
def get_data_dir(instance_type: type, suffix: str) -> Path:
from utils.sys_utils import get_service_file_path
# if the service file exists, we read the data dir path from it
# this also ensures compatibility with pre v6.0.0 instances
service_file_path: Path = get_service_file_path(instance_type, suffix)
if service_file_path and service_file_path.exists():
with open(service_file_path, "r") as service_file:
lines = service_file.readlines()
for line in lines:
pattern = r"^EnvironmentFile=(.+)(/systemd/.+\.env)"
match = re.search(pattern, line)
if match:
return Path(match.group(1))
if suffix != "":
# this is the new data dir naming scheme introduced in v6.0.0
return Path.home().joinpath(f"printer_{suffix}_data")
return Path.home().joinpath("printer_data")

289
kiauh/utils/git_utils.py Normal file
View File

@@ -0,0 +1,289 @@
from __future__ import annotations
import json
import shutil
import urllib.request
from http.client import HTTPResponse
from json import JSONDecodeError
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError, check_output, run
from typing import List, Type
from core.instance_manager.instance_manager import InstanceManager
from core.instance_type import InstanceType
from core.logger import Logger
from utils.input_utils import get_confirm, get_number_input
from utils.instance_utils import get_instances
def git_clone_wrapper(
repo: str, target_dir: Path, branch: str | None = None, force: bool = False
) -> None:
"""
Clones a repository from the given URL and checks out the specified branch if given.
:param repo: The URL of the repository to clone.
:param branch: The branch to check out. If None, the default branch will be checked out.
:param target_dir: The directory where the repository will be cloned.
:param force: Force the cloning of the repository even if it already exists.
:return: None
"""
log = f"Cloning repository from '{repo}'"
Logger.print_status(log)
try:
if Path(target_dir).exists():
question = f"'{target_dir}' already exists. Overwrite?"
if not force and not get_confirm(question, default_choice=False):
Logger.print_info("Skip cloning of repository ...")
return
shutil.rmtree(target_dir)
git_cmd_clone(repo, target_dir)
git_cmd_checkout(branch, target_dir)
except CalledProcessError:
log = "An unexpected error occured during cloning of the repository."
Logger.print_error(log)
return
except OSError as e:
Logger.print_error(f"Error removing existing repository: {e.strerror}")
return
def git_pull_wrapper(repo: str, target_dir: Path) -> None:
"""
A function that updates a repository using git pull.
:param repo: The repository to update.
:param target_dir: The directory of the repository.
:return: None
"""
Logger.print_status(f"Updating repository '{repo}' ...")
try:
git_cmd_pull(target_dir)
except CalledProcessError:
log = "An unexpected error occured during updating the repository."
Logger.print_error(log)
return
def get_repo_name(repo: Path) -> str | None:
"""
Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from
:return: String in form of "<orga>/<name>" or None
"""
if not repo.exists() or not repo.joinpath(".git").exists():
return "-"
try:
cmd = ["git", "-C", repo.as_posix(), "config", "--get", "remote.origin.url"]
result: str = check_output(cmd, stderr=DEVNULL).decode(encoding="utf-8")
substrings: List[str] = result.strip().split("/")[-2:]
return "/".join(substrings).replace(".git", "")
except CalledProcessError:
return None
def get_local_tags(repo_path: Path, _filter: str | None = None) -> List[str]:
"""
Get all tags of a local Git repository
:param repo_path: Path to the local Git repository
:param _filter: Optional filter to filter the tags by
:return: List of tags
"""
try:
cmd = ["git", "tag", "-l"]
if _filter is not None:
cmd.append(f"'${_filter}'")
result: str = check_output(
cmd,
stderr=DEVNULL,
cwd=repo_path.as_posix(),
).decode(encoding="utf-8")
tags = result.split("\n")
return tags[:-1]
except CalledProcessError:
return []
def get_remote_tags(repo_path: str) -> List[str]:
"""
Gets the tags of a GitHub repostiory
:param repo_path: path of the GitHub repository - e.g. `<owner>/<name>`
:return: List of tags
"""
try:
url = f"https://api.github.com/repos/{repo_path}/tags"
with urllib.request.urlopen(url) as r:
response: HTTPResponse = r
if response.getcode() != 200:
Logger.print_error(
f"Error retrieving tags: HTTP status code {response.getcode()}"
)
return []
data = json.loads(response.read())
return [item["name"] for item in data]
except (JSONDecodeError, TypeError) as e:
Logger.print_error(f"Error while processing the response: {e}")
raise
def get_latest_remote_tag(repo_path: str) -> str:
"""
Gets the latest stable tag of a GitHub repostiory
:param repo_path: path of the GitHub repository - e.g. `<owner>/<name>`
:return: tag or empty string
"""
try:
if len(latest_tag := get_remote_tags(repo_path)) > 0:
return latest_tag[0]
else:
return ""
except Exception:
raise
def get_latest_unstable_tag(repo_path: str) -> str:
"""
Gets the latest unstable (alpha, beta, rc) tag of a GitHub repository
:param repo_path: path of the GitHub repository - e.g. `<owner>/<name>`
:return: tag or empty string
"""
try:
if (
len(unstable_tags := [t for t in get_remote_tags(repo_path) if "-" in t])
> 0
):
return unstable_tags[0]
else:
return ""
except Exception:
Logger.print_error("Error while getting the latest unstable tag")
raise
def compare_semver_tags(tag1: str, tag2: str) -> bool:
"""
Compare two semver version strings.
Does not support comparing pre-release versions (e.g. 1.0.0-rc.1, 1.0.0-beta.1)
:param tag1: First version string
:param tag2: Second version string
:return: True if tag1 is greater than tag2, False otherwise
"""
if tag1 == tag2:
return False
def parse_version(v):
return list(map(int, v[1:].split(".")))
tag1_parts = parse_version(tag1)
tag2_parts = parse_version(tag2)
max_len = max(len(tag1_parts), len(tag2_parts))
tag1_parts += [0] * (max_len - len(tag1_parts))
tag2_parts += [0] * (max_len - len(tag2_parts))
for part1, part2 in zip(tag1_parts, tag2_parts):
if part1 != part2:
return part1 > part2
return False
def get_local_commit(repo: Path) -> str | None:
if not repo.exists() or not repo.joinpath(".git").exists():
return None
try:
cmd = f"cd {repo} && git describe HEAD --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return None
def get_remote_commit(repo: Path) -> str | None:
if not repo.exists() or not repo.joinpath(".git").exists():
return None
try:
# get locally checked out branch
branch_cmd = f"cd {repo} && git branch | grep -E '\*'"
branch = check_output(branch_cmd, shell=True, text=True)
branch = branch.split("*")[-1].strip()
cmd = f"cd {repo} && git describe 'origin/{branch}' --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return None
def git_cmd_clone(repo: str, target_dir: Path) -> None:
try:
command = ["git", "clone", repo, target_dir.as_posix()]
run(command, check=True)
Logger.print_ok("Clone successful!")
except CalledProcessError as e:
error = e.stderr.decode() if e.stderr else "Unknown error"
log = f"Error cloning repository {repo}: {error}"
Logger.print_error(log)
raise
def git_cmd_checkout(branch: str | None, target_dir: Path) -> None:
if branch is None:
return
try:
command = ["git", "checkout", f"{branch}"]
run(command, cwd=target_dir, check=True)
Logger.print_ok("Checkout successful!")
except CalledProcessError as e:
log = f"Error checking out branch {branch}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def git_cmd_pull(target_dir: Path) -> None:
try:
command = ["git", "pull"]
run(command, cwd=target_dir, check=True)
except CalledProcessError as e:
log = f"Error on git pull: {e.stderr.decode()}"
Logger.print_error(log)
raise
def rollback_repository(repo_dir: Path, instance: Type[InstanceType]) -> None:
q1 = "How many commits do you want to roll back"
amount = get_number_input(q1, 1, allow_go_back=True)
instances = get_instances(instance)
Logger.print_warn("Do not continue if you have ongoing prints!", start="\n")
Logger.print_warn(
f"All currently running {instance.__name__} services will be stopped!"
)
if not get_confirm(
f"Roll back {amount} commit{'s' if amount > 1 else ''}",
default_choice=False,
allow_go_back=True,
):
Logger.print_info("Aborting roll back ...")
return
InstanceManager.stop_all(instances)
try:
cmd = ["git", "reset", "--hard", f"HEAD~{amount}"]
run(cmd, cwd=repo_dir, check=True, stdout=PIPE, stderr=PIPE)
Logger.print_ok(f"Rolled back {amount} commits!", start="\n")
except CalledProcessError as e:
Logger.print_error(f"An error occured during repo rollback:\n{e}")
InstanceManager.start_all(instances)

172
kiauh/utils/input_utils.py Normal file
View File

@@ -0,0 +1,172 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from typing import Dict, List
from core.constants import COLOR_CYAN, INVALID_CHOICE, RESET_FORMAT
from core.logger import Logger
def get_confirm(question: str, default_choice=True, allow_go_back=False) -> bool | None:
"""
Helper method for validating confirmation (yes/no) user input. |
:param question: The question to display
:param default_choice: A default if input was submitted without input
:param allow_go_back: Navigate back to a previous dialog
:return: Either True or False, or None on go_back
"""
options_confirm = ["y", "yes"]
options_decline = ["n", "no"]
options_go_back = ["b", "B"]
if default_choice:
def_choice = "(Y/n)"
options_confirm.append("")
else:
def_choice = "(y/N)"
options_decline.append("")
while True:
choice = (
input(format_question(question + f" {def_choice}", None)).strip().lower()
)
if choice in options_confirm:
return True
elif choice in options_decline:
return False
elif allow_go_back and choice in options_go_back:
return None
else:
Logger.print_error(INVALID_CHOICE)
def get_number_input(
question: str,
min_count: int,
max_count: int | None = None,
default: int | None = None,
allow_go_back: bool = False,
) -> int | None:
"""
Helper method to get a number input from the user
:param question: The question to display
:param min_count: The lowest allowed value
:param max_count: The highest allowed value (or None)
:param default: Optional default value
:param allow_go_back: Navigate back to a previous dialog
:return: Either the validated number input, or None on go_back
"""
options_go_back = ["b", "B"]
_question = format_question(question, default)
while True:
_input = input(_question)
if allow_go_back and _input in options_go_back:
return None
if _input == "" and default is not None:
return default
try:
return validate_number_input(_input, min_count, max_count)
except ValueError:
Logger.print_error(INVALID_CHOICE)
def get_string_input(
question: str,
regex: str | None = None,
exclude: List[str] | None = None,
allow_special_chars: bool = False,
default: str | None = None,
) -> str:
"""
Helper method to get a string input from the user
:param question: The question to display
:param regex: An optional regex pattern to validate the input against
:param exclude: List of strings which are not allowed
:param allow_special_chars: Wheter to allow special characters in the input
:param default: Optional default value
:return: The validated string value
"""
_exclude = [] if exclude is None else exclude
_question = format_question(question, default)
_pattern = re.compile(regex) if regex is not None else None
while True:
_input = input(_question)
if _input.lower() in _exclude:
Logger.print_error("This value is already in use/reserved.")
elif default is not None and _input == "":
return default
elif _pattern is not None and _pattern.match(_input):
return _input
elif allow_special_chars:
return _input
elif not allow_special_chars and _input.isalnum():
return _input
else:
Logger.print_error(INVALID_CHOICE)
def get_selection_input(question: str, option_list: List | Dict, default=None) -> str:
"""
Helper method to get a selection from a list of options from the user
:param question: The question to display
:param option_list: The list of options the user can select from
:param default: Optional default value
:return: The option that was selected by the user
"""
while True:
_input = input(format_question(question, default)).strip().lower()
if isinstance(option_list, list):
if _input in option_list:
return _input
elif isinstance(option_list, dict):
if _input in option_list.keys():
return _input
else:
raise ValueError("Invalid option_list type")
Logger.print_error(INVALID_CHOICE)
def format_question(question: str, default=None) -> str:
"""
Helper method to have a standardized formatting of questions |
:param question: The question to display
:param default: If defined, the default option will be displayed to the user
:return: The formatted question string
"""
formatted_q = question
if default is not None:
formatted_q += f" (default={default})"
return f"{COLOR_CYAN}###### {formatted_q}: {RESET_FORMAT}"
def validate_number_input(value: str, min_count: int, max_count: int | None) -> int:
"""
Helper method for a simple number input validation. |
:param value: The value to validate
:param min_count: The lowest allowed value
:param max_count: The highest allowed value (or None)
:return: The validated value as Integer
:raises: ValueError if value is invalid
"""
if max_count is not None:
if min_count <= int(value) <= max_count:
return int(value)
elif int(value) >= min_count:
return int(value)
raise ValueError

View File

@@ -0,0 +1,56 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import re
from pathlib import Path
from typing import List
from core.constants import SYSTEMD
from core.instance_manager.base_instance import SUFFIX_BLACKLIST
from core.instance_type import InstanceType
def get_instances(instance_type: type) -> List[InstanceType]:
from utils.common import convert_camelcase_to_kebabcase
if not isinstance(instance_type, type):
raise ValueError("instance_type must be a class")
name = convert_camelcase_to_kebabcase(instance_type.__name__)
pattern = re.compile(f"^{name}(-[0-9a-zA-Z]+)?.service$")
service_list = [
Path(SYSTEMD, service)
for service in SYSTEMD.iterdir()
if pattern.search(service.name)
and not any(s in service.name for s in SUFFIX_BLACKLIST)
]
instance_list = [
instance_type(get_instance_suffix(name, service)) for service in service_list
]
def _sort_instance_list(suffix: int | str | None):
if suffix is None:
return
elif isinstance(suffix, str) and suffix.isdigit():
return f"{int(suffix):04}"
else:
return suffix
return sorted(instance_list, key=lambda x: _sort_instance_list(x.suffix))
def get_instance_suffix(name: str, file_path: Path) -> str:
# to get the suffix of the instance, we remove the name of the instance from
# the file name, if the remaining part an empty string we return it
# otherwise there is and hyphen left, and we return the part after the hyphen
suffix = file_path.stem[len(name) :]
return suffix[1:] if suffix else ""

528
kiauh/utils/sys_utils.py Normal file
View File

@@ -0,0 +1,528 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
from __future__ import annotations
import os
import re
import select
import shutil
import socket
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from subprocess import DEVNULL, PIPE, CalledProcessError, Popen, check_output, run
from typing import List, Literal, Set
from core.constants import SYSTEMD
from core.logger import Logger
from utils.fs_utils import check_file_exist, remove_with_sudo
from utils.input_utils import get_confirm
SysCtlServiceAction = Literal[
"start",
"stop",
"restart",
"reload",
"enable",
"disable",
"mask",
"unmask",
]
SysCtlManageAction = Literal["daemon-reload", "reset-failed"]
def kill(opt_err_msg: str = "") -> None:
"""
Kills the application |
:param opt_err_msg: an optional, additional error message
:return: None
"""
if opt_err_msg:
Logger.print_error(opt_err_msg)
Logger.print_error("A critical error has occured. KIAUH was terminated.")
sys.exit(1)
def check_python_version(major: int, minor: int) -> bool:
"""
Checks the python version and returns True if it's at least the given version
:param major: the major version to check
:param minor: the minor version to check
:return: bool
"""
if not (sys.version_info.major >= major and sys.version_info.minor >= minor):
Logger.print_error("Versioncheck failed!")
Logger.print_error(f"Python {major}.{minor} or newer required.")
return False
return True
def parse_packages_from_file(source_file: Path) -> List[str]:
"""
Read the package names from bash scripts, when defined like:
PKGLIST="package1 package2 package3" |
:param source_file: path of the sourcefile to read from
:return: A list of package names
"""
packages = []
with open(source_file, "r") as file:
for line in file:
line = line.strip()
if line.startswith("PKGLIST="):
line = line.replace('"', "")
line = line.replace("PKGLIST=", "")
line = line.replace("${PKGLIST}", "")
packages.extend(line.split())
return packages
def create_python_venv(target: Path) -> bool:
"""
Create a python 3 virtualenv at the provided target destination.
Returns True if the virtualenv was created successfully.
Returns False if the virtualenv already exists, recreation was declined or creation failed.
:param target: Path where to create the virtualenv at
:return: bool
"""
Logger.print_status("Set up Python virtual environment ...")
if not target.exists():
try:
cmd = ["virtualenv", "-p", "/usr/bin/python3", target.as_posix()]
run(cmd, check=True)
Logger.print_ok("Setup of virtualenv successful!")
return True
except CalledProcessError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e}")
return False
else:
if not get_confirm(
"Virtualenv already exists. Re-create?", default_choice=False
):
Logger.print_info("Skipping re-creation of virtualenv ...")
return False
try:
shutil.rmtree(target)
create_python_venv(target)
return True
except OSError as e:
log = f"Error removing existing virtualenv: {e.strerror}"
Logger.print_error(log, False)
return False
def update_python_pip(target: Path) -> None:
"""
Updates pip in the provided target destination |
:param target: Path of the virtualenv
:return: None
"""
Logger.print_status("Updating pip ...")
try:
pip_location: Path = target.joinpath("bin/pip")
pip_exists: bool = check_file_exist(pip_location)
if not pip_exists:
raise FileNotFoundError("Error updating pip! Not found.")
command = [pip_location.as_posix(), "install", "-U", "pip"]
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
Logger.print_error("Updating pip failed!")
return
Logger.print_ok("Updating pip successful!")
except FileNotFoundError as e:
Logger.print_error(e)
raise
except CalledProcessError as e:
Logger.print_error(f"Error updating pip:\n{e.output.decode()}")
raise
def install_python_requirements(target: Path, requirements: Path) -> None:
"""
Installs the python packages based on a provided requirements.txt |
:param target: Path of the virtualenv
:param requirements: Path to the requirements.txt file
:return: None
"""
try:
# always update pip before installing requirements
update_python_pip(target)
Logger.print_status("Installing Python requirements ...")
command = [
target.joinpath("bin/pip").as_posix(),
"install",
"-r",
f"{requirements}",
]
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
Logger.print_error("Installing Python requirements failed!")
return
Logger.print_ok("Installing Python requirements successful!")
except CalledProcessError as e:
log = f"Error installing Python requirements:\n{e.output.decode()}"
Logger.print_error(log)
raise
def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
"""
Updates the systems package list |
:param silent: Log info to the console or not
:param rls_info_change: Flag for "--allow-releaseinfo-change"
:return: None
"""
cache_mtime: float = 0
cache_files: List[Path] = [
Path("/var/lib/apt/periodic/update-success-stamp"),
Path("/var/lib/apt/lists"),
]
for cache_file in cache_files:
if cache_file.exists():
cache_mtime = max(cache_mtime, os.path.getmtime(cache_file))
update_age = int(time.time() - cache_mtime)
update_interval = 6 * 3600 # 48hrs
if update_age <= update_interval:
return
if not silent:
Logger.print_status("Updating package list...")
try:
command = ["sudo", "apt-get", "update"]
if rls_info_change:
command.append("--allow-releaseinfo-change")
result = run(command, stderr=PIPE, text=True)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)
Logger.print_error("Updating system package list failed!")
return
Logger.print_ok("System package list update successful!")
except CalledProcessError as e:
Logger.print_error(f"Error updating system package list:\n{e.stderr.decode()}")
raise
def get_upgradable_packages() -> List[str]:
"""
Reads all system packages that can be upgraded.
:return: A list of package names available for upgrade
"""
try:
command = ["apt", "list", "--upgradable"]
output: str = check_output(command, stderr=DEVNULL, text=True, encoding="utf-8")
pkglist = []
for line in output.split("\n"):
if "/" not in line:
continue
pkg = line.split("/")[0]
pkglist.append(pkg)
return pkglist
except CalledProcessError as e:
raise Exception(f"Error reading upgradable packages: {e}")
def check_package_install(packages: Set[str]) -> List[str]:
"""
Checks the system for installed packages |
:param packages: List of strings of package names
:return: A list containing the names of packages that are not installed
"""
not_installed = []
for package in packages:
command = ["dpkg-query", "-f'${Status}'", "--show", package]
result = run(
command,
stdout=PIPE,
stderr=DEVNULL,
text=True,
)
if "installed" not in result.stdout.strip("'").split():
not_installed.append(package)
return not_installed
def install_system_packages(packages: List[str]) -> None:
"""
Installs a list of system packages |
:param packages: List of system package names
:return: None
"""
try:
command = ["sudo", "apt-get", "install", "-y"]
for pkg in packages:
command.append(pkg)
run(command, stderr=PIPE, check=True)
Logger.print_ok("Packages successfully installed.")
except CalledProcessError as e:
Logger.print_error(f"Error installing packages:\n{e.stderr.decode()}")
raise
def upgrade_system_packages(packages: List[str]) -> None:
"""
Updates a list of system packages |
:param packages: List of system package names
:return: None
"""
try:
command = ["sudo", "apt-get", "upgrade", "-y"]
for pkg in packages:
command.append(pkg)
run(command, stderr=PIPE, check=True)
Logger.print_ok("Packages successfully upgraded.")
except CalledProcessError as e:
raise Exception(f"Error upgrading packages:\n{e.stderr.decode()}")
# this feels hacky and not quite right, but for now it works
# see: https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
def get_ipv4_addr() -> str:
"""
Helper function that returns the IPv4 of the current machine
by opening a socket and sending a package to an arbitrary IP. |
:return: Local IPv4 of the current machine
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(("192.255.255.255", 1))
return str(s.getsockname()[0])
except Exception:
return "127.0.0.1"
finally:
s.close()
def download_file(url: str, target: Path, show_progress=True) -> None:
"""
Helper method for downloading files from a provided URL |
:param url: the url to the file
:param target: the target path incl filename
:param show_progress: show download progress or not
:return: None
"""
try:
if show_progress:
urllib.request.urlretrieve(url, target, download_progress)
sys.stdout.write("\n")
else:
urllib.request.urlretrieve(url, target)
except urllib.error.HTTPError as e:
Logger.print_error(f"Download failed! HTTP error occured: {e}")
raise
except urllib.error.URLError as e:
Logger.print_error(f"Download failed! URL error occured: {e}")
raise
except Exception as e:
Logger.print_error(f"Download failed! An error occured: {e}")
raise
def download_progress(block_num, block_size, total_size) -> None:
"""
Reporthook method for urllib.request.urlretrieve() method call in download_file() |
:param block_num:
:param block_size:
:param total_size: total filesize in bytes
:return: None
"""
downloaded = block_num * block_size
percent = 100 if downloaded >= total_size else downloaded / total_size * 100
mb = 1024 * 1024
progress = int(percent / 5)
remaining = "-" * (20 - progress)
dl = f"\rDownloading: [{'#' * progress}{remaining}]{percent:.2f}% ({downloaded / mb:.2f}/{total_size / mb:.2f}MB)"
sys.stdout.write(dl)
sys.stdout.flush()
def set_nginx_permissions() -> None:
"""
Check if permissions of the users home directory
grant execution rights to group and other and set them if not set.
Required permissions for NGINX to be able to serve Mainsail/Fluidd.
This seems to have become necessary with Ubuntu 21+. |
:return: None
"""
cmd = f"ls -ld {Path.home()} | cut -d' ' -f1"
homedir_perm = run(cmd, shell=True, stdout=PIPE, text=True)
permissions = homedir_perm.stdout
if permissions.count("x") < 3:
Logger.print_status("Granting NGINX the required permissions ...")
run(["chmod", "og+x", Path.home()])
Logger.print_ok("Permissions granted.")
def cmd_sysctl_service(name: str, action: SysCtlServiceAction) -> None:
"""
Helper method to execute several actions for a specific systemd service. |
:param name: the service name
:param action: Either "start", "stop", "restart" or "disable"
:return: None
"""
try:
Logger.print_status(f"{action.capitalize()} {name} ...")
run(["sudo", "systemctl", action, name], stderr=PIPE, check=True)
Logger.print_ok("OK!")
except CalledProcessError as e:
log = f"Failed to {action} {name}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def cmd_sysctl_manage(action: SysCtlManageAction) -> None:
try:
run(["sudo", "systemctl", action], stderr=PIPE, check=True)
except CalledProcessError as e:
log = f"Failed to run {action}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def unit_file_exists(
name: str, suffix: Literal["service", "timer"], exclude: List[str] | None = None
) -> bool:
"""
Checks if a systemd unit file of the provided suffix exists.
:param name: the name of the unit file
:param suffix: suffix of the unit file, either "service" or "timer"
:param exclude: List of strings of names to exclude
:return: True if the unit file exists, False otherwise
"""
exclude = exclude or []
pattern = re.compile(f"^{name}(-[0-9a-zA-Z]+)?.{suffix}$")
service_list = [
Path(SYSTEMD, service)
for service in SYSTEMD.iterdir()
if pattern.search(service.name) and not any(s in service.name for s in exclude)
]
return any(service_list)
def log_process(process: Popen) -> None:
"""
Helper method to print stdout of a process in near realtime to the console.
:param process: Process to log the output from
:return: None
"""
while True:
if process.stdout is not None:
reads = [process.stdout.fileno()]
ret = select.select(reads, [], [])
for fd in ret[0]:
if fd == process.stdout.fileno():
line = process.stdout.readline()
if line:
print(line.strip(), flush=True)
else:
break
if process.poll() is not None:
break
def create_service_file(name: str, content: str) -> None:
"""
Creates a service file at the provided path with the provided content.
:param name: the name of the service file
:param content: the content of the service file
:return: None
"""
try:
run(
["sudo", "tee", SYSTEMD.joinpath(name)],
input=content.encode(),
stdout=DEVNULL,
check=True,
)
Logger.print_ok(f"Service file created: {SYSTEMD.joinpath(name)}")
except CalledProcessError as e:
Logger.print_error(f"Error creating service file: {e}")
raise
def create_env_file(path: Path, content: str) -> None:
"""
Creates an env file at the provided path with the provided content.
:param path: the path of the env file
:param content: the content of the env file
:return: None
"""
try:
with open(path, "w") as env_file:
env_file.write(content)
Logger.print_ok(f"Env file created: {path}")
except OSError as e:
Logger.print_error(f"Error creating env file: {e}")
raise
def remove_system_service(service_name: str) -> None:
"""
Disables and removes a systemd service
:param service_name: name of the service unit file - must end with '.service'
:return: None
"""
try:
if not service_name.endswith(".service"):
raise ValueError(f"service_name '{service_name}' must end with '.service'")
file: Path = SYSTEMD.joinpath(service_name)
if not file.exists() or not file.is_file():
Logger.print_info(f"Service '{service_name}' does not exist! Skipped ...")
return
Logger.print_status(f"Removing {service_name} ...")
cmd_sysctl_service(service_name, "stop")
cmd_sysctl_service(service_name, "disable")
remove_with_sudo(file)
cmd_sysctl_manage("daemon-reload")
cmd_sysctl_manage("reset-failed")
Logger.print_ok(f"{service_name} successfully removed!")
except Exception as e:
Logger.print_error(f"Error removing {service_name}: {e}")
raise
def get_service_file_path(instance_type: type, suffix: str) -> Path:
from utils.common import convert_camelcase_to_kebabcase
if not isinstance(instance_type, type):
raise ValueError("instance_type must be a class")
name: str = convert_camelcase_to_kebabcase(instance_type.__name__)
if suffix != "":
name += f"-{suffix}"
file_path: Path = SYSTEMD.joinpath(f"{name}.service")
return file_path