diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 0e77bc1e321..248b8d17fab 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -54,8 +54,6 @@ from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient from DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog import Watchdog -EXECUTION_RESULT = {} - class JobWrapper: """The only user of the JobWrapper is the JobWrapperTemplate""" @@ -158,6 +156,7 @@ def __init__(self, jobID=None, jobReport=None): # Set defaults for some global parameters to be defined for the accounting report self.owner = "unknown" self.jobGroup = "unknown" + self.jobName = "unknown" self.jobType = "unknown" self.processingType = "unknown" self.userGroup = "unknown" @@ -175,6 +174,9 @@ def __init__(self, jobID=None, jobReport=None): self.optArgs = {} self.ceArgs = {} + # Store the result of the payload execution + self.executionResults = {} + ############################################################################# def initialize(self, arguments): """Initializes parameters and environment for job.""" @@ -189,6 +191,7 @@ def initialize(self, arguments): # Fill some parameters for the accounting report self.owner = self.jobArgs.get("Owner", self.owner) self.jobGroup = self.jobArgs.get("JobGroup", self.jobGroup) + self.jobName = self.jobArgs.get("JobName", self.jobName) self.jobType = self.jobArgs.get("JobType", self.jobType) dataParam = self.jobArgs.get("InputData", []) if dataParam and not isinstance(dataParam, list): @@ -260,55 +263,39 @@ def __dictAsInfoString(self, dData, infoString="", currentBase=""): return infoString ############################################################################# - def execute(self): - """The main execution method of the Job Wrapper""" - self.log.info(f"Job Wrapper is starting execution phase for job {self.jobID}") - os.environ["DIRACJOBID"] = str(self.jobID) - os.environ["DIRACSITE"] = DIRAC.siteName() - self.log.verbose(f"DIRACSITE = {DIRAC.siteName()}") - - os.environ["DIRAC_PROCESSORS"] = str(self.ceArgs.get("Processors", 1)) - self.log.verbose(f"DIRAC_PROCESSORS = {self.ceArgs.get('Processors', 1)}") - - os.environ["DIRAC_WHOLENODE"] = str(self.ceArgs.get("WholeNode", False)) - self.log.verbose(f"DIRAC_WHOLENODE = {self.ceArgs.get('WholeNode', False)}") + def __prepareCommand(self): + """Prepare the command to be executed.""" + if not "Executable" in self.jobArgs: + self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_NOT_FOUND, sendFlag=True) + return S_ERROR(f"Job {self.jobID} has no specified executable") - errorFile = self.jobArgs.get("StdError", self.defaultErrorFile) - outputFile = self.jobArgs.get("StdOutput", self.defaultOutputFile) + executable = self.jobArgs["Executable"].strip() + executable = os.path.expandvars(executable) - if "CPUTime" in self.jobArgs: - jobCPUTime = int(self.jobArgs["CPUTime"]) - else: - self.log.info( - "Job has no CPU time limit specified, ", - f"applying default of {self.defaultCPUTime} to {self.jobID}", - ) - jobCPUTime = self.defaultCPUTime - self.numberOfProcessors = int(self.jobArgs.get("NumberOfProcessors", 1)) + # Try to find the executable on PATH + if "/" not in executable: + # Returns None if the executable is not found so use "or" to leave it unchanged + executable = shutil.which(executable) or executable - jobMemory = 0.0 - if "Memory" in self.jobArgs: - # Job specifies memory in GB, internally use KB - jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0 + # Make the full path since . is not always in the PATH + executable = os.path.abspath(executable) + if not os.path.exists(executable): + self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_NOT_FOUND, sendFlag=True) + return S_ERROR(f"Path to executable {executable} not found") - if "Executable" in self.jobArgs: - executable = self.jobArgs[ - "Executable" - ].strip() # This is normally dirac-jobexec script, but not necessarily - else: - msg = f"Job {self.jobID} has no specified executable" - self.log.warn(msg) - return S_ERROR(msg) + if not os.access(executable, os.X_OK): + try: + os.chmod(executable, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH) + except OSError: + self.log.warn("Failed to change mode to 775 for the executable", executable) - # In case the executable is dirac-jobexec, - # the argument should include the jobDescription.xml file jobArguments = self.jobArgs.get("Arguments", "") # In case the executable is dirac-jobexec, # the configuration should include essential parameters related to the CE (which can be found in ceArgs) # we consider information from ceArgs more accurate than from LocalSite (especially when jobs are pushed) configOptions = "" - if executable == "dirac-jobexec": + if "dirac-jobexec" in executable: configOptions = f"-o /LocalSite/CPUNormalizationFactor={self.cpuNormalizationFactor} " configOptions += f"-o /LocalSite/Site={self.siteName} " configOptions += "-o /LocalSite/GridCE=%s " % self.ceArgs.get( @@ -321,22 +308,29 @@ def execute(self): "RemoteExecution", gConfig.getValue("/LocalSite/RemoteExecution", False) ) - executable = os.path.expandvars(executable) - exeThread = None - spObject = None + command = executable + if jobArguments: + command += " " + str(jobArguments) + if configOptions: + command += " " + configOptions - # Try to find the executable on PATH - if "/" not in executable: - # Returns None if the executable is not found so use "or" to leave it unchanged - executable = shutil.which(executable) or executable + return S_OK(command) - # Make the full path since . is not always in the PATH - executable = os.path.abspath(executable) - if not os.access(executable, os.X_OK): - try: - os.chmod(executable, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH) - except OSError: - self.log.warn("Failed to change mode to 775 for the executable", executable) + def __prepareEnvironment(self): + """Prepare the environment to be used by the payload.""" + os.environ["DIRACJOBID"] = str(self.jobID) + + diracSite = DIRAC.siteName() + os.environ["DIRACSITE"] = diracSite + self.log.verbose(f"DIRACSITE = {diracSite}") + + diracProcessors = self.ceArgs.get("Processors", 1) + os.environ["DIRAC_PROCESSORS"] = str(diracProcessors) + self.log.verbose(f"DIRAC_PROCESSORS = {diracProcessors}") + + diracWholeNode = self.ceArgs.get("WholeNode", False) + os.environ["DIRAC_WHOLENODE"] = str(diracWholeNode) + self.log.verbose(f"DIRAC_WHOLENODE = {diracWholeNode}") exeEnv = dict(os.environ) if "ExecutionEnvironment" in self.jobArgs: @@ -350,31 +344,70 @@ def execute(self): exeEnv[nameEnv] = valEnv self.log.verbose(f"{nameEnv} = {valEnv}") - if os.path.exists(executable): - # the actual executable is not yet running: it will be in few lines - self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True) - spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) - command = executable - if jobArguments: - command += " " + str(jobArguments) - if configOptions: - command += " " + configOptions - self.log.verbose(f"Execution command: {command}") - maxPeekLines = self.maxPeekLines - exeThread = ExecutionThread(spObject, command, maxPeekLines, outputFile, errorFile, exeEnv) - exeThread.start() - payloadPID = None - for seconds in range(5, 40, 5): - time.sleep(seconds) - payloadPID = spObject.getChildPID() - if payloadPID: - self.__setJobParam("PayloadPID", payloadPID) - break - if not payloadPID: - return S_ERROR("Payload process could not start after 140 seconds") + return S_OK(exeEnv) + + def preProcess(self): + """This method is called before the payload starts.""" + self.log.info(f"Job Wrapper is starting the pre processing phase for job {self.jobID}") + + result = self.__prepareCommand() + if not result["OK"]: + return result + command = result["Value"] + self.log.verbose(f"Execution command: {command}") + + # Prepare outputs + errorFile = self.jobArgs.get("StdError", self.defaultErrorFile) + outputFile = self.jobArgs.get("StdOutput", self.defaultOutputFile) + + result = self.__prepareEnvironment() + if not result["OK"]: + return result + exeEnv = result["Value"] + + return S_OK( + { + "command": command, + "error": errorFile, + "output": outputFile, + "env": exeEnv, + } + ) + + ############################################################################# + def process(self, command: str, output: str, error: str, env: dict): + """This method calls the payload.""" + self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}") + + if "CPUTime" in self.jobArgs: + jobCPUTime = int(self.jobArgs["CPUTime"]) else: - self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_NOT_FOUND, sendFlag=True) - return S_ERROR(f"Path to executable {executable} not found") + self.log.info( + "Job has no CPU time limit specified, ", + f"applying default of {self.defaultCPUTime} to {self.jobID}", + ) + jobCPUTime = self.defaultCPUTime + self.numberOfProcessors = int(self.jobArgs.get("NumberOfProcessors", 1)) + + jobMemory = 0.0 + if "Memory" in self.jobArgs: + # Job specifies memory in GB, internally use KB + jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0 + + # The actual executable is not yet running: it will be in few lines + self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True) + spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) + exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) + exeThread.start() + payloadPID = None + for seconds in range(5, 40, 5): + time.sleep(seconds) + payloadPID = spObject.getChildPID() + if payloadPID: + self.__setJobParam("PayloadPID", payloadPID) + break + if not payloadPID: + return S_ERROR("Payload process could not start after 140 seconds") watchdog = Watchdog( pid=self.currentPID, @@ -390,14 +423,14 @@ def execute(self): watchdog.initialize() self.log.verbose("Calibrating Watchdog instance") watchdog.calibrate() - # do not kill Test jobs by CPU time + # Do not kill Test jobs by CPU time if self.jobArgs.get("JobType", "") == "Test": watchdog.testCPUConsumed = False if "DisableCPUCheck" in self.jobArgs: watchdog.testCPUConsumed = False - # disable checks if remote execution: do not need it as pre/post processing occurs locally + # Disable checks if remote execution: do not need it as pre/post processing occurs locally if self.ceArgs.get("RemoteExecution", False): watchdog.testWallClock = False watchdog.testDiskSpace = False @@ -418,65 +451,94 @@ def execute(self): while exeThread.is_alive(): time.sleep(5) - outputs = None - if "Thread" in EXECUTION_RESULT: - threadResult = EXECUTION_RESULT["Thread"] - if not threadResult["OK"]: - self.log.error("Failed to execute the payload", threadResult["Message"]) - - self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True) - if "Value" in threadResult: - outs = threadResult["Value"] - if outs: - self.__setJobParam("ApplicationError", outs[0], sendFlag=True) - else: - self.__setJobParam("ApplicationError", "None reported", sendFlag=True) - else: - outputs = threadResult["Value"] - else: # if the execution thread didn't complete + payloadResult = { + "payloadStatus": None, + "payloadOutput": None, + "payloadExecutorError": None, + "cpuTimeConsumed": None, + "watchdogError": watchdog.checkError, + "watchdogStats": watchdog.currentStats, + } + + # Get CPU time consumed + if watchdog.checkError: + # In this case, the Watchdog has killed the Payload and the ExecutionThread can not get the CPU statistics + # os.times only reports for waited children + # Take the CPU from the last value recorded by the Watchdog + if "CPU" in self.executionResults and "LastUpdateCPU(s)" in watchdog.currentStats: + self.executionResults["CPU"][0] = watchdog.currentStats["LastUpdateCPU(s)"] + payloadResult["cpuTimeConsumed"] = self.executionResults.get("CPU") + + # Get payload status or error if an issue occurred + result = self.executionResults.get("Thread", {}) + if not result: + return S_OK(payloadResult) + if not result["OK"]: + payloadResult["payloadExecutorError"] = result["Message"] + if "Value" in result: + payloadResult["payloadStatus"] = result["Value"][0] + return S_OK(payloadResult) + + payloadResult["payloadStatus"] = result["Value"][0] + + # Get payload output + result = exeThread.getOutput(self.maxPeekLines) + if not result["OK"]: + return S_OK(payloadResult) + + payloadResult["payloadOutput"] = "\n".join(result["Value"]) + return S_OK(payloadResult) + + ############################################################################# + def postProcess( + self, + payloadStatus: int, + payloadOutput: str, + payloadExecutorError: str, + cpuTimeConsumed: list, + watchdogError: str, + watchdogStats: dict, + ): + """This method is called after the payload has finished running.""" + self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}") + + # if the execution thread didn't complete + if payloadStatus is None and not payloadExecutorError: self.log.error("Application thread did not complete") self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True) self.__setJobParam("ApplicationError", JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True) return S_ERROR("No outputs generated from job execution") - if "CPU" in EXECUTION_RESULT: - cpuString = " ".join([f"{x:.2f}" for x in EXECUTION_RESULT["CPU"]]) - self.log.info("EXECUTION_RESULT[CPU] in JobWrapper execute", cpuString) + # If the execution thread got an error (not a payload error) + if payloadExecutorError: + self.log.error("Failed to execute the payload", payloadExecutorError) + self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True) + applicationErrorStatus = "None reported" + if payloadStatus: + applicationErrorStatus = payloadStatus + self.__setJobParam("ApplicationError", applicationErrorStatus, sendFlag=True) - if watchdog.checkError: - # In this case, the Watchdog has killed the Payload and the ExecutionThread can not get the CPU statistics - # os.times only reports for waited children - # Take the CPU from the last value recorded by the Watchdog - self.__report(status=JobStatus.FAILED, minorStatus=watchdog.checkError, sendFlag=True) - if "CPU" in EXECUTION_RESULT: - if "LastUpdateCPU(s)" in watchdog.currentStats: - EXECUTION_RESULT["CPU"][0] = watchdog.currentStats["LastUpdateCPU(s)"] + if cpuTimeConsumed: + cpuString = " ".join([f"{x:.2f}" for x in cpuTimeConsumed]) + self.log.info("CPU time consumed in JobWrapper process", cpuString) - if watchdog.currentStats: + if watchdogError: + self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True) + + if watchdogStats: self.log.info( "Statistics collected by the Watchdog:\n ", - "\n ".join(["%s: %s" % items for items in watchdog.currentStats.items()]), + "\n ".join(["%s: %s" % items for items in watchdogStats.items()]), ) # can be an iterator - if outputs: - status = threadResult["Value"][0] # the status of the payload execution - # Send final heartbeat of a configurable number of lines here - self.log.verbose("Sending final application standard output heartbeat") - self.__sendFinalStdOut(exeThread) - self.log.verbose(f"Execution thread status = {status}") - - if not watchdog.checkError and not status: - self.failedFlag = False - self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True) - elif not watchdog.checkError: - self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_ERRORS, sendFlag=True) - if status in (DErrno.EWMSRESC, DErrno.EWMSRESC & 255): # the status will be truncated to 0xDE (222) - self.log.verbose("job will be rescheduled") - self.__report(minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True) - return S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled") - else: + if payloadStatus is None: return S_ERROR("No outputs generated from job execution") + # Send final heartbeat of a configurable number of lines here + self.log.verbose("Sending final application standard output heartbeat") + self.__sendFinalStdOut(payloadOutput) + self.log.verbose(f"Execution thread status = {payloadStatus}") + self.log.info("Checking directory contents after execution:") res = systemCall(5, ["ls", "-al"]) if not res["OK"]: @@ -487,10 +549,55 @@ def execute(self): # no timeout and exit code is 0 self.log.info(res["Value"][1]) + if not watchdogError and payloadStatus != 0: + self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_ERRORS, sendFlag=True) + + if not watchdogError and payloadStatus in ( + DErrno.EWMSRESC, + DErrno.EWMSRESC & 255, + ): # the status will be truncated to 0xDE (222) + self.log.verbose("job will be rescheduled") + self.__report(minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True) + return S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled") + + if not watchdogError and payloadStatus == 0: + self.failedFlag = False + self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True) + return S_OK() ############################################################################# - def __sendFinalStdOut(self, exeThread): + def execute(self): + """Main execution method of the Job Wrapper""" + result = self.preProcess() + if not result["OK"]: + return result + payloadParams = result["Value"] + + result = self.process( + command=payloadParams["command"], + output=payloadParams["output"], + error=payloadParams["error"], + env=payloadParams["env"], + ) + if not result["OK"]: + return result + payloadResult = result["Value"] + + result = self.postProcess( + payloadStatus=payloadResult["payloadStatus"], + payloadOutput=payloadResult["payloadOutput"], + payloadExecutorError=payloadResult["payloadExecutorError"], + cpuTimeConsumed=payloadResult["cpuTimeConsumed"], + watchdogError=payloadResult["watchdogError"], + watchdogStats=payloadResult["watchdogStats"], + ) + if not result["OK"]: + return result + return S_OK() + + ############################################################################# + def __sendFinalStdOut(self, payloadOutput): """After the Watchdog process has finished, this function sends a final report to be presented in the StdOut in the web page via the heartbeat mechanism. @@ -503,13 +610,11 @@ def __sendFinalStdOut(self, exeThread): if self.cpuNormalizationFactor: self.log.info("Normalized CPU Consumed is:", normCPU) - result = exeThread.getOutput(self.maxPeekLines) - if not result["OK"]: - lines = 0 - appStdOut = "" - else: - lines = len(result["Value"]) - appStdOut = "\n".join(result["Value"]) + lines = 0 + appStdOut = "" + if payloadOutput: + lines = len(payloadOutput.split("\n")) + appStdOut = payloadOutput header = "Last {} lines of application output from JobWrapper on {} :".format( lines, @@ -535,9 +640,9 @@ def __sendFinalStdOut(self, exeThread): def __getCPU(self): """Uses os.times() to get CPU time and returns HH:MM:SS after conversion.""" # TODO: normalize CPU consumed via scale factor - cpuString = " ".join([f"{x:.2f}" for x in EXECUTION_RESULT["CPU"]]) - self.log.info("EXECUTION_RESULT[CPU] in __getCPU", cpuString) - utime, stime, cutime, cstime, _elapsed = EXECUTION_RESULT["CPU"] + cpuString = " ".join([f"{x:.2f}" for x in self.executionResults["CPU"]]) + self.log.info("CPU time left in __getCPU", cpuString) + utime, stime, cutime, cstime, _elapsed = self.executionResults["CPU"] cpuTime = utime + stime + cutime + cstime self.log.verbose(f"Total CPU time consumed = {cpuTime}") result = self.__getCPUHMS(cpuTime) @@ -1227,19 +1332,19 @@ def sendJobAccounting(self, status="", minorStatus=""): self.accountingReport.setEndTime() # CPUTime and ExecTime - if "CPU" not in EXECUTION_RESULT: + if "CPU" not in self.executionResults: # If the payload has not started execution (error with input data, SW, SB,...) # Execution result is not filled use self.initialTiming - self.log.info("EXECUTION_RESULT[CPU] missing in sendJobAccounting") + self.log.info("CPU time left missing in sendJobAccounting") finalStat = os.times() - EXECUTION_RESULT["CPU"] = [] + self.executionResults["CPU"] = [] for i, _ in enumerate(finalStat): - EXECUTION_RESULT["CPU"].append(finalStat[i] - self.initialTiming[i]) + self.executionResults["CPU"].append(finalStat[i] - self.initialTiming[i]) - cpuString = " ".join([f"{x:.2f}" for x in EXECUTION_RESULT["CPU"]]) - self.log.info("EXECUTION_RESULT[CPU] in sendJobAccounting", cpuString) + cpuString = " ".join([f"{x:.2f}" for x in self.executionResults["CPU"]]) + self.log.info("CPU time left in sendJobAccounting", cpuString) - utime, stime, cutime, cstime, elapsed = EXECUTION_RESULT["CPU"] + utime, stime, cutime, cstime, elapsed = self.executionResults["CPU"] try: cpuTime = int(utime + stime + cutime + cstime) except ValueError: @@ -1291,11 +1396,10 @@ def sendFailoverRequest(self): request.delayNextExecution(self.failoverRequestDelay) requestName = f"job_{self.jobID}" - if "JobName" in self.jobArgs: + if self.jobName != "unknown": # To make the request names more appealing for users - jobName = self.jobArgs["JobName"] - if isinstance(jobName, str) and jobName: - jobName = jobName.replace(" ", "").replace("(", "").replace(")", "").replace('"', "") + if isinstance(self.jobName, str): + jobName = self.jobName.replace(" ", "").replace("(", "").replace(")", "").replace('"', "") jobName = jobName.replace(".", "").replace("{", "").replace("}", "").replace(":", "") requestName = f"{jobName}_{requestName}" @@ -1429,7 +1533,7 @@ def __setJobParamList(self, value, sendFlag=False): class ExecutionThread(threading.Thread): ############################################################################# - def __init__(self, spObject, cmd, maxPeekLines, stdoutFile, stderrFile, exeEnv): + def __init__(self, spObject, cmd, maxPeekLines, stdoutFile, stderrFile, exeEnv, executionResults): threading.Thread.__init__(self) self.cmd = cmd self.spObject = spObject @@ -1438,6 +1542,7 @@ def __init__(self, spObject, cmd, maxPeekLines, stdoutFile, stderrFile, exeEnv): self.stdout = stdoutFile self.stderr = stderrFile self.exeEnv = exeEnv + self.executionResults = executionResults ############################################################################# def run(self): @@ -1446,24 +1551,21 @@ def run(self): """ log = gLogger.getSubLogger(self.__class__.__name__) - # FIXME: why local instances of object variables are created? - cmd = self.cmd - spObject = self.spObject start = time.time() initialStat = os.times() - log.verbose("Cmd called", cmd) - output = spObject.systemCall(cmd, env=self.exeEnv, callbackFunction=self.sendOutput, shell=True) + log.verbose("Cmd called", self.cmd) + output = self.spObject.systemCall(self.cmd, env=self.exeEnv, callbackFunction=self.sendOutput, shell=True) log.verbose(f"Output of system call within execution thread: {output}") - EXECUTION_RESULT["Thread"] = output + self.executionResults["Thread"] = output timing = time.time() - start - EXECUTION_RESULT["Timing"] = timing + self.executionResults["Timing"] = timing finalStat = os.times() - EXECUTION_RESULT["CPU"] = [] + self.executionResults["CPU"] = [] for i, _ in enumerate(finalStat): - EXECUTION_RESULT["CPU"].append(finalStat[i] - initialStat[i]) - cpuString = " ".join([f"{x:.2f}" for x in EXECUTION_RESULT["CPU"]]) - log.info("EXECUTION_RESULT[CPU] after Execution of spObject.systemCall", cpuString) - log.info("EXECUTION_RESULT[Thread] after Execution of spObject.systemCall", str(EXECUTION_RESULT["Thread"])) + self.executionResults["CPU"].append(finalStat[i] - initialStat[i]) + cpuString = " ".join([f"{x:.2f}" for x in self.executionResults["CPU"]]) + log.info("CPU time consumed after Execution of spObject.systemCall", cpuString) + log.info("Thread result after Execution of spObject.systemCall", str(self.executionResults["Thread"])) ############################################################################# def getCurrentPID(self): diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 1284cbf5815..5e893ac77e5 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -256,12 +256,12 @@ def execute(arguments): ret = execute(jobArgs) gJobReport.commit() except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapperTemplate exception", lException=exc) + gLogger.exception("JobWrapperTemplate exception") try: gJobReport.commit() ret = -1 except Exception as exc: # pylint: disable=broad-except - gLogger.exception("Could not commit the job report", lException=exc) + gLogger.exception("Could not commit the job report") ret = -2 sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index fb8ac3ce233..4c00cb1e788 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -2,10 +2,14 @@ """ import os import shutil +import tempfile +import time import pytest from unittest.mock import MagicMock from DIRAC import gLogger +from DIRAC.Core.Utilities import DErrno +from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.DataManagementSystem.Client.test.mock_DM import dm_mock from DIRAC.Resources.Catalog.test.mock_FC import fc_mock @@ -19,46 +23,500 @@ gLogger.setLevel("DEBUG") +# PreProcess method -def test_InputData(mocker): - mocker.patch( - "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + +def test_preProcess(mocker): + """Test the pre process method of the JobWrapper class.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + + echoLocation = shutil.which("echo") + diracJobExecLocation = shutil.which("dirac-jobexec") + + # Test a simple command without argument + jw = JobWrapper() + jw.jobArgs = {"Executable": echoLocation} + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == echoLocation + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command with arguments + jw = JobWrapper() + jw.jobArgs = {"Executable": echoLocation, "Arguments": "hello"} + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == f"{echoLocation} hello" + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command that is included in the PATH + jw = JobWrapper() + jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == f"{echoLocation} hello" + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command and specify outputs + jw = JobWrapper() + jw.jobArgs = {"Executable": "echo", "Arguments": "hello", "StdError": "error.log", "StdOutput": "output.log"} + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == f"{echoLocation} hello" + assert result["Value"]["error"] == "error.log" + assert result["Value"]["output"] == "output.log" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command and specify number of processors + jw = JobWrapper() + jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} + jw.ceArgs = {"Processors": 2} + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == f"{echoLocation} hello" + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "2" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command with environment variable in the executable + jw = JobWrapper() + jw.jobArgs = {"Executable": "${CMD}", "Arguments": "hello"} + + os.environ["CMD"] = echoLocation + + result = jw.preProcess() + assert result["OK"] + assert result["Value"]["command"] == f"{echoLocation} hello" + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + + # Test a command with an empty executable + jw = JobWrapper() + jw.jobArgs = {} + result = jw.preProcess() + assert not result["OK"] + assert result["Message"] == "Job 0 has no specified executable" + + # Test a command with an executable that does not exist + jw = JobWrapper() + jw.jobArgs = {"Executable": "pippo"} + result = jw.preProcess() + assert not result["OK"] + assert result["Message"] == f"Path to executable {os.getcwd()}/pippo not found" + + # Test dirac-jobexec + jw = JobWrapper() + jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} + + result = jw.preProcess() + assert result["OK"] + expectedOptions = [ + "-o /LocalSite/CPUNormalizationFactor=0.0", + "-o /LocalSite/Site=DIRAC.Client.local", + "-o /LocalSite/GridCE=", + "-o /LocalSite/CEQueue=", + "-o /LocalSite/RemoteExecution=False", + ] + assert ( + result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" ) - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + # Test dirac-jobexec with arguments jw = JobWrapper() - jw.jobArgs["InputData"] = "" - res = jw.resolveInputData() - assert res["OK"] is False + jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} + jw.ceArgs = {"GridCE": "CE", "Queue": "Queue", "RemoteExecution": True} + result = jw.preProcess() + + assert result["OK"] + expectedOptions = [ + "-o /LocalSite/CPUNormalizationFactor=0.0", + "-o /LocalSite/Site=DIRAC.Client.local", + "-o /LocalSite/GridCE=CE", + "-o /LocalSite/CEQueue=Queue", + "-o /LocalSite/RemoteExecution=True", + ] + assert ( + result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" + ) + assert result["Value"]["error"] == "std.err" + assert result["Value"]["output"] == "std.out" + assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" + assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" + +# Process method + + +@pytest.mark.slow +def test_processSuccessfulCommand(mocker): + """Test the process method of the JobWrapper class: most common scenario.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") jw = JobWrapper() - jw.jobArgs["InputData"] = "pippo" - jw.dm = dm_mock - jw.fc = fc_mock - res = jw.resolveInputData() - assert res["OK"] + jw.jobArgs = {"CPUTime": 100, "Memory": 1} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + result = jw.process( + command=f"{os.path.dirname(os.path.abspath(__file__))}/script-long.sh", + output=std_out.name, + error=std_err.name, + env={}, + ) + + assert result["OK"] + assert result["Value"]["payloadStatus"] == 0 + assert result["Value"]["payloadOutput"] == "Hello World" + assert not result["Value"]["payloadExecutorError"] + assert result["Value"]["cpuTimeConsumed"][0] > 0 + assert not result["Value"]["watchdogError"] + assert "LastUpdateCPU(s)" in result["Value"]["watchdogStats"] + assert "MemoryUsed(kb)" in result["Value"]["watchdogStats"] + +@pytest.mark.slow +def test_processSuccessfulDiracJobExec(mocker): + """Test the process method of the JobWrapper class: most common scenario with dirac-jobexec.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") jw = JobWrapper() - jw.jobArgs["InputData"] = "pippo" - jw.jobArgs["LocalSE"] = "mySE" - jw.jobArgs["InputDataModule"] = "aa.bb" - jw.dm = dm_mock - jw.fc = fc_mock - res = jw.resolveInputData() - assert res["OK"] + jw.jobArgs = {"CPUTime": 100, "Memory": 1} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + executable = shutil.which("dirac-jobexec") + result = jw.process( + command=f"{executable} {os.path.dirname(os.path.abspath(__file__))}/jobDescription.xml --o /DIRAC/Setup=Test", + output=std_out.name, + error=std_err.name, + env={}, + ) + + assert result["OK"] + assert result["Value"]["payloadStatus"] == 0 + assert "ls successful" in result["Value"]["payloadOutput"] + assert not result["Value"]["payloadExecutorError"] + + +@pytest.mark.slow +def test_processFailedCommand(mocker): + """Test the process method of the JobWrapper class: the command fails.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + jw = JobWrapper() + jw.jobArgs = {"CPUTime": 100, "Memory": 1} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + result = jw.process( + command=f"{os.path.dirname(os.path.abspath(__file__))}/script-fail.sh", + output=std_out.name, + error=std_err.name, + env={}, + ) -def test_performChecks(): - wd = Watchdog( - pid=os.getpid(), - exeThread=MagicMock(), - spObject=MagicMock(), - jobCPUTime=1000, - memoryLimit=1024 * 1024, - jobArgs={"StopSigNumber": 10}, + assert result["OK"] + assert result["Value"]["payloadStatus"] == 127 + assert ( + result["Value"]["payloadOutput"] + == f"Hello World\n{os.path.dirname(os.path.abspath(__file__))}/script-fail.sh: line 5: command_not_found: command not found" ) - res = wd._performChecks() - assert res["OK"] + assert not result["Value"]["payloadExecutorError"] + assert result["Value"]["cpuTimeConsumed"][0] > 0 + assert not result["Value"]["watchdogError"] + assert "LastUpdateCPU(s)" in result["Value"]["watchdogStats"] + assert "MemoryUsed(kb)" in result["Value"]["watchdogStats"] + + +@pytest.mark.slow +def test_processFailedSubprocess(mocker): + """Test the process method of the JobWrapper class: the subprocess fails.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.systemCall") + mock_system_call.return_value = S_ERROR("Any problem") + mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.getChildPID") + mock_system_call.return_value = 123 + + jw = JobWrapper() + jw.jobArgs = {"CPUTime": 100, "Memory": 1} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + result = jw.process("mock_command", std_out.name, std_err.name, {}) + + assert result["OK"] + assert not result["Value"]["payloadStatus"] + assert not result["Value"]["payloadOutput"] + assert result["Value"]["payloadExecutorError"] == "Any problem" + assert result["Value"]["cpuTimeConsumed"][0] == 0.0 + assert not result["Value"]["watchdogError"] + assert not result["Value"]["watchdogStats"] + + +@pytest.mark.slow +def test_processQuickExecutionNoWatchdog(mocker): + """Test the process method of the JobWrapper class: the payload is too fast to start the watchdog.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + jw = JobWrapper() + jw.jobArgs = {"CPUTime": 100, "Memory": 1} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + result = jw.process(command=f"echo hello", output=std_out.name, error=std_err.name, env={}) + + assert result["OK"] + assert result["Value"]["payloadStatus"] == 0 + assert result["Value"]["payloadOutput"] == "hello" + assert not result["Value"]["payloadExecutorError"] + assert result["Value"]["cpuTimeConsumed"][0] == 0.0 + assert not result["Value"]["watchdogError"] + assert not result["Value"]["watchdogStats"] + + +@pytest.mark.slow +def test_processSubprocessFailureNoPid(mocker): + """Test the process method of the JobWrapper class: the subprocess fails and no PID is returned.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + # Test failure in starting the payload process + jw = JobWrapper() + jw.jobArgs = {} + + mocker.patch.object(jw, "_JobWrapper__report") + mocker.patch.object(jw, "_JobWrapper__setJobParam") + mock_exeThread = mocker.Mock() + mock_exeThread.start.side_effect = lambda: time.sleep(0.1) + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ExecutionThread", return_value=mock_exeThread) + + with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + result = jw.process(command=f"mock_command", output=std_out.name, error=std_err.name, env={}) + assert not result["OK"] + assert "Payload process could not start after 140 seconds" in result["Message"] + + +# PostProcess method + + +def test_postProcess(mocker): + """Test the post process method of the JobWrapper class.""" + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + # Mimic the behaviour of __report and __setJobParam to get the arguments passed to them + report_args = [] + set_param_args = [] + + def report_side_effect(*args, **kwargs): + report_args.append(kwargs) + + def set_param_side_effect(*args, **kwargs): + set_param_args.append((args, kwargs)) + + # Test when the payload finished successfully + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 0, + "payloadOutput": "Hello World", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": None, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(kb)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + assert report_args[-1]["status"] == JobStatus.COMPLETING + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS + + # Test when the payload failed + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 126, + "payloadOutput": "Hello World\nscript.sh: line 5: command_not_found: command not found", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": None, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(kb)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + assert report_args[-1]["status"] == JobStatus.COMPLETING + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_ERRORS + + # Test when the payload failed: should be rescheduled + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": DErrno.EWMSRESC, + "payloadOutput": "Hello World\nscript.sh: line 5: command_not_found: command not found", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": None, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(kb)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert result["Errno"] == DErrno.EWMSRESC + assert report_args[-2]["status"] == JobStatus.COMPLETING + assert report_args[-2]["minorStatus"] == JobMinorStatus.APP_ERRORS + assert report_args[-1]["minorStatus"] == JobMinorStatus.GOING_RESCHEDULE + + # Test when there is no output + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 0, + "payloadOutput": "", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": None, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(kb)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + assert report_args[-1]["status"] == JobStatus.COMPLETING + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS + + # Test when there is a watchdog error + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 1, + "payloadOutput": "Error output", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": "Watchdog error", + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(kb)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + assert report_args[-1]["status"] == JobStatus.FAILED + assert report_args[-1]["minorStatus"] == payloadResult["watchdogError"] + + # Test when the executor failed: no status defined + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": None, + "payloadOutput": None, + "payloadExecutorError": "Execution failed", + "cpuTimeConsumed": None, + "watchdogError": None, + "watchdogStats": None, + } + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert report_args[-1]["status"] == JobStatus.FAILED + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_THREAD_FAILED + assert set_param_args[-1][0][1] == "None reported" + + # Test when the executor failed: status defined + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 126, + "payloadOutput": None, + "payloadExecutorError": "Execution failed", + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": None, + "watchdogStats": None, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + assert report_args[-1]["status"] == JobStatus.COMPLETING + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_ERRORS + assert set_param_args[-3][0][1] == 126 + + # Test when the subprocess did not complete + jw = JobWrapper() + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": None, + "payloadOutput": None, + "payloadExecutorError": None, + "cpuTimeConsumed": None, + "watchdogError": None, + "watchdogStats": None, + } + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert "No outputs generated from job execution" in result["Message"] + assert report_args[-1]["status"] == JobStatus.FAILED + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_THREAD_NOT_COMPLETE + + +# Execute method @pytest.mark.slow @@ -88,12 +546,8 @@ def test_execute(mocker, executable, args, src, expectedResult): The returned value of JobWrapper.execute() is not checked as it can apparently be wrong depending on the shell used. """ - mocker.patch( - "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock - ) - mocker.patch( - "DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", side_effect=getSystemSectionMock - ) + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") if src: shutil.copy(os.path.join(src, executable), executable) @@ -112,6 +566,34 @@ def test_execute(mocker, executable, args, src, expectedResult): os.remove("std.out") +def test_InputData(mocker): + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) + + jw = JobWrapper() + jw.jobArgs["InputData"] = "" + res = jw.resolveInputData() + assert res["OK"] is False + + jw = JobWrapper() + jw.jobArgs["InputData"] = "pippo" + jw.dm = dm_mock + jw.fc = fc_mock + res = jw.resolveInputData() + assert res["OK"] + + jw = JobWrapper() + jw.jobArgs["InputData"] = "pippo" + jw.jobArgs["LocalSE"] = "mySE" + jw.jobArgs["InputDataModule"] = "aa.bb" + jw.dm = dm_mock + jw.fc = fc_mock + res = jw.resolveInputData() + assert res["OK"] + + @pytest.mark.parametrize( "failedFlag, expectedRes, finalStates", [ diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-fail.sh b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-fail.sh new file mode 100755 index 00000000000..52fd2006b9b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-fail.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +sleep 10 +echo "Hello World" +command_not_found 123 diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-long.sh b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-long.sh new file mode 100755 index 00000000000..00c8572c40b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-long.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +sleep 10 +echo "Hello World"