Compare commits

..

1 Commits

Author SHA1 Message Date
dw-0
a96153ef34 Merge 1f2d724189 into f2691f33d3 2024-04-28 19:43:31 +02:00
11 changed files with 242 additions and 197 deletions

View File

@@ -36,7 +36,7 @@ from components.klipper.klipper_utils import (
)
from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from core.repo_manager.repo_manager import RepoManager
from utils.input_utils import get_confirm
from utils.logger import Logger
from utils.system_utils import (
@@ -108,10 +108,12 @@ def install_klipper() -> None:
def setup_klipper_prerequesites() -> None:
settings = KiauhSettings()
repo = settings.get("klipper", "repo_url")
branch = settings.get("klipper", "branch")
git_clone_wrapper(repo, branch, KLIPPER_DIR)
repo_manager = RepoManager(
repo=settings.get("klipper", "repo_url"),
branch=settings.get("klipper", "branch"),
target_dir=KLIPPER_DIR,
)
repo_manager.clone_repo()
# install klipper dependencies and create python virtualenv
try:
@@ -150,7 +152,12 @@ def update_klipper() -> None:
instance_manager = InstanceManager(Klipper)
instance_manager.stop_all_instance()
git_pull_wrapper(repo=settings.get("klipper", "repo_url"), target_dir=KLIPPER_DIR)
repo_manager = RepoManager(
repo=settings.get("klipper", "repo_url"),
branch=settings.get("klipper", "branch"),
target_dir=KLIPPER_DIR,
)
repo_manager.pull_repo()
# install possible new system packages
install_klipper_packages(KLIPPER_DIR)

View File

@@ -39,10 +39,10 @@ from core.config_manager.config_manager import ConfigManager
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from core.instance_manager.name_scheme import NameScheme
from core.repo_manager.repo_manager import RepoManager
from utils import PRINTER_CFG_BACKUP_DIR
from utils.common import get_install_status_common
from utils.constants import CURRENT_USER
from utils.git_utils import get_repo_name, get_remote_commit, get_local_commit
from utils.input_utils import get_confirm, get_string_input, get_number_input
from utils.logger import Logger
from utils.system_utils import mask_system_service
@@ -59,9 +59,9 @@ def get_klipper_status() -> (
"status": status.get("status"),
"status_code": status.get("status_code"),
"instances": status.get("instances"),
"repo": get_repo_name(KLIPPER_DIR),
"local": get_local_commit(KLIPPER_DIR),
"remote": get_remote_commit(KLIPPER_DIR),
"repo": RepoManager.get_repo_name(KLIPPER_DIR),
"local": RepoManager.get_local_commit(KLIPPER_DIR),
"remote": RepoManager.get_remote_commit(KLIPPER_DIR),
}

View File

@@ -35,8 +35,8 @@ from components.moonraker.moonraker_utils import (
backup_moonraker_dir,
)
from core.instance_manager.instance_manager import InstanceManager
from core.repo_manager.repo_manager import RepoManager
from utils.filesystem_utils import check_file_exist
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import (
get_confirm,
get_selection_input,
@@ -135,7 +135,12 @@ def setup_moonraker_prerequesites() -> None:
repo = settings.get("moonraker", "repo_url")
branch = settings.get("moonraker", "branch")
git_clone_wrapper(repo, branch, MOONRAKER_DIR)
repo_manager = RepoManager(
repo=repo,
branch=branch,
target_dir=MOONRAKER_DIR,
)
repo_manager.clone_repo()
# install moonraker dependencies and create python virtualenv
install_moonraker_packages(MOONRAKER_DIR)
@@ -191,9 +196,12 @@ def update_moonraker() -> None:
instance_manager = InstanceManager(Moonraker)
instance_manager.stop_all_instance()
git_pull_wrapper(
repo=settings.get("moonraker", "repo_url"), target_dir=MOONRAKER_DIR
repo_manager = RepoManager(
repo=settings.get("moonraker", "repo_url"),
branch=settings.get("moonraker", "branch"),
target_dir=MOONRAKER_DIR,
)
repo_manager.pull_repo()
# install possible new system packages
install_moonraker_packages(MOONRAKER_DIR)

View File

@@ -25,8 +25,8 @@ from components.webui_client.mainsail_data import MainsailData
from core.backup_manager.backup_manager import BackupManager
from core.config_manager.config_manager import ConfigManager
from core.instance_manager.instance_manager import InstanceManager
from core.repo_manager.repo_manager import RepoManager
from utils.common import get_install_status_common
from utils.git_utils import get_repo_name, get_local_commit, get_remote_commit
from utils.logger import Logger
from utils.system_utils import (
get_ipv4_addr,
@@ -44,9 +44,9 @@ def get_moonraker_status() -> (
"status": status.get("status"),
"status_code": status.get("status_code"),
"instances": status.get("instances"),
"repo": get_repo_name(MOONRAKER_DIR),
"local": get_local_commit(MOONRAKER_DIR),
"remote": get_remote_commit(MOONRAKER_DIR),
"repo": RepoManager.get_repo_name(MOONRAKER_DIR),
"local": RepoManager.get_local_commit(MOONRAKER_DIR),
"remote": RepoManager.get_remote_commit(MOONRAKER_DIR),
}

View File

@@ -25,13 +25,13 @@ from components.webui_client.client_utils import (
)
from core.instance_manager.instance_manager import InstanceManager
from core.repo_manager.repo_manager import RepoManager
from utils.common import backup_printer_config_dir
from utils.filesystem_utils import (
create_symlink,
add_config_section,
add_config_section_at_top,
)
from utils.git_utils import git_clone_wrapper, git_pull_wrapper
from utils.input_utils import get_confirm
from utils.logger import Logger
@@ -86,9 +86,11 @@ def install_client_config(client_data: BaseWebClient) -> None:
def download_client_config(client_config: BaseWebClientConfig) -> None:
try:
Logger.print_status(f"Downloading {client_config.display_name} ...")
repo = client_config.repo_url
target_dir = client_config.config_dir
git_clone_wrapper(repo, None, target_dir)
rm = RepoManager(
client_config.repo_url,
target_dir=str(client_config.config_dir),
)
rm.clone_repo()
except Exception:
Logger.print_error(f"Downloading {client_config.display_name} failed!")
raise
@@ -109,7 +111,12 @@ def update_client_config(client: BaseWebClient) -> None:
if settings.get("kiauh", "backup_before_update"):
backup_client_config_data(client)
git_pull_wrapper(client_config.repo_url, client_config.config_dir)
repo_manager = RepoManager(
repo=client_config.repo_url,
branch="master",
target_dir=str(client_config.config_dir),
)
repo_manager.pull_repo()
Logger.print_ok(f"Successfully updated {client_config.display_name}.")
Logger.print_info("Restart Klipper to reload the configuration!")

View File

@@ -21,17 +21,12 @@ from components.webui_client.base_data import (
)
from components.webui_client.mainsail_data import MainsailData
from core.backup_manager.backup_manager import BackupManager
from core.repo_manager.repo_manager import RepoManager
from core.settings.kiauh_settings import KiauhSettings
from utils import NGINX_SITES_AVAILABLE, NGINX_CONFD
from utils.common import get_install_status_webui
from utils.constants import COLOR_CYAN, RESET_FORMAT, COLOR_YELLOW
from utils.git_utils import (
get_latest_tag,
get_latest_unstable_tag,
get_repo_name,
get_local_commit,
get_remote_commit,
)
from utils.git_utils import get_latest_tag, get_latest_unstable_tag
from utils.logger import Logger
@@ -50,12 +45,13 @@ def get_client_config_status(
Literal["repo", "local", "remote"],
Union[str, int],
]:
client_config = client.client_config.config_dir
client_config = client.client_config
client_config = client_config.config_dir
return {
"repo": get_repo_name(client_config),
"local": get_local_commit(client_config),
"remote": get_remote_commit(client_config),
"repo": RepoManager.get_repo_name(client_config),
"local": RepoManager.get_local_commit(client_config),
"remote": RepoManager.get_remote_commit(client_config),
}
@@ -131,19 +127,11 @@ def symlink_webui_nginx_log(klipper_instances: List[Klipper]) -> None:
def get_local_client_version(client: BaseWebClient) -> str:
relinfo_file = client.client_dir.joinpath("release_info.json")
version_file = client.client_dir.joinpath(".version")
if not client.client_dir.exists():
if not relinfo_file.is_file():
return "-"
if not relinfo_file.is_file() and not version_file.is_file():
return "n/a"
if relinfo_file.is_file():
with open(relinfo_file, "r") as f:
return json.load(f)["version"]
else:
with open(version_file, "r") as f:
return f.readlines()[0]
with open(relinfo_file, "r") as f:
return json.load(f)["version"]
def get_remote_client_version(client: BaseWebClient) -> str:

View File

@@ -18,9 +18,9 @@ from components.moonraker.moonraker import Moonraker
from core.instance_manager.instance_manager import InstanceManager
from core.menus import Option
from core.menus.base_menu import BaseMenu
from core.repo_manager.repo_manager import RepoManager
from core.settings.kiauh_settings import KiauhSettings
from utils.constants import COLOR_CYAN, RESET_FORMAT, COLOR_GREEN, COLOR_YELLOW
from utils.git_utils import git_clone_wrapper
from utils.input_utils import get_string_input, get_confirm
from utils.logger import Logger
@@ -199,7 +199,8 @@ class SettingsMenu(BaseMenu):
repo = self.kiauh_settings.get(name, "repo_url")
branch = self.kiauh_settings.get(name, "branch")
git_clone_wrapper(repo, branch, target_dir)
repman = RepoManager(repo, str(target_dir), branch)
repman.clone_repo()
im.start_all_instance()

View File

View File

@@ -0,0 +1,171 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import shutil
import subprocess
from pathlib import Path
from utils.input_utils import get_confirm
from utils.logger import Logger
# noinspection PyMethodMayBeStatic
class RepoManager:
def __init__(
self,
repo: str,
target_dir: str,
branch: str = None,
):
self._repo = repo
self._branch = branch
self._method = self._get_method()
self._target_dir = target_dir
@property
def repo(self) -> str:
return self._repo
@repo.setter
def repo(self, value) -> None:
self._repo = value
@property
def branch(self) -> str:
return self._branch
@branch.setter
def branch(self, value) -> None:
self._branch = value
@property
def method(self) -> str:
return self._method
@method.setter
def method(self, value) -> None:
self._method = value
@property
def target_dir(self) -> str:
return self._target_dir
@target_dir.setter
def target_dir(self, value) -> None:
self._target_dir = value
@staticmethod
def get_repo_name(repo: Path) -> str:
"""
Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from
:return: String in form of "<orga>/<name>"
"""
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
cmd = ["git", "-C", repo, "config", "--get", "remote.origin.url"]
result = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return "/".join(result.decode().strip().split("/")[-2:])
except subprocess.CalledProcessError:
return "-"
@staticmethod
def get_local_commit(repo: Path) -> str:
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
cmd = f"cd {repo} && git describe HEAD --always --tags | cut -d '-' -f 1,2"
return subprocess.check_output(cmd, shell=True, text=True).strip()
except subprocess.CalledProcessError:
return "-"
@staticmethod
def get_remote_commit(repo: Path) -> str:
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
# get locally checked out branch
branch_cmd = f"cd {repo} && git branch | grep -E '\*'"
branch = subprocess.check_output(branch_cmd, shell=True, text=True)
branch = branch.split("*")[-1].strip()
cmd = f"cd {repo} && git describe 'origin/{branch}' --always --tags | cut -d '-' -f 1,2"
return subprocess.check_output(cmd, shell=True, text=True).strip()
except subprocess.CalledProcessError:
return "-"
def clone_repo(self):
log = f"Cloning repository from '{self.repo}' with method '{self.method}'"
Logger.print_status(log)
try:
if Path(self.target_dir).exists():
question = f"'{self.target_dir}' already exists. Overwrite?"
if not get_confirm(question, default_choice=False):
Logger.print_info("Skip cloning of repository ...")
return
shutil.rmtree(self.target_dir)
self._clone()
self._checkout()
except subprocess.CalledProcessError:
log = "An unexpected error occured during cloning of the repository."
Logger.print_error(log)
return
except OSError as e:
Logger.print_error(f"Error removing existing repository: {e.strerror}")
return
def pull_repo(self) -> None:
Logger.print_status(f"Updating repository '{self.repo}' ...")
try:
self._pull()
except subprocess.CalledProcessError:
log = "An unexpected error occured during updating the repository."
Logger.print_error(log)
return
def _clone(self):
try:
command = ["git", "clone", self.repo, self.target_dir]
subprocess.run(command, check=True)
Logger.print_ok("Clone successful!")
except subprocess.CalledProcessError as e:
log = f"Error cloning repository {self.repo}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def _checkout(self):
if self.branch is None:
return
try:
command = ["git", "checkout", f"{self.branch}"]
subprocess.run(command, cwd=self.target_dir, check=True)
Logger.print_ok("Checkout successful!")
except subprocess.CalledProcessError as e:
log = f"Error checking out branch {self.branch}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def _pull(self) -> None:
try:
command = ["git", "pull"]
subprocess.run(command, cwd=self.target_dir, check=True)
except subprocess.CalledProcessError as e:
log = f"Error on git pull: {e.stderr.decode()}"
Logger.print_error(log)
raise
def _get_method(self) -> str:
return "ssh" if self.repo.startswith("git") else "https"

View File

@@ -11,7 +11,7 @@ import csv
import shutil
import textwrap
import urllib.request
from typing import List, Union, Optional, Type
from typing import List, Union
from typing import TypedDict
from components.klipper.klipper import Klipper
@@ -19,13 +19,12 @@ from components.klipper.klipper_dialogs import (
print_instance_overview,
DisplayType,
)
from core.menus import Option
from extensions.base_extension import BaseExtension
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from core.menus.base_menu import BaseMenu
from core.repo_manager.repo_manager import RepoManager
from utils.constants import COLOR_YELLOW, COLOR_CYAN, RESET_FORMAT
from utils.git_utils import git_clone_wrapper
from utils.input_utils import get_selection_input
from utils.logger import Logger
@@ -43,7 +42,8 @@ class MainsailThemeInstallerExtension(BaseExtension):
instances: List[Klipper] = im.instances
def install_extension(self, **kwargs) -> None:
MainsailThemeInstallMenu(self.instances).run()
install_menu = MainsailThemeInstallMenu(self.instances)
install_menu.run()
def remove_extension(self, **kwargs) -> None:
print_instance_overview(
@@ -80,21 +80,11 @@ class MainsailThemeInstallMenu(BaseMenu):
def __init__(self, instances: List[Klipper]):
super().__init__()
self.themes: List[ThemeData] = self.load_themes()
options = {f"{index}": self.install_theme for index in range(len(self.themes))}
self.options = options
self.instances = instances
def set_previous_menu(self, previous_menu: Optional[Type[BaseMenu]]) -> None:
from extensions.extensions_menu import ExtensionsMenu
self.previous_menu: Type[BaseMenu] = (
previous_menu if previous_menu is not None else ExtensionsMenu
)
def set_options(self) -> None:
self.options = {
f"{index}": Option(self.install_theme, False, opt_index=f"{index}")
for index in range(len(self.themes))
}
def print_menu(self) -> None:
header = " [ Mainsail Theme Installer ] "
color = COLOR_YELLOW
@@ -146,8 +136,10 @@ class MainsailThemeInstallMenu(BaseMenu):
if printer_list is None:
return
repo_manager = RepoManager(theme_repo_url, "")
for printer in printer_list:
git_clone_wrapper(theme_repo_url, None, printer.cfg_dir.joinpath(".theme"))
repo_manager.target_dir = printer.cfg_dir.joinpath(".theme")
repo_manager.clone_repo()
if len(theme_data.get("short_note", "")) > 1:
Logger.print_warn("Info from the creator:", prefix=False, start="\n")

View File

@@ -1,11 +1,9 @@
import json
import shutil
import urllib.request
from http.client import HTTPResponse
from json import JSONDecodeError
from pathlib import Path
from subprocess import CalledProcessError, PIPE, run, check_output, DEVNULL
from typing import List, Type, Optional
from subprocess import CalledProcessError, PIPE, run
from typing import List, Type
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
@@ -13,70 +11,6 @@ from utils.input_utils import get_number_input, get_confirm
from utils.logger import Logger
def git_clone_wrapper(repo: str, branch: Optional[str], target_dir: Path) -> None:
"""
Clones a repository from the given URL and checks out the specified branch if given.
:param repo: The URL of the repository to clone.
:param branch: The branch to check out. If None, the default branch will be checked out.
:param target_dir: The directory where the repository will be cloned.
:return: None
"""
log = f"Cloning repository from '{repo}'"
Logger.print_status(log)
try:
if Path(target_dir).exists():
question = f"'{target_dir}' already exists. Overwrite?"
if not get_confirm(question, default_choice=False):
Logger.print_info("Skip cloning of repository ...")
return
shutil.rmtree(target_dir)
git_cmd_clone(repo, target_dir)
git_cmd_checkout(branch, target_dir)
except CalledProcessError:
log = "An unexpected error occured during cloning of the repository."
Logger.print_error(log)
return
except OSError as e:
Logger.print_error(f"Error removing existing repository: {e.strerror}")
return
def git_pull_wrapper(repo: str, target_dir: Path) -> None:
"""
A function that updates a repository using git pull.
:param repo: The repository to update.
:param target_dir: The directory of the repository.
:return: None
"""
Logger.print_status(f"Updating repository '{repo}' ...")
try:
git_cmd_pull(target_dir)
except CalledProcessError:
log = "An unexpected error occured during updating the repository."
Logger.print_error(log)
return
def get_repo_name(repo: Path) -> str:
"""
Helper method to extract the organisation and name of a repository |
:param repo: repository to extract the values from
:return: String in form of "<orga>/<name>"
"""
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
cmd = ["git", "-C", repo, "config", "--get", "remote.origin.url"]
result = check_output(cmd, stderr=DEVNULL)
return "/".join(result.decode().strip().split("/")[-2:])
except CalledProcessError:
return "-"
def get_tags(repo_path: str) -> List[str]:
try:
url = f"https://api.github.com/repos/{repo_path}/tags"
@@ -127,70 +61,7 @@ def get_latest_unstable_tag(repo_path: str) -> str:
raise
def get_local_commit(repo: Path) -> str:
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
cmd = f"cd {repo} && git describe HEAD --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return "-"
def get_remote_commit(repo: Path) -> str:
if not repo.exists() and not repo.joinpath(".git").exists():
return "-"
try:
# get locally checked out branch
branch_cmd = f"cd {repo} && git branch | grep -E '\*'"
branch = check_output(branch_cmd, shell=True, text=True)
branch = branch.split("*")[-1].strip()
cmd = f"cd {repo} && git describe 'origin/{branch}' --always --tags | cut -d '-' -f 1,2"
return check_output(cmd, shell=True, text=True).strip()
except CalledProcessError:
return "-"
def git_cmd_clone(repo: str, target_dir: Path) -> None:
try:
command = ["git", "clone", repo, target_dir]
run(command, check=True)
Logger.print_ok("Clone successful!")
except CalledProcessError as e:
log = f"Error cloning repository {repo}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def git_cmd_checkout(branch: str, target_dir: Path) -> None:
if branch is None:
return
try:
command = ["git", "checkout", f"{branch}"]
run(command, cwd=target_dir, check=True)
Logger.print_ok("Checkout successful!")
except CalledProcessError as e:
log = f"Error checking out branch {branch}: {e.stderr.decode()}"
Logger.print_error(log)
raise
def git_cmd_pull(target_dir: Path) -> None:
try:
command = ["git", "pull"]
run(command, cwd=target_dir, check=True)
except CalledProcessError as e:
log = f"Error on git pull: {e.stderr.decode()}"
Logger.print_error(log)
raise
def rollback_repository(repo_dir: Path, instance: Type[BaseInstance]) -> None:
def rollback_repository(repo_dir: str, instance: Type[BaseInstance]) -> None:
q1 = "How many commits do you want to roll back"
amount = get_number_input(q1, 1, allow_go_back=True)