Compare commits

..

8 Commits

Author SHA1 Message Date
dw-0
f0aa0f16ab Merge 72e3a56e4f into b6c6edb622 2024-03-24 01:07:02 +01:00
dw-0
72e3a56e4f chore: replace black with ruff
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 01:06:59 +01:00
dw-0
e64aa94df4 chore: format
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 01:06:59 +01:00
dw-0
58719a4ca0 chore: fix lint issues
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 00:32:27 +01:00
dw-0
59a83aee12 feat(Mainsail): implement Mainsail Theme-Installer
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 00:04:29 +01:00
dw-0
7104eb078f refactor(RepoManager): if no branch is given, no checkout is done
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 00:02:10 +01:00
dw-0
341ecb325c refactor(klipper): instance overview dialog can now show printer folder and not only services
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-24 00:01:36 +01:00
dw-0
e3a6d8a0ab README.md: add contributor section
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
2024-03-23 21:18:11 +01:00
24 changed files with 369 additions and 86 deletions

View File

@@ -158,7 +158,7 @@ prompt and confirm by hitting ENTER.
</tr>
<tr>
<th><a href="https://github.com/Clon1998/mobileraker_companion"><img src="https://raw.githubusercontent.com/Clon1998/mobileraker/master/assets/icon/mr_appicon.png" alt="OctoEverywhere Logo" height="64"></th>
<th><a href="https://github.com/Clon1998/mobileraker_companion"><img src="https://raw.githubusercontent.com/Clon1998/mobileraker/master/assets/icon/mr_appicon.png" alt="OctoEverywhere Logo" height="64"></a></th>
<th><a href="https://octoeverywhere.com/?source=kiauh_readme"><img src="https://octoeverywhere.com/img/logo.svg" alt="OctoEverywhere Logo" height="64"></a></th>
<th></th>
</tr>
@@ -174,6 +174,16 @@ prompt and confirm by hitting ENTER.
<hr>
<h2 align="center">🎖️ Contributors 🎖️</h2>
<div align="center">
<a href="https://github.com/dw-0/kiauh/graphs/contributors">
<img src="https://contrib.rocks/image?repo=dw-0/kiauh" alt=""/>
</a>
</div>
<hr>
<h2 align="center">✨ Credits ✨</h2>
* A big thank you to [lixxbox](https://github.com/lixxbox) for that awesome KIAUH-Logo!

View File

@@ -8,6 +8,7 @@
# ======================================================================= #
import textwrap
from enum import Enum, unique
from typing import List
from core.instance_manager.base_instance import BaseInstance
@@ -15,17 +16,29 @@ from core.menus.base_menu import print_back_footer
from utils.constants import COLOR_GREEN, RESET_FORMAT, COLOR_YELLOW, COLOR_CYAN
@unique
class DisplayType(Enum):
SERVICE_NAME = "SERVICE_NAME"
PRINTER_NAME = "PRINTER_NAME"
def print_instance_overview(
instances: List[BaseInstance], show_index=False, show_select_all=False
instances: List[BaseInstance],
display_type: DisplayType = DisplayType.SERVICE_NAME,
show_headline=True,
show_index=False,
show_select_all=False,
):
headline = f"{COLOR_GREEN}The following Klipper instances were found:{RESET_FORMAT}"
dialog = textwrap.dedent(
f"""
/=======================================================\\
|{headline:^64}|
|-------------------------------------------------------|
"""
)[1:]
dialog = "/=======================================================\\\n"
if show_headline:
d_type = (
"Klipper instances"
if display_type is DisplayType.SERVICE_NAME
else "printer directories"
)
headline = f"{COLOR_GREEN}The following {d_type} were found:{RESET_FORMAT}"
dialog += f"|{headline:^64}|\n"
dialog += "|-------------------------------------------------------|\n"
if show_select_all:
select_all = f"{COLOR_YELLOW}a) Select all{RESET_FORMAT}"
@@ -33,7 +46,11 @@ def print_instance_overview(
dialog += "| |\n"
for i, s in enumerate(instances):
line = f"{COLOR_CYAN}{f'{i})' if show_index else ''} {s.get_service_file_name()}{RESET_FORMAT}"
if display_type is DisplayType.SERVICE_NAME:
name = s.get_service_file_name()
else:
name = s.data_dir
line = f"{COLOR_CYAN}{f'{i})' if show_index else ''} {name}{RESET_FORMAT}"
dialog += f"| {line:<63}|\n"
print(dialog, end="")

View File

@@ -62,7 +62,7 @@ def run_klipper_removal(
def select_instances_to_remove(
instances: List[Klipper],
) -> Union[List[Klipper], None]:
print_instance_overview(instances, True, True)
print_instance_overview(instances, show_index=True, show_select_all=True)
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])

View File

@@ -48,10 +48,12 @@ from utils.logger import Logger
from utils.system_utils import mask_system_service
def get_klipper_status() -> Dict[
Literal["status", "status_code", "instances", "repo", "local", "remote"],
Union[str, int],
]:
def get_klipper_status() -> (
Dict[
Literal["status", "status_code", "instances", "repo", "local", "remote"],
Union[str, int],
]
):
status = get_install_status_common(Klipper, KLIPPER_DIR, KLIPPER_ENV_DIR)
return {
"status": status.get("status"),
@@ -69,7 +71,9 @@ def check_is_multi_install(
return not existing_instances and install_count > 1
def check_is_single_to_multi_conversion(existing_instances: List[Klipper]) -> bool:
def check_is_single_to_multi_conversion(
existing_instances: List[Klipper],
) -> bool:
return len(existing_instances) == 1 and existing_instances[0].suffix == ""

View File

@@ -50,5 +50,5 @@ def upload_logfile(logfile: LogFile) -> None:
Logger.print_ok("Upload successful! Access it via the following link:")
Logger.print_ok(f">>>> {link}", False)
except Exception as e:
Logger.print_error(f"Uploading logfile failed!")
Logger.print_error("Uploading logfile failed!")
Logger.print_error(str(e))

View File

@@ -68,7 +68,7 @@ def run_moonraker_removal(
def select_instances_to_remove(
instances: List[Moonraker],
) -> Union[List[Moonraker], None]:
print_instance_overview(instances, True, True)
print_instance_overview(instances, show_index=True, show_select_all=True)
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
@@ -130,9 +130,15 @@ def remove_polkit_rules() -> None:
return
try:
command = [f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh", "--clear"]
command = [
f"{MOONRAKER_DIR}/scripts/set-policykit-rules.sh",
"--clear",
]
subprocess.run(
command, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, check=True
command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
check=True,
)
except subprocess.CalledProcessError as e:
Logger.print_error(f"Error while removing policykit rules: {e}")

View File

@@ -12,7 +12,10 @@ import sys
from pathlib import Path
from components.webui_client import MAINSAIL_DIR
from components.webui_client.client_utils import enable_mainsail_remotemode, get_existing_clients
from components.webui_client.client_utils import (
enable_mainsail_remotemode,
get_existing_clients,
)
from kiauh import KIAUH_CFG
from components.klipper.klipper import Klipper
from components.moonraker import (
@@ -170,7 +173,10 @@ def install_moonraker_polkit() -> None:
try:
command = [POLKIT_SCRIPT, "--disable-systemctl"]
result = subprocess.run(
command, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, text=True
command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
text=True,
)
if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False)

View File

@@ -32,10 +32,12 @@ from utils.system_utils import (
)
def get_moonraker_status() -> Dict[
Literal["status", "status_code", "instances", "repo", "local", "remote"],
Union[str, int],
]:
def get_moonraker_status() -> (
Dict[
Literal["status", "status_code", "instances", "repo", "local", "remote"],
Union[str, int],
]
):
status = get_install_status_common(Moonraker, MOONRAKER_DIR, MOONRAKER_ENV_DIR)
return {
"status": status.get("status"),

View File

@@ -8,7 +8,7 @@
# ======================================================================= #
from pathlib import Path
from typing import Literal, TypedDict, Set
from typing import Literal, TypedDict
from core.backup_manager import BACKUP_ROOT_DIR

View File

@@ -16,7 +16,9 @@ from kiauh import KIAUH_CFG
from components.klipper.klipper import Klipper
from components.moonraker.moonraker import Moonraker
from components.webui_client import ClientConfigData, ClientName, ClientData
from components.webui_client.client_dialogs import print_client_already_installed_dialog
from components.webui_client.client_dialogs import (
print_client_already_installed_dialog,
)
from components.webui_client.client_utils import (
load_client_data,
backup_client_config_data,
@@ -29,8 +31,9 @@ from core.repo_manager.repo_manager import RepoManager
from utils.common import backup_printer_config_dir
from utils.filesystem_utils import (
create_symlink,
add_config_section, add_config_section_at_top,
)
add_config_section,
add_config_section_at_top,
)
from utils.input_utils import get_confirm
from utils.logger import Logger

View File

@@ -112,7 +112,9 @@ def get_client_status(client: ClientData) -> str:
)
def get_client_config_status(client: ClientData) -> Dict[
def get_client_config_status(
client: ClientData,
) -> Dict[
Literal["repo", "local", "remote"],
Union[str, int],
]:

View File

@@ -16,34 +16,34 @@ from core.instance_manager.base_instance import BaseInstance
from utils.constants import SYSTEMD
from utils.logger import Logger
I = TypeVar(name="I", bound=BaseInstance, covariant=True)
T = TypeVar(name="T", bound=BaseInstance, covariant=True)
# noinspection PyMethodMayBeStatic
class InstanceManager:
def __init__(self, instance_type: I) -> None:
def __init__(self, instance_type: T) -> None:
self._instance_type = instance_type
self._current_instance: Optional[I] = None
self._current_instance: Optional[T] = None
self._instance_suffix: Optional[str] = None
self._instance_service: Optional[str] = None
self._instance_service_full: Optional[str] = None
self._instance_service_path: Optional[str] = None
self._instances: List[I] = []
self._instances: List[T] = []
@property
def instance_type(self) -> I:
def instance_type(self) -> T:
return self._instance_type
@instance_type.setter
def instance_type(self, value: I):
def instance_type(self, value: T):
self._instance_type = value
@property
def current_instance(self) -> I:
def current_instance(self) -> T:
return self._current_instance
@current_instance.setter
def current_instance(self, value: I) -> None:
def current_instance(self, value: T) -> None:
self._current_instance = value
self.instance_suffix = value.suffix
self.instance_service = value.get_service_file_name()
@@ -78,11 +78,11 @@ class InstanceManager:
self._instance_service_path = value
@property
def instances(self) -> List[I]:
def instances(self) -> List[T]:
return self.find_instances()
@instances.setter
def instances(self, value: List[I]):
def instances(self, value: List[T]):
self._instances = value
def create_instance(self) -> None:
@@ -108,7 +108,12 @@ class InstanceManager:
def enable_instance(self) -> None:
Logger.print_status(f"Enabling {self.instance_service_full} ...")
try:
command = ["sudo", "systemctl", "enable", self.instance_service_full]
command = [
"sudo",
"systemctl",
"enable",
self.instance_service_full,
]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service_full} enabled.")
except subprocess.CalledProcessError as e:
@@ -118,7 +123,12 @@ class InstanceManager:
def disable_instance(self) -> None:
Logger.print_status(f"Disabling {self.instance_service_full} ...")
try:
command = ["sudo", "systemctl", "disable", self.instance_service_full]
command = [
"sudo",
"systemctl",
"disable",
self.instance_service_full,
]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service_full} disabled.")
except subprocess.CalledProcessError as e:
@@ -128,7 +138,12 @@ class InstanceManager:
def start_instance(self) -> None:
Logger.print_status(f"Starting {self.instance_service_full} ...")
try:
command = ["sudo", "systemctl", "start", self.instance_service_full]
command = [
"sudo",
"systemctl",
"start",
self.instance_service_full,
]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service_full} started.")
except subprocess.CalledProcessError as e:
@@ -138,7 +153,12 @@ class InstanceManager:
def restart_instance(self) -> None:
Logger.print_status(f"Restarting {self.instance_service_full} ...")
try:
command = ["sudo", "systemctl", "restart", self.instance_service_full]
command = [
"sudo",
"systemctl",
"restart",
self.instance_service_full,
]
if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service_full} restarted.")
except subprocess.CalledProcessError as e:
@@ -182,7 +202,7 @@ class InstanceManager:
Logger.print_error(f"{e}")
raise
def find_instances(self) -> List[I]:
def find_instances(self) -> List[T]:
name = self.instance_type.__name__.lower()
pattern = re.compile(f"^{name}(-[0-9a-zA-Z]+)?.service$")
excluded = self.instance_type.blacklist()

View File

@@ -68,7 +68,11 @@ class MainMenu(BaseMenu):
def init_status(self) -> None:
status_vars = ["kl", "mr", "ms", "fl", "ks", "mb", "cn"]
for var in status_vars:
setattr(self, f"{var}_status", f"{COLOR_RED}Not installed!{RESET_FORMAT}")
setattr(
self,
f"{var}_status",
f"{COLOR_RED}Not installed!{RESET_FORMAT}",
)
def fetch_status(self) -> None:
# klipper

View File

@@ -10,7 +10,9 @@
import textwrap
from components.klipper.menus.klipper_remove_menu import KlipperRemoveMenu
from components.moonraker.menus.moonraker_remove_menu import MoonrakerRemoveMenu
from components.moonraker.menus.moonraker_remove_menu import (
MoonrakerRemoveMenu,
)
from components.webui_client.client_utils import load_client_data
from components.webui_client.menus.client_remove_menu import ClientRemoveMenu
from core.menus import BACK_FOOTER

View File

@@ -24,7 +24,7 @@ class RepoManager:
branch: str = None,
):
self._repo = repo
self._branch = branch if branch is not None else "master"
self._branch = branch
self._method = self._get_method()
self._target_dir = target_dir
@@ -110,7 +110,7 @@ class RepoManager:
if Path(self.target_dir).exists():
question = f"'{self.target_dir}' already exists. Overwrite?"
if not get_confirm(question, default_choice=False):
Logger.print_info("Skipping re-clone of repository.")
Logger.print_info("Skip cloning of repository ...")
return
shutil.rmtree(self.target_dir)
@@ -145,6 +145,9 @@ class RepoManager:
raise
def _checkout(self):
if self.branch is None:
return
try:
command = ["git", "checkout", f"{self.branch}"]
subprocess.run(command, cwd=self.target_dir, check=True)

View File

@@ -44,7 +44,9 @@ class GcodeShellCmdExtension(BaseExtension):
overwrite = True
if extension_installed:
overwrite = get_confirm(
"Extension seems to be installed already. Overwrite?", True, False
"Extension seems to be installed already. Overwrite?",
True,
False,
)
if not overwrite:

View File

@@ -0,0 +1,173 @@
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
# #
# This file is part of KIAUH - Klipper Installation And Update Helper #
# https://github.com/dw-0/kiauh #
# #
# This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= #
import csv
import shutil
import textwrap
import urllib.request
from typing import List, Union
from typing import TypedDict
from components.klipper.klipper import Klipper
from components.klipper.klipper_dialogs import (
print_instance_overview,
DisplayType,
)
from core.base_extension import BaseExtension
from core.instance_manager.base_instance import BaseInstance
from core.instance_manager.instance_manager import InstanceManager
from core.menus import BACK_FOOTER
from core.menus.base_menu import BaseMenu
from core.repo_manager.repo_manager import RepoManager
from utils.constants import COLOR_YELLOW, COLOR_CYAN, RESET_FORMAT
from utils.input_utils import get_selection_input
from utils.logger import Logger
class ThemeData(TypedDict):
name: str
short_note: str
author: str
repo: str
# noinspection PyMethodMayBeStatic
class MainsailThemeInstallerExtension(BaseExtension):
im = InstanceManager(Klipper)
instances: List[Klipper] = im.instances
def install_extension(self, **kwargs) -> None:
install_menu = MainsailThemeInstallMenu(self.instances)
install_menu.start()
def remove_extension(self, **kwargs) -> None:
print_instance_overview(
self.instances,
display_type=DisplayType.PRINTER_NAME,
show_headline=True,
show_index=True,
show_select_all=True,
)
printer_list = get_printer_selection(self.instances, True)
if printer_list is None:
return
for printer in printer_list:
Logger.print_status(f"Uninstalling theme from {printer.cfg_dir} ...")
theme_dir = printer.cfg_dir.joinpath(".theme")
if not theme_dir.exists():
Logger.print_info(f"{theme_dir} not found. Skipping ...")
continue
try:
shutil.rmtree(theme_dir)
Logger.print_ok("Theme successfully uninstalled!")
except OSError as e:
Logger.print_error("Unable to uninstall theme")
Logger.print_error(e)
# noinspection PyMethodMayBeStatic
class MainsailThemeInstallMenu(BaseMenu):
THEMES_URL: str = (
"https://raw.githubusercontent.com/mainsail-crew/gb-docs/main/_data/themes.csv"
)
def __init__(self, instances: List[Klipper]):
self.instances = instances
self.themes: List[ThemeData] = self.load_themes()
options = {f"{index}": self.install_theme for index in range(len(self.themes))}
super().__init__(
header=False,
options=options,
footer_type=BACK_FOOTER,
)
def print_menu(self) -> None:
header = " [ Mainsail Theme Installer ] "
color = COLOR_YELLOW
line1 = f"{COLOR_CYAN}A preview of each Mainsail theme can be found here:{RESET_FORMAT}"
count = 62 - len(color) - len(RESET_FORMAT)
menu = textwrap.dedent(
f"""
/=======================================================\\
| {color}{header:~^{count}}{RESET_FORMAT} |
|-------------------------------------------------------|
| {line1:<62} |
| https://docs.mainsail.xyz/theming/themes |
|-------------------------------------------------------|
"""
)[1:]
for i, theme in enumerate(self.themes):
i = f" {i}" if i < 10 else f"{i}"
row = f"{i}) [{theme.get('name')}]"
menu += f"| {row:<53} |\n"
print(menu, end="")
def load_themes(self) -> List[ThemeData]:
with urllib.request.urlopen(self.THEMES_URL) as response:
themes: List[ThemeData] = []
csv_data: str = response.read().decode().splitlines()
csv_reader = csv.DictReader(csv_data, delimiter=",")
for row in csv_reader:
row: ThemeData = row
themes.append(row)
return themes
def install_theme(self, **kwargs):
index = int(kwargs.get("opt_index"))
theme_data: ThemeData = self.themes[index]
theme_author: str = theme_data.get("author")
theme_repo: str = theme_data.get("repo")
theme_repo_url: str = f"https://github.com/{theme_author}/{theme_repo}"
print_instance_overview(
self.instances,
display_type=DisplayType.PRINTER_NAME,
show_headline=True,
show_index=True,
show_select_all=True,
)
printer_list = get_printer_selection(self.instances, True)
if printer_list is None:
return
repo_manager = RepoManager(theme_repo_url, "")
for printer in printer_list:
repo_manager.target_dir = printer.cfg_dir.joinpath(".theme")
repo_manager.clone_repo()
if len(theme_data.get("short_note", "")) > 1:
Logger.print_warn("Info from the creator:", prefix=False, start="\n")
Logger.print_info(theme_data.get("short_note"), prefix=False, end="\n\n")
def get_printer_selection(
instances: List[BaseInstance], is_install: bool
) -> Union[List[BaseInstance], None]:
options = [str(i) for i in range(len(instances))]
options.extend(["a", "A", "b", "B"])
if is_install:
q = "Select the printer to install the theme for"
else:
q = "Select the printer to remove the theme from"
selection = get_selection_input(q, options)
install_for = []
if selection == "b".lower():
return None
elif selection == "a".lower():
install_for.extend(instances)
else:
instance = instances[int(selection)]
install_for.append(instance)
return install_for

View File

@@ -0,0 +1,9 @@
{
"metadata": {
"index": 2,
"module": "mainsail_theme_installer_extension",
"maintained_by": "dw-0",
"display_name": "Mainsail Theme Installer",
"description": "Install Mainsail Themes maintained by the community."
}
}

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3
import os
# ======================================================================= #
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
@@ -209,11 +208,10 @@ def add_config_section(
cm.write_config()
def add_config_section_at_top(
section: str,
instances: List[B]):
def add_config_section_at_top(section: str, instances: List[B]):
for instance in instances:
tmp_cfg = tempfile.NamedTemporaryFile(mode="w" ,delete=False)
tmp_cfg = tempfile.NamedTemporaryFile(mode="w", delete=False)
tmp_cfg_path = Path(tmp_cfg.name)
cmt = ConfigManager(tmp_cfg_path)
cmt.config.add_section(section)

View File

@@ -51,7 +51,11 @@ def get_confirm(
def get_number_input(
question: str, min_count: int, max_count=None, default=None, allow_go_back=False
question: str,
min_count: int,
max_count=None,
default=None,
allow_go_back=False,
) -> Union[int, None]:
"""
Helper method to get a number input from the user

View File

@@ -130,7 +130,12 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
update_python_pip(target)
Logger.print_status("Installing Python requirements ...")
command = [target.joinpath("bin/pip"), "install", "-r", f"{requirements}"]
command = [
target.joinpath("bin/pip"),
"install",
"-r",
f"{requirements}",
]
result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
if result.returncode != 0 or result.stderr:
@@ -196,7 +201,10 @@ def check_package_install(packages: List[str]) -> List[str]:
for package in packages:
command = ["dpkg-query", "-f'${Status}'", "--show", package]
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
command,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
if "installed" not in result.stdout.strip("'").split():
not_installed.append(package)

View File

@@ -1,13 +1,16 @@
[tool.black]
[project]
requires-python = ">=3.8"
[tool.ruff]
required-version = ">=0.3.4"
respect-gitignore = true
exclude = [".git",".github", "./docs"]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
exclude = '''
(
\.git/
| \.github/
| docs/
| resources/
| scripts/
)
'''
indent-width = 4
output-format = "full"
[tool.ruff.format]
indent-style = "space"
line-ending = "lf"
quote-style = "double"

View File

@@ -8,22 +8,26 @@ import shlex
import subprocess
import logging
class ShellCommand:
def __init__(self, config):
self.name = config.get_name().split()[-1]
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
cmd = config.get('command')
self.gcode = self.printer.lookup_object("gcode")
cmd = config.get("command")
cmd = os.path.expanduser(cmd)
self.command = shlex.split(cmd)
self.timeout = config.getfloat('timeout', 2., above=0.)
self.verbose = config.getboolean('verbose', True)
self.timeout = config.getfloat("timeout", 2.0, above=0.0)
self.verbose = config.getboolean("verbose", True)
self.proc_fd = None
self.partial_output = ""
self.gcode.register_mux_command(
"RUN_SHELL_COMMAND", "CMD", self.name,
"RUN_SHELL_COMMAND",
"CMD",
self.name,
self.cmd_RUN_SHELL_COMMAND,
desc=self.cmd_RUN_SHELL_COMMAND_help)
desc=self.cmd_RUN_SHELL_COMMAND_help,
)
def _process_output(self, eventime):
if self.proc_fd is None:
@@ -33,11 +37,11 @@ class ShellCommand:
except Exception:
pass
data = self.partial_output + data.decode()
if '\n' not in data:
if "\n" not in data:
self.partial_output = data
return
elif data[-1] != '\n':
split = data.rfind('\n') + 1
elif data[-1] != "\n":
split = data.rfind("\n") + 1
self.partial_output = data[split:]
data = data[:split]
else:
@@ -45,16 +49,19 @@ class ShellCommand:
self.gcode.respond_info(data)
cmd_RUN_SHELL_COMMAND_help = "Run a linux shell command"
def cmd_RUN_SHELL_COMMAND(self, params):
gcode_params = params.get('PARAMS','')
gcode_params = params.get("PARAMS", "")
gcode_params = shlex.split(gcode_params)
reactor = self.printer.get_reactor()
try:
proc = subprocess.Popen(
self.command + gcode_params, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self.command + gcode_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except Exception:
logging.exception(
"shell_command: Command {%s} failed" % (self.name))
logging.exception("shell_command: Command {%s} failed" % (self.name))
raise self.gcode.error("Error running command {%s}" % (self.name))
if self.verbose:
self.proc_fd = proc.stdout.fileno()
@@ -64,7 +71,7 @@ class ShellCommand:
endtime = eventtime + self.timeout
complete = False
while eventtime < endtime:
eventtime = reactor.pause(eventtime + .05)
eventtime = reactor.pause(eventtime + 0.05)
if proc.poll() is not None:
complete = True
break