mirror of
https://github.com/dw-0/kiauh.git
synced 2025-12-13 02:24:27 +05:00
95 lines
3.2 KiB
Python
Executable File
95 lines
3.2 KiB
Python
Executable File
# Run a shell command via gcode
|
|
#
|
|
# Copyright (C) 2019 Eric Callahan <arksine.code@gmail.com>
|
|
#
|
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import logging
|
|
|
|
|
|
class ShellCommand:
|
|
def __init__(self, config):
|
|
self.name = config.get_name().split()[-1]
|
|
self.printer = config.get_printer()
|
|
self.gcode = self.printer.lookup_object("gcode")
|
|
cmd = config.get("command")
|
|
cmd = os.path.expanduser(cmd)
|
|
self.command = shlex.split(cmd)
|
|
self.timeout = config.getfloat("timeout", 2.0, above=0.0)
|
|
self.verbose = config.getboolean("verbose", True)
|
|
self.proc_fd = None
|
|
self.partial_output = ""
|
|
self.gcode.register_mux_command(
|
|
"RUN_SHELL_COMMAND",
|
|
"CMD",
|
|
self.name,
|
|
self.cmd_RUN_SHELL_COMMAND,
|
|
desc=self.cmd_RUN_SHELL_COMMAND_help,
|
|
)
|
|
|
|
def _process_output(self, eventime):
|
|
if self.proc_fd is None:
|
|
return
|
|
try:
|
|
data = os.read(self.proc_fd, 4096)
|
|
except Exception:
|
|
pass
|
|
data = self.partial_output + data.decode()
|
|
if "\n" not in data:
|
|
self.partial_output = data
|
|
return
|
|
elif data[-1] != "\n":
|
|
split = data.rfind("\n") + 1
|
|
self.partial_output = data[split:]
|
|
data = data[:split]
|
|
else:
|
|
self.partial_output = ""
|
|
self.gcode.respond_info(data)
|
|
|
|
cmd_RUN_SHELL_COMMAND_help = "Run a linux shell command"
|
|
|
|
def cmd_RUN_SHELL_COMMAND(self, params):
|
|
gcode_params = params.get("PARAMS", "")
|
|
gcode_params = shlex.split(gcode_params)
|
|
reactor = self.printer.get_reactor()
|
|
try:
|
|
proc = subprocess.Popen(
|
|
self.command + gcode_params,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
except Exception:
|
|
logging.exception("shell_command: Command {%s} failed" % (self.name))
|
|
raise self.gcode.error("Error running command {%s}" % (self.name))
|
|
if self.verbose:
|
|
self.proc_fd = proc.stdout.fileno()
|
|
self.gcode.respond_info("Running Command {%s}...:" % (self.name))
|
|
hdl = reactor.register_fd(self.proc_fd, self._process_output)
|
|
eventtime = reactor.monotonic()
|
|
endtime = eventtime + self.timeout
|
|
complete = False
|
|
while eventtime < endtime:
|
|
eventtime = reactor.pause(eventtime + 0.05)
|
|
if proc.poll() is not None:
|
|
complete = True
|
|
break
|
|
if not complete:
|
|
proc.terminate()
|
|
if self.verbose:
|
|
if self.partial_output:
|
|
self.gcode.respond_info(self.partial_output)
|
|
self.partial_output = ""
|
|
if complete:
|
|
msg = "Command {%s} finished\n" % (self.name)
|
|
else:
|
|
msg = "Command {%s} timed out" % (self.name)
|
|
self.gcode.respond_info(msg)
|
|
reactor.unregister_fd(hdl)
|
|
self.proc_fd = None
|
|
|
|
|
|
def load_config_prefix(config):
|
|
return ShellCommand(config)
|