From 2ea2f006fce82fdb1b4db8c1a97fe157a280a477 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Sat, 11 Jan 2025 12:52:55 +0100 Subject: [PATCH] Refactor threaded job interactions. --- law/contrib/arc/job.py | 19 ++++----- law/contrib/cms/job.py | 31 ++++++++++----- law/contrib/glite/job.py | 19 ++++----- law/contrib/htcondor/job.py | 26 +++++++------ law/contrib/lsf/job.py | 16 ++++---- law/contrib/slurm/job.py | 18 +++++---- law/job/base.py | 22 ++++++++--- law/util.py | 77 +++++++++++++++++++++++++------------ 8 files changed, 144 insertions(+), 84 deletions(-) diff --git a/law/contrib/arc/job.py b/law/contrib/arc/job.py index f3ca5d20..ff1592fe 100644 --- a/law/contrib/arc/job.py +++ b/law/contrib/arc/job.py @@ -18,7 +18,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles from law.target.file import get_path -from law.util import interruptable_popen, make_list, make_unique, quote_cmd +from law.util import make_list, make_unique, quote_cmd, interruptable_popen from law.logger import get_logger @@ -48,7 +48,8 @@ def __init__(self, job_list=None, ce=None, threads=1): self.ce = ce self.threads = threads - def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, silent=False): + def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, silent=False, + _processes=None): # default arguments if job_list is None: job_list = self.job_list @@ -81,7 +82,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil # run the command logger.debug("submit arc job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, cwd=job_file_dir) + stdout=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes) # in some cases, the return code is 0 but the ce did not respond valid job ids job_ids = [] @@ -117,7 +118,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil raise Exception("submission of arc job(s) '{}' failed:\n{}".format(job_files, out)) - def cancel(self, job_id, job_list=None, silent=False): + def cancel(self, job_id, job_list=None, silent=False, _processes=None): # default arguments if job_list is None: job_list = self.job_list @@ -135,7 +136,7 @@ def cancel(self, job_id, job_list=None, silent=False): # run it logger.debug("cancel arc job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -145,7 +146,7 @@ def cancel(self, job_id, job_list=None, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def cleanup(self, job_id, job_list=None, silent=False): + def cleanup(self, job_id, job_list=None, silent=False, _processes=None): # default arguments if job_list is None: job_list = self.job_list @@ -163,7 +164,7 @@ def cleanup(self, job_id, job_list=None, silent=False): # run it logger.debug("cleanup arc job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -173,7 +174,7 @@ def cleanup(self, job_id, job_list=None, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def query(self, job_id, job_list=None, silent=False): + def query(self, job_id, job_list=None, silent=False, _processes=None): # default arguments if job_list is None: job_list = self.job_list @@ -191,7 +192,7 @@ def query(self, job_id, job_list=None, silent=False): # run it logger.debug("query arc job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, kill_timeout=2, processes=_processes) # handle errors if code != 0: diff --git a/law/contrib/cms/job.py b/law/contrib/cms/job.py index 8fbb56d1..d3e8b153 100644 --- a/law/contrib/cms/job.py +++ b/law/contrib/cms/job.py @@ -27,7 +27,7 @@ from law.job.dashboard import BaseJobDashboard from law.target.file import get_path from law.util import ( - DotDict, interruptable_popen, make_list, make_unique, quote_cmd, no_value, rel_path, + DotDict, make_list, make_unique, quote_cmd, no_value, rel_path, interruptable_popen, ) from law.logger import get_logger @@ -138,8 +138,12 @@ def _apply_group(self, func, result_type, group_func, job_objs, *args, **kwargs) **kwargs # noqa ) + def _check_proj_dir(self, proj_dir): + if not os.path.isdir(str(proj_dir)): + raise Exception("project directory '{}' does not exist".format(proj_dir)) + def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_username=None, - retries=0, retry_delay=3, silent=False): + retries=0, retry_delay=3, silent=False, _processes=None): # default arguments if proxy is None: proxy = self.proxy @@ -163,7 +167,8 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us # crab prints everything to stdout logger.debug("submit crab jobs with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, cwd=job_file_dir, env=self.cmssw_env) + stdout=subprocess.PIPE, cwd=job_file_dir, env=self.cmssw_env, kill_timeout=2, + processes=_processes) # handle errors if code != 0: @@ -171,7 +176,8 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us job_file, code, out)) # remove the project directory - proj_dir = self._proj_dir_from_job_file(job_file, self.cmssw_env) + proj_dir = self._proj_dir_from_job_file(job_file, self.cmssw_env, + _processes=_processes) if proj_dir and os.path.isdir(proj_dir): logger.debug("removing crab project '{}' from previous attempt".format( proj_dir)) @@ -225,7 +231,9 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us return job_ids def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, - silent=False): + silent=False, _processes=None): + self._check_proj_dir(proj_dir) + if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) @@ -240,7 +248,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user # run it logger.debug("cancel crab job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, env=self.cmssw_env) + stdout=subprocess.PIPE, env=self.cmssw_env, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -265,7 +273,9 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use return {job_id: None for job_id in job_ids} def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, - skip_transfers=None, silent=False): + skip_transfers=None, silent=False, _processes=None): + self._check_proj_dir(proj_dir) + proj_dir = str(proj_dir) log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log")) if job_ids is None: @@ -286,7 +296,8 @@ def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_usern # run it logger.debug("query crab job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.cmssw_env) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.cmssw_env, kill_timeout=2, + processes=_processes) # handle errors if code != 0: @@ -414,7 +425,7 @@ def extra(job_id, job_data=None): return query_data @classmethod - def _proj_dir_from_job_file(cls, job_file, cmssw_env): + def _proj_dir_from_job_file(cls, job_file, cmssw_env, _processes=None): work_area = None request_name = None @@ -454,7 +465,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env): cfg = mod["cfg"] print(join(cfg.General.workArea, "crab_" + cfg.General.requestName))'""".format(py_exec, job_file) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, env=cmssw_env) + stdout=subprocess.PIPE, env=cmssw_env, kill_timeout=2, processes=_processes) if code == 0: path = out.strip().replace("\r\n", "\n").split("\n")[-1] path = os.path.expandvars(os.path.expanduser(path)) diff --git a/law/contrib/glite/job.py b/law/contrib/glite/job.py index 262fdc90..9776549e 100644 --- a/law/contrib/glite/job.py +++ b/law/contrib/glite/job.py @@ -17,7 +17,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles from law.target.file import get_path -from law.util import interruptable_popen, make_list, make_unique, quote_cmd +from law.util import make_list, make_unique, quote_cmd, interruptable_popen from law.logger import get_logger @@ -44,7 +44,8 @@ def __init__(self, ce=None, delegation_id=None, threads=1): self.delegation_id = delegation_id self.threads = threads - def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3, silent=False): + def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3, silent=False, + _processes=None): # default arguments if ce is None: ce = self.ce @@ -80,7 +81,7 @@ def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3 # glite prints everything to stdout logger.debug("submit glite job with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, cwd=job_file_dir) + stdout=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes) # in some cases, the return code is 0 but the ce did not respond with a valid id if code == 0: @@ -106,7 +107,7 @@ def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3 raise Exception("submission of glite job '{}' failed:\n{}".format(job_file, out)) - def cancel(self, job_id, silent=False): + def cancel(self, job_id, silent=False, _processes=None): chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) @@ -117,7 +118,7 @@ def cancel(self, job_id, silent=False): # run it logger.debug("cancel glite job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -127,7 +128,7 @@ def cancel(self, job_id, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def cleanup(self, job_id, silent=False): + def cleanup(self, job_id, silent=False, _processes=None): chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) @@ -138,7 +139,7 @@ def cleanup(self, job_id, silent=False): # run it logger.debug("cleanup glite job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -148,7 +149,7 @@ def cleanup(self, job_id, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def query(self, job_id, silent=False): + def query(self, job_id, silent=False, _processes=None): chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) @@ -159,7 +160,7 @@ def query(self, job_id, silent=False): # run it logger.debug("query glite job(s) with command '{}'".format(cmd)) code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, kill_timeout=2, processes=_processes) # handle errors if code != 0: diff --git a/law/contrib/htcondor/job.py b/law/contrib/htcondor/job.py index 4d991676..81ac693e 100644 --- a/law/contrib/htcondor/job.py +++ b/law/contrib/htcondor/job.py @@ -17,7 +17,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles from law.target.file import get_path -from law.util import interruptable_popen, make_list, make_unique, quote_cmd +from law.util import make_list, make_unique, quote_cmd, interruptable_popen from law.logger import get_logger from law.contrib.htcondor.util import get_htcondor_version @@ -76,7 +76,7 @@ def cleanup_batch(self, *args, **kwargs): raise NotImplementedError("HTCondorJobManager.cleanup_batch is not implemented") def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=False, retries=0, - retry_delay=3, silent=False): + retry_delay=3, silent=False, _processes=None): # signature is the superset for both grouped and batched submission, and the dispatching to # the actual submission implementation is based on the presence of job_files kwargs = { @@ -86,6 +86,7 @@ def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=Fals "retries": retries, "retry_delay": retry_delay, "silent": silent, + "_processes": _processes, } if job_files is None: @@ -97,7 +98,7 @@ def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=Fals return func(job_file, **kwargs) def _submit_impl_batched(self, job_file, pool=None, scheduler=None, spool=False, retries=0, - retry_delay=3, silent=False): + retry_delay=3, silent=False, _processes=None): # default arguments if pool is None: pool = self.pool @@ -156,7 +157,8 @@ def has_initialdir(job_file): # run the command logger.debug("submit htcondor job with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_files[0])) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_files[0]), + kill_timeout=2, processes=_processes) # get the job id(s) if code == 0: @@ -193,7 +195,7 @@ def has_initialdir(job_file): job_files_repr, err)) def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=None, spool=False, - retries=0, retry_delay=3, silent=False): + retries=0, retry_delay=3, silent=False, _processes=None): # default arguments if pool is None: pool = self.pool @@ -216,7 +218,8 @@ def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=No # run the command logger.debug("submit htcondor job with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file)) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file), + kill_timeout=2, processes=_processes) # get the job id(s) if code == 0: @@ -250,7 +253,7 @@ def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=No raise Exception("submission of htcondor job(s) '{}' failed:\n{}".format(job_file, err)) - def cancel(self, job_id, pool=None, scheduler=None, silent=False): + def cancel(self, job_id, pool=None, scheduler=None, silent=False, _processes=None): # default arguments if pool is None: pool = self.pool @@ -272,7 +275,7 @@ def cancel(self, job_id, pool=None, scheduler=None, silent=False): # run it logger.debug("cancel htcondor job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -281,7 +284,7 @@ def cancel(self, job_id, pool=None, scheduler=None, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def query(self, job_id, pool=None, scheduler=None, user=None, silent=False): + def query(self, job_id, pool=None, scheduler=None, user=None, silent=False, _processes=None): # default arguments if pool is None: pool = self.pool @@ -313,7 +316,7 @@ def query(self, job_id, pool=None, scheduler=None, user=None, silent=False): logger.debug("query htcondor job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # handle errors if code != 0: @@ -345,7 +348,8 @@ def query(self, job_id, pool=None, scheduler=None, user=None, silent=False): logger.debug("query htcondor job history with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, + processes=_processes) # handle errors if code != 0: diff --git a/law/contrib/lsf/job.py b/law/contrib/lsf/job.py index eceec96f..9a07a601 100644 --- a/law/contrib/lsf/job.py +++ b/law/contrib/lsf/job.py @@ -18,7 +18,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles from law.target.file import get_path -from law.util import interruptable_popen, make_list, make_unique, quote_cmd +from law.util import make_list, make_unique, quote_cmd, interruptable_popen from law.logger import get_logger from law.contrib.lsf.util import get_lsf_version @@ -57,7 +57,8 @@ def cleanup(self, *args, **kwargs): def cleanup_batch(self, *args, **kwargs): raise NotImplementedError("LSFJobManager.cleanup_batch is not implemented") - def submit(self, job_file, queue=None, emails=None, retries=0, retry_delay=3, silent=False): + def submit(self, job_file, queue=None, emails=None, retries=0, retry_delay=3, silent=False, + _processes=None): # default arguments if queue is None: queue = self.queue @@ -78,7 +79,8 @@ def submit(self, job_file, queue=None, emails=None, retries=0, retry_delay=3, si # run the command logger.debug("submit lsf job with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job_file_dir) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, + processes=_processes) # get the job id if code == 0: @@ -106,7 +108,7 @@ def submit(self, job_file, queue=None, emails=None, retries=0, retry_delay=3, si raise Exception("submission of lsf job '{}' failed: \n{}".format(job_file, err)) - def cancel(self, job_id, queue=None, silent=False): + def cancel(self, job_id, queue=None, silent=False, _processes=None): # default arguments if queue is None: queue = self.queue @@ -124,7 +126,7 @@ def cancel(self, job_id, queue=None, silent=False): # run it logger.debug("cancel lsf job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -133,7 +135,7 @@ def cancel(self, job_id, queue=None, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def query(self, job_id, queue=None, silent=False): + def query(self, job_id, queue=None, silent=False, _processes=None): # default arguments if queue is None: queue = self.queue @@ -153,7 +155,7 @@ def query(self, job_id, queue=None, silent=False): # run it logger.debug("query lsf job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # handle errors if code != 0: diff --git a/law/contrib/slurm/job.py b/law/contrib/slurm/job.py index 7ccb1743..4f1735b3 100644 --- a/law/contrib/slurm/job.py +++ b/law/contrib/slurm/job.py @@ -16,7 +16,7 @@ from law.config import Config from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile from law.target.file import get_path -from law.util import interruptable_popen, make_list, quote_cmd +from law.util import make_list, quote_cmd, interruptable_popen from law.logger import get_logger @@ -52,7 +52,8 @@ def cleanup(self, *args, **kwargs): def cleanup_batch(self, *args, **kwargs): raise NotImplementedError("SlurmJobManager.cleanup_batch is not implemented") - def submit(self, job_file, partition=None, retries=0, retry_delay=3, silent=False): + def submit(self, job_file, partition=None, retries=0, retry_delay=3, silent=False, + _processes=None): # default arguments if partition is None: partition = self.partition @@ -72,7 +73,8 @@ def submit(self, job_file, partition=None, retries=0, retry_delay=3, silent=Fals # run the command logger.debug("submit slurm job with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job_file_dir) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, + processes=_processes) # get the job id(s) if code == 0: @@ -104,7 +106,7 @@ def submit(self, job_file, partition=None, retries=0, retry_delay=3, silent=Fals raise Exception("submission of slurm job '{}' failed:\n{}".format( job_file, err)) - def cancel(self, job_id, partition=None, silent=False): + def cancel(self, job_id, partition=None, silent=False, _processes=None): # default arguments if partition is None: partition = self.partition @@ -122,7 +124,7 @@ def cancel(self, job_id, partition=None, silent=False): # run it logger.debug("cancel slurm job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # check success if code != 0 and not silent: @@ -131,7 +133,7 @@ def cancel(self, job_id, partition=None, silent=False): return {job_id: None for job_id in job_ids} if chunking else None - def query(self, job_id, partition=None, silent=False): + def query(self, job_id, partition=None, silent=False, _processes=None): # default arguments if partition is None: partition = self.partition @@ -148,7 +150,7 @@ def query(self, job_id, partition=None, silent=False): logger.debug("query slurm job(s) with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes) # special case: when the id of a single yet expired job is queried, squeue responds with an # error (exit code != 0), so as a workaround, consider these cases as an empty result @@ -180,7 +182,7 @@ def query(self, job_id, partition=None, silent=False): logger.debug("query slurm accounting history with command '{}'".format(cmd)) code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash", - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2) # handle errors if code != 0: diff --git a/law/job/base.py b/law/job/base.py index a55d2caa..913ac78a 100644 --- a/law/job/base.py +++ b/law/job/base.py @@ -28,7 +28,7 @@ from law.target.remote.base import RemoteTarget from law.util import ( colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, create_random_string, - increment_path, + increment_path, kill_process, ) from law.logger import get_logger @@ -314,12 +314,18 @@ def wrapper(data): # threaded processing pool = ThreadPool(threads) + kwargs["_processes"] = [] results = [ pool.apply_async(func, (arg,), kwargs, callback=cb_factory(i)) for i, arg in enumerate(job_objs) ] - pool.close() - pool.join() + try: + pool.close() + pool.join() + except KeyboardInterrupt: + for p in kwargs["_processes"]: + kill_process(p, kill_group=True, kill_timeout=2) + raise # store result data or an exception result_data = result_type() @@ -481,12 +487,18 @@ def wrapper(result_data): # threaded processing pool = ThreadPool(threads) + kwargs["_processes"] = [] results = [ pool.apply_async(func, make_tuple(arg), kwargs, callback=cb_factory(i)) for i, arg in enumerate(job_objs.items()) ] - pool.close() - pool.join() + try: + pool.close() + pool.join() + except KeyboardInterrupt: + for p in kwargs["_processes"]: + kill_process(p, kill_group=True, kill_timeout=2) + raise # store result data or an exception result_data = result_type() diff --git a/law/util.py b/law/util.py index 4aa80567..5217a3b2 100644 --- a/law/util.py +++ b/law/util.py @@ -12,12 +12,12 @@ "is_pattern", "brace_expand", "range_expand", "range_join", "multi_match", "is_iterable", "is_lazy_iterable", "make_list", "make_tuple", "make_set", "make_unique", "is_nested", "flatten", "merge_dicts", "unzip", "which", "map_verbose", "map_struct", "mask_struct", - "tmp_file", "perf_counter", "interruptable_popen", "readable_popen", "create_hash", - "create_random_string", "copy_no_perm", "makedirs", "user_owns_file", "increment_path", - "iter_chunks", "human_bytes", "parse_bytes", "human_duration", "parse_duration", - "is_file_exists_error", "send_mail", "DotDict", "ShorthandDict", "open_compat", "patch_object", - "join_generators", "quote_cmd", "escape_markdown", "classproperty", "BaseStream", "TeeStream", - "FilteredStream", + "tmp_file", "perf_counter", "interruptable_popen", "kill_process", "readable_popen", + "create_hash", "create_random_string", "copy_no_perm", "makedirs", "user_owns_file", + "increment_path", "iter_chunks", "human_bytes", "parse_bytes", "human_duration", + "parse_duration", "is_file_exists_error", "send_mail", "DotDict", "ShorthandDict", + "open_compat", "patch_object", "join_generators", "quote_cmd", "escape_markdown", + "classproperty", "BaseStream", "TeeStream", "FilteredStream", ] @@ -1327,7 +1327,7 @@ def perf_counter(): def interruptable_popen(*args, **kwargs): - """ interruptable_popen(*args, stdin_callback=None, stdin_delay=0, interrupt_callback=None, kill_timeout=None, **kwargs) # noqa + """ interruptable_popen(*args, stdin_callback=None, stdin_delay=0, interrupt_callback=None, kill_timeout=None, processes=None, **kwargs) # noqa Shorthand to :py:class:`Popen` followed by :py:meth:`Popen.communicate` which can be interrupted by *KeyboardInterrupt*. The return code, standard output and standard error are returned in a 3-tuple. @@ -1342,6 +1342,10 @@ def interruptable_popen(*args, **kwargs): When *kill_timeout* is set, and the process is still alive after that period (in seconds), a SIGKILL signal is sent to force the process termination. + In case a *processes* list is defined, the process is appended to it right after it was created. + This can be useful to keep track of multiple processes and sending signals to them from an + outer context. + All other *args* and *kwargs* are forwarded to the :py:class:`Popen` constructor. """ # get kwargs not being passed to Popen @@ -1349,11 +1353,16 @@ def interruptable_popen(*args, **kwargs): stdin_delay = kwargs.pop("stdin_delay", 0) interrupt_callback = kwargs.pop("interrupt_callback", None) kill_timeout = kwargs.pop("kill_timeout", None) + processes = kwargs.pop("processes", None) # start the subprocess in a new process group kwargs["preexec_fn"] = os.setsid p = subprocess.Popen(*args, **kwargs) + # add to processes list + if processes is not None: + processes.append(p) + # get stdin stdin_data = None if callable(stdin_callback): @@ -1371,24 +1380,8 @@ def interruptable_popen(*args, **kwargs): if callable(interrupt_callback): interrupt_callback(p) - # when the process is still alive, send SIGTERM to gracefully terminate it - pgid = os.getpgid(p.pid) - if p.poll() is None: - os.killpg(pgid, signal.SIGTERM) - - # when a kill_timeout is set, and the process is still running after that period, - # send SIGKILL to force its termination - if kill_timeout is not None: - target_time = perf_counter() + kill_timeout - while target_time > perf_counter(): - time.sleep(0.05) - if p.poll() is not None: - # the process terminated, exit the loop - break - else: - # check the status again to avoid race conditions - if p.poll() is None: - os.killpg(pgid, signal.SIGKILL) + # kill the process + kill_process(p, use_group=True, kill_timeout=kill_timeout) # transparently reraise raise @@ -1402,6 +1395,40 @@ def interruptable_popen(*args, **kwargs): return p.returncode, out, err +def kill_process(p, kill_group=False, kill_timeout=None): + """ + Kills a running process with SIGTERM. When *kill_group* is *True*, the process group is killed. + When *kill_timeout* is set, and the process is still running after that period (in seconds), a + SIGKILL signal is sent to force the process termination. + """ + # get the pid + pid = p.pid + if kill_group: + pid = os.getpgid(pid) + + # when the process is still alive, send SIGTERM to gracefully terminate it + if p.poll() is None: + os.killpg(pid, signal.SIGTERM) + + # when a kill_timeout is set, and the process is still running after that period, + # send SIGKILL to force its termination + if p.poll() is None and kill_timeout is not None: + print("kill timeout") + target_time = perf_counter() + kill_timeout + while target_time > perf_counter(): + time.sleep(0.05) + if p.poll() is not None: + # the process terminated, exit the loop + break + else: + print("here", 1) + # check the status again to avoid race conditions + if p.poll() is None: + print("here", 2) + os.killpg(pid, signal.SIGKILL) + print("here", 3) + + def readable_popen(*args, **kwargs): """ Creates a :py:class:`Popen` object and a generator function yielding the output line-by-line as