Source code for koapy.utils.subprocess

import inspect
import subprocess
import sys
import textwrap

from koapy.utils.logging import get_logger

[docs]logger = get_logger(__name__)
[docs]def function_to_script(func): function_sig = inspect.signature(func) assert all( p.default != p.empty for p in function_sig.parameters ), "Function should not require any parameters" function_name = func.__name__ function_impl = inspect.getsource(func) function_impl = textwrap.dedent(function_impl) script = ( textwrap.dedent( """ %s if __name__ == '__main__': %s() """ ) % (function_impl, function_name) ) return script
[docs]def function_to_subprocess_args(func, executable=None): if executable is None: executable = sys.executable script = function_to_script(func) args = [executable, "-c", script] return args
[docs]def run_file(filename, *args, executable=None, **kwargs): if executable is None: executable = sys.executable cmd = [executable, filename] return subprocess.check_call(cmd, *args, **kwargs)
[docs]def run_script(script, *args, executable=None, **kwargs): if executable is None: executable = sys.executable cmd = [executable, "-c", script] return subprocess.check_call(cmd, *args, **kwargs)
[docs]def run_function(function, *args, executable=None, **kwargs): script = function_to_script(function) return run_script(script, *args, executable=executable, **kwargs)
[docs]def quote(s): return '"' + s.replace('"', '`"') + '"'
[docs]def run_as_admin(cmd, cwd=None, check=True, wait=True): import win32con import win32event import win32process # pyright: reportMissingImports=false # pylint: disable=import-error,no-name-in-module from win32com.shell import shellcon from win32com.shell.shell import ShellExecuteEx kwargs = dict( nShow=win32con.SW_SHOWNORMAL, fMask=shellcon.SEE_MASK_NOCLOSEPROCESS, lpVerb="runas", lpFile=cmd[0], lpParameters=" ".join(cmd[1:]), ) if cwd is not None: kwargs["lpDirectory"] = cwd logger.info("Running command: %s", " ".join(cmd)) procInfo = ShellExecuteEx(**kwargs) if check or wait: procHandle = procInfo["hProcess"] _ = win32event.WaitForSingleObject(procHandle, win32event.INFINITE) rc = win32process.GetExitCodeProcess(procHandle) logger.info("Process handle %s returned code %d", procHandle, rc) if check and rc < 0: raise subprocess.CalledProcessError(rc, cmd) else: rc = None return rc
[docs]def create_job_object_for_cleanup(): # https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108s import win32job jobAttributes = None jobName = "" hJob = win32job.CreateJobObject(jobAttributes, jobName) extendedInfo = win32job.QueryInformationJobObject( hJob, win32job.JobObjectExtendedLimitInformation ) basicLimitInformation = extendedInfo["BasicLimitInformation"] basicLimitInformation["LimitFlags"] = win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE win32job.SetInformationJobObject( hJob, win32job.JobObjectExtendedLimitInformation, extendedInfo, ) return hJob
[docs]job_handle = create_job_object_for_cleanup()
[docs]def make_process_die_when_parent_dies(pid): assert pid != 0 # https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108s import win32api import win32con import win32job desiredAccess = win32con.PROCESS_TERMINATE | win32con.PROCESS_SET_QUOTA inheritHandle = False hProcess = win32api.OpenProcess( desiredAccess, inheritHandle, pid, ) # process will be terminated when job is destroyed # job will be destroyed when its last handle is closed # job handle will be closed when it loses its reference, hJob in this case hJob = job_handle win32job.AssignProcessToJobObject(hJob, hProcess)
[docs]class Popen(subprocess.Popen): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) make_process_die_when_parent_dies(self.pid)