diff --git a/kiauh/components/klipper_firmware/__init__.py b/kiauh/components/klipper_firmware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kiauh/components/klipper_firmware/flash_options.py b/kiauh/components/klipper_firmware/flash_options.py new file mode 100644 index 0000000..814e41b --- /dev/null +++ b/kiauh/components/klipper_firmware/flash_options.py @@ -0,0 +1,48 @@ +# ======================================================================= # +# Copyright (C) 2020 - 2024 Dominik Willner # +# # +# This file is part of KIAUH - Klipper Installation And Update Helper # +# https://github.com/dw-0/kiauh # +# # +# This file may be distributed under the terms of the GNU GPLv3 license # +# ======================================================================= # + +from dataclasses import field, dataclass +from enum import Enum +from typing import Union, List + + +class FlashMethod(Enum): + REGULAR = "REGULAR" + SD_CARD = "SD_CARD" + + +class FlashCommand(Enum): + FLASH = "flash" + SERIAL_FLASH = "serialflash" + + +class ConnectionType(Enum): + USB = "USB" + USB_DFU = "USB_DFU" + UART = "UART" + + +@dataclass +class FlashOptions: + _instance = None + flash_method: Union[FlashMethod, None] = None + flash_command: Union[FlashCommand, None] = None + connection_type: Union[ConnectionType, None] = None + mcu_list: List[str] = field(default_factory=list) + selected_mcu: str = "" + selected_board: str = "" + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(FlashOptions, cls).__new__(cls, *args, **kwargs) + return cls._instance + + @classmethod + def destroy(cls): + cls._instance = None diff --git a/kiauh/components/klipper_firmware/flash_utils.py b/kiauh/components/klipper_firmware/flash_utils.py new file mode 100644 index 0000000..75b6610 --- /dev/null +++ b/kiauh/components/klipper_firmware/flash_utils.py @@ -0,0 +1,77 @@ +# ======================================================================= # +# Copyright (C) 2020 - 2024 Dominik Willner # +# # +# This file is part of KIAUH - Klipper Installation And Update Helper # +# https://github.com/dw-0/kiauh # +# # +# This file may be distributed under the terms of the GNU GPLv3 license # +# ======================================================================= # + +from subprocess import CalledProcessError, check_output, Popen, PIPE, STDOUT +from typing import List + +from components.klipper import KLIPPER_DIR +from components.klipper_firmware.flash_options import FlashOptions, FlashCommand +from utils.logger import Logger +from utils.system_utils import log_process + + +def find_usb_device_by_id() -> List[str]: + try: + command = "find /dev/serial/by-id/* 2>/dev/null" + output = check_output(command, shell=True, text=True) + return output.splitlines() + except CalledProcessError as e: + Logger.print_error("Unable to find a USB device!") + Logger.print_error(e, prefix=False) + return [] + + +def find_uart_device() -> List[str]: + try: + command = '"find /dev -maxdepth 1 -regextype posix-extended -regex "^\/dev\/tty(AMA0|S0)$" 2>/dev/null"' + output = check_output(command, shell=True, text=True) + return output.splitlines() + except CalledProcessError as e: + Logger.print_error("Unable to find a UART device!") + Logger.print_error(e, prefix=False) + return [] + + +def find_usb_dfu_device() -> List[str]: + try: + command = '"lsusb | grep "DFU" | cut -d " " -f 6 2>/dev/null"' + output = check_output(command, shell=True, text=True) + return output.splitlines() + except CalledProcessError as e: + Logger.print_error("Unable to find a USB DFU device!") + Logger.print_error(e, prefix=False) + return [] + + +def flash_device(flash_options: FlashOptions) -> None: + try: + if not flash_options.selected_mcu: + raise Exception("Missing value for selected_mcu!") + + if flash_options.flash_command is FlashCommand.FLASH: + command = [ + "make", + flash_options.flash_command.value, + f"FLASH_DEVICE={flash_options.selected_mcu}", + ] + process = Popen( + command, cwd=KLIPPER_DIR, stdout=PIPE, stderr=STDOUT, text=True + ) + + log_process(process) + + rc = process.returncode + if rc != 0: + raise Exception(f"Flashing failed with returncode: {rc}") + else: + Logger.print_ok("Flashing successfull!", start="\n", end="\n\n") + + except (Exception, CalledProcessError): + Logger.print_error("Flashing failed!", start="\n") + Logger.print_error("See the console output above!", end="\n\n") diff --git a/kiauh/components/klipper_firmware/menus/klipper_flash_menu.py b/kiauh/components/klipper_firmware/menus/klipper_flash_menu.py new file mode 100644 index 0000000..afad1b6 --- /dev/null +++ b/kiauh/components/klipper_firmware/menus/klipper_flash_menu.py @@ -0,0 +1,381 @@ +# ======================================================================= # +# Copyright (C) 2020 - 2024 Dominik Willner # +# # +# This file is part of KIAUH - Klipper Installation And Update Helper # +# https://github.com/dw-0/kiauh # +# # +# This file may be distributed under the terms of the GNU GPLv3 license # +# ======================================================================= # + +import textwrap + +from components.klipper_firmware.flash_options import ( + FlashOptions, + FlashMethod, + FlashCommand, + ConnectionType, +) +from components.klipper_firmware.flash_utils import ( + find_usb_device_by_id, + find_uart_device, + find_usb_dfu_device, + flash_device, +) +from core.menus import BACK_HELP_FOOTER, BACK_FOOTER + +from core.menus.base_menu import BaseMenu +from utils.constants import COLOR_CYAN, RESET_FORMAT, COLOR_YELLOW, COLOR_RED +from utils.input_utils import get_confirm +from utils.logger import Logger + + +# noinspection PyUnusedLocal +# noinspection PyMethodMayBeStatic +class KlipperFlashMethodMenu(BaseMenu): + def __init__(self): + self.flash_options = FlashOptions() + super().__init__( + header=False, + options={ + "1": self.select_regular, + "2": self.select_sdcard, + "h": KlipperFlashMethodHelpMenu, + }, + input_label_txt="Select flash method", + footer_type=BACK_HELP_FOOTER, + ) + + def print_menu(self) -> None: + header = " [ Flash MCU ] " + color = COLOR_CYAN + count = 62 - len(color) - len(RESET_FORMAT) + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:~^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | Please select the flashing method to flash your MCU. | + | Make sure to only select a method your MCU supports. | + | Not all MCUs support both methods! | + |-------------------------------------------------------| + | | + | 1) Regular flashing method | + | 2) Updating via SD-Card Update | + | | + """ + )[1:] + print(menu, end="") + + def select_regular(self, **kwargs): + self.flash_options.flash_method = FlashMethod.REGULAR + self.goto_next_menu() + + def select_sdcard(self, **kwargs): + self.flash_options.flash_method = FlashMethod.SD_CARD + self.goto_next_menu() + + def goto_next_menu(self, **kwargs): + next_menu = KlipperFlashCommandMenu(previous_menu=self) + next_menu.start() + + +# noinspection PyUnusedLocal +# noinspection PyMethodMayBeStatic +class KlipperFlashCommandMenu(BaseMenu): + def __init__(self, previous_menu: BaseMenu): + self.flash_options = FlashOptions() + super().__init__( + header=False, + options={ + "1": self.select_flash, + "2": self.select_serialflash, + "h": KlipperFlashCommandHelpMenu, + }, + default_option="1", + input_label_txt="Select flash command", + previous_menu=previous_menu, + footer_type=BACK_HELP_FOOTER, + ) + + def print_menu(self) -> None: + menu = textwrap.dedent( + """ + /=======================================================\\ + | | + | Which flash command to use for flashing the MCU? | + | 1) make flash (default) | + | 2) make serialflash (stm32flash) | + | | + """ + )[1:] + print(menu, end="") + + def select_flash(self, **kwargs): + self.flash_options.flash_command = FlashCommand.FLASH + self.goto_next_menu() + + def select_serialflash(self, **kwargs): + self.flash_options.flash_command = FlashCommand.SERIAL_FLASH + self.goto_next_menu() + + def goto_next_menu(self, **kwargs): + next_menu = KlipperSelectMcuConnectionMenu(previous_menu=self) + next_menu.start() + + +# noinspection PyUnusedLocal +# noinspection PyMethodMayBeStatic +class KlipperSelectMcuConnectionMenu(BaseMenu): + def __init__(self, previous_menu: BaseMenu): + self.flash_options = FlashOptions() + super().__init__( + header=False, + options={ + "1": self.select_usb, + "2": self.select_dfu, + "3": self.select_usb_dfu, + "h": KlipperMcuConnectionHelpMenu, + }, + input_label_txt="Select connection type", + previous_menu=previous_menu, + footer_type=BACK_HELP_FOOTER, + ) + + def print_menu(self) -> None: + header = "Make sure that the controller board is connected now!" + color = COLOR_YELLOW + count = 62 - len(color) - len(RESET_FORMAT) + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | | + | How is the controller board connected to the host? | + | 1) USB | + | 2) UART | + | 3) USB (DFU mode) | + | | + """ + )[1:] + print(menu, end="") + + def select_usb(self, **kwargs): + self.flash_options.connection_type = ConnectionType.USB + self.get_mcu_list() + + def select_dfu(self, **kwargs): + self.flash_options.connection_type = ConnectionType.UART + self.get_mcu_list() + + def select_usb_dfu(self, **kwargs): + self.flash_options.connection_type = ConnectionType.USB_DFU + self.get_mcu_list() + + def get_mcu_list(self, **kwargs): + conn_type = self.flash_options.connection_type + + if conn_type is ConnectionType.USB: + Logger.print_status("Identifying MCU connected via USB ...") + self.flash_options.mcu_list = find_usb_device_by_id() + elif conn_type is ConnectionType.UART: + Logger.print_status("Identifying MCU possibly connected via UART ...") + self.flash_options.mcu_list = find_uart_device() + elif conn_type is ConnectionType.USB_DFU: + Logger.print_status("Identifying MCU connected via USB in DFU mode ...") + self.flash_options.mcu_list = find_usb_dfu_device() + + if len(self.flash_options.mcu_list) < 1: + Logger.print_warn("No MCUs found!") + Logger.print_warn("Make sure they are connected and repeat this step.") + else: + self.goto_next_menu() + + def goto_next_menu(self, **kwargs): + next_menu = KlipperSelectMcuIdMenu(previous_menu=self) + next_menu.start() + + +# noinspection PyUnusedLocal +# noinspection PyMethodMayBeStatic +class KlipperSelectMcuIdMenu(BaseMenu): + def __init__(self, previous_menu: BaseMenu): + self.flash_options = FlashOptions() + self.mcu_list = self.flash_options.mcu_list + options = {f"{index}": self.flash_mcu for index in range(len(self.mcu_list))} + super().__init__( + header=False, + options=options, + input_label_txt="Select MCU to flash", + previous_menu=previous_menu, + footer_type=BACK_FOOTER, + ) + + def print_menu(self) -> None: + header = "!!! ATTENTION !!!" + header2 = f"[{COLOR_CYAN}List of available MCUs{RESET_FORMAT}]" + color = COLOR_RED + count = 62 - len(color) - len(RESET_FORMAT) + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | Make sure, to select the correct MCU! | + | ONLY flash a firmware created for the respective MCU! | + | | + |{header2:-^64}| + + """ + )[1:] + + for i, mcu in enumerate(self.mcu_list): + mcu = mcu.split("/")[-1] + menu += f" ● MCU #{i}: {COLOR_CYAN}{mcu}{RESET_FORMAT}\n" + + print(menu, end="\n") + + def flash_mcu(self, **kwargs): + index = int(kwargs.get("opt_index")) + selected_mcu = self.mcu_list[index] + self.flash_options.selected_mcu = selected_mcu + + print(f"{COLOR_CYAN}###### You selected:{RESET_FORMAT}") + print(f"● MCU #{index}: {selected_mcu}\n") + + if get_confirm("Continue", allow_go_back=True): + Logger.print_status(f"Flashing '{selected_mcu}' ...") + flash_device(self.flash_options) + + self.goto_next_menu() + + def goto_next_menu(self, **kwargs): + pass + # TODO: navigate back to advanced menu after flashing + + # from core.menus.main_menu import MainMenu + # from core.menus.advanced_menu import AdvancedMenu + # + # next_menu = AdvancedMenu() + # next_menu.start() + + +class KlipperFlashMethodHelpMenu(BaseMenu): + def __init__(self): + super().__init__( + header=False, + options={}, + footer_type=BACK_FOOTER, + ) + + def print_menu(self) -> None: + header = " < ? > Help: Flash MCU < ? > " + color = COLOR_YELLOW + count = 62 - len(color) - len(RESET_FORMAT) + subheader1 = f"{COLOR_CYAN}Regular flashing method:{RESET_FORMAT}" + subheader2 = f"{COLOR_CYAN}Updating via SD-Card Update:{RESET_FORMAT}" + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:~^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | {subheader1:<62} | + | The default method to flash controller boards which | + | are connected and updated over USB and not by placing | + | a compiled firmware file onto an internal SD-Card. | + | | + | Common controllers that get flashed that way are: | + | - Arduino Mega 2560 | + | - Fysetc F6 / S6 (used without a Display + SD-Slot) | + | | + | {subheader2:<62} | + | Many popular controller boards ship with a bootloader | + | capable of updating the firmware via SD-Card. | + | Choose this method if your controller board supports | + | this way of updating. This method ONLY works for up- | + | grading firmware. The initial flashing procedure must | + | be done manually per the instructions that apply to | + | your controller board. | + | | + | Common controllers that can be flashed that way are: | + | - BigTreeTech SKR 1.3 / 1.4 (Turbo) / E3 / Mini E3 | + | - Fysetc F6 / S6 (used with a Display + SD-Slot) | + | - Fysetc Spider | + | | + """ + )[1:] + print(menu, end="") + + +class KlipperFlashCommandHelpMenu(BaseMenu): + def __init__(self): + super().__init__( + header=False, + options={}, + footer_type=BACK_FOOTER, + ) + + def print_menu(self) -> None: + header = " < ? > Help: Flash MCU < ? > " + color = COLOR_YELLOW + count = 62 - len(color) - len(RESET_FORMAT) + subheader1 = f"{COLOR_CYAN}make flash:{RESET_FORMAT}" + subheader2 = f"{COLOR_CYAN}make serialflash:{RESET_FORMAT}" + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:~^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | {subheader1:<62} | + | The default command to flash controller board, it | + | will detect selected microcontroller and use suitable | + | tool for flashing it. | + | | + | {subheader2:<62} | + | Special command to flash STM32 microcontrollers in | + | DFU mode but connected via serial. stm32flash command | + | will be used internally. | + | | + """ + )[1:] + print(menu, end="") + + +class KlipperMcuConnectionHelpMenu(BaseMenu): + def __init__(self): + super().__init__( + header=False, + options={}, + footer_type=BACK_FOOTER, + ) + + def print_menu(self) -> None: + header = " < ? > Help: Flash MCU < ? > " + color = COLOR_YELLOW + count = 62 - len(color) - len(RESET_FORMAT) + subheader1 = f"{COLOR_CYAN}USB:{RESET_FORMAT}" + subheader2 = f"{COLOR_CYAN}UART:{RESET_FORMAT}" + menu = textwrap.dedent( + f""" + /=======================================================\\ + | {color}{header:~^{count}}{RESET_FORMAT} | + |-------------------------------------------------------| + | {subheader1:<62} | + | Selecting USB as the connection method will scan the | + | USB ports for connected controller boards. This will | + | be similar to the 'ls /dev/serial/by-id/*' command | + | suggested by the official Klipper documentation for | + | determining successfull USB connections! | + | | + | {subheader2:<62} | + | Selecting UART as the connection method will list all | + | possible UART serial ports. Note: This method ALWAYS | + | returns something as it seems impossible to determine | + | if a valid Klipper controller board is connected or | + | not. Because of that, you MUST know which UART serial | + | port your controller board is connected to when using | + | this connection method. | + | | + """ + )[1:] + print(menu, end="") diff --git a/kiauh/core/menus/advanced_menu.py b/kiauh/core/menus/advanced_menu.py index b76a930..66d9be1 100644 --- a/kiauh/core/menus/advanced_menu.py +++ b/kiauh/core/menus/advanced_menu.py @@ -9,7 +9,7 @@ import textwrap -from components.klipper_firmware.menus.klipper_flash_menu import KlipperFlashMenu +from components.klipper_firmware.menus.klipper_flash_menu import KlipperFlashMethodMenu from core.menus import BACK_FOOTER from core.menus.base_menu import BaseMenu from utils.constants import COLOR_YELLOW, RESET_FORMAT @@ -23,7 +23,7 @@ class AdvancedMenu(BaseMenu): "1": None, "2": None, "3": None, - "4": KlipperFlashMenu, + "4": KlipperFlashMethodMenu, "5": None, "6": None, }, diff --git a/kiauh/utils/system_utils.py b/kiauh/utils/system_utils.py index 991e65e..6ac932f 100644 --- a/kiauh/utils/system_utils.py +++ b/kiauh/utils/system_utils.py @@ -10,7 +10,7 @@ import os import shutil import socket -import subprocess +from subprocess import Popen, PIPE, CalledProcessError, run, DEVNULL import sys import time import urllib.error @@ -19,6 +19,8 @@ import venv from pathlib import Path from typing import List, Literal +import select + from utils.input_utils import get_confirm from utils.logger import Logger from utils.filesystem_utils import check_file_exist @@ -73,7 +75,7 @@ def create_python_venv(target: Path) -> None: except OSError as e: Logger.print_error(f"Error setting up virtualenv:\n{e}") raise - except subprocess.CalledProcessError as e: + except CalledProcessError as e: Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}") raise else: @@ -103,7 +105,7 @@ def update_python_pip(target: Path) -> None: raise FileNotFoundError("Error updating pip! Not found.") command = [pip_location, "install", "-U", "pip"] - result = subprocess.run(command, stderr=subprocess.PIPE, text=True) + result = run(command, stderr=PIPE, text=True) if result.returncode != 0 or result.stderr: Logger.print_error(f"{result.stderr}", False) Logger.print_error("Updating pip failed!") @@ -113,7 +115,7 @@ def update_python_pip(target: Path) -> None: except FileNotFoundError as e: Logger.print_error(e) raise - except subprocess.CalledProcessError as e: + except CalledProcessError as e: Logger.print_error(f"Error updating pip:\n{e.output.decode()}") raise @@ -136,7 +138,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None: "-r", f"{requirements}", ] - result = subprocess.run(command, stderr=subprocess.PIPE, text=True) + result = run(command, stderr=PIPE, text=True) if result.returncode != 0 or result.stderr: Logger.print_error(f"{result.stderr}", False) @@ -144,7 +146,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None: return Logger.print_ok("Installing Python requirements successful!") - except subprocess.CalledProcessError as e: + except CalledProcessError as e: log = f"Error installing Python requirements:\n{e.output.decode()}" Logger.print_error(log) raise @@ -180,14 +182,14 @@ def update_system_package_lists(silent: bool, rls_info_change=False) -> None: if rls_info_change: command.append("--allow-releaseinfo-change") - result = subprocess.run(command, stderr=subprocess.PIPE, text=True) + result = run(command, stderr=PIPE, text=True) if result.returncode != 0 or result.stderr: Logger.print_error(f"{result.stderr}", False) Logger.print_error("Updating system package list failed!") return Logger.print_ok("System package list update successful!") - except subprocess.CalledProcessError as e: + except CalledProcessError as e: kill(f"Error updating system package list:\n{e.stderr.decode()}") @@ -200,10 +202,10 @@ def check_package_install(packages: List[str]) -> List[str]: not_installed = [] for package in packages: command = ["dpkg-query", "-f'${Status}'", "--show", package] - result = subprocess.run( + result = run( command, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, + stdout=PIPE, + stderr=DEVNULL, text=True, ) if "installed" not in result.stdout.strip("'").split(): @@ -224,10 +226,10 @@ def install_system_packages(packages: List[str]) -> None: command = ["sudo", "apt-get", "install", "-y"] for pkg in packages: command.append(pkg) - subprocess.run(command, stderr=subprocess.PIPE, check=True) + run(command, stderr=PIPE, check=True) Logger.print_ok("Packages installed successfully.") - except subprocess.CalledProcessError as e: + except CalledProcessError as e: kill(f"Error installing packages:\n{e.stderr.decode()}") @@ -239,8 +241,8 @@ def mask_system_service(service_name: str) -> None: """ try: command = ["sudo", "systemctl", "mask", service_name] - subprocess.run(command, stderr=subprocess.PIPE, check=True) - except subprocess.CalledProcessError as e: + run(command, stderr=PIPE, check=True) + except CalledProcessError as e: log = f"Unable to mask system service {service_name}: {e.stderr.decode()}" Logger.print_error(log) raise @@ -318,12 +320,12 @@ def set_nginx_permissions() -> None: :return: None """ cmd = f"ls -ld {Path.home()} | cut -d' ' -f1" - homedir_perm = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True) + homedir_perm = run(cmd, shell=True, stdout=PIPE, text=True) homedir_perm = homedir_perm.stdout if homedir_perm.count("x") < 3: Logger.print_status("Granting NGINX the required permissions ...") - subprocess.run(["chmod", "og+x", Path.home()]) + run(["chmod", "og+x", Path.home()]) Logger.print_ok("Permissions granted.") @@ -339,9 +341,30 @@ def control_systemd_service( try: Logger.print_status(f"{action.capitalize()} {name}.service ...") command = ["sudo", "systemctl", action, f"{name}.service"] - subprocess.run(command, stderr=subprocess.PIPE, check=True) + run(command, stderr=PIPE, check=True) Logger.print_ok("OK!") - except subprocess.CalledProcessError as e: + except CalledProcessError as e: log = f"Failed to {action} {name}.service: {e.stderr.decode()}" Logger.print_error(log) raise + + +def log_process(process: Popen) -> None: + """ + Helper method to print stdout of a process in near realtime to the console. + :param process: Process to log the output from + :return: None + """ + while True: + reads = [process.stdout.fileno()] + ret = select.select(reads, [], []) + for fd in ret[0]: + if fd == process.stdout.fileno(): + line = process.stdout.readline() + if line: + print(line.strip(), flush=True) + else: + break + + if process.poll() is not None: + break