diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py new file mode 100644 index 00000000000..c82e10a9de3 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py @@ -0,0 +1,39 @@ +from DIRAC import S_OK + + +class JobExecutionCoordinator: + """ + Abstract class for job execution coordinators. + + This class is responsible for pre-processing and post-processing jobs before and after execution. + It should be implemented by the community job execution coordinator. + It is used by the JobWrapper to handle the execution of jobs. + """ + + def __init__(self, jobArgs: dict, ceArgs: dict) -> None: + """ + Initialize the job execution coordinator. + + :param jobArgs: The job arguments + :param ceArgs: The environment arguments + """ + self.jobArgs = jobArgs + self.ceArgs = ceArgs + + def preProcess(self, command: str, exeEnv: dict): + """ + Pre-process a job before executing it. + This should handle tasks like downloading inputs, preparing commands, etc. + + :param job: The job to be pre-processed + """ + return S_OK({"command": command, "env": exeEnv}) + + def postProcess(self): + """ + Post-process a job after executing it. + This should handle tasks like uploading outputs, checking results, etc. + + :param job: The job to be post-processed + """ + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 0a003b10ad6..c17f2e3284d 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -97,8 +97,8 @@ def __init__(self, jobID=None, jobReport=None): if self.maxPeekLines < 0: self.maxPeekLines = 0 self.defaultCPUTime = gConfig.getValue(self.section + "/DefaultCPUTime", 600) - self.defaultOutputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out") - self.defaultErrorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err") + self.outputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out") + self.errorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err") self.diskSE = gConfig.getValue(self.section + "/DiskSE", ["-disk", "-DST", "-USER"]) self.tapeSE = gConfig.getValue(self.section + "/TapeSE", ["-tape", "-RDST", "-RAW"]) self.failoverRequestDelay = gConfig.getValue(self.section + "/FailoverRequestDelay", 45) @@ -175,6 +175,8 @@ def __init__(self, jobID=None, jobReport=None): self.optArgs = {} self.ceArgs = {} + self.jobExecutionCoordinator = None + # Store the result of the payload execution self.executionResults = {} @@ -194,6 +196,10 @@ def initialize(self, arguments): self.jobGroup = self.jobArgs.get("JobGroup", self.jobGroup) self.jobName = self.jobArgs.get("JobName", self.jobName) self.jobType = self.jobArgs.get("JobType", self.jobType) + # Prepare outputs + self.errorFile = self.jobArgs.get("StdError", self.errorFile) + self.outputFile = self.jobArgs.get("StdOutput", self.outputFile) + dataParam = self.jobArgs.get("InputData", []) if dataParam and not isinstance(dataParam, list): dataParam = [dataParam] @@ -232,6 +238,14 @@ def initialize(self, arguments): self.log.debug("================") self.log.debug(json.dumps(dict(os.environ), indent=4)) + # Load the Job Execution Coordinator: can be overriden by a specific implementation + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator", "JobExecutionCoordinator" + ) + if not result["OK"]: + return result + self.jobExecutionCoordinator = result["Value"](jobArgs=self.jobArgs, ceArgs=self.ceArgs) + ############################################################################# def __setInitialJobParameters(self): """Sets some initial job parameters""" @@ -358,26 +372,19 @@ def preProcess(self): command = result["Value"] self.log.verbose(f"Execution command: {command}") - # Prepare outputs - errorFile = self.jobArgs.get("StdError", self.defaultErrorFile) - outputFile = self.jobArgs.get("StdOutput", self.defaultOutputFile) - result = self.__prepareEnvironment() if not result["OK"]: return result exeEnv = result["Value"] - return S_OK( - { - "command": command, - "error": errorFile, - "output": outputFile, - "env": exeEnv, - } - ) + if not (result := self.jobExecutionCoordinator.preProcess(command, exeEnv))["OK"]: + self.log.error("Failed to pre-process the job", result["Message"]) + return result + + return result ############################################################################# - def process(self, command: str, output: str, error: str, env: dict): + def process(self, command: str, env: dict): """This method calls the payload.""" self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}") @@ -398,7 +405,9 @@ def process(self, command: str, output: str, error: str, env: dict): spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) with contextlib.chdir(self.jobIDPath): - exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) + exeThread = ExecutionThread( + spObject, command, self.maxPeekLines, self.outputFile, self.errorFile, env, self.executionResults + ) exeThread.start() payloadPID = None for seconds in range(5, 40, 5): @@ -565,7 +574,11 @@ def postProcess( self.failedFlag = False self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True) - return S_OK() + if not (result := self.jobExecutionCoordinator.postProcess())["OK"]: + self.log.error("Failed to post-process the job", result["Message"]) + return result + + return result ############################################################################# def execute(self): @@ -580,8 +593,6 @@ def execute(self): result = self.process( command=payloadParams["command"], - output=payloadParams["output"], - error=payloadParams["error"], env=payloadParams["env"], ) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index f999baf6826..0dc4d899e4b 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -15,6 +15,7 @@ from DIRAC.DataManagementSystem.Client.test.mock_DM import dm_mock from DIRAC.Resources.Catalog.test.mock_FC import fc_mock from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator import JobExecutionCoordinator from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper getSystemSectionMock = MagicMock() @@ -35,48 +36,44 @@ def test_preProcess(mocker): # Test a simple command without argument jw = JobWrapper() jw.jobArgs = {"Executable": echoLocation} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == echoLocation - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" # Test a command with arguments jw = JobWrapper() jw.jobArgs = {"Executable": echoLocation, "Arguments": "hello"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" # Test a command that is included in the PATH jw = JobWrapper() jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" # Test a command and specify outputs jw = JobWrapper() jw.jobArgs = {"Executable": "echo", "Arguments": "hello", "StdError": "error.log", "StdOutput": "output.log"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "error.log" - assert result["Value"]["output"] == "output.log" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" @@ -84,32 +81,32 @@ def test_preProcess(mocker): jw = JobWrapper() jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} jw.ceArgs = {"Processors": 2} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "2" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" # Test a command with environment variable in the executable jw = JobWrapper() jw.jobArgs = {"Executable": "${CMD}", "Arguments": "hello"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) os.environ["CMD"] = echoLocation result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" # Test a command with an empty executable jw = JobWrapper() jw.jobArgs = {} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + result = jw.preProcess() assert not result["OK"] assert result["Message"] == "Job 0 has no specified executable" @@ -117,6 +114,8 @@ def test_preProcess(mocker): # Test a command with an executable that does not exist jw = JobWrapper() jw.jobArgs = {"Executable": "pippo"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + result = jw.preProcess() assert not result["OK"] assert result["Message"] == f"Path to executable {os.getcwd()}/pippo not found" @@ -124,6 +123,7 @@ def test_preProcess(mocker): # Test dirac-jobexec jw = JobWrapper() jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) result = jw.preProcess() assert result["OK"] @@ -136,8 +136,6 @@ def test_preProcess(mocker): assert ( result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" ) - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" @@ -145,8 +143,9 @@ def test_preProcess(mocker): jw = JobWrapper() jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} jw.ceArgs = {"GridCE": "CE", "Queue": "Queue", "SubmissionPolicy": "Application"} - result = jw.preProcess() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + result = jw.preProcess() assert result["OK"] expectedOptions = [ "-o /LocalSite/CPUNormalizationFactor=0.0", @@ -158,8 +157,6 @@ def test_preProcess(mocker): assert ( result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" ) - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" @@ -179,10 +176,10 @@ def test_processSuccessfulCommand(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{os.path.dirname(os.path.abspath(__file__))}/script-long.sh", - output=std_out.name, - error=std_err.name, env={}, ) @@ -209,10 +206,10 @@ def test_processSuccessfulDiracJobExec(mocker): with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: executable = shutil.which("dirac-jobexec") + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{executable} {os.path.dirname(os.path.abspath(__file__))}/jobDescription.xml --o /DIRAC/Setup=Test", - output=std_out.name, - error=std_err.name, env={}, ) @@ -234,10 +231,10 @@ def test_processFailedCommand(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{os.path.dirname(os.path.abspath(__file__))}/script-fail.sh", - output=std_out.name, - error=std_err.name, env={}, ) @@ -271,7 +268,9 @@ def test_processFailedSubprocess(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process("mock_command", std_out.name, std_err.name, {}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process("mock_command", {}) assert result["OK"] assert not result["Value"]["payloadStatus"] @@ -294,7 +293,9 @@ def test_processQuickExecutionNoWatchdog(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process(command=f"echo hello", output=std_out.name, error=std_err.name, env={}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process(command=f"echo hello", env={}) assert result["OK"] assert result["Value"]["payloadStatus"] == 0 @@ -321,7 +322,9 @@ def test_processSubprocessFailureNoPid(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ExecutionThread", return_value=mock_exeThread) with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process(command=f"mock_command", output=std_out.name, error=std_err.name, env={}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process(command=f"mock_command", env={}) assert not result["OK"] assert "Payload process could not start after 140 seconds" in result["Message"] @@ -344,6 +347,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the payload finished successfully jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -365,6 +369,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the payload failed jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -386,6 +391,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the payload failed: should be rescheduled jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -409,6 +415,7 @@ def set_param_side_effect(*args, **kwargs): # Test when there is no output jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -430,6 +437,7 @@ def set_param_side_effect(*args, **kwargs): # Test when there is a watchdog error jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -451,6 +459,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the executor failed: no status defined jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -472,6 +481,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the executor failed: status defined jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -494,6 +504,7 @@ def set_param_side_effect(*args, **kwargs): # Test when the subprocess did not complete jw = JobWrapper() + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -552,9 +563,11 @@ def test_execute(mocker, executable, args, src, expectedResult): jw = JobWrapper() jw.jobArgs = {"Executable": executable} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + if args: jw.jobArgs["Arguments"] = args - res = jw.execute() + jw.execute() assert expectedResult in jw.jobReport.jobStatusInfo[-1] if src: