From d6e13b98fe5b45d00cd43a490a77b3ae76fe228a Mon Sep 17 00:00:00 2001 From: aldbr Date: Thu, 21 Dec 2023 17:31:21 +0100 Subject: [PATCH] feat: PushJobAgent can handle special JobWrappers --- .../Agent/PushJobAgent.py | 192 ++++++++++++-- .../JobWrapper/JobWrapper.py | 49 +--- .../JobWrapper/JobWrapperLighTemplate.py | 87 +++++++ .../JobWrapper/JobWrapperTemplate.py | 213 +++------------- .../JobWrapper/JobWrapperUtilities.py | 237 ++++++++++++++++++ .../Utilities/Utils.py | 4 +- 6 files changed, 539 insertions(+), 243 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py create mode 100644 src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 89ce0a1dc9d..8502391fa8d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -9,21 +9,34 @@ """ +import os import random import sys from collections import defaultdict +import time +from typing import Dict from DIRAC import S_OK, gConfig +from DIRAC.Core.Security import X509Chain from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities import DErrno from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername +from DIRAC.Core.Utilities.ProcessPool import S_ERROR from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.private.standardLogging.LogLevels import LogLevel from DIRAC.RequestManagementSystem.Client.Request import Request +from DIRAC.Resources.Computing import ComputingElement from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + getJobWrapper, + resolveInputData, + transferInputSandbox, +) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials MAX_JOBS_MANAGED = 100 @@ -50,10 +63,21 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.failedQueueCycleFactor = 10 self.failedQueues = defaultdict(int) + # Choose the submission policy + # - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site + # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution + self.submissionPolicy = "Workflow" + def initialize(self): """Sets default parameters and creates CE instance""" super().initialize() + # Get the submission policy + # Initialized here because it cannot be dynamically modified during the execution + self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) + if self.submissionPolicy not in ["Workflow", "JobWrapper"]: + return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") + result = self._initializeComputingElement("Pool") if not result["OK"]: return result @@ -137,11 +161,12 @@ def execute(self): if not result["OK"] or result["Value"]: return result - # Check errors that could have occurred during job submission and/or execution - # Status are handled internally, and therefore, not checked outside of the method - result = self._checkSubmittedJobs() - if not result["OK"]: - return result + if self.submissionPolicy == "Workflow": + # Check errors that could have occurred during job submission and/or execution + # Status are handled internally, and therefore, not checked outside of the method + result = self._checkSubmittedJobs() + if not result["OK"]: + return result for queueName, queueDictionary in queueDictItems: # Make sure there is no problem with the queue before trying to submit @@ -251,21 +276,41 @@ def execute(self): # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - resultSubmission = self._submitJob( - jobID=jobID, - jobParams=params, - resourceParams=ceDict, - optimizerParams=optimizerParams, - proxyChain=proxyChain, - processors=submissionParams["processors"], - wholeNode=submissionParams["wholeNode"], - maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], - mpTag=submissionParams["mpTag"], - ) - if not result["OK"]: - result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) - self.failedQueues[queueName] += 1 - break + if self.submissionPolicy == "Workflow": + resultSubmission = self._submitJob( + jobID=jobID, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + wholeNode=submissionParams["wholeNode"], + maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], + mpTag=submissionParams["mpTag"], + ) + if not result["OK"]: + result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) + self.failedQueues[queueName] += 1 + break + else: + resultSubmission = self._submitJobWrapper( + jobID=jobID, + ce=ce, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + ) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break + + # Check status of the submitted jobs + result = self._checkSubmittedJobWrapper(ce) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break self.log.debug(f"After {self.ceName}CE submitJob()") # Committing the JobReport before evaluating the result of job submission @@ -384,3 +429,110 @@ def _checkMatchingIssues(self, jobRequest): self.log.notice("Failed to get jobs", jobRequest["Message"]) return S_OK() + + def _submitJobWrapper( + self, + jobID: str, + ce: ComputingElement, + jobParams: dict, + resourceParams: dict, + optimizerParams: dict, + proxyChain: X509Chain, + processors: int, + ): + """Submit a JobWrapper to the remote site + + :param jobID: job ID + :param ce: ComputingElement instance + :param jobParams: job parameters + :param resourceParams: resource parameters + :param optimizerParams: optimizer parameters + :param proxyChain: proxy chain + :param processors: number of processors + + :return: S_OK + """ + # Add the number of requested processors to the job environment + if "ExecutionEnvironment" in jobParams: + if isinstance(jobParams["ExecutionEnvironment"], str): + jobParams["ExecutionEnvironment"] = jobParams["ExecutionEnvironment"].split(";") + jobParams.setdefault("ExecutionEnvironment", []).append("DIRAC_JOB_PROCESSORS=%d" % processors) + + # Prepare the job for submission + self.verbose("Getting a JobWrapper") + arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams} + job = getJobWrapper(jobID, arguments, self.jobReport) + if not job: + return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}") + + if "InputSandbox" in jobParams: + self.log.verbose("Getting the inputSandbox of the job") + if not transferInputSandbox(job, jobParams["InputSandbox"], self.jobReport): + return S_ERROR(f"Cannot get input sandbox of job {jobID}") + self.jobReport.commit() + + if "InputData" in jobParams and jobParams["InputData"]: + self.log.verbose("Getting the inputData of the job") + if not resolveInputData(job, self.jobReport): + return S_ERROR(f"Cannot get input data of job {jobID}") + self.jobReport.commit() + + # Preprocess the payload + payloadParams = job.preProcess() + self.jobReport.commit() + + # Generate a light JobWrapper executor script + result = createJobWrapper(log=self.log, logLevel=LogLevel.DEBUG, **payloadParams) + if not result["OK"]: + return result + + wrapperFile = result["Value"][0] + inputs = os.listdir(".") + inputs.remove(os.path.basename(wrapperFile)) + self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs)) + # Request the whole directory as output + outputs = ["/"] + + self.jobReport.setJobStatus(minorStatus="Submitting To CE") + self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE") + + # Pass proxy to the CE + proxy = proxyChain.dumpAllToString() + if not proxy["OK"]: + self.log.error("Invalid proxy", proxy) + return S_ERROR("Payload Proxy Not Found") + ce.setProxy(proxy["Value"]) + + result = ce.submitJob( + wrapperFile, + None, + inputs, + outputs, + ) + if not result["OK"]: + self._rescheduleFailedJob(jobID, result["Message"]) + return result + + taskID = result.get("Value") + self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})") + + self.submissionDict[jobID] = taskID + time.sleep(self.jobSubmissionDelay) + return S_OK() + + def _checkSubmittedJobWrapper(self, ce: ComputingElement): + """Check the status of the submitted jobs + + :return: S_OK + """ + result = ce.getJobStatus(self.submissionDict.values()) + if not result["OK"]: + self.log.error("Failed to get job status", result["Message"]) + return result + + for jobID, status in result["Value"]: + if status in ["Done", "Failed"]: + self.log.info("Job execution finished", f"(DIRAC JobID: {jobID}; Status: {status})") + # jobWrapper.postProcess() + del self.submissionDict[jobID] + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index c554778a8fd..e4da7c55a20 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -21,6 +21,7 @@ import tarfile import threading import time +from typing import Dict from urllib.parse import unquote import DIRAC @@ -322,6 +323,7 @@ def __prepareCommand(self): def __prepareEnvironment(self): """Prepare the environment to be used by the payload.""" + os.environ["JOBID"] = str(self.jobID) os.environ["DIRACJOBID"] = str(self.jobID) diracSite = DIRAC.siteName() @@ -379,7 +381,7 @@ def preProcess(self): ) ############################################################################# - def process(self, command, output, error, env): + def process(self, command: str, output: str, error: str, env: dict[str, str]): """This method calls the payload.""" self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}") @@ -496,7 +498,13 @@ def process(self, command, output, error, env): ############################################################################# def postProcess( - self, payloadStatus, payloadOutput, payloadExecutorError, cpuTimeConsumed, watchdogError, watchdogStats + self, + payloadStatus: int | None, + payloadOutput: str, + payloadExecutorError: str, + cpuTimeConsumed: List[int], + watchdogError: str | None, + watchdogStats: dict, ): """This method is called after the payload has finished running.""" self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}") @@ -1596,40 +1604,3 @@ def getOutput(self, lines=0): self.outputLines = self.outputLines[cut:] return S_OK(self.outputLines) return S_ERROR("No Job output found") - - -def rescheduleFailedJob(jobID, minorStatus, jobReport=None): - """Function for rescheduling a jobID, setting a minorStatus""" - - rescheduleResult = JobStatus.RESCHEDULED - - try: - gLogger.warn("Failure during", minorStatus) - - # Setting a job parameter does not help since the job will be rescheduled, - # instead set the status with the cause and then another status showing the - # reschedule operation. - - if not jobReport: - gLogger.info("Creating a new JobReport Object") - jobReport = JobReport(int(jobID), "JobWrapper") - - jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) - jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) - - # We must send Job States and Parameters before it gets reschedule - jobReport.sendStoredStatusInfo() - jobReport.sendStoredJobParameters() - - gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") - - result = JobManagerClient().rescheduleJob(int(jobID)) - if not result["OK"]: - gLogger.warn(result["Message"]) - if "Maximum number of reschedulings is reached" in result["Message"]: - rescheduleResult = JobStatus.FAILED - - return rescheduleResult - except Exception: - gLogger.exception("JobWrapperTemplate failed to reschedule Job") - return JobStatus.FAILED diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py new file mode 100644 index 00000000000..1f86eef6623 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperLighTemplate.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +""" This template will become the job wrapper that's actually executed. + + The JobWrapperLighTemplate is completed and invoked by the PushJobAgent and uses functionalities from JobWrapper module. + It is executed in environment where external connections are not allowed. + + The JobWrapperTemplate will reschedule the job according to certain criteria: + - the working directory could not be created + - the jobWrapper initialization phase failed + - the inputSandbox download failed + - the resolution of the inpt data failed + - the JobWrapper ended with the status DErrno.EWMSRESC +""" +import sys +import json +import ast +import os +from typing import Dict + +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + createAndEnterWorkingDirectory, + executePayload, + finalize, + getJobWrapper, + processJobOutputs, + resolveInputData, + transferInputSandbox, +) + +sitePython = "@SITEPYTHON@" +if sitePython: + sys.path.insert(0, "@SITEPYTHON@") + +from DIRAC.Core.Base.Script import Script + +Script.parseCommandLine() + +from DIRAC import gLogger +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport + + +os.umask(0o22) + + +def execute(jobID: str, jobWrapperArguments: dict, payloadParams: dict, jobReport: JobReport): + """The only real function executed here""" + job = getJobWrapper(jobID, jobWrapperArguments, jobReport) + payloadResult = job.process(**payloadParams) + if not payloadResult["OK"]: + return 1 + + with open("payloadResult.json", "w") as f: + json.dump(payloadResult, f) + + return 0 + + +########################################################## + + +ret = -3 +try: + jsonFileName = os.path.realpath(__file__) + ".json" + with open(jsonFileName) as f: + jobArgsFromJSON = json.loads(f.readlines()[0]) + jobArgs = ast.literal_eval(jobArgsFromJSON) + if not isinstance(jobArgs, dict): + raise TypeError(f"jobArgs is of type {type(jobArgs)}") + if "Job" not in jobArgs: + raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() +except Exception as exc: # pylint: disable=broad-except + gLogger.exception("JobWrapperTemplate exception") + try: + jobReport.commit() + ret = -1 + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Could not commit the job report") + ret = -2 + +sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 5e893ac77e5..18e141352ab 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -15,9 +15,17 @@ import json import ast import os -import errno -import time -import signal +from typing import Dict + +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + createAndEnterWorkingDirectory, + executePayload, + finalize, + getJobWrapper, + processJobOutputs, + resolveInputData, + transferInputSandbox, +) sitePython = "@SITEPYTHON@" if sitePython: @@ -28,140 +36,35 @@ Script.parseCommandLine() from DIRAC import gLogger -from DIRAC.Core.Utilities import DErrno - -from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus - - -gJobReport = None os.umask(0o22) -class JobWrapperError(Exception): - """Custom exception for handling JobWrapper "genuine" errors""" - - def __init__(self, value): - self.value = value - super().__init__() - - def __str__(self): - return str(self.value) - - -def killJobWrapper(job): - """Function that stops and ultimately kills the JobWrapper""" - # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process - time.sleep(60) - os.kill(job.currentPID, signal.SIGTERM) - # wait for half a minute and if worker is still alive use REAL silencer - time.sleep(30) - # now you're dead - os.kill(job.currentPID, signal.SIGKILL) - return 1 - - -def sendJobAccounting(job, status, minorStatus): - """safe sending job accounting (always catching exceptions)""" - try: - job.sendJobAccounting(status, minorStatus) - except Exception as exc: # pylint: disable=broad-except - gLogger.exception( - f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", - lException=exc, - ) - - -def execute(arguments): +def execute(jobID: str, arguments: dict, jobReport: JobReport): """The only real function executed here""" - global gJobReport - - jobID = arguments["Job"].get("JobID", 0) - os.environ["JOBID"] = str(jobID) - jobID = int(jobID) - if "WorkingDirectory" in arguments: - wdir = os.path.expandvars(arguments["WorkingDirectory"]) - if os.path.isdir(wdir): - os.chdir(wdir) - else: - try: - os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) - if os.path.isdir(wdir): - os.chdir(wdir) - except OSError as osError: - if osError.errno == errno.EEXIST and os.path.isdir(wdir): - gLogger.exception("JobWrapperTemplate found that the working directory already exists") - rescheduleResult = rescheduleFailedJob(jobID, "Working Directory already exists") - else: - gLogger.exception("JobWrapperTemplate could not create working directory") - rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory") - return 1 - - gJobReport = JobReport(jobID, "JobWrapper") + if not createAndEnterWorkingDirectory(jobID, arguments["WorkingDirectory"], jobReport): + return 1 - try: - job = JobWrapper(jobID, gJobReport) - job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper failed the initialization phase", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + job = getJobWrapper(jobID, arguments, jobReport) + if not job: return 1 if "InputSandbox" in arguments["Job"]: - gJobReport.commit() - try: - result = job.transferInputSandbox(arguments["Job"]["InputSandbox"]) - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to download input sandbox") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + jobReport.commit() + if not transferInputSandbox(job, arguments["Job"]["InputSandbox"], jobReport): return 1 else: gLogger.verbose("Job has no InputSandbox requirement") - gJobReport.commit() + jobReport.commit() if "InputData" in arguments["Job"]: if arguments["Job"]["InputData"]: - try: - result = job.resolveInputData() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to resolve input data") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + if not resolveInputData(job, jobReport): return 1 else: gLogger.verbose("Job has a null InputData requirement:") @@ -169,75 +72,18 @@ def execute(arguments): else: gLogger.verbose("Job has no InputData requirement") - gJobReport.commit() + jobReport.commit() - try: - result = job.execute() - if not result["OK"]: - gLogger.error("Failed to execute job", result["Message"]) - raise JobWrapperError((result["Message"], result["Errno"])) - except JobWrapperError as exc: - if exc.value[1] == 0 or str(exc.value[0]) == "0": - gLogger.verbose("JobWrapper exited with status=0 after execution") - if exc.value[1] == DErrno.EWMSRESC: - gLogger.warn("Asked to reschedule job") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return 1 - gLogger.exception("Job failed in execution phase") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("Job raised exception during execution phase", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + if not executePayload(job, jobReport): return 1 if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: - try: - result = job.processJobOutputs() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError as exc: - gLogger.exception("JobWrapper failed to process output files") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - - return 2 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while processing output files", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + if not processJobOutputs(job, jobReport): return 2 else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") - try: - # Failed jobs will return !=0 / successful jobs will return 0 - return job.finalize() - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception during the finalization phase", lException=exc) - return 2 + return finalize(job) ########################################################## @@ -253,12 +99,17 @@ def execute(arguments): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - ret = execute(jobArgs) - gJobReport.commit() + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") try: - gJobReport.commit() + jobReport.commit() ret = -1 except Exception as exc: # pylint: disable=broad-except gLogger.exception("Could not commit the job report") diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py new file mode 100644 index 00000000000..35abf986302 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -0,0 +1,237 @@ +"""JobWrapperUtilities + +This module contains the functions that are used by the JobWrapperTemplate to execute the JobWrapper. +""" +import errno +import os +import time +import signal +from typing import Dict +from DIRAC import gLogger +from DIRAC.Core.Utilities import DErrno +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob + + +class JobWrapperError(Exception): + """Custom exception for handling JobWrapper "genuine" errors""" + + def __init__(self, value): + self.value = value + super().__init__() + + def __str__(self): + return str(self.value) + + +def killJobWrapper(job: JobWrapper) -> int: + """Function that stops and ultimately kills the JobWrapper""" + # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process + time.sleep(60) + os.kill(job.currentPID, signal.SIGTERM) + # wait for half a minute and if worker is still alive use REAL silencer + time.sleep(30) + # now you're dead + os.kill(job.currentPID, signal.SIGKILL) + return 1 + + +def rescheduleFailedJob(jobID, minorStatus, jobReport: JobReport): + """Function for rescheduling a jobID, setting a minorStatus""" + + rescheduleResult = JobStatus.RESCHEDULED + + try: + gLogger.warn("Failure during", minorStatus) + + # Setting a job parameter does not help since the job will be rescheduled, + # instead set the status with the cause and then another status showing the + # reschedule operation. + + jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) + jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) + + # We must send Job States and Parameters before it gets reschedule + jobReport.sendStoredStatusInfo() + jobReport.sendStoredJobParameters() + + gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") + + result = JobManagerClient().rescheduleJob(int(jobID)) + if not result["OK"]: + gLogger.warn(result["Message"]) + if "Maximum number of reschedulings is reached" in result["Message"]: + rescheduleResult = JobStatus.FAILED + + return rescheduleResult + except Exception: + gLogger.exception("JobWrapperTemplate failed to reschedule Job") + return JobStatus.FAILED + + +def sendJobAccounting(job: JobWrapper, status: str, minorStatus: str): + """safe sending job accounting (always catching exceptions)""" + try: + job.sendJobAccounting(status, minorStatus) + except Exception: # pylint: disable=broad-except + gLogger.exception( + f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", + ) + + +def createAndEnterWorkingDirectory(jobID: str, workingDirectory: str, jobReport: JobReport) -> bool: + """Create the working directory and change to it""" + wdir = os.path.expandvars(workingDirectory) + if os.path.isdir(wdir): + os.chdir(wdir) + return True + + try: + os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) + if os.path.isdir(wdir): + os.chdir(wdir) + except OSError as osError: + if osError.errno == errno.EEXIST and os.path.isdir(wdir): + gLogger.exception("JobWrapperTemplate found that the working directory already exists") + rescheduleFailedJob(jobID, "Working Directory already exists", jobReport) + else: + gLogger.exception("JobWrapperTemplate could not create working directory") + rescheduleFailedJob(jobID, "Could Not Create Working Directory", jobReport) + return False + return True + + +def getJobWrapper(jobID: str, arguments: dict, jobReport: JobReport) -> JobWrapper: + """Create a JobWrapper instance""" + try: + job = JobWrapper(jobID, jobReport) + job.initialize(arguments) + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper failed the initialization phase") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + return None + return job + + +def transferInputSandbox(job: JobWrapper, inputSandbox: list, jobReport: JobReport) -> bool: + """Transfer the input sandbox""" + try: + result = job.transferInputSandbox(inputSandbox) + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to download input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while downloading input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + return True + + +def resolveInputData(job: JobWrapper, jobReport: JobReport) -> bool: + """Resolve the input data""" + try: + result = job.resolveInputData() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to resolve input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while resolving input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + return True + + +def processJobOutputs(job: JobWrapper, jobReport: JobReport) -> bool: + """Process the job outputs""" + try: + result = job.processJobOutputs() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to process output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while processing output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + return True + + +def finalize(job: JobWrapper) -> int: + """Finalize the job""" + try: + # Failed jobs will return !=0 / successful jobs will return 0 + return job.finalize() + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception during the finalization phase") + return 2 + + +def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: + """Execute the payload""" + try: + result = job.execute() + if not result["OK"]: + gLogger.error("Failed to execute job", result["Message"]) + raise JobWrapperError((result["Message"], result["Errno"])) + except JobWrapperError as exc: + if exc.value[1] == 0 or str(exc.value[0]) == "0": + gLogger.verbose("JobWrapper exited with status=0 after execution") + if exc.value[1] == DErrno.EWMSRESC: + gLogger.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return False + gLogger.exception("Job failed in execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Job raised exception during execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + return True diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 21cfd0a8cd9..3069b859864 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -4,10 +4,8 @@ import sys import json -from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC import gLogger, S_OK from DIRAC.Core.Utilities.File import mkDir -from DIRAC.ConfigurationSystem.Client.Helpers import Registry -from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager def createJobWrapper(