diff --git a/law/contrib/cms/job.py b/law/contrib/cms/job.py index b7b586cf..d907d9a9 100644 --- a/law/contrib/cms/job.py +++ b/law/contrib/cms/job.py @@ -44,6 +44,7 @@ class CrabJobManager(BaseJobManager): submission_task_name_cre = re.compile(r"^Task\s+name\s*\:\s+([^\s]+)\s*$") submission_log_file_cre = re.compile(r"^Log\s+file\s+is\s+([^\s]+\.log)\s*$") query_server_status_cre = re.compile(r"^Status\s+on\s+the\s+CRAB\s+server\s*\:\s+([^\s].*)$") + query_server_failure_cre = re.compile(r"^Failure\s+message\s+from\s+server\s*\:\s+([^\s].*)$") query_user_cre = re.compile(r"^Task\s+name\s*:\s+\d+_\d+\:([^_]+)_.+$") query_scheduler_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+([^\s]+).+$") query_scheduler_id_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+crab.*\@.+[^\d](\d+)\..+$") # noqa @@ -63,7 +64,8 @@ class CrabJobManager(BaseJobManager): JobId = namedtuple("JobId", ["crab_num", "task_name", "proj_dir"]) - def __init__(self, sandbox_name=None, proxy=None, instance=None, threads=1): + def __init__(self, sandbox_name=None, proxy_file=None, myproxy_username=None, instance=None, + threads=1): super(CrabJobManager, self).__init__() # default sandbox name @@ -79,7 +81,8 @@ def __init__(self, sandbox_name=None, proxy=None, instance=None, threads=1): ) # store attributes - self.proxy = proxy + self.proxy_file = proxy_file + self.myproxy_username = myproxy_username self.instance = instance self.threads = threads @@ -142,11 +145,13 @@ def _check_proj_dir(self, proj_dir): if not os.path.isdir(str(proj_dir)): raise Exception("project directory '{}' does not exist".format(proj_dir)) - def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_username=None, - retries=0, retry_delay=3, silent=False, _processes=None): + def submit(self, job_file, job_files=None, proxy_file=None, myproxy_username=None, + instance=None, retries=0, retry_delay=3, silent=False, _processes=None): # default arguments - if proxy is None: - proxy = self.proxy + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username if instance is None: instance = self.instance @@ -157,8 +162,8 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us while True: # build the command cmd = ["crab", "submit", "--config", job_file_name] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd = quote_cmd(cmd) @@ -230,17 +235,24 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us return job_ids - def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, + def cancel(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None, silent=False, _processes=None): self._check_proj_dir(proj_dir) + # default arguments if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username + if instance is None: + instance = self.instance # build the command cmd = ["crab", "kill", "--dir", str(proj_dir)] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd = quote_cmd(cmd) @@ -260,7 +272,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user return {job_id: None for job_id in job_ids} - def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, + def cleanup(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None, silent=False, _processes=None): if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) @@ -272,7 +284,7 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use return {job_id: None for job_id in job_ids} - def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None, + def query(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None, skip_transfers=None, silent=False, _processes=None): self._check_proj_dir(proj_dir) @@ -281,14 +293,22 @@ def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_usern if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data) + # default arguments + if proxy_file is None: + proxy_file = self.proxy_file + if myproxy_username is None: + myproxy_username = self.myproxy_username + if instance is None: + instance = self.instance + # when output collection is disabled, we can consider all "transferring" states as finished if skip_transfers is None: skip_transfers = str(log_data.get("disable_output_collection")).lower() == "true" # build the command cmd = ["crab", "status", "--dir", proj_dir, "--json"] - if proxy: - cmd += ["--proxy", proxy] + if proxy_file: + cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd = quote_cmd(cmd) @@ -333,6 +353,7 @@ def parse_query_output(cls, out, proj_dir, job_ids, skip_transfers=False): cls.query_scheduler_status_cre, cls.query_json_line_cre, cls.query_monitoring_url_cre, + cls.query_server_failure_cre, ] values = len(cres) * [None] for line in out.replace("\r", "").split("\n"): @@ -346,7 +367,15 @@ def parse_query_output(cls, out, proj_dir, job_ids, skip_transfers=False): break # unpack - username, server_status, scheduler_id, scheduler_status, json_line, monitoring_url = values + ( + username, + server_status, + scheduler_id, + scheduler_status, + json_line, + monitoring_url, + server_failure, + ) = values # helper to build extra info def extra(job_id, job_data=None): @@ -379,11 +408,11 @@ def extra(job_id, job_data=None): status = cls.PENDING elif server_status in failed_server_states: status = cls.FAILED - error = "submission failed" + error = server_failure or "submission failed" else: s = ",".join(map("'{}'".format, pending_server_states | failed_server_states)) raise Exception( - "no per-job information available (yet?), which is only accepted if the crab " + + "no per-job information available (yet?), which is only accepted if the crab " "server status is any of {}, but got '{}'".format(s, server_status), ) return { @@ -395,7 +424,7 @@ def extra(job_id, job_data=None): # parse json data if not json_line: raise Exception( - "no per-job information available in status response, crab server " + + "no per-job information available in status response, crab server " "status '{}', scheduler status '{}'".format(server_status, scheduler_status), ) diff --git a/law/contrib/cms/scripts/delegate_myproxy.py b/law/contrib/cms/scripts/delegate_myproxy.py index dfe3b623..25e4b55f 100755 --- a/law/contrib/cms/scripts/delegate_myproxy.py +++ b/law/contrib/cms/scripts/delegate_myproxy.py @@ -5,16 +5,27 @@ """ -def delegate(renew, endpoint, username, password_file, vo, voms_proxy): +def delegate(renew, endpoint, username, password_file, vo, voms_proxy, crab): import law law.contrib.load("cms", "wlcg") + # settings + encode_username = False + retrievers = None + + # crab mode + if crab: + encode_username = True + voms_proxy = True + retrievers = law.contrib.cms.util._default_crab_receivers + # when not renewing, check if a previous delegation exists if not renew: info = law.wlcg.get_myproxy_info( endpoint=endpoint, username=username, + encode_username=encode_username, silent=True, ) if info: @@ -30,8 +41,10 @@ def delegate(renew, endpoint, username, password_file, vo, voms_proxy): law.cms.delegate_myproxy( endpoint=endpoint, username=username, + encode_username=encode_username, password_file=password_file, vo=vo, + retrievers=retrievers, ) @@ -56,7 +69,7 @@ def main(): "--endpoint", "-e", default="myproxy.cern.ch", - help="the server endpoint; default: myproxy.cern.ch", + help="the server endpoint; default: %(default)s", ) parser.add_argument( "--password-file", @@ -74,8 +87,8 @@ def main(): parser.add_argument( "--vo", "-m", - default="cms", - help="virtual organization to use; default: cms", + default=law.contrib.cms.util._default_vo(), + help="virtual organization to use; default: %(default)s", ) parser.add_argument( "--voms-proxy", @@ -83,6 +96,11 @@ def main(): action="store_true", help="create a voms-proxy prior to the delegation", ) + parser.add_argument( + "--crab", + action="store_true", + help="adds crab-specific defaults", + ) args = parser.parse_args() delegate( @@ -92,6 +110,7 @@ def main(): args.password_file, args.vo, args.voms_proxy, + args.crab, ) diff --git a/law/contrib/cms/util.py b/law/contrib/cms/util.py index 3fb56b97..abd6a804 100644 --- a/law/contrib/cms/util.py +++ b/law/contrib/cms/util.py @@ -14,6 +14,12 @@ law.contrib.load("wlcg") +# obtained via _get_crab_receivers below +_default_crab_receivers = [ + "/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod)-tw(01|02).cern.ch|/DC=ch/DC=cern/OU=computers/CN=crab-dev-tw(01|02|03|04).cern.ch|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=cmscrab/CN=(817881|373708)/CN=Robot: cms crab|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=crabint1/CN=373708/CN=Robot: CMS CRAB Integration 1", # noqa +] + + class Site(object): """ Helper class that provides site-related data, mostly via simple properties. When *name* is @@ -118,24 +124,35 @@ def lfn_to_pfn(lfn, redirector="global"): return "root://{}/{}".format(Site.redirectors[redirector], lfn) +def _default_vo(): + return os.getenv("LAW_CMS_VO", "cms") + + def renew_vomsproxy(**kwargs): """ Renews a VOMS proxy in the exact same way that :py:func:`law.wlcg.renew_vomsproxy` does, but - with the *vo* attribute set to ``"cms"`` by default. + with the *vo* argument default to the environment variable LAW_CMS_VO or ``"cms"`` when empty. """ - kwargs.setdefault("vo", "cms") + if "vo" not in kwargs: + kwargs["vo"] = _default_vo() return law.wlcg.renew_vomsproxy(**kwargs) def delegate_myproxy(**kwargs): """ Delegates a X509 proxy to a myproxy server in the exact same way that - :py:func:`law.wlcg.delegate_myproxy` does, but with the *retrievers* argument set to a value - that is usually expected for crab submissions and the vo set to "cms". + :py:func:`law.wlcg.delegate_myproxy` does, but with the *vo* argument default to the environment + variable LAW_CMS_VO or ``"cms"`` when empty. """ - kwargs.setdefault( - "retrievers", - "/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod|dev)-tw(01|02|03).cern.ch|/DC=ch/DC=cern/OU=computers/CN=stefanov(m|m2).cern.ch|/DC=ch/DC=cern/OU=computers/CN=dciangot-tw.cern.ch", # noqa - ) - kwargs.setdefault("vo", "cms") + if "vo" not in kwargs: + kwargs["vo"] = _default_vo() return law.wlcg.delegate_myproxy(**kwargs) + + +def _get_crab_receivers(): + from CRABClient.ClientUtilities import initLoggers, server_info + from CRABClient.Commands.createmyproxy import createmyproxy + + cmd = createmyproxy(logger=initLoggers()[1]) + alldns = server_info(crabserver=cmd.crabserver, subresource="delegatedn") + print(alldns.get("services")) diff --git a/law/contrib/cms/workflow.py b/law/contrib/cms/workflow.py index c169424b..efcc2c59 100644 --- a/law/contrib/cms/workflow.py +++ b/law/contrib/cms/workflow.py @@ -56,7 +56,7 @@ def setup_job_manager(self): renew_vomsproxy(proxy_file=proxy_file, password_file=password_file) # ensure that it has been delegated to the myproxy server - info = law.wlcg.get_myproxy_info(proxy_file=proxy_file, silent=True) + info = law.wlcg.get_myproxy_info(proxy_file=proxy_file, encode_username=True, silent=True) delegate = False if not info: delegate = True @@ -66,19 +66,20 @@ def setup_job_manager(self): elif "timeleft" not in info: logger.warning("field 'timeleft' not in myproxy info") delegate = True - elif info["timeleft"] < 86400: - logger.warning("myproxy lifetime below 24h ({})".format( + elif info["timeleft"] < 5 * 86400: + logger.warning("myproxy lifetime below 5 days ({})".format( human_duration(seconds=info["timeleft"]), )) delegate = True # actual delegation if delegate: - myproxy_username = delegate_myproxy(proxy_file=proxy_file, password_file=password_file) + myproxy_username = delegate_myproxy(proxy_file=proxy_file, password_file=password_file, + encode_username=True) else: myproxy_username = info["username"] - return {"proxy": proxy_file, "myproxy_username": myproxy_username} + return {"proxy_file": proxy_file, "myproxy_username": myproxy_username} def create_job_file_factory(self, **kwargs): return self.task.crab_create_job_file_factory(**kwargs) diff --git a/law/contrib/wlcg/util.py b/law/contrib/wlcg/util.py index c65564f7..5a201b78 100644 --- a/law/contrib/wlcg/util.py +++ b/law/contrib/wlcg/util.py @@ -25,7 +25,7 @@ import six from law.util import ( - interruptable_popen, create_hash, human_duration, parse_duration, quote_cmd, + interruptable_popen, create_hash, human_duration, parse_duration, quote_cmd, make_list, ) from law.logger import get_logger @@ -314,7 +314,7 @@ def delegate_myproxy( usercert=None, username=None, proxy_file=None, - encode_username=True, + encode_username=False, cred_lifetime=720, proxy_lifetime=168, retrievers=None, @@ -334,7 +334,7 @@ def delegate_myproxy( *encode_username* is set, *username* is the sha1 encoded. The credential and proxy lifetimes can be defined in hours by *cred_lifetime* and - *proxy_lifetime*. When *retrievers* is given, it is passed as both ``--renewable_by`` and + *proxy_lifetime*. When *retrievers* is given, they are passed as both ``--renewable_by`` and ``--retrievable_by_cert`` to the underlying ``myproxy-init`` command. When *rfc* is *True*, the delegated proxy will be RFC compliant. To pass VOMS attributes to the @@ -362,7 +362,7 @@ def delegate_myproxy( if encode_username: username = hashlib.sha1(username.encode("utf-8")).hexdigest() - # build the command + # build the init command cmd = [ "myproxy-init", "-s", endpoint, @@ -373,10 +373,8 @@ def delegate_myproxy( "-c", str(cred_lifetime), ] if retrievers: - cmd.extend([ - "-x", "-R", retrievers, - "-x", "-Z", retrievers, - ]) + for r in make_list(retrievers): + cmd.extend(["-x", "-R", r, "-x", "-Z", r]) if vo: cmd.extend(["-m", vo]) if create_local: @@ -397,17 +395,49 @@ def delegate_myproxy( stderr=silent_pipe, stdin=subprocess.PIPE, stdin_callback=stdin_callback, stdin_delay=0.2)[0] - if code == 0: - return username - - if silent: - return None - - raise Exception("myproxy-init failed with code {}".format(code)) - - -def get_myproxy_info(endpoint="myproxy.cern.ch", username=None, encode_username=True, - proxy_file=None, silent=False): + # stop in case of an error + if code != 0: + if silent: + return None + raise Exception("myproxy-init failed with code {}".format(code)) + + # optional myproxy-get-delegation call, currently disabled + # # build the delegation command + # cmd = ["myproxy-get-delegation"] + # if vo: + # cmd.extend(["-m", vo]) + # if proxy_file: + # cmd.extend(["-o", proxy_file]) + + # # run it, depending on whether a password file is given + # silent_pipe = subprocess.PIPE if silent else None + # if password_file: + # cmd = "cat \"{}\" | {} -S".format(password_file, quote_cmd(cmd)) + # code = interruptable_popen(cmd, shell=True, executable="/bin/bash", stdout=silent_pipe, + # stderr=silent_pipe)[0] + # else: + # cmd = quote_cmd(cmd) + # stdin_callback = (lambda: password) if password else functools.partial(getpass.getpass, "") + # code = interruptable_popen(cmd, shell=True, executable="/bin/bash", stdout=silent_pipe, + # stderr=silent_pipe, stdin=subprocess.PIPE, stdin_callback=stdin_callback, + # stdin_delay=0.2)[0] + + # # stop in case of an error + # if code != 0: + # if silent: + # return None + # raise Exception("myproxy-get-delegation failed with code {}".format(code)) + + return username + + +def get_myproxy_info( + endpoint="myproxy.cern.ch", + username=None, + encode_username=False, + proxy_file=None, + silent=False, +): """ Returns information about a previous myproxy delegation to a server *endpoint*. When *username* is *None*, the subject string of the certificate is used instead, and sha1 encoded if