Skip to content

Commit

Permalink
feat: add JobWrapper tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Aug 19, 2024
1 parent 3f63c7f commit ff08839
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 127 deletions.
4 changes: 2 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _submitJobWrapper(
self.log.verbose("Getting a JobWrapper")
arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams}

job = getJobWrapper(jobID, arguments, jobReport)
job = getJobWrapper(int(jobID), arguments, jobReport)
if not job:
return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}")

Expand Down Expand Up @@ -729,7 +729,7 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
# Get the job wrapper
jobReport = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}")
try:
job = JobWrapper(jobID, jobReport)
job = JobWrapper(int(jobID), jobReport)
job.initialize(arguments)
except Exception:
self.log.exception("JobWrapper failed the initialization phase", jobID)
Expand Down
51 changes: 25 additions & 26 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class JobWrapper:
"""The only user of the JobWrapper is the JobWrapperTemplate"""

#############################################################################
def __init__(self, jobID=None, jobReport=None):
def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None):
"""Standard constructor"""
self.initialTiming = os.times()
self.section = "/Systems/WorkloadManagement/JobWrapper"
Expand Down Expand Up @@ -684,11 +684,11 @@ def resolveInputData(self):
msg = "Job Wrapper cannot resolve local replicas of input data with null job input data parameter "
self.log.error(msg)
return S_ERROR(msg)
else:
if isinstance(inputData, str):
inputData = [inputData]
lfns = [fname.replace("LFN:", "") for fname in inputData]
self.log.verbose("Job input data requirement is \n%s" % ",\n".join(lfns))

if isinstance(inputData, str):
inputData = [inputData]
lfns = [fname.replace("LFN:", "") for fname in inputData]
self.log.verbose("Job input data requirement is \n%s" % ",\n".join(lfns))

# Does this site have local SEs? - not failing if it doesn't
if "LocalSE" in self.ceArgs:
Expand Down Expand Up @@ -874,23 +874,23 @@ def processJobOutputs(self):
outputSandbox = [outputSandbox]
if outputSandbox:
self.log.verbose(f"OutputSandbox files are: {', '.join(outputSandbox)}")
outputSandboxFiles = [str(self.jobIDPath / output) for output in outputSandbox]

outputData = self.jobArgs.get("OutputData", [])
if outputData and isinstance(outputData, str):
outputData = outputData.split(";")
if outputData:
self.log.verbose(f"OutputData files are: {', '.join(outputData)}")

# First resolve any wildcards for output files and work out if any files are missing
resolvedSandbox = self.__resolveOutputSandboxFiles(outputSandbox)
if not resolvedSandbox["OK"]:
self.log.warn("Output sandbox file resolution failed:")
self.log.warn(resolvedSandbox["Message"])
self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.RESOLVING_OUTPUT_SANDBOX)

fileList = resolvedSandbox["Value"]["Files"]
missingFiles = resolvedSandbox["Value"]["Missing"]
resolvedSandbox = self.__resolveOutputSandboxFiles(outputSandboxFiles)

fileList = resolvedSandbox["Files"]
missingFiles = resolvedSandbox["Missing"]
if missingFiles:
self.jobReport.setJobParameter("OutputSandboxMissingFiles", ", ".join(missingFiles), sendFlag=False)
self.jobReport.setJobParameter(
"OutputSandboxMissingFiles", ", ".join([Path(output).name for output in missingFiles]), sendFlag=False
)

if "Owner" not in self.jobArgs:
msg = "Job has no owner specified"
Expand All @@ -902,6 +902,7 @@ def processJobOutputs(self):
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.UPLOADING_OUTPUT_SANDBOX)

uploadOutputDataInAnyCase = False
result_sbUpload = None

if fileList and self.jobID:
self.outputSandboxSize = getGlobbedTotalSize(fileList)
Expand Down Expand Up @@ -954,7 +955,7 @@ def processJobOutputs(self):
# now that we (tried to) transfer the output files,
# including possibly oversized Output Sandboxes,
# we delete the local output sandbox tarfile in case it's still there.
if not result_sbUpload["OK"]:
if result_sbUpload and not result_sbUpload["OK"]:
outputSandboxData = result_sbUpload.get("SandboxFileName")
if outputSandboxData:
try:
Expand All @@ -976,7 +977,7 @@ def __resolveOutputSandboxFiles(self, outputSandbox):
okFiles = []
for i in outputSandbox:
self.log.verbose(f"Looking at OutputSandbox file/directory/wildcard: {i}")
globList = glob.glob(self.jobIDPath / i)
globList = glob.glob(i)
for check in globList:
if os.path.isfile(check):
self.log.verbose(f"Found locally existing OutputSandbox file: {check}")
Expand All @@ -1003,8 +1004,7 @@ def __resolveOutputSandboxFiles(self, outputSandbox):
if i not in missing:
missing.append(i)

result = {"Missing": missing, "Files": okFiles}
return S_OK(result)
return {"Missing": missing, "Files": okFiles}

#############################################################################
def __transferOutputDataFiles(self, outputData, outputSE, outputPath):
Expand All @@ -1025,7 +1025,7 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath):
nonlfnList.append(out)

# Check whether list of outputData has a globbable pattern
nonlfnList = [self.jobIDPath / x for x in nonlfnList]
nonlfnList = [str(self.jobIDPath / x) for x in nonlfnList]
globbedOutputList = List.uniqueElements(getGlobbedFiles(nonlfnList))
if globbedOutputList != nonlfnList and globbedOutputList:
self.log.info(
Expand All @@ -1035,7 +1035,7 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath):
outputData = lfnList + nonlfnList

pfnGUID = {}
result = getGUID(outputData)
result = getGUID(outputData, str(self.jobIDPath.absolute()))
if not result["OK"]:
self.log.warn(
"Failed to determine POOL GUID(s) for output file list (OK if not POOL files)", result["Message"]
Expand Down Expand Up @@ -1172,7 +1172,6 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""):
"""Provides a generic convention for VO output data
files if no path is specified.
"""

if not re.search("^LFN:", outputFile):
localfile = outputFile
initial = self.owner[:1]
Expand All @@ -1182,7 +1181,7 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""):

ops = Operations(vo=vo)
user_prefix = ops.getValue("LFNUserPrefix", "user")
basePath = "/" + vo + "/" + user_prefix + "/" + initial + "/" + self.owner
basePath = Path(f"/{vo}") / user_prefix / initial / self.owner
if outputPath:
# If output path is given, append it to the user path and put output files in this directory
if outputPath.startswith("/"):
Expand All @@ -1191,10 +1190,10 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""):
# By default the output path is constructed from the job id
subdir = str(int(self.jobID / 1000))
outputPath = subdir + "/" + str(self.jobID)
lfn = os.path.join(basePath, outputPath, os.path.basename(localfile))
lfn = str(basePath / outputPath / os.path.basename(localfile))
else:
# if LFN is given, take it as it is
localfile = self.jobIDPath / outputFile.replace("LFN:", "")
localfile = str(self.jobIDPath / outputFile.replace("LFN:", ""))
lfn = outputFile.replace("LFN:", "")

return (lfn, localfile)
Expand Down Expand Up @@ -1256,7 +1255,7 @@ def transferInputSandbox(self, inputSandbox):
sandboxFiles.append(os.path.basename(download["Value"]["Successful"][lfn]))

userFiles = [str(self.jobIDPath / file) for file in sandboxFiles] + [
self.jobIDPath / os.path.basename(lfn) for lfn in lfns
str(self.jobIDPath / os.path.basename(lfn)) for lfn in lfns
]
for possibleTarFile in userFiles:
if not os.path.exists(possibleTarFile):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
os.umask(0o22)


def execute(jobID: str, arguments: dict, jobReport: JobReport):
def execute(jobID: int, arguments: dict, jobReport: JobReport):
"""The only real function executed here"""

if "WorkingDirectory" in arguments:
Expand Down Expand Up @@ -103,12 +103,12 @@ def execute(jobID: str, arguments: dict, jobReport: JobReport):

ret = execute(jobID, jobArgs, jobReport)
jobReport.commit()
except Exception as exc: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
gLogger.exception("JobWrapperTemplate exception")
try:
jobReport.commit()
ret = -1
except Exception as exc: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
gLogger.exception("Could not commit the job report")
ret = -2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def createAndEnterWorkingDirectory(jobID: str, workingDirectory: str, jobReport:
return True


def getJobWrapper(jobID: str, arguments: dict, jobReport: JobReport) -> JobWrapper:
def getJobWrapper(jobID: int, arguments: dict, jobReport: JobReport) -> JobWrapper:
"""Create a JobWrapper instance"""
try:
job = JobWrapper(jobID, jobReport)
Expand Down
Loading

0 comments on commit ff08839

Please sign in to comment.