Skip to content

Commit

Permalink
Refactor threaded job interactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 11, 2025
1 parent 0a4901a commit 2ea2f00
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 84 deletions.
19 changes: 10 additions & 9 deletions law/contrib/arc/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd
from law.util import make_list, make_unique, quote_cmd, interruptable_popen
from law.logger import get_logger


Expand Down Expand Up @@ -48,7 +48,8 @@ def __init__(self, job_list=None, ce=None, threads=1):
self.ce = ce
self.threads = threads

def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, silent=False):
def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, silent=False,
_processes=None):
# default arguments
if job_list is None:
job_list = self.job_list
Expand Down Expand Up @@ -81,7 +82,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil
# run the command
logger.debug("submit arc job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, cwd=job_file_dir)
stdout=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes)

# in some cases, the return code is 0 but the ce did not respond valid job ids
job_ids = []
Expand Down Expand Up @@ -117,7 +118,7 @@ def submit(self, job_file, job_list=None, ce=None, retries=0, retry_delay=3, sil

raise Exception("submission of arc job(s) '{}' failed:\n{}".format(job_files, out))

def cancel(self, job_id, job_list=None, silent=False):
def cancel(self, job_id, job_list=None, silent=False, _processes=None):
# default arguments
if job_list is None:
job_list = self.job_list
Expand All @@ -135,7 +136,7 @@ def cancel(self, job_id, job_list=None, silent=False):
# run it
logger.debug("cancel arc job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE)
stdout=subprocess.PIPE, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -145,7 +146,7 @@ def cancel(self, job_id, job_list=None, silent=False):

return {job_id: None for job_id in job_ids} if chunking else None

def cleanup(self, job_id, job_list=None, silent=False):
def cleanup(self, job_id, job_list=None, silent=False, _processes=None):
# default arguments
if job_list is None:
job_list = self.job_list
Expand All @@ -163,7 +164,7 @@ def cleanup(self, job_id, job_list=None, silent=False):
# run it
logger.debug("cleanup arc job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE)
stdout=subprocess.PIPE, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -173,7 +174,7 @@ def cleanup(self, job_id, job_list=None, silent=False):

return {job_id: None for job_id in job_ids} if chunking else None

def query(self, job_id, job_list=None, silent=False):
def query(self, job_id, job_list=None, silent=False, _processes=None):
# default arguments
if job_list is None:
job_list = self.job_list
Expand All @@ -191,7 +192,7 @@ def query(self, job_id, job_list=None, silent=False):
# run it
logger.debug("query arc job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, kill_timeout=2, processes=_processes)

# handle errors
if code != 0:
Expand Down
31 changes: 21 additions & 10 deletions law/contrib/cms/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from law.job.dashboard import BaseJobDashboard
from law.target.file import get_path
from law.util import (
DotDict, interruptable_popen, make_list, make_unique, quote_cmd, no_value, rel_path,
DotDict, make_list, make_unique, quote_cmd, no_value, rel_path, interruptable_popen,
)
from law.logger import get_logger

Expand Down Expand Up @@ -138,8 +138,12 @@ def _apply_group(self, func, result_type, group_func, job_objs, *args, **kwargs)
**kwargs # noqa
)

def _check_proj_dir(self, proj_dir):
if not os.path.isdir(str(proj_dir)):
raise Exception("project directory '{}' does not exist".format(proj_dir))

def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_username=None,
retries=0, retry_delay=3, silent=False):
retries=0, retry_delay=3, silent=False, _processes=None):
# default arguments
if proxy is None:
proxy = self.proxy
Expand All @@ -163,15 +167,17 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us
# crab prints everything to stdout
logger.debug("submit crab jobs with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, cwd=job_file_dir, env=self.cmssw_env)
stdout=subprocess.PIPE, cwd=job_file_dir, env=self.cmssw_env, kill_timeout=2,
processes=_processes)

# handle errors
if code != 0:
logger.debug("submission of crab job '{}' failed with code {}:\n{}".format(
job_file, code, out))

# remove the project directory
proj_dir = self._proj_dir_from_job_file(job_file, self.cmssw_env)
proj_dir = self._proj_dir_from_job_file(job_file, self.cmssw_env,
_processes=_processes)
if proj_dir and os.path.isdir(proj_dir):
logger.debug("removing crab project '{}' from previous attempt".format(
proj_dir))
Expand Down Expand Up @@ -225,7 +231,9 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us
return job_ids

def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
silent=False):
silent=False, _processes=None):
self._check_proj_dir(proj_dir)

if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir)

Expand All @@ -240,7 +248,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user
# run it
logger.debug("cancel crab job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, env=self.cmssw_env)
stdout=subprocess.PIPE, env=self.cmssw_env, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -265,7 +273,9 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use
return {job_id: None for job_id in job_ids}

def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
skip_transfers=None, silent=False):
skip_transfers=None, silent=False, _processes=None):
self._check_proj_dir(proj_dir)

proj_dir = str(proj_dir)
log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log"))
if job_ids is None:
Expand All @@ -286,7 +296,8 @@ def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_usern
# run it
logger.debug("query crab job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.cmssw_env)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.cmssw_env, kill_timeout=2,
processes=_processes)

# handle errors
if code != 0:
Expand Down Expand Up @@ -414,7 +425,7 @@ def extra(job_id, job_data=None):
return query_data

@classmethod
def _proj_dir_from_job_file(cls, job_file, cmssw_env):
def _proj_dir_from_job_file(cls, job_file, cmssw_env, _processes=None):
work_area = None
request_name = None

Expand Down Expand Up @@ -454,7 +465,7 @@ def _proj_dir_from_job_file(cls, job_file, cmssw_env):
cfg = mod["cfg"]
print(join(cfg.General.workArea, "crab_" + cfg.General.requestName))'""".format(py_exec, job_file)
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, env=cmssw_env)
stdout=subprocess.PIPE, env=cmssw_env, kill_timeout=2, processes=_processes)
if code == 0:
path = out.strip().replace("\r\n", "\n").split("\n")[-1]
path = os.path.expandvars(os.path.expanduser(path))
Expand Down
19 changes: 10 additions & 9 deletions law/contrib/glite/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd
from law.util import make_list, make_unique, quote_cmd, interruptable_popen
from law.logger import get_logger


Expand All @@ -44,7 +44,8 @@ def __init__(self, ce=None, delegation_id=None, threads=1):
self.delegation_id = delegation_id
self.threads = threads

def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3, silent=False):
def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3, silent=False,
_processes=None):
# default arguments
if ce is None:
ce = self.ce
Expand Down Expand Up @@ -80,7 +81,7 @@ def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3
# glite prints everything to stdout
logger.debug("submit glite job with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, cwd=job_file_dir)
stdout=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes)

# in some cases, the return code is 0 but the ce did not respond with a valid id
if code == 0:
Expand All @@ -106,7 +107,7 @@ def submit(self, job_file, ce=None, delegation_id=None, retries=0, retry_delay=3

raise Exception("submission of glite job '{}' failed:\n{}".format(job_file, out))

def cancel(self, job_id, silent=False):
def cancel(self, job_id, silent=False, _processes=None):
chunking = isinstance(job_id, (list, tuple))
job_ids = make_list(job_id)

Expand All @@ -117,7 +118,7 @@ def cancel(self, job_id, silent=False):
# run it
logger.debug("cancel glite job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE)
stdout=subprocess.PIPE, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -127,7 +128,7 @@ def cancel(self, job_id, silent=False):

return {job_id: None for job_id in job_ids} if chunking else None

def cleanup(self, job_id, silent=False):
def cleanup(self, job_id, silent=False, _processes=None):
chunking = isinstance(job_id, (list, tuple))
job_ids = make_list(job_id)

Expand All @@ -138,7 +139,7 @@ def cleanup(self, job_id, silent=False):
# run it
logger.debug("cleanup glite job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE)
stdout=subprocess.PIPE, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -148,7 +149,7 @@ def cleanup(self, job_id, silent=False):

return {job_id: None for job_id in job_ids} if chunking else None

def query(self, job_id, silent=False):
def query(self, job_id, silent=False, _processes=None):
chunking = isinstance(job_id, (list, tuple))
job_ids = make_list(job_id)

Expand All @@ -159,7 +160,7 @@ def query(self, job_id, silent=False):
# run it
logger.debug("query glite job(s) with command '{}'".format(cmd))
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE)
stdout=subprocess.PIPE, kill_timeout=2, processes=_processes)

# handle errors
if code != 0:
Expand Down
26 changes: 15 additions & 11 deletions law/contrib/htcondor/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile, DeprecatedInputFiles
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd
from law.util import make_list, make_unique, quote_cmd, interruptable_popen
from law.logger import get_logger

from law.contrib.htcondor.util import get_htcondor_version
Expand Down Expand Up @@ -76,7 +76,7 @@ def cleanup_batch(self, *args, **kwargs):
raise NotImplementedError("HTCondorJobManager.cleanup_batch is not implemented")

def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=False, retries=0,
retry_delay=3, silent=False):
retry_delay=3, silent=False, _processes=None):
# signature is the superset for both grouped and batched submission, and the dispatching to
# the actual submission implementation is based on the presence of job_files
kwargs = {
Expand All @@ -86,6 +86,7 @@ def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=Fals
"retries": retries,
"retry_delay": retry_delay,
"silent": silent,
"_processes": _processes,
}

if job_files is None:
Expand All @@ -97,7 +98,7 @@ def submit(self, job_file, job_files=None, pool=None, scheduler=None, spool=Fals
return func(job_file, **kwargs)

def _submit_impl_batched(self, job_file, pool=None, scheduler=None, spool=False, retries=0,
retry_delay=3, silent=False):
retry_delay=3, silent=False, _processes=None):
# default arguments
if pool is None:
pool = self.pool
Expand Down Expand Up @@ -156,7 +157,8 @@ def has_initialdir(job_file):
# run the command
logger.debug("submit htcondor job with command '{}'".format(cmd))
code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_files[0]))
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_files[0]),
kill_timeout=2, processes=_processes)

# get the job id(s)
if code == 0:
Expand Down Expand Up @@ -193,7 +195,7 @@ def has_initialdir(job_file):
job_files_repr, err))

def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=None, spool=False,
retries=0, retry_delay=3, silent=False):
retries=0, retry_delay=3, silent=False, _processes=None):
# default arguments
if pool is None:
pool = self.pool
Expand All @@ -216,7 +218,8 @@ def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=No
# run the command
logger.debug("submit htcondor job with command '{}'".format(cmd))
code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file))
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(job_file),
kill_timeout=2, processes=_processes)

# get the job id(s)
if code == 0:
Expand Down Expand Up @@ -250,7 +253,7 @@ def _submit_impl_grouped(self, job_file, job_files=None, pool=None, scheduler=No

raise Exception("submission of htcondor job(s) '{}' failed:\n{}".format(job_file, err))

def cancel(self, job_id, pool=None, scheduler=None, silent=False):
def cancel(self, job_id, pool=None, scheduler=None, silent=False, _processes=None):
# default arguments
if pool is None:
pool = self.pool
Expand All @@ -272,7 +275,7 @@ def cancel(self, job_id, pool=None, scheduler=None, silent=False):
# run it
logger.debug("cancel htcondor job(s) with command '{}'".format(cmd))
code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes)

# check success
if code != 0 and not silent:
Expand All @@ -281,7 +284,7 @@ def cancel(self, job_id, pool=None, scheduler=None, silent=False):

return {job_id: None for job_id in job_ids} if chunking else None

def query(self, job_id, pool=None, scheduler=None, user=None, silent=False):
def query(self, job_id, pool=None, scheduler=None, user=None, silent=False, _processes=None):
# default arguments
if pool is None:
pool = self.pool
Expand Down Expand Up @@ -313,7 +316,7 @@ def query(self, job_id, pool=None, scheduler=None, user=None, silent=False):

logger.debug("query htcondor job(s) with command '{}'".format(cmd))
code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes)

# handle errors
if code != 0:
Expand Down Expand Up @@ -345,7 +348,8 @@ def query(self, job_id, pool=None, scheduler=None, user=None, silent=False):

logger.debug("query htcondor job history with command '{}'".format(cmd))
code, out, err = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2,
processes=_processes)

# handle errors
if code != 0:
Expand Down
Loading

0 comments on commit 2ea2f00

Please sign in to comment.