diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py index ef82bb02b22..9a75c971461 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py @@ -14,7 +14,6 @@ import hashlib import sys import json -import ast import os from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import getJobWrapper @@ -40,14 +39,14 @@ def execute(jobID: str, arguments: dict, jobReport: JobReport): if not payloadParams: return 1 + if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]: + return 1 + job = getJobWrapper(jobID, arguments, jobReport) payloadResult = job.process(**payloadParams) if not payloadResult["OK"]: return 1 - if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]: - return 1 - # Store the payload result with open(arguments["Job"]["PayloadResults"], "w") as f: json.dump(payloadResult, f) @@ -58,7 +57,7 @@ def execute(jobID: str, arguments: dict, jobReport: JobReport): if os.path.isfile(file): hash_md5 = hashlib.md5() with open(file, "rb") as f: - while chunk := f.read(128 * hash.block_size): + while chunk := f.read(128 * hash_md5.block_size): hash_md5.update(chunk) checksums[file] = hash_md5.hexdigest() @@ -75,8 +74,7 @@ def execute(jobID: str, arguments: dict, jobReport: JobReport): try: jsonFileName = os.path.realpath(__file__) + ".json" with open(jsonFileName) as f: - jobArgsFromJSON = json.loads(f.readlines()[0]) - jobArgs = ast.literal_eval(jobArgsFromJSON) + jobArgs = json.load(f) if not isinstance(jobArgs, dict): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 636960a7720..921e5d94fd3 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -13,7 +13,6 @@ """ import sys import json -import ast import os from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( @@ -92,8 +91,7 @@ def execute(jobID: str, arguments: dict, jobReport: JobReport): try: jsonFileName = os.path.realpath(__file__) + ".json" with open(jsonFileName) as f: - jobArgsFromJSON = json.loads(f.readlines()[0]) - jobArgs = ast.literal_eval(jobArgsFromJSON) + jobArgs = json.load(f) if not isinstance(jobArgs, dict): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 9e19e8d0d1d..7df5de80d3e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -80,7 +80,7 @@ def createJobWrapper( jobWrapperJsonFile = jobWrapperFile + ".json" with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile: - json.dump(str(arguments), jsonFile, ensure_ascii=False) + json.dump(arguments, jsonFile, ensure_ascii=False) with open(jobWrapperFile, "w") as wrapper: wrapper.write(wrapperTemplate) @@ -93,7 +93,7 @@ def createJobWrapper( """.format( pythonPath, jobWrapperDirect, - extraOptions, + extraOptions if extraOptions else "", logLevel, ) with open(jobExeFile, "w") as jobFile: @@ -105,5 +105,5 @@ def createJobWrapper( "JobWrapperPath": jobWrapperFile, } if rootLocation != wrapperPath: - generatedFiles["JobExecutableRelocatedPath"] = jobWrapperDirect + generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile)) return S_OK(generatedFiles) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobWrapperTemplate.py new file mode 100644 index 00000000000..7d256d4a984 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobWrapperTemplate.py @@ -0,0 +1,546 @@ +""" JobWrapper test + +Here we test the creation of a job wrapper and make sure it can be executed without crashing. +We don't test the actual execution of the wrapper or its payload. +""" +import json +import os +import shutil +import sys + +from diraccfg import CFG +import pytest + +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper +import subprocess + + +jobParams = { + "JobID": "1", + "JobType": "Merge", + "JobGroup": "Group1", + "JobName": "TestJob", + "CPUTime": "1000000", + "Executable": "dirac-jobexec", + "Arguments": "helloWorld.xml -o LogLevel=DEBUG", + "InputSandbox": ["helloWorld.xml", "exe-script.py"], +} +resourceParams = { + "MaxTotalJobs": 4300, + "MaxWaitingJobs": 200, + "Site": "LCG.CERN.cern", + "CEType": "AREX", + "architecture": "x86_64", + "OS": "linux_CentOS_7.9.2009", + "SI00": 2775, + "MaxRAM": 128534, + "CPUTime": 3836159, + "maxCPUTime": 5760, + "Queue": "nordugrid-SLURM-grid", + "GridCE": "ce1.cern.ch", + "RequiredTag": [], + "DIRACVersion": "9.0.0a20", + "NumberOfProcessors": None, +} +optimizerParams = { + "JobID": 1, + "OptimizerChain": "JobPath,JobSanity,JobScheduling", + "matchTime": 0.15743756294250488, + "CEDict": {}, +} +payloadParams = { + "command": "dirac-jobexec helloworld.xml -o LogLevel=DEBUG", + "error": "std.err", + "output": "std.out", + "env": {}, +} + + +@pytest.fixture +def extraOptions(): + """Create a temporary cfg file""" + extraOptions = "test.cfg" + + localCfg = CFG() + localCfg.createNewSection("DIRAC") + localCfg.setOption("/DIRAC/Setup", "Test") + + localCfg.createNewSection("/DIRAC/Setups") + localCfg.createNewSection("/DIRAC/Setups/Test") + localCfg.setOption("/DIRAC/Setups/Test/WorkloadManagement", "Test") + localCfg.setOption("/DIRAC/Setups/Test/RequestManagement", "Test") + + localCfg.createNewSection("Systems") + localCfg.createNewSection("/Systems/Test") + localCfg.createNewSection("/Systems/Test/WorkloadManagement") + localCfg.createNewSection("/Systems/Test/WorkloadManagement/JobWrapper") + + localCfg.writeToFile(extraOptions) + yield extraOptions + os.remove(extraOptions) + + +def test_createAndExecuteJobWrapperTemplate_success(extraOptions): + """Test the creation of a classical job wrapper and its execution: + There is an extra option cfg file to be passed to the job wrapper. + """ + # Create job wrapper + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + extraOptions=extraOptions + ) + assert res["OK"], res.get("Message") + + # Test job wrapper content + jobWrapperPath = res["Value"].get("JobWrapperPath") + assert jobWrapperPath + assert os.path.exists(jobWrapperPath) + + with open(jobWrapperPath) as f: + jobWrapperContent = f.read() + + assert "@SITEPYTHON@" not in jobWrapperContent + assert f"{os.path.join(os.getcwd(), 'job/Wrapper')}" in jobWrapperContent + + # Test job wrapper configuration path + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + assert jobWrapperConfigPath + assert os.path.exists(jobWrapperConfigPath) + + with open(jobWrapperConfigPath) as f: + jobWrapperConfigContent = json.load(f) + + assert jobWrapperConfigContent["Job"] == jobParams + assert jobWrapperConfigContent["CE"] == resourceParams + assert jobWrapperConfigContent["Optimizer"] == optimizerParams + assert "Payload" not in jobWrapperConfigContent + + # Test job executable path + jobExecutablePath = res["Value"].get("JobExecutablePath") + assert jobExecutablePath + assert os.path.exists(jobExecutablePath) + + with open(jobExecutablePath) as f: + jobExecutableContent = f.read() + + assert os.path.realpath(sys.executable) in jobExecutableContent + assert jobWrapperPath in jobExecutableContent + assert extraOptions in jobExecutableContent + assert "-o LogLevel=INFO" in jobExecutableContent + assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent + + # Test job executable relocated path + assert not res["Value"].get("JobExecutableRelocatedPath") + + # Execute wrapperFile in a subprocess + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutablePath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"Downloading InputSandbox for job 1: helloWorld.xml, exe-script.py" in result.stdout, result.stdout + assert b"Job Wrapper is starting the pre processing phase for job" in result.stdout, result.stdout + assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout + assert b"Final job status Failed" in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # This is the default wrapper path + assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper")) + shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper")) + + +def test_createAndExecuteJobWrapperTemplate_missingExtraOptions(): + """Test the creation of a classical job wrapper and its execution: + There is no extra options to be passed to the job wrapper. + + This might happen when the pilot.cfg does not contain any extra options. + """ + # Create job wrapper + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams + ) + assert res["OK"], res.get("Message") + + # Test job wrapper content + jobWrapperPath = res["Value"].get("JobWrapperPath") + assert jobWrapperPath + assert os.path.exists(jobWrapperPath) + + with open(jobWrapperPath) as f: + jobWrapperContent = f.read() + + assert "@SITEPYTHON@" not in jobWrapperContent + assert f"{os.path.join(os.getcwd(), 'job/Wrapper')}" in jobWrapperContent + + # Test job wrapper configuration path + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + assert jobWrapperConfigPath + assert os.path.exists(jobWrapperConfigPath) + + with open(jobWrapperConfigPath) as f: + jobWrapperConfigContent = json.load(f) + + assert jobWrapperConfigContent["Job"] == jobParams + assert jobWrapperConfigContent["CE"] == resourceParams + assert jobWrapperConfigContent["Optimizer"] == optimizerParams + assert "Payload" not in jobWrapperConfigContent + + # Test job executable path + jobExecutablePath = res["Value"].get("JobExecutablePath") + assert jobExecutablePath + assert os.path.exists(jobExecutablePath) + + with open(jobExecutablePath) as f: + jobExecutableContent = f.read() + + assert os.path.realpath(sys.executable) in jobExecutableContent + assert jobWrapperPath in jobExecutableContent + assert "-o LogLevel=INFO" in jobExecutableContent + assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent + + # Test job executable relocated path + assert not res["Value"].get("JobExecutableRelocatedPath") + + # Execute wrapperFile in a subprocess + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutablePath, shell=True, capture_output=True) + assert result.returncode == 1, result.stderr + assert b"Missing mandatory local configuration option /DIRAC/Setup" in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # This is the default wrapper path + assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper")) + shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper")) + + +def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions): + """Test the creation of a relocated job wrapper and its execution: + This is generally used when containers are involved (SingularityCE). + """ + # Create a specific wrapper path + wrapperPath = os.path.join(os.getcwd(), "DIRAC_containers/job") + os.makedirs(wrapperPath, exist_ok=True) + # Working directory within the container + rootLocation = os.path.join(os.getcwd(), "tmp/Wrapper") + os.makedirs(rootLocation, exist_ok=True) + + # Create relocated job wrapper + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + pythonPath="python", + wrapperPath=wrapperPath, + rootLocation=rootLocation, + extraOptions=extraOptions, + ) + assert res["OK"], res.get("Message") + + # Test job wrapper content + jobWrapperPath = res["Value"].get("JobWrapperPath") + assert jobWrapperPath + assert os.path.exists(jobWrapperPath) + assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobWrapperPath))) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) + + with open(jobWrapperPath) as f: + jobWrapperContent = f.read() + + assert "@SITEPYTHON@" not in jobWrapperContent + assert rootLocation in jobWrapperContent + + # Test job wrapper configuration path + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + assert jobWrapperConfigPath + assert os.path.exists(jobWrapperConfigPath) + assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobWrapperConfigPath))) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) + + with open(jobWrapperConfigPath) as f: + jobWrapperConfigContent = json.load(f) + + assert jobWrapperConfigContent["Job"] == jobParams + assert jobWrapperConfigContent["CE"] == resourceParams + assert jobWrapperConfigContent["Optimizer"] == optimizerParams + assert "Payload" not in jobWrapperConfigContent + + # Test job executable path + jobExecutablePath = res["Value"].get("JobExecutablePath") + assert jobExecutablePath + assert os.path.exists(jobExecutablePath) + assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobExecutablePath))) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) + + with open(jobExecutablePath) as f: + jobExecutableContent = f.read() + + assert os.path.realpath(sys.executable) not in jobExecutableContent + assert "python" in jobExecutableContent + + assert jobWrapperPath not in jobExecutableContent + assert os.path.join(rootLocation, os.path.basename(jobWrapperPath)) in jobExecutableContent + assert extraOptions in jobExecutableContent + assert "-o LogLevel=INFO" in jobExecutableContent + assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent + + # Test job executable relocated path + jobExecutableRelocatedPath = res["Value"].get("JobExecutableRelocatedPath") + assert jobExecutableRelocatedPath + assert jobExecutablePath != jobExecutableRelocatedPath + assert os.path.basename(jobExecutablePath) == os.path.basename(jobExecutableRelocatedPath) + assert not os.path.exists(jobExecutableRelocatedPath) + + # 1. Execute the executable file in a subprocess without relocating the files as a container bind mount would do + # We expect it to fail because the job wrapper is not in the expected location + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutablePath, shell=True, capture_output=True) + + assert result.returncode == 2, result.stderr + assert result.stdout == b"", result.stdout + assert f"can't open file '{os.path.join(rootLocation, os.path.basename(jobWrapperPath))}'".encode() in result.stderr, result.stderr + + # 2. Execute the relocated executable file in a subprocess without relocating the files as a container bind mount would do + # We expect it to fail because the relocated executable should not exist + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 127, result.stderr + assert result.stdout == b"", result.stdout + assert f"{jobExecutableRelocatedPath}: not found".encode() in result.stderr, result.stderr + + # 3. Now we relocate the files as a container bind mount would do and execute the relocated executable file in a subprocess + # We expect it to work + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"Downloading InputSandbox for job 1: helloWorld.xml, exe-script.py" in result.stdout, result.stdout + assert b"Job Wrapper is starting the pre processing phase for job" in result.stdout, result.stdout + assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout + assert b"Final job status Failed" in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + shutil.rmtree(rootLocation) + shutil.rmtree(wrapperPath) + + +def test_createAndExecuteJobWrapperLightTemplate_success(extraOptions): + """Test the creation of a light job wrapper and its execution: + This is generally used when pre/post processing operations are executed locally, + while the workflow itself is executed on a remote computing resource (PushJobAgent). + """ + # Working directory on the remote resource + rootLocation = "." + numberOfFiles = len(os.listdir(rootLocation)) + + # Create relocated job wrapper + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + ) + assert res["OK"], res.get("Message") + + # Test job wrapper content + jobWrapperPath = res["Value"].get("JobWrapperPath") + assert jobWrapperPath + assert os.path.exists(jobWrapperPath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) + + with open(jobWrapperPath) as f: + jobWrapperContent = f.read() + + assert "@SITEPYTHON@" not in jobWrapperContent + assert rootLocation in jobWrapperContent + + # Test job wrapper configuration path + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + assert jobWrapperConfigPath + assert os.path.exists(jobWrapperConfigPath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) + + with open(jobWrapperConfigPath) as f: + jobWrapperConfigContent = json.load(f) + + assert jobWrapperConfigContent["Job"] == jobParams + assert jobWrapperConfigContent["CE"] == resourceParams + assert jobWrapperConfigContent["Optimizer"] == optimizerParams + assert "Payload" not in jobWrapperConfigContent + + # Test job executable path + jobExecutablePath = res["Value"].get("JobExecutablePath") + assert jobExecutablePath + assert os.path.exists(jobExecutablePath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) + + with open(jobExecutablePath) as f: + jobExecutableContent = f.read() + + assert os.path.realpath(sys.executable) not in jobExecutableContent + assert "python" in jobExecutableContent + + assert jobWrapperPath not in jobExecutableContent + assert os.path.join(rootLocation, os.path.basename(jobWrapperPath)) in jobExecutableContent + assert extraOptions in jobExecutableContent + assert "-o LogLevel=INFO" in jobExecutableContent + assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent + + # Test job executable relocated path + jobExecutableRelocatedPath = res["Value"].get("JobExecutableRelocatedPath") + assert jobExecutableRelocatedPath + assert jobExecutablePath != jobExecutableRelocatedPath + assert os.path.basename(jobExecutablePath) == os.path.basename(jobExecutableRelocatedPath) + assert not os.path.exists(jobExecutableRelocatedPath) + + # 1. Execute the executable file in a subprocess without relocating the files as if they were on the remote resource + # We expect it to fail because the job wrapper is not in the expected location + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutablePath, shell=True, capture_output=True) + + assert result.returncode == 2, result.stderr + assert result.stdout == b"", result.stdout + assert b"can't open file" in result.stderr, result.stderr + + # 2. Execute the relocated executable file in a subprocess without relocating the files as they would be on the remote resource + # We expect it to fail because the relocated executable should not exist + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 127, result.stderr + assert result.stdout == b"", result.stdout + assert f"{jobExecutableRelocatedPath}: not found".encode() in result.stderr, result.stderr + + # 3. Now we relocate the files as if they were on a remote resource and execute the relocated executable file in a subprocess + # We expect it to fail because the payload parameters are not available + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # 4. We recreate the job wrapper light template with the payload params now + # We did not specify where the results and checksum should be stored, so we expect it to fail + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + payloadParams=payloadParams, + ) + assert res["OK"], res.get("Message") + jobWrapperPath = res["Value"].get("JobWrapperPath") + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + jobExecutablePath = res["Value"].get("JobExecutablePath") + + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # The root location should contain: + # - the job wrapper + # - the job wrapper configuration + # - the job executable + # - the job/Wrapper directory + assert len(os.listdir(rootLocation)) == numberOfFiles + 4 + + # 5. We recreate the job wrapper light template with the payload params and the additional job params + # It should work fine now + jobParams["PayloadResults"] = "payloadResults.json" + jobParams["Checksum"] = "checksum.json" + + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLightTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + payloadParams=payloadParams, + ) + assert res["OK"], res.get("Message") + jobWrapperPath = res["Value"].get("JobWrapperPath") + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + jobExecutablePath = res["Value"].get("JobExecutablePath") + + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 0, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # The root location should contain: + # - the job wrapper + # - the job wrapper configuration + # - the job executable + # - the job/Wrapper directory + # - the directory + print(result.stdout.decode()) + assert len(os.listdir(rootLocation)) == numberOfFiles + 5 + assert os.path.exists(os.path.join(rootLocation, "1")) + assert os.path.exists(os.path.join(rootLocation, "1", "payloadResults.json")) + assert os.path.exists(os.path.join(rootLocation, "1", "checksum.json")) + + with open(os.path.join(rootLocation, "1", "payloadResults.json")) as f: + payloadResults = json.load(f) + + assert payloadResults["OK"] + assert "cpuTimeConsumed" in payloadResults["Value"] + assert "payloadExecutorError" in payloadResults["Value"] + assert "payloadOutput" in payloadResults["Value"] + assert "payloadStatus" in payloadResults["Value"] + + with open(os.path.join(rootLocation, "1", "checksum.json")) as f: + checksums = json.load(f) + + assert jobParams["PayloadResults"] in checksums + + os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) + os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) + os.unlink(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) + shutil.rmtree(os.path.join(rootLocation, "1")) + shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper")) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobWrapper.py b/tests/Integration/WorkloadManagementSystem/Test_JobWrapper.py deleted file mode 100644 index ab75b736618..00000000000 --- a/tests/Integration/WorkloadManagementSystem/Test_JobWrapper.py +++ /dev/null @@ -1,55 +0,0 @@ -""" JobWrapper test -""" -import os -import pytest - -from DIRAC import gLogger - -from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper -from DIRAC.Core.Security.ProxyInfo import getProxyInfo - - -@pytest.fixture -def setup(): - gLogger.setLevel("DEBUG") - - # get proxy - proxyInfo = getProxyInfo(disableVOMS=True) - proxyChain = proxyInfo["Value"]["chain"] - proxyDumped = proxyChain.dumpAllToString() - payloadProxy = proxyDumped["Value"] - - yield payloadProxy - - -def test_CreateAndSubmit(setup): - jobParams = { - "JobID": "1", - "JobType": "Merge", - "CPUTime": "1000000", - "Executable": "dirac-jobexec", - "Arguments": "helloWorld.xml -o LogLevel=DEBUG --cfg pilot.cfg", - "InputSandbox": ["helloWorld.xml", "exe-script.py"], - } - resourceParams = {} - optimizerParams = {} - - ceFactory = ComputingElementFactory() - ceInstance = ceFactory.getCE("InProcess") - assert ceInstance["OK"] - computingElement = ceInstance["Value"] - - if "pilot.cfg" in os.listdir("."): - jobParams.setdefault("ExtraOptions", "pilot.cfg") - res = createJobWrapper( - 2, jobParams, resourceParams, optimizerParams, extraOptions="pilot.cfg", logLevel="DEBUG" - ) - else: - res = createJobWrapper(2, jobParams, resourceParams, optimizerParams, logLevel="DEBUG") - assert res["OK"], res.get("Message") - wrapperFile = res["Value"]["JobExecutablePath"] - - res = computingElement.submitJob(wrapperFile, setup) - assert res["OK"], res.get("Message") - assert res["Value"] == 0, res.get("Value") diff --git a/tests/Integration/all_integration_client_tests.sh b/tests/Integration/all_integration_client_tests.sh index d79aec5b13a..2c08595b88c 100644 --- a/tests/Integration/all_integration_client_tests.sh +++ b/tests/Integration/all_integration_client_tests.sh @@ -53,7 +53,6 @@ pytest --no-check-dirac-environment "${THIS_DIR}/ResourceStatusSystem/Test_Email echo -e "*** $(date -u) **** WMS TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxStoreClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobWrapper.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_WMSAdministratorClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_Client_WMS.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" ))