mirror of
https://github.com/dw-0/kiauh.git
synced 2025-12-23 15:53:36 +05:00
feat: first implementation of firmware flashing via usb and regular flash command
Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
0
kiauh/components/klipper_firmware/__init__.py
Normal file
0
kiauh/components/klipper_firmware/__init__.py
Normal file
48
kiauh/components/klipper_firmware/flash_options.py
Normal file
48
kiauh/components/klipper_firmware/flash_options.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
# ======================================================================= #
|
||||||
|
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||||
|
# https://github.com/dw-0/kiauh #
|
||||||
|
# #
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||||
|
# ======================================================================= #
|
||||||
|
|
||||||
|
from dataclasses import field, dataclass
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Union, List
|
||||||
|
|
||||||
|
|
||||||
|
class FlashMethod(Enum):
|
||||||
|
REGULAR = "REGULAR"
|
||||||
|
SD_CARD = "SD_CARD"
|
||||||
|
|
||||||
|
|
||||||
|
class FlashCommand(Enum):
|
||||||
|
FLASH = "flash"
|
||||||
|
SERIAL_FLASH = "serialflash"
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionType(Enum):
|
||||||
|
USB = "USB"
|
||||||
|
USB_DFU = "USB_DFU"
|
||||||
|
UART = "UART"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FlashOptions:
|
||||||
|
_instance = None
|
||||||
|
flash_method: Union[FlashMethod, None] = None
|
||||||
|
flash_command: Union[FlashCommand, None] = None
|
||||||
|
connection_type: Union[ConnectionType, None] = None
|
||||||
|
mcu_list: List[str] = field(default_factory=list)
|
||||||
|
selected_mcu: str = ""
|
||||||
|
selected_board: str = ""
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
if not cls._instance:
|
||||||
|
cls._instance = super(FlashOptions, cls).__new__(cls, *args, **kwargs)
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def destroy(cls):
|
||||||
|
cls._instance = None
|
||||||
77
kiauh/components/klipper_firmware/flash_utils.py
Normal file
77
kiauh/components/klipper_firmware/flash_utils.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
# ======================================================================= #
|
||||||
|
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||||
|
# https://github.com/dw-0/kiauh #
|
||||||
|
# #
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||||
|
# ======================================================================= #
|
||||||
|
|
||||||
|
from subprocess import CalledProcessError, check_output, Popen, PIPE, STDOUT
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from components.klipper import KLIPPER_DIR
|
||||||
|
from components.klipper_firmware.flash_options import FlashOptions, FlashCommand
|
||||||
|
from utils.logger import Logger
|
||||||
|
from utils.system_utils import log_process
|
||||||
|
|
||||||
|
|
||||||
|
def find_usb_device_by_id() -> List[str]:
|
||||||
|
try:
|
||||||
|
command = "find /dev/serial/by-id/* 2>/dev/null"
|
||||||
|
output = check_output(command, shell=True, text=True)
|
||||||
|
return output.splitlines()
|
||||||
|
except CalledProcessError as e:
|
||||||
|
Logger.print_error("Unable to find a USB device!")
|
||||||
|
Logger.print_error(e, prefix=False)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def find_uart_device() -> List[str]:
|
||||||
|
try:
|
||||||
|
command = '"find /dev -maxdepth 1 -regextype posix-extended -regex "^\/dev\/tty(AMA0|S0)$" 2>/dev/null"'
|
||||||
|
output = check_output(command, shell=True, text=True)
|
||||||
|
return output.splitlines()
|
||||||
|
except CalledProcessError as e:
|
||||||
|
Logger.print_error("Unable to find a UART device!")
|
||||||
|
Logger.print_error(e, prefix=False)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def find_usb_dfu_device() -> List[str]:
|
||||||
|
try:
|
||||||
|
command = '"lsusb | grep "DFU" | cut -d " " -f 6 2>/dev/null"'
|
||||||
|
output = check_output(command, shell=True, text=True)
|
||||||
|
return output.splitlines()
|
||||||
|
except CalledProcessError as e:
|
||||||
|
Logger.print_error("Unable to find a USB DFU device!")
|
||||||
|
Logger.print_error(e, prefix=False)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def flash_device(flash_options: FlashOptions) -> None:
|
||||||
|
try:
|
||||||
|
if not flash_options.selected_mcu:
|
||||||
|
raise Exception("Missing value for selected_mcu!")
|
||||||
|
|
||||||
|
if flash_options.flash_command is FlashCommand.FLASH:
|
||||||
|
command = [
|
||||||
|
"make",
|
||||||
|
flash_options.flash_command.value,
|
||||||
|
f"FLASH_DEVICE={flash_options.selected_mcu}",
|
||||||
|
]
|
||||||
|
process = Popen(
|
||||||
|
command, cwd=KLIPPER_DIR, stdout=PIPE, stderr=STDOUT, text=True
|
||||||
|
)
|
||||||
|
|
||||||
|
log_process(process)
|
||||||
|
|
||||||
|
rc = process.returncode
|
||||||
|
if rc != 0:
|
||||||
|
raise Exception(f"Flashing failed with returncode: {rc}")
|
||||||
|
else:
|
||||||
|
Logger.print_ok("Flashing successfull!", start="\n", end="\n\n")
|
||||||
|
|
||||||
|
except (Exception, CalledProcessError):
|
||||||
|
Logger.print_error("Flashing failed!", start="\n")
|
||||||
|
Logger.print_error("See the console output above!", end="\n\n")
|
||||||
381
kiauh/components/klipper_firmware/menus/klipper_flash_menu.py
Normal file
381
kiauh/components/klipper_firmware/menus/klipper_flash_menu.py
Normal file
@@ -0,0 +1,381 @@
|
|||||||
|
# ======================================================================= #
|
||||||
|
# Copyright (C) 2020 - 2024 Dominik Willner <th33xitus@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This file is part of KIAUH - Klipper Installation And Update Helper #
|
||||||
|
# https://github.com/dw-0/kiauh #
|
||||||
|
# #
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license #
|
||||||
|
# ======================================================================= #
|
||||||
|
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
from components.klipper_firmware.flash_options import (
|
||||||
|
FlashOptions,
|
||||||
|
FlashMethod,
|
||||||
|
FlashCommand,
|
||||||
|
ConnectionType,
|
||||||
|
)
|
||||||
|
from components.klipper_firmware.flash_utils import (
|
||||||
|
find_usb_device_by_id,
|
||||||
|
find_uart_device,
|
||||||
|
find_usb_dfu_device,
|
||||||
|
flash_device,
|
||||||
|
)
|
||||||
|
from core.menus import BACK_HELP_FOOTER, BACK_FOOTER
|
||||||
|
|
||||||
|
from core.menus.base_menu import BaseMenu
|
||||||
|
from utils.constants import COLOR_CYAN, RESET_FORMAT, COLOR_YELLOW, COLOR_RED
|
||||||
|
from utils.input_utils import get_confirm
|
||||||
|
from utils.logger import Logger
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyUnusedLocal
|
||||||
|
# noinspection PyMethodMayBeStatic
|
||||||
|
class KlipperFlashMethodMenu(BaseMenu):
|
||||||
|
def __init__(self):
|
||||||
|
self.flash_options = FlashOptions()
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={
|
||||||
|
"1": self.select_regular,
|
||||||
|
"2": self.select_sdcard,
|
||||||
|
"h": KlipperFlashMethodHelpMenu,
|
||||||
|
},
|
||||||
|
input_label_txt="Select flash method",
|
||||||
|
footer_type=BACK_HELP_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = " [ Flash MCU ] "
|
||||||
|
color = COLOR_CYAN
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:~^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| Please select the flashing method to flash your MCU. |
|
||||||
|
| Make sure to only select a method your MCU supports. |
|
||||||
|
| Not all MCUs support both methods! |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| |
|
||||||
|
| 1) Regular flashing method |
|
||||||
|
| 2) Updating via SD-Card Update |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
|
|
||||||
|
def select_regular(self, **kwargs):
|
||||||
|
self.flash_options.flash_method = FlashMethod.REGULAR
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def select_sdcard(self, **kwargs):
|
||||||
|
self.flash_options.flash_method = FlashMethod.SD_CARD
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def goto_next_menu(self, **kwargs):
|
||||||
|
next_menu = KlipperFlashCommandMenu(previous_menu=self)
|
||||||
|
next_menu.start()
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyUnusedLocal
|
||||||
|
# noinspection PyMethodMayBeStatic
|
||||||
|
class KlipperFlashCommandMenu(BaseMenu):
|
||||||
|
def __init__(self, previous_menu: BaseMenu):
|
||||||
|
self.flash_options = FlashOptions()
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={
|
||||||
|
"1": self.select_flash,
|
||||||
|
"2": self.select_serialflash,
|
||||||
|
"h": KlipperFlashCommandHelpMenu,
|
||||||
|
},
|
||||||
|
default_option="1",
|
||||||
|
input_label_txt="Select flash command",
|
||||||
|
previous_menu=previous_menu,
|
||||||
|
footer_type=BACK_HELP_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
"""
|
||||||
|
/=======================================================\\
|
||||||
|
| |
|
||||||
|
| Which flash command to use for flashing the MCU? |
|
||||||
|
| 1) make flash (default) |
|
||||||
|
| 2) make serialflash (stm32flash) |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
|
|
||||||
|
def select_flash(self, **kwargs):
|
||||||
|
self.flash_options.flash_command = FlashCommand.FLASH
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def select_serialflash(self, **kwargs):
|
||||||
|
self.flash_options.flash_command = FlashCommand.SERIAL_FLASH
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def goto_next_menu(self, **kwargs):
|
||||||
|
next_menu = KlipperSelectMcuConnectionMenu(previous_menu=self)
|
||||||
|
next_menu.start()
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyUnusedLocal
|
||||||
|
# noinspection PyMethodMayBeStatic
|
||||||
|
class KlipperSelectMcuConnectionMenu(BaseMenu):
|
||||||
|
def __init__(self, previous_menu: BaseMenu):
|
||||||
|
self.flash_options = FlashOptions()
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={
|
||||||
|
"1": self.select_usb,
|
||||||
|
"2": self.select_dfu,
|
||||||
|
"3": self.select_usb_dfu,
|
||||||
|
"h": KlipperMcuConnectionHelpMenu,
|
||||||
|
},
|
||||||
|
input_label_txt="Select connection type",
|
||||||
|
previous_menu=previous_menu,
|
||||||
|
footer_type=BACK_HELP_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = "Make sure that the controller board is connected now!"
|
||||||
|
color = COLOR_YELLOW
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| |
|
||||||
|
| How is the controller board connected to the host? |
|
||||||
|
| 1) USB |
|
||||||
|
| 2) UART |
|
||||||
|
| 3) USB (DFU mode) |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
|
|
||||||
|
def select_usb(self, **kwargs):
|
||||||
|
self.flash_options.connection_type = ConnectionType.USB
|
||||||
|
self.get_mcu_list()
|
||||||
|
|
||||||
|
def select_dfu(self, **kwargs):
|
||||||
|
self.flash_options.connection_type = ConnectionType.UART
|
||||||
|
self.get_mcu_list()
|
||||||
|
|
||||||
|
def select_usb_dfu(self, **kwargs):
|
||||||
|
self.flash_options.connection_type = ConnectionType.USB_DFU
|
||||||
|
self.get_mcu_list()
|
||||||
|
|
||||||
|
def get_mcu_list(self, **kwargs):
|
||||||
|
conn_type = self.flash_options.connection_type
|
||||||
|
|
||||||
|
if conn_type is ConnectionType.USB:
|
||||||
|
Logger.print_status("Identifying MCU connected via USB ...")
|
||||||
|
self.flash_options.mcu_list = find_usb_device_by_id()
|
||||||
|
elif conn_type is ConnectionType.UART:
|
||||||
|
Logger.print_status("Identifying MCU possibly connected via UART ...")
|
||||||
|
self.flash_options.mcu_list = find_uart_device()
|
||||||
|
elif conn_type is ConnectionType.USB_DFU:
|
||||||
|
Logger.print_status("Identifying MCU connected via USB in DFU mode ...")
|
||||||
|
self.flash_options.mcu_list = find_usb_dfu_device()
|
||||||
|
|
||||||
|
if len(self.flash_options.mcu_list) < 1:
|
||||||
|
Logger.print_warn("No MCUs found!")
|
||||||
|
Logger.print_warn("Make sure they are connected and repeat this step.")
|
||||||
|
else:
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def goto_next_menu(self, **kwargs):
|
||||||
|
next_menu = KlipperSelectMcuIdMenu(previous_menu=self)
|
||||||
|
next_menu.start()
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyUnusedLocal
|
||||||
|
# noinspection PyMethodMayBeStatic
|
||||||
|
class KlipperSelectMcuIdMenu(BaseMenu):
|
||||||
|
def __init__(self, previous_menu: BaseMenu):
|
||||||
|
self.flash_options = FlashOptions()
|
||||||
|
self.mcu_list = self.flash_options.mcu_list
|
||||||
|
options = {f"{index}": self.flash_mcu for index in range(len(self.mcu_list))}
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options=options,
|
||||||
|
input_label_txt="Select MCU to flash",
|
||||||
|
previous_menu=previous_menu,
|
||||||
|
footer_type=BACK_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = "!!! ATTENTION !!!"
|
||||||
|
header2 = f"[{COLOR_CYAN}List of available MCUs{RESET_FORMAT}]"
|
||||||
|
color = COLOR_RED
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| Make sure, to select the correct MCU! |
|
||||||
|
| ONLY flash a firmware created for the respective MCU! |
|
||||||
|
| |
|
||||||
|
|{header2:-^64}|
|
||||||
|
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
|
||||||
|
for i, mcu in enumerate(self.mcu_list):
|
||||||
|
mcu = mcu.split("/")[-1]
|
||||||
|
menu += f" ● MCU #{i}: {COLOR_CYAN}{mcu}{RESET_FORMAT}\n"
|
||||||
|
|
||||||
|
print(menu, end="\n")
|
||||||
|
|
||||||
|
def flash_mcu(self, **kwargs):
|
||||||
|
index = int(kwargs.get("opt_index"))
|
||||||
|
selected_mcu = self.mcu_list[index]
|
||||||
|
self.flash_options.selected_mcu = selected_mcu
|
||||||
|
|
||||||
|
print(f"{COLOR_CYAN}###### You selected:{RESET_FORMAT}")
|
||||||
|
print(f"● MCU #{index}: {selected_mcu}\n")
|
||||||
|
|
||||||
|
if get_confirm("Continue", allow_go_back=True):
|
||||||
|
Logger.print_status(f"Flashing '{selected_mcu}' ...")
|
||||||
|
flash_device(self.flash_options)
|
||||||
|
|
||||||
|
self.goto_next_menu()
|
||||||
|
|
||||||
|
def goto_next_menu(self, **kwargs):
|
||||||
|
pass
|
||||||
|
# TODO: navigate back to advanced menu after flashing
|
||||||
|
|
||||||
|
# from core.menus.main_menu import MainMenu
|
||||||
|
# from core.menus.advanced_menu import AdvancedMenu
|
||||||
|
#
|
||||||
|
# next_menu = AdvancedMenu()
|
||||||
|
# next_menu.start()
|
||||||
|
|
||||||
|
|
||||||
|
class KlipperFlashMethodHelpMenu(BaseMenu):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={},
|
||||||
|
footer_type=BACK_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = " < ? > Help: Flash MCU < ? > "
|
||||||
|
color = COLOR_YELLOW
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
subheader1 = f"{COLOR_CYAN}Regular flashing method:{RESET_FORMAT}"
|
||||||
|
subheader2 = f"{COLOR_CYAN}Updating via SD-Card Update:{RESET_FORMAT}"
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:~^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| {subheader1:<62} |
|
||||||
|
| The default method to flash controller boards which |
|
||||||
|
| are connected and updated over USB and not by placing |
|
||||||
|
| a compiled firmware file onto an internal SD-Card. |
|
||||||
|
| |
|
||||||
|
| Common controllers that get flashed that way are: |
|
||||||
|
| - Arduino Mega 2560 |
|
||||||
|
| - Fysetc F6 / S6 (used without a Display + SD-Slot) |
|
||||||
|
| |
|
||||||
|
| {subheader2:<62} |
|
||||||
|
| Many popular controller boards ship with a bootloader |
|
||||||
|
| capable of updating the firmware via SD-Card. |
|
||||||
|
| Choose this method if your controller board supports |
|
||||||
|
| this way of updating. This method ONLY works for up- |
|
||||||
|
| grading firmware. The initial flashing procedure must |
|
||||||
|
| be done manually per the instructions that apply to |
|
||||||
|
| your controller board. |
|
||||||
|
| |
|
||||||
|
| Common controllers that can be flashed that way are: |
|
||||||
|
| - BigTreeTech SKR 1.3 / 1.4 (Turbo) / E3 / Mini E3 |
|
||||||
|
| - Fysetc F6 / S6 (used with a Display + SD-Slot) |
|
||||||
|
| - Fysetc Spider |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
|
|
||||||
|
|
||||||
|
class KlipperFlashCommandHelpMenu(BaseMenu):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={},
|
||||||
|
footer_type=BACK_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = " < ? > Help: Flash MCU < ? > "
|
||||||
|
color = COLOR_YELLOW
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
subheader1 = f"{COLOR_CYAN}make flash:{RESET_FORMAT}"
|
||||||
|
subheader2 = f"{COLOR_CYAN}make serialflash:{RESET_FORMAT}"
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:~^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| {subheader1:<62} |
|
||||||
|
| The default command to flash controller board, it |
|
||||||
|
| will detect selected microcontroller and use suitable |
|
||||||
|
| tool for flashing it. |
|
||||||
|
| |
|
||||||
|
| {subheader2:<62} |
|
||||||
|
| Special command to flash STM32 microcontrollers in |
|
||||||
|
| DFU mode but connected via serial. stm32flash command |
|
||||||
|
| will be used internally. |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
|
|
||||||
|
|
||||||
|
class KlipperMcuConnectionHelpMenu(BaseMenu):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
header=False,
|
||||||
|
options={},
|
||||||
|
footer_type=BACK_FOOTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
def print_menu(self) -> None:
|
||||||
|
header = " < ? > Help: Flash MCU < ? > "
|
||||||
|
color = COLOR_YELLOW
|
||||||
|
count = 62 - len(color) - len(RESET_FORMAT)
|
||||||
|
subheader1 = f"{COLOR_CYAN}USB:{RESET_FORMAT}"
|
||||||
|
subheader2 = f"{COLOR_CYAN}UART:{RESET_FORMAT}"
|
||||||
|
menu = textwrap.dedent(
|
||||||
|
f"""
|
||||||
|
/=======================================================\\
|
||||||
|
| {color}{header:~^{count}}{RESET_FORMAT} |
|
||||||
|
|-------------------------------------------------------|
|
||||||
|
| {subheader1:<62} |
|
||||||
|
| Selecting USB as the connection method will scan the |
|
||||||
|
| USB ports for connected controller boards. This will |
|
||||||
|
| be similar to the 'ls /dev/serial/by-id/*' command |
|
||||||
|
| suggested by the official Klipper documentation for |
|
||||||
|
| determining successfull USB connections! |
|
||||||
|
| |
|
||||||
|
| {subheader2:<62} |
|
||||||
|
| Selecting UART as the connection method will list all |
|
||||||
|
| possible UART serial ports. Note: This method ALWAYS |
|
||||||
|
| returns something as it seems impossible to determine |
|
||||||
|
| if a valid Klipper controller board is connected or |
|
||||||
|
| not. Because of that, you MUST know which UART serial |
|
||||||
|
| port your controller board is connected to when using |
|
||||||
|
| this connection method. |
|
||||||
|
| |
|
||||||
|
"""
|
||||||
|
)[1:]
|
||||||
|
print(menu, end="")
|
||||||
@@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
import textwrap
|
import textwrap
|
||||||
|
|
||||||
from components.klipper_firmware.menus.klipper_flash_menu import KlipperFlashMenu
|
from components.klipper_firmware.menus.klipper_flash_menu import KlipperFlashMethodMenu
|
||||||
from core.menus import BACK_FOOTER
|
from core.menus import BACK_FOOTER
|
||||||
from core.menus.base_menu import BaseMenu
|
from core.menus.base_menu import BaseMenu
|
||||||
from utils.constants import COLOR_YELLOW, RESET_FORMAT
|
from utils.constants import COLOR_YELLOW, RESET_FORMAT
|
||||||
@@ -23,7 +23,7 @@ class AdvancedMenu(BaseMenu):
|
|||||||
"1": None,
|
"1": None,
|
||||||
"2": None,
|
"2": None,
|
||||||
"3": None,
|
"3": None,
|
||||||
"4": KlipperFlashMenu,
|
"4": KlipperFlashMethodMenu,
|
||||||
"5": None,
|
"5": None,
|
||||||
"6": None,
|
"6": None,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -10,7 +10,7 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import socket
|
import socket
|
||||||
import subprocess
|
from subprocess import Popen, PIPE, CalledProcessError, run, DEVNULL
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import urllib.error
|
import urllib.error
|
||||||
@@ -19,6 +19,8 @@ import venv
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Literal
|
from typing import List, Literal
|
||||||
|
|
||||||
|
import select
|
||||||
|
|
||||||
from utils.input_utils import get_confirm
|
from utils.input_utils import get_confirm
|
||||||
from utils.logger import Logger
|
from utils.logger import Logger
|
||||||
from utils.filesystem_utils import check_file_exist
|
from utils.filesystem_utils import check_file_exist
|
||||||
@@ -73,7 +75,7 @@ def create_python_venv(target: Path) -> None:
|
|||||||
except OSError as e:
|
except OSError as e:
|
||||||
Logger.print_error(f"Error setting up virtualenv:\n{e}")
|
Logger.print_error(f"Error setting up virtualenv:\n{e}")
|
||||||
raise
|
raise
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}")
|
Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}")
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
@@ -103,7 +105,7 @@ def update_python_pip(target: Path) -> None:
|
|||||||
raise FileNotFoundError("Error updating pip! Not found.")
|
raise FileNotFoundError("Error updating pip! Not found.")
|
||||||
|
|
||||||
command = [pip_location, "install", "-U", "pip"]
|
command = [pip_location, "install", "-U", "pip"]
|
||||||
result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
|
result = run(command, stderr=PIPE, text=True)
|
||||||
if result.returncode != 0 or result.stderr:
|
if result.returncode != 0 or result.stderr:
|
||||||
Logger.print_error(f"{result.stderr}", False)
|
Logger.print_error(f"{result.stderr}", False)
|
||||||
Logger.print_error("Updating pip failed!")
|
Logger.print_error("Updating pip failed!")
|
||||||
@@ -113,7 +115,7 @@ def update_python_pip(target: Path) -> None:
|
|||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
Logger.print_error(e)
|
Logger.print_error(e)
|
||||||
raise
|
raise
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
Logger.print_error(f"Error updating pip:\n{e.output.decode()}")
|
Logger.print_error(f"Error updating pip:\n{e.output.decode()}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@@ -136,7 +138,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
|
|||||||
"-r",
|
"-r",
|
||||||
f"{requirements}",
|
f"{requirements}",
|
||||||
]
|
]
|
||||||
result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
|
result = run(command, stderr=PIPE, text=True)
|
||||||
|
|
||||||
if result.returncode != 0 or result.stderr:
|
if result.returncode != 0 or result.stderr:
|
||||||
Logger.print_error(f"{result.stderr}", False)
|
Logger.print_error(f"{result.stderr}", False)
|
||||||
@@ -144,7 +146,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
Logger.print_ok("Installing Python requirements successful!")
|
Logger.print_ok("Installing Python requirements successful!")
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
log = f"Error installing Python requirements:\n{e.output.decode()}"
|
log = f"Error installing Python requirements:\n{e.output.decode()}"
|
||||||
Logger.print_error(log)
|
Logger.print_error(log)
|
||||||
raise
|
raise
|
||||||
@@ -180,14 +182,14 @@ def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
|
|||||||
if rls_info_change:
|
if rls_info_change:
|
||||||
command.append("--allow-releaseinfo-change")
|
command.append("--allow-releaseinfo-change")
|
||||||
|
|
||||||
result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
|
result = run(command, stderr=PIPE, text=True)
|
||||||
if result.returncode != 0 or result.stderr:
|
if result.returncode != 0 or result.stderr:
|
||||||
Logger.print_error(f"{result.stderr}", False)
|
Logger.print_error(f"{result.stderr}", False)
|
||||||
Logger.print_error("Updating system package list failed!")
|
Logger.print_error("Updating system package list failed!")
|
||||||
return
|
return
|
||||||
|
|
||||||
Logger.print_ok("System package list update successful!")
|
Logger.print_ok("System package list update successful!")
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
kill(f"Error updating system package list:\n{e.stderr.decode()}")
|
kill(f"Error updating system package list:\n{e.stderr.decode()}")
|
||||||
|
|
||||||
|
|
||||||
@@ -200,10 +202,10 @@ def check_package_install(packages: List[str]) -> List[str]:
|
|||||||
not_installed = []
|
not_installed = []
|
||||||
for package in packages:
|
for package in packages:
|
||||||
command = ["dpkg-query", "-f'${Status}'", "--show", package]
|
command = ["dpkg-query", "-f'${Status}'", "--show", package]
|
||||||
result = subprocess.run(
|
result = run(
|
||||||
command,
|
command,
|
||||||
stdout=subprocess.PIPE,
|
stdout=PIPE,
|
||||||
stderr=subprocess.DEVNULL,
|
stderr=DEVNULL,
|
||||||
text=True,
|
text=True,
|
||||||
)
|
)
|
||||||
if "installed" not in result.stdout.strip("'").split():
|
if "installed" not in result.stdout.strip("'").split():
|
||||||
@@ -224,10 +226,10 @@ def install_system_packages(packages: List[str]) -> None:
|
|||||||
command = ["sudo", "apt-get", "install", "-y"]
|
command = ["sudo", "apt-get", "install", "-y"]
|
||||||
for pkg in packages:
|
for pkg in packages:
|
||||||
command.append(pkg)
|
command.append(pkg)
|
||||||
subprocess.run(command, stderr=subprocess.PIPE, check=True)
|
run(command, stderr=PIPE, check=True)
|
||||||
|
|
||||||
Logger.print_ok("Packages installed successfully.")
|
Logger.print_ok("Packages installed successfully.")
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
kill(f"Error installing packages:\n{e.stderr.decode()}")
|
kill(f"Error installing packages:\n{e.stderr.decode()}")
|
||||||
|
|
||||||
|
|
||||||
@@ -239,8 +241,8 @@ def mask_system_service(service_name: str) -> None:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
command = ["sudo", "systemctl", "mask", service_name]
|
command = ["sudo", "systemctl", "mask", service_name]
|
||||||
subprocess.run(command, stderr=subprocess.PIPE, check=True)
|
run(command, stderr=PIPE, check=True)
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
log = f"Unable to mask system service {service_name}: {e.stderr.decode()}"
|
log = f"Unable to mask system service {service_name}: {e.stderr.decode()}"
|
||||||
Logger.print_error(log)
|
Logger.print_error(log)
|
||||||
raise
|
raise
|
||||||
@@ -318,12 +320,12 @@ def set_nginx_permissions() -> None:
|
|||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
cmd = f"ls -ld {Path.home()} | cut -d' ' -f1"
|
cmd = f"ls -ld {Path.home()} | cut -d' ' -f1"
|
||||||
homedir_perm = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True)
|
homedir_perm = run(cmd, shell=True, stdout=PIPE, text=True)
|
||||||
homedir_perm = homedir_perm.stdout
|
homedir_perm = homedir_perm.stdout
|
||||||
|
|
||||||
if homedir_perm.count("x") < 3:
|
if homedir_perm.count("x") < 3:
|
||||||
Logger.print_status("Granting NGINX the required permissions ...")
|
Logger.print_status("Granting NGINX the required permissions ...")
|
||||||
subprocess.run(["chmod", "og+x", Path.home()])
|
run(["chmod", "og+x", Path.home()])
|
||||||
Logger.print_ok("Permissions granted.")
|
Logger.print_ok("Permissions granted.")
|
||||||
|
|
||||||
|
|
||||||
@@ -339,9 +341,30 @@ def control_systemd_service(
|
|||||||
try:
|
try:
|
||||||
Logger.print_status(f"{action.capitalize()} {name}.service ...")
|
Logger.print_status(f"{action.capitalize()} {name}.service ...")
|
||||||
command = ["sudo", "systemctl", action, f"{name}.service"]
|
command = ["sudo", "systemctl", action, f"{name}.service"]
|
||||||
subprocess.run(command, stderr=subprocess.PIPE, check=True)
|
run(command, stderr=PIPE, check=True)
|
||||||
Logger.print_ok("OK!")
|
Logger.print_ok("OK!")
|
||||||
except subprocess.CalledProcessError as e:
|
except CalledProcessError as e:
|
||||||
log = f"Failed to {action} {name}.service: {e.stderr.decode()}"
|
log = f"Failed to {action} {name}.service: {e.stderr.decode()}"
|
||||||
Logger.print_error(log)
|
Logger.print_error(log)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def log_process(process: Popen) -> None:
|
||||||
|
"""
|
||||||
|
Helper method to print stdout of a process in near realtime to the console.
|
||||||
|
:param process: Process to log the output from
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
reads = [process.stdout.fileno()]
|
||||||
|
ret = select.select(reads, [], [])
|
||||||
|
for fd in ret[0]:
|
||||||
|
if fd == process.stdout.fileno():
|
||||||
|
line = process.stdout.readline()
|
||||||
|
if line:
|
||||||
|
print(line.strip(), flush=True)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if process.poll() is not None:
|
||||||
|
break
|
||||||
|
|||||||
Reference in New Issue
Block a user