From 34822614d3319d42962ebbffd4d042c96865c0be Mon Sep 17 00:00:00 2001 From: aldbr Date: Thu, 21 Dec 2023 17:31:21 +0100 Subject: [PATCH] feat: PushJobAgent can handle special JobWrappers --- .../Agent/PushJobAgent.py | 268 ++++++++++++++++-- .../Agent/test/Test_Agent_PushJobAgent.py | 3 + .../JobWrapper/JobWrapper.py | 49 +--- .../JobWrapper/JobWrapperLighTemplate.py | 94 ++++++ .../JobWrapper/JobWrapperTemplate.py | 213 +++----------- .../JobWrapper/JobWrapperUtilities.py | 237 ++++++++++++++++ .../Utilities/Utils.py | 71 ++++- 7 files changed, 689 insertions(+), 246 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py create mode 100644 src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 89ce0a1dc9d..a026a62654e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -9,21 +9,35 @@ """ +import hashlib +import json +import os import random import sys from collections import defaultdict +import time from DIRAC import S_OK, gConfig +from DIRAC.Core.Security import X509Chain from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities import DErrno from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername +from DIRAC.Core.Utilities.ProcessPool import S_ERROR from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.private.standardLogging.LogLevels import LogLevel from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.Resources.Computing import ComputingElement +from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + getJobWrapper, + resolveInputData, + transferInputSandbox, +) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createLightJobWrapper from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials MAX_JOBS_MANAGED = 100 @@ -50,10 +64,24 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.failedQueueCycleFactor = 10 self.failedQueues = defaultdict(int) + # Choose the submission policy + # - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site + # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution + self.submissionPolicy = "Workflow" + + # cleanTask is used to clean the task in the remote site + self.cleanTask = True + def initialize(self): """Sets default parameters and creates CE instance""" super().initialize() + # Get the submission policy + # Initialized here because it cannot be dynamically modified during the execution + self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) + if self.submissionPolicy not in ["Workflow", "JobWrapper"]: + return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") + result = self._initializeComputingElement("Pool") if not result["OK"]: return result @@ -87,6 +115,7 @@ def beginExecution(self): self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) + self.cleanTask = self.am_getOption("CleanTask", self.cleanTask) # Get target queues from the configuration siteNames = None @@ -137,11 +166,12 @@ def execute(self): if not result["OK"] or result["Value"]: return result - # Check errors that could have occurred during job submission and/or execution - # Status are handled internally, and therefore, not checked outside of the method - result = self._checkSubmittedJobs() - if not result["OK"]: - return result + if self.submissionPolicy == "Workflow": + # Check errors that could have occurred during job submission and/or execution + # Status are handled internally, and therefore, not checked outside of the method + result = self._checkSubmittedJobs() + if not result["OK"]: + return result for queueName, queueDictionary in queueDictItems: # Make sure there is no problem with the queue before trying to submit @@ -251,21 +281,41 @@ def execute(self): # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - resultSubmission = self._submitJob( - jobID=jobID, - jobParams=params, - resourceParams=ceDict, - optimizerParams=optimizerParams, - proxyChain=proxyChain, - processors=submissionParams["processors"], - wholeNode=submissionParams["wholeNode"], - maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], - mpTag=submissionParams["mpTag"], - ) - if not result["OK"]: - result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) - self.failedQueues[queueName] += 1 - break + if self.submissionPolicy == "Workflow": + resultSubmission = self._submitJob( + jobID=jobID, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + wholeNode=submissionParams["wholeNode"], + maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], + mpTag=submissionParams["mpTag"], + ) + if not result["OK"]: + result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) + self.failedQueues[queueName] += 1 + break + else: + resultSubmission = self._submitJobWrapper( + jobID=jobID, + ce=ce, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + ) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break + + # Check status of the submitted jobs + result = self._checkSubmittedJobWrapper(ce) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break self.log.debug(f"After {self.ceName}CE submitJob()") # Committing the JobReport before evaluating the result of job submission @@ -368,7 +418,8 @@ def _setCEDict(self, ceDict): ceDict["ReleaseProject"] = project # Add a RemoteExecution entry, which can be used in the next stages - ceDict["RemoteExecution"] = True + if self.submissionPolicy == "Workflow": + ceDict["RemoteExecution"] = True def _checkMatchingIssues(self, jobRequest): """Check the source of the matching issue @@ -384,3 +435,176 @@ def _checkMatchingIssues(self, jobRequest): self.log.notice("Failed to get jobs", jobRequest["Message"]) return S_OK() + + def _submitJobWrapper( + self, + jobID: str, + ce: ComputingElement, + jobParams: dict, + resourceParams: dict, + optimizerParams: dict, + proxyChain: X509Chain, + processors: int, + ): + """Submit a JobWrapper to the remote site + + :param jobID: job ID + :param ce: ComputingElement instance + :param jobParams: job parameters + :param resourceParams: resource parameters + :param optimizerParams: optimizer parameters + :param proxyChain: proxy chain + :param processors: number of processors + + :return: S_OK + """ + # Add the number of requested processors to the job environment + if "ExecutionEnvironment" in jobParams: + if isinstance(jobParams["ExecutionEnvironment"], str): + jobParams["ExecutionEnvironment"] = jobParams["ExecutionEnvironment"].split(";") + jobParams.setdefault("ExecutionEnvironment", []).append("DIRAC_JOB_PROCESSORS=%d" % processors) + + # Prepare the job for submission + self.verbose("Getting a JobWrapper") + arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams} + job = getJobWrapper(jobID, arguments, self.jobReport) + if not job: + return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}") + + if "InputSandbox" in jobParams: + self.log.verbose("Getting the inputSandbox of the job") + if not transferInputSandbox(job, jobParams["InputSandbox"], self.jobReport): + return S_ERROR(f"Cannot get input sandbox of job {jobID}") + self.jobReport.commit() + + if "InputData" in jobParams and jobParams["InputData"]: + self.log.verbose("Getting the inputData of the job") + if not resolveInputData(job, self.jobReport): + return S_ERROR(f"Cannot get input data of job {jobID}") + self.jobReport.commit() + + # Preprocess the payload + payloadParams = job.preProcess() + self.jobReport.commit() + + # Generate a light JobWrapper executor script + jobDesc = { + "jobID": jobID, + "jobParams": jobParams, + "resourceParams": resourceParams, + "optimizerParams": optimizerParams, + "payloadParams": payloadParams, + "extraOptions": self.extraOptions, + } + result = createLightJobWrapper(log=self.log, logLevel=self.logLevel, **jobDesc) + if not result["OK"]: + return result + wrapperFile = result["Value"][0] + + # Get inputs from the current working directory + inputs = os.listdir(".") + inputs.remove(os.path.basename(wrapperFile)) + self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs)) + + # Request the whole directory as output + outputs = ["/"] + + self.jobReport.setJobStatus(minorStatus="Submitting To CE") + self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE") + + # Pass proxy to the CE + proxy = proxyChain.dumpAllToString() + if not proxy["OK"]: + self.log.error("Invalid proxy", proxy) + return S_ERROR("Payload Proxy Not Found") + ce.setProxy(proxy["Value"]) + + result = ce.submitJob( + executableFile=wrapperFile, + proxy=None, + inputs=inputs, + outputs=outputs, + ) + if not result["OK"]: + self._rescheduleFailedJob(jobID, result["Message"]) + return result + + taskID = result["Value"][0] + stamp = result["PilotStampDict"][taskID] + self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})") + + self.submissionDict[f"{taskID}:::{stamp}"] = job + time.sleep(self.jobSubmissionDelay) + return S_OK() + + def _checkOutputIntegrity(self, workingDirectory: str): + """Make sure that output files are not corrupted. + + :param workingDirectory: path of the outputs + """ + checkSumOutput = os.path.join(workingDirectory, "checksums.json") + if not os.path.exists(checkSumOutput): + return S_ERROR(f"Cannot guarantee the integrity of the outputs: {checkSumOutput} unavailable") + + with open(checkSumOutput) as f: + checksums = json.load(f) + + # for each output file, compute the md5 checksum + for output, checksum in checksums.items(): + hash = hashlib.md5() + localOutput = os.path.join(workingDirectory, output) + if not os.path.exists(localOutput): + return S_ERROR(f"{localOutput} was expected but not found") + + with open(localOutput, "rb") as f: + while chunk := f.read(128 * hash.block_size): + hash.update(chunk) + if checksum != hash.hexdigest(): + return S_ERROR(f"{localOutput} is corrupted") + + return S_OK() + + def _checkSubmittedJobWrapper(self, ce: ComputingElement): + """Check the status of the submitted tasks. + If the task is finished, get the output and post process the job. + Finally, remove from the submission dictionary. + + :return: S_OK/S_ERROR + """ + if not (result := ce.getJobStatus(self.submissionDict.keys()))["OK"]: + self.log.error("Failed to get job status", result["Message"]) + return result + + for taskID, status in result["Value"]: + if status not in PilotStatus.PILOT_FINAL_STATES: + continue + + self.log.info("Job execution finished", f"(DIRAC taskID: {taskID}; Status: {status})") + + # Get the output of the job + self.log.info(f"Getting the outputs of taskID {taskID}") + if not (result := ce.getJobOutput(taskID, os.path.abspath(".")))["OK"]: + self.log.error("Failed to get the output of taskID", f"{taskID}: {result['Message']}") + return result + + # Make sure the output is correct + self.log.info(f"Checking the integrity of the outputs of {taskID}") + if not (result := self._checkOutputIntegrity("."))["OK"]: + return result + self.log.info("The output has been retrieved and declared complete") + + job = self.submissionDict[taskID] + + with open("payloadResults.json") as f: + payloadResults = json.load(f) + job.postProcess(**payloadResults) + + # Clean job in the remote resource + if self.cleanTask: + if not (result := ce.cleanJob(taskID))["OK"]: + self.log.warn("Failed to clean the output remotely", result["Message"]) + self.log.info(f"TaskID {taskID} has been remotely removed") + + # Remove the job from the submission dictionary + del self.submissionDict[taskID] + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py index e1de80a21cd..990c7f9a8eb 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py @@ -7,6 +7,9 @@ # DIRAC Components from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.Core.Utilities.ReturnValues import S_OK +from DIRAC.Resources.Computing import ComputingElement +from DIRAC.WorkloadManagementSystem import JobWrapper from DIRAC.WorkloadManagementSystem.Agent.PushJobAgent import PushJobAgent from DIRAC import gLogger, S_ERROR diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index c554778a8fd..98f75cfec58 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -47,7 +47,6 @@ from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.Resources.Catalog.PoolXMLFile import getGUID from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus -from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient @@ -322,6 +321,7 @@ def __prepareCommand(self): def __prepareEnvironment(self): """Prepare the environment to be used by the payload.""" + os.environ["JOBID"] = str(self.jobID) os.environ["DIRACJOBID"] = str(self.jobID) diracSite = DIRAC.siteName() @@ -379,7 +379,7 @@ def preProcess(self): ) ############################################################################# - def process(self, command, output, error, env): + def process(self, command: str, output: str, error: str, env: dict[str, str]): """This method calls the payload.""" self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}") @@ -496,7 +496,13 @@ def process(self, command, output, error, env): ############################################################################# def postProcess( - self, payloadStatus, payloadOutput, payloadExecutorError, cpuTimeConsumed, watchdogError, watchdogStats + self, + payloadStatus: int | None, + payloadOutput: str, + payloadExecutorError: str, + cpuTimeConsumed: list[int], + watchdogError: str | None, + watchdogStats: dict, ): """This method is called after the payload has finished running.""" self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}") @@ -1596,40 +1602,3 @@ def getOutput(self, lines=0): self.outputLines = self.outputLines[cut:] return S_OK(self.outputLines) return S_ERROR("No Job output found") - - -def rescheduleFailedJob(jobID, minorStatus, jobReport=None): - """Function for rescheduling a jobID, setting a minorStatus""" - - rescheduleResult = JobStatus.RESCHEDULED - - try: - gLogger.warn("Failure during", minorStatus) - - # Setting a job parameter does not help since the job will be rescheduled, - # instead set the status with the cause and then another status showing the - # reschedule operation. - - if not jobReport: - gLogger.info("Creating a new JobReport Object") - jobReport = JobReport(int(jobID), "JobWrapper") - - jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) - jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) - - # We must send Job States and Parameters before it gets reschedule - jobReport.sendStoredStatusInfo() - jobReport.sendStoredJobParameters() - - gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") - - result = JobManagerClient().rescheduleJob(int(jobID)) - if not result["OK"]: - gLogger.warn(result["Message"]) - if "Maximum number of reschedulings is reached" in result["Message"]: - rescheduleResult = JobStatus.FAILED - - return rescheduleResult - except Exception: - gLogger.exception("JobWrapperTemplate failed to reschedule Job") - return JobStatus.FAILED diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py new file mode 100644 index 00000000000..5493ec87b0b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +""" This template will become the job wrapper that's actually executed. + + The JobWrapperLighTemplate is completed and invoked by the PushJobAgent and uses functionalities from JobWrapper module. + It is executed in environment where external connections are not allowed. + + The JobWrapperTemplate will reschedule the job according to certain criteria: + - the working directory could not be created + - the jobWrapper initialization phase failed + - the inputSandbox download failed + - the resolution of the inpt data failed + - the JobWrapper ended with the status DErrno.EWMSRESC +""" +import hashlib +import sys +import json +import ast +import os +from typing import Dict + +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import getJobWrapper + +sitePython = "@SITEPYTHON@" +if sitePython: + sys.path.insert(0, "@SITEPYTHON@") + +from DIRAC.Core.Base.Script import Script + +Script.parseCommandLine() + +from DIRAC import gLogger +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport + + +os.umask(0o22) + + +def execute(jobID: str, arguments: dict, jobReport: JobReport): + """The only real function executed here""" + job = getJobWrapper(jobID, arguments, jobReport) + payloadResult = job.process(arguments["payloadParams"]) + if not payloadResult["OK"]: + return 1 + + # Store the payload result + with open("payloadResult.json", "w") as f: + json.dump(payloadResult, f) + + # Generate the checksum of the files present in the current directory + checksums = {} + for file in os.listdir("."): + if os.path.isfile(file): + hash_md5 = hashlib.md5() + with open(file, "rb") as f: + while chunk := f.read(128 * hash.block_size): + hash_md5.update(chunk) + checksums[file] = hash_md5.hexdigest() + + with open("checksums.json", "w") as f: + json.dump(checksums, f) + + return 0 + + +########################################################## + + +ret = -3 +try: + jsonFileName = os.path.realpath(__file__) + ".json" + with open(jsonFileName) as f: + jobArgsFromJSON = json.loads(f.readlines()[0]) + jobArgs = ast.literal_eval(jobArgsFromJSON) + if not isinstance(jobArgs, dict): + raise TypeError(f"jobArgs is of type {type(jobArgs)}") + if "Job" not in jobArgs: + raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() +except Exception as exc: # pylint: disable=broad-except + gLogger.exception("JobWrapperTemplate exception") + try: + jobReport.commit() + ret = -1 + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Could not commit the job report") + ret = -2 + +sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 5e893ac77e5..18e141352ab 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -15,9 +15,17 @@ import json import ast import os -import errno -import time -import signal +from typing import Dict + +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + createAndEnterWorkingDirectory, + executePayload, + finalize, + getJobWrapper, + processJobOutputs, + resolveInputData, + transferInputSandbox, +) sitePython = "@SITEPYTHON@" if sitePython: @@ -28,140 +36,35 @@ Script.parseCommandLine() from DIRAC import gLogger -from DIRAC.Core.Utilities import DErrno - -from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus - - -gJobReport = None os.umask(0o22) -class JobWrapperError(Exception): - """Custom exception for handling JobWrapper "genuine" errors""" - - def __init__(self, value): - self.value = value - super().__init__() - - def __str__(self): - return str(self.value) - - -def killJobWrapper(job): - """Function that stops and ultimately kills the JobWrapper""" - # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process - time.sleep(60) - os.kill(job.currentPID, signal.SIGTERM) - # wait for half a minute and if worker is still alive use REAL silencer - time.sleep(30) - # now you're dead - os.kill(job.currentPID, signal.SIGKILL) - return 1 - - -def sendJobAccounting(job, status, minorStatus): - """safe sending job accounting (always catching exceptions)""" - try: - job.sendJobAccounting(status, minorStatus) - except Exception as exc: # pylint: disable=broad-except - gLogger.exception( - f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", - lException=exc, - ) - - -def execute(arguments): +def execute(jobID: str, arguments: dict, jobReport: JobReport): """The only real function executed here""" - global gJobReport - - jobID = arguments["Job"].get("JobID", 0) - os.environ["JOBID"] = str(jobID) - jobID = int(jobID) - if "WorkingDirectory" in arguments: - wdir = os.path.expandvars(arguments["WorkingDirectory"]) - if os.path.isdir(wdir): - os.chdir(wdir) - else: - try: - os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) - if os.path.isdir(wdir): - os.chdir(wdir) - except OSError as osError: - if osError.errno == errno.EEXIST and os.path.isdir(wdir): - gLogger.exception("JobWrapperTemplate found that the working directory already exists") - rescheduleResult = rescheduleFailedJob(jobID, "Working Directory already exists") - else: - gLogger.exception("JobWrapperTemplate could not create working directory") - rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory") - return 1 - - gJobReport = JobReport(jobID, "JobWrapper") + if not createAndEnterWorkingDirectory(jobID, arguments["WorkingDirectory"], jobReport): + return 1 - try: - job = JobWrapper(jobID, gJobReport) - job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper failed the initialization phase", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + job = getJobWrapper(jobID, arguments, jobReport) + if not job: return 1 if "InputSandbox" in arguments["Job"]: - gJobReport.commit() - try: - result = job.transferInputSandbox(arguments["Job"]["InputSandbox"]) - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to download input sandbox") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + jobReport.commit() + if not transferInputSandbox(job, arguments["Job"]["InputSandbox"], jobReport): return 1 else: gLogger.verbose("Job has no InputSandbox requirement") - gJobReport.commit() + jobReport.commit() if "InputData" in arguments["Job"]: if arguments["Job"]["InputData"]: - try: - result = job.resolveInputData() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to resolve input data") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + if not resolveInputData(job, jobReport): return 1 else: gLogger.verbose("Job has a null InputData requirement:") @@ -169,75 +72,18 @@ def execute(arguments): else: gLogger.verbose("Job has no InputData requirement") - gJobReport.commit() + jobReport.commit() - try: - result = job.execute() - if not result["OK"]: - gLogger.error("Failed to execute job", result["Message"]) - raise JobWrapperError((result["Message"], result["Errno"])) - except JobWrapperError as exc: - if exc.value[1] == 0 or str(exc.value[0]) == "0": - gLogger.verbose("JobWrapper exited with status=0 after execution") - if exc.value[1] == DErrno.EWMSRESC: - gLogger.warn("Asked to reschedule job") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return 1 - gLogger.exception("Job failed in execution phase") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("Job raised exception during execution phase", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + if not executePayload(job, jobReport): return 1 if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: - try: - result = job.processJobOutputs() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError as exc: - gLogger.exception("JobWrapper failed to process output files") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - - return 2 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while processing output files", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + if not processJobOutputs(job, jobReport): return 2 else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") - try: - # Failed jobs will return !=0 / successful jobs will return 0 - return job.finalize() - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception during the finalization phase", lException=exc) - return 2 + return finalize(job) ########################################################## @@ -253,12 +99,17 @@ def execute(arguments): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - ret = execute(jobArgs) - gJobReport.commit() + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") try: - gJobReport.commit() + jobReport.commit() ret = -1 except Exception as exc: # pylint: disable=broad-except gLogger.exception("Could not commit the job report") diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py new file mode 100644 index 00000000000..84cc4d13eb3 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -0,0 +1,237 @@ +"""JobWrapperUtilities + +This module contains the functions that are used by the JobWrapperTemplate to execute the JobWrapper. +""" +import errno +import os +import time +import signal +from typing import Dict +from DIRAC import gLogger +from DIRAC.Core.Utilities import DErrno +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper + + +class JobWrapperError(Exception): + """Custom exception for handling JobWrapper "genuine" errors""" + + def __init__(self, value): + self.value = value + super().__init__() + + def __str__(self): + return str(self.value) + + +def killJobWrapper(job: JobWrapper) -> int: + """Function that stops and ultimately kills the JobWrapper""" + # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process + time.sleep(60) + os.kill(job.currentPID, signal.SIGTERM) + # wait for half a minute and if worker is still alive use REAL silencer + time.sleep(30) + # now you're dead + os.kill(job.currentPID, signal.SIGKILL) + return 1 + + +def rescheduleFailedJob(jobID, minorStatus, jobReport: JobReport): + """Function for rescheduling a jobID, setting a minorStatus""" + + rescheduleResult = JobStatus.RESCHEDULED + + try: + gLogger.warn("Failure during", minorStatus) + + # Setting a job parameter does not help since the job will be rescheduled, + # instead set the status with the cause and then another status showing the + # reschedule operation. + + jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) + jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) + + # We must send Job States and Parameters before it gets reschedule + jobReport.sendStoredStatusInfo() + jobReport.sendStoredJobParameters() + + gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") + + result = JobManagerClient().rescheduleJob(int(jobID)) + if not result["OK"]: + gLogger.warn(result["Message"]) + if "Maximum number of reschedulings is reached" in result["Message"]: + rescheduleResult = JobStatus.FAILED + + return rescheduleResult + except Exception: + gLogger.exception("JobWrapperTemplate failed to reschedule Job") + return JobStatus.FAILED + + +def sendJobAccounting(job: JobWrapper, status: str, minorStatus: str): + """safe sending job accounting (always catching exceptions)""" + try: + job.sendJobAccounting(status, minorStatus) + except Exception: # pylint: disable=broad-except + gLogger.exception( + f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", + ) + + +def createAndEnterWorkingDirectory(jobID: str, workingDirectory: str, jobReport: JobReport) -> bool: + """Create the working directory and change to it""" + wdir = os.path.expandvars(workingDirectory) + if os.path.isdir(wdir): + os.chdir(wdir) + return True + + try: + os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) + if os.path.isdir(wdir): + os.chdir(wdir) + except OSError as osError: + if osError.errno == errno.EEXIST and os.path.isdir(wdir): + gLogger.exception("JobWrapperTemplate found that the working directory already exists") + rescheduleFailedJob(jobID, "Working Directory already exists", jobReport) + else: + gLogger.exception("JobWrapperTemplate could not create working directory") + rescheduleFailedJob(jobID, "Could Not Create Working Directory", jobReport) + return False + return True + + +def getJobWrapper(jobID: str, arguments: dict, jobReport: JobReport) -> JobWrapper: + """Create a JobWrapper instance""" + try: + job = JobWrapper(jobID, jobReport) + job.initialize(arguments) + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper failed the initialization phase") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + return None + return job + + +def transferInputSandbox(job: JobWrapper, inputSandbox: list, jobReport: JobReport) -> bool: + """Transfer the input sandbox""" + try: + result = job.transferInputSandbox(inputSandbox) + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to download input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while downloading input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + return True + + +def resolveInputData(job: JobWrapper, jobReport: JobReport) -> bool: + """Resolve the input data""" + try: + result = job.resolveInputData() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to resolve input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while resolving input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + return True + + +def processJobOutputs(job: JobWrapper, jobReport: JobReport) -> bool: + """Process the job outputs""" + try: + result = job.processJobOutputs() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to process output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while processing output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + return True + + +def finalize(job: JobWrapper) -> int: + """Finalize the job""" + try: + # Failed jobs will return !=0 / successful jobs will return 0 + return job.finalize() + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception during the finalization phase") + return 2 + + +def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: + """Execute the payload""" + try: + result = job.execute() + if not result["OK"]: + gLogger.error("Failed to execute job", result["Message"]) + raise JobWrapperError((result["Message"], result["Errno"])) + except JobWrapperError as exc: + if exc.value[1] == 0 or str(exc.value[0]) == "0": + gLogger.verbose("JobWrapper exited with status=0 after execution") + if exc.value[1] == DErrno.EWMSRESC: + gLogger.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return False + gLogger.exception("Job failed in execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Job raised exception during execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + return True diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 21cfd0a8cd9..9ae732b5b4d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -4,10 +4,8 @@ import sys import json -from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC import gLogger, S_OK from DIRAC.Core.Utilities.File import mkDir -from DIRAC.ConfigurationSystem.Client.Helpers import Registry -from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager def createJobWrapper( @@ -138,3 +136,70 @@ def createRelocatedJobWrapper( jobExeDirect = os.path.join(rootLocation, f"Job{jobID}") return S_OK(jobExeDirect) + + +def createLightJobWrapper( + jobID, + jobParams, + resourceParams, + optimizerParams, + payloadParams, + extraOptions="", + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py", + log=gLogger, + logLevel="INFO", +): + """This method creates a light job wrapper filled with the CE and Job parameters to execute the job. + Main user is the JobAgent + """ + if isinstance(extraOptions, str) and extraOptions.endswith(".cfg"): + extraOptions = f"--cfg {extraOptions}" + + arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams, "Payload": payloadParams} + log.verbose(f"Job arguments are: \n {arguments}") + + mkDir(os.path.join(os.getcwd(), "job/Wrapper")) + diracRoot = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + + jobWrapperFile = f"{os.getcwd()}/job/Wrapper/Wrapper_{jobID}" + if os.path.exists(jobWrapperFile): + log.verbose("Removing existing Job Wrapper for", jobID) + os.remove(jobWrapperFile) + with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd: + wrapperTemplate = fd.read() + + if "LogLevel" in jobParams: + logLevel = jobParams["LogLevel"] + log.info("Found Job LogLevel JDL parameter with value", logLevel) + else: + log.info("Applying default LogLevel JDL parameter with value", logLevel) + + dPython = sys.executable + realPythonPath = os.path.realpath(dPython) + log.debug("Real python path after resolving links is: ", realPythonPath) + dPython = realPythonPath + + # Making real substitutions + # wrapperTemplate = wrapperTemplate.replace( "@JOBARGS@", str( arguments ) ) + wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", os.getcwd()) + + jobWrapperJsonFile = jobWrapperFile + ".json" + with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile: + json.dump(str(arguments), jsonFile, ensure_ascii=False) + + with open(jobWrapperFile, "w") as wrapper: + wrapper.write(wrapperTemplate) + + jobExeFile = f"{os.getcwd()}/job/Wrapper/Job{jobID}" + jobFileContents = """#!/bin/sh +{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no +""".format( + dPython, + jobWrapperFile, + extraOptions, + logLevel, + ) + with open(jobExeFile, "w") as jobFile: + jobFile.write(jobFileContents) + + return S_OK((jobExeFile, jobWrapperJsonFile, jobWrapperFile))