Skip to content

Commit

Permalink
[cms] Adapt crab worflow and proxy delegeation.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 13, 2025
1 parent e0655e9 commit 1d82bd4
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 56 deletions.
67 changes: 48 additions & 19 deletions law/contrib/cms/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CrabJobManager(BaseJobManager):
submission_task_name_cre = re.compile(r"^Task\s+name\s*\:\s+([^\s]+)\s*$")
submission_log_file_cre = re.compile(r"^Log\s+file\s+is\s+([^\s]+\.log)\s*$")
query_server_status_cre = re.compile(r"^Status\s+on\s+the\s+CRAB\s+server\s*\:\s+([^\s].*)$")
query_server_failure_cre = re.compile(r"^Failure\s+message\s+from\s+server\s*\:\s+([^\s].*)$")
query_user_cre = re.compile(r"^Task\s+name\s*:\s+\d+_\d+\:([^_]+)_.+$")
query_scheduler_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+([^\s]+).+$")
query_scheduler_id_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+crab.*\@.+[^\d](\d+)\..+$") # noqa
Expand All @@ -63,7 +64,8 @@ class CrabJobManager(BaseJobManager):

JobId = namedtuple("JobId", ["crab_num", "task_name", "proj_dir"])

def __init__(self, sandbox_name=None, proxy=None, instance=None, threads=1):
def __init__(self, sandbox_name=None, proxy_file=None, myproxy_username=None, instance=None,
threads=1):
super(CrabJobManager, self).__init__()

# default sandbox name
Expand All @@ -79,7 +81,8 @@ def __init__(self, sandbox_name=None, proxy=None, instance=None, threads=1):
)

# store attributes
self.proxy = proxy
self.proxy_file = proxy_file
self.myproxy_username = myproxy_username
self.instance = instance
self.threads = threads

Expand Down Expand Up @@ -142,11 +145,13 @@ def _check_proj_dir(self, proj_dir):
if not os.path.isdir(str(proj_dir)):
raise Exception("project directory '{}' does not exist".format(proj_dir))

def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_username=None,
retries=0, retry_delay=3, silent=False, _processes=None):
def submit(self, job_file, job_files=None, proxy_file=None, myproxy_username=None,
instance=None, retries=0, retry_delay=3, silent=False, _processes=None):
# default arguments
if proxy is None:
proxy = self.proxy
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

Expand All @@ -157,8 +162,8 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us
while True:
# build the command
cmd = ["crab", "submit", "--config", job_file_name]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd = quote_cmd(cmd)
Expand Down Expand Up @@ -230,17 +235,24 @@ def submit(self, job_file, job_files=None, proxy=None, instance=None, myproxy_us

return job_ids

def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
def cancel(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None,
silent=False, _processes=None):
self._check_proj_dir(proj_dir)

# default arguments
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir)
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

# build the command
cmd = ["crab", "kill", "--dir", str(proj_dir)]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd = quote_cmd(cmd)
Expand All @@ -260,7 +272,7 @@ def cancel(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_user

return {job_id: None for job_id in job_ids}

def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
def cleanup(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None,
silent=False, _processes=None):
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir)
Expand All @@ -272,7 +284,7 @@ def cleanup(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_use

return {job_id: None for job_id in job_ids}

def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_username=None,
def query(self, proj_dir, job_ids=None, proxy_file=None, myproxy_username=None, instance=None,
skip_transfers=None, silent=False, _processes=None):
self._check_proj_dir(proj_dir)

Expand All @@ -281,14 +293,22 @@ def query(self, proj_dir, job_ids=None, proxy=None, instance=None, myproxy_usern
if job_ids is None:
job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data)

# default arguments
if proxy_file is None:
proxy_file = self.proxy_file
if myproxy_username is None:
myproxy_username = self.myproxy_username
if instance is None:
instance = self.instance

# when output collection is disabled, we can consider all "transferring" states as finished
if skip_transfers is None:
skip_transfers = str(log_data.get("disable_output_collection")).lower() == "true"

# build the command
cmd = ["crab", "status", "--dir", proj_dir, "--json"]
if proxy:
cmd += ["--proxy", proxy]
if proxy_file:
cmd += ["--proxy", proxy_file]
if instance:
cmd += ["--instance", instance]
cmd = quote_cmd(cmd)
Expand Down Expand Up @@ -333,6 +353,7 @@ def parse_query_output(cls, out, proj_dir, job_ids, skip_transfers=False):
cls.query_scheduler_status_cre,
cls.query_json_line_cre,
cls.query_monitoring_url_cre,
cls.query_server_failure_cre,
]
values = len(cres) * [None]
for line in out.replace("\r", "").split("\n"):
Expand All @@ -346,7 +367,15 @@ def parse_query_output(cls, out, proj_dir, job_ids, skip_transfers=False):
break

# unpack
username, server_status, scheduler_id, scheduler_status, json_line, monitoring_url = values
(
username,
server_status,
scheduler_id,
scheduler_status,
json_line,
monitoring_url,
server_failure,
) = values

# helper to build extra info
def extra(job_id, job_data=None):
Expand Down Expand Up @@ -379,11 +408,11 @@ def extra(job_id, job_data=None):
status = cls.PENDING
elif server_status in failed_server_states:
status = cls.FAILED
error = "submission failed"
error = server_failure or "submission failed"
else:
s = ",".join(map("'{}'".format, pending_server_states | failed_server_states))
raise Exception(
"no per-job information available (yet?), which is only accepted if the crab " +
"no per-job information available (yet?), which is only accepted if the crab "
"server status is any of {}, but got '{}'".format(s, server_status),
)
return {
Expand All @@ -395,7 +424,7 @@ def extra(job_id, job_data=None):
# parse json data
if not json_line:
raise Exception(
"no per-job information available in status response, crab server " +
"no per-job information available in status response, crab server "
"status '{}', scheduler status '{}'".format(server_status, scheduler_status),
)

Expand Down
27 changes: 23 additions & 4 deletions law/contrib/cms/scripts/delegate_myproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@
"""


def delegate(renew, endpoint, username, password_file, vo, voms_proxy):
def delegate(renew, endpoint, username, password_file, vo, voms_proxy, crab):
import law

law.contrib.load("cms", "wlcg")

# settings
encode_username = False
retrievers = None

# crab mode
if crab:
encode_username = True
voms_proxy = True
retrievers = law.contrib.cms.util._default_crab_receivers

# when not renewing, check if a previous delegation exists
if not renew:
info = law.wlcg.get_myproxy_info(
endpoint=endpoint,
username=username,
encode_username=encode_username,
silent=True,
)
if info:
Expand All @@ -30,8 +41,10 @@ def delegate(renew, endpoint, username, password_file, vo, voms_proxy):
law.cms.delegate_myproxy(
endpoint=endpoint,
username=username,
encode_username=encode_username,
password_file=password_file,
vo=vo,
retrievers=retrievers,
)


Expand All @@ -56,7 +69,7 @@ def main():
"--endpoint",
"-e",
default="myproxy.cern.ch",
help="the server endpoint; default: myproxy.cern.ch",
help="the server endpoint; default: %(default)s",
)
parser.add_argument(
"--password-file",
Expand All @@ -74,15 +87,20 @@ def main():
parser.add_argument(
"--vo",
"-m",
default="cms",
help="virtual organization to use; default: cms",
default=law.contrib.cms.util._default_vo(),
help="virtual organization to use; default: %(default)s",
)
parser.add_argument(
"--voms-proxy",
"-v",
action="store_true",
help="create a voms-proxy prior to the delegation",
)
parser.add_argument(
"--crab",
action="store_true",
help="adds crab-specific defaults",
)
args = parser.parse_args()

delegate(
Expand All @@ -92,6 +110,7 @@ def main():
args.password_file,
args.vo,
args.voms_proxy,
args.crab,
)


Expand Down
35 changes: 26 additions & 9 deletions law/contrib/cms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
law.contrib.load("wlcg")


# obtained via _get_crab_receivers below
_default_crab_receivers = [
"/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod)-tw(01|02).cern.ch|/DC=ch/DC=cern/OU=computers/CN=crab-dev-tw(01|02|03|04).cern.ch|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=cmscrab/CN=(817881|373708)/CN=Robot: cms crab|/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=crabint1/CN=373708/CN=Robot: CMS CRAB Integration 1", # noqa
]


class Site(object):
"""
Helper class that provides site-related data, mostly via simple properties. When *name* is
Expand Down Expand Up @@ -118,24 +124,35 @@ def lfn_to_pfn(lfn, redirector="global"):
return "root://{}/{}".format(Site.redirectors[redirector], lfn)


def _default_vo():
return os.getenv("LAW_CMS_VO", "cms")


def renew_vomsproxy(**kwargs):
"""
Renews a VOMS proxy in the exact same way that :py:func:`law.wlcg.renew_vomsproxy` does, but
with the *vo* attribute set to ``"cms"`` by default.
with the *vo* argument default to the environment variable LAW_CMS_VO or ``"cms"`` when empty.
"""
kwargs.setdefault("vo", "cms")
if "vo" not in kwargs:
kwargs["vo"] = _default_vo()
return law.wlcg.renew_vomsproxy(**kwargs)


def delegate_myproxy(**kwargs):
"""
Delegates a X509 proxy to a myproxy server in the exact same way that
:py:func:`law.wlcg.delegate_myproxy` does, but with the *retrievers* argument set to a value
that is usually expected for crab submissions and the vo set to "cms".
:py:func:`law.wlcg.delegate_myproxy` does, but with the *vo* argument default to the environment
variable LAW_CMS_VO or ``"cms"`` when empty.
"""
kwargs.setdefault(
"retrievers",
"/DC=ch/DC=cern/OU=computers/CN=crab-(preprod|prod|dev)-tw(01|02|03).cern.ch|/DC=ch/DC=cern/OU=computers/CN=stefanov(m|m2).cern.ch|/DC=ch/DC=cern/OU=computers/CN=dciangot-tw.cern.ch", # noqa
)
kwargs.setdefault("vo", "cms")
if "vo" not in kwargs:
kwargs["vo"] = _default_vo()
return law.wlcg.delegate_myproxy(**kwargs)


def _get_crab_receivers():
from CRABClient.ClientUtilities import initLoggers, server_info
from CRABClient.Commands.createmyproxy import createmyproxy

cmd = createmyproxy(logger=initLoggers()[1])
alldns = server_info(crabserver=cmd.crabserver, subresource="delegatedn")
print(alldns.get("services"))
11 changes: 6 additions & 5 deletions law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def setup_job_manager(self):
renew_vomsproxy(proxy_file=proxy_file, password_file=password_file)

# ensure that it has been delegated to the myproxy server
info = law.wlcg.get_myproxy_info(proxy_file=proxy_file, silent=True)
info = law.wlcg.get_myproxy_info(proxy_file=proxy_file, encode_username=True, silent=True)
delegate = False
if not info:
delegate = True
Expand All @@ -66,19 +66,20 @@ def setup_job_manager(self):
elif "timeleft" not in info:
logger.warning("field 'timeleft' not in myproxy info")
delegate = True
elif info["timeleft"] < 86400:
logger.warning("myproxy lifetime below 24h ({})".format(
elif info["timeleft"] < 5 * 86400:
logger.warning("myproxy lifetime below 5 days ({})".format(
human_duration(seconds=info["timeleft"]),
))
delegate = True

# actual delegation
if delegate:
myproxy_username = delegate_myproxy(proxy_file=proxy_file, password_file=password_file)
myproxy_username = delegate_myproxy(proxy_file=proxy_file, password_file=password_file,
encode_username=True)
else:
myproxy_username = info["username"]

return {"proxy": proxy_file, "myproxy_username": myproxy_username}
return {"proxy_file": proxy_file, "myproxy_username": myproxy_username}

def create_job_file_factory(self, **kwargs):
return self.task.crab_create_job_file_factory(**kwargs)
Expand Down
Loading

0 comments on commit 1d82bd4

Please sign in to comment.