Skip to content

Commit

Permalink
feat: PushJobAgent can handle special JobWrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Dec 21, 2023
1 parent 4750396 commit d6e13b9
Show file tree
Hide file tree
Showing 6 changed files with 539 additions and 243 deletions.
192 changes: 172 additions & 20 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,34 @@
"""

import os
import random
import sys
from collections import defaultdict
import time
from typing import Dict

from DIRAC import S_OK, gConfig
from DIRAC.Core.Security import X509Chain
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities import DErrno
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Utilities.ProcessPool import S_ERROR
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.FrameworkSystem.private.standardLogging.LogLevels import LogLevel
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.Resources.Computing import ComputingElement
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import (
getJobWrapper,
resolveInputData,
transferInputSandbox,
)
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials

MAX_JOBS_MANAGED = 100
Expand All @@ -50,10 +63,21 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
self.failedQueueCycleFactor = 10
self.failedQueues = defaultdict(int)

# Choose the submission policy
# - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site
# - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution
self.submissionPolicy = "Workflow"

def initialize(self):
"""Sets default parameters and creates CE instance"""
super().initialize()

# Get the submission policy
# Initialized here because it cannot be dynamically modified during the execution
self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy)
if self.submissionPolicy not in ["Workflow", "JobWrapper"]:
return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper")

result = self._initializeComputingElement("Pool")
if not result["OK"]:
return result
Expand Down Expand Up @@ -137,11 +161,12 @@ def execute(self):
if not result["OK"] or result["Value"]:
return result

# Check errors that could have occurred during job submission and/or execution
# Status are handled internally, and therefore, not checked outside of the method
result = self._checkSubmittedJobs()
if not result["OK"]:
return result
if self.submissionPolicy == "Workflow":
# Check errors that could have occurred during job submission and/or execution
# Status are handled internally, and therefore, not checked outside of the method
result = self._checkSubmittedJobs()
if not result["OK"]:
return result

for queueName, queueDictionary in queueDictItems:
# Make sure there is no problem with the queue before trying to submit
Expand Down Expand Up @@ -251,21 +276,41 @@ def execute(self):

# Submit the job to the CE
self.log.debug(f"Before self._submitJob() ({self.ceName}CE)")
resultSubmission = self._submitJob(
jobID=jobID,
jobParams=params,
resourceParams=ceDict,
optimizerParams=optimizerParams,
proxyChain=proxyChain,
processors=submissionParams["processors"],
wholeNode=submissionParams["wholeNode"],
maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"],
mpTag=submissionParams["mpTag"],
)
if not result["OK"]:
result = self._rescheduleFailedJob(jobID, resultSubmission["Message"])
self.failedQueues[queueName] += 1
break
if self.submissionPolicy == "Workflow":
resultSubmission = self._submitJob(
jobID=jobID,
jobParams=params,
resourceParams=ceDict,
optimizerParams=optimizerParams,
proxyChain=proxyChain,
processors=submissionParams["processors"],
wholeNode=submissionParams["wholeNode"],
maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"],
mpTag=submissionParams["mpTag"],
)
if not result["OK"]:
result = self._rescheduleFailedJob(jobID, resultSubmission["Message"])
self.failedQueues[queueName] += 1
break
else:
resultSubmission = self._submitJobWrapper(
jobID=jobID,
ce=ce,
jobParams=params,
resourceParams=ceDict,
optimizerParams=optimizerParams,
proxyChain=proxyChain,
processors=submissionParams["processors"],
)
if not result["OK"]:
self.failedQueues[queueName] += 1
break

# Check status of the submitted jobs
result = self._checkSubmittedJobWrapper(ce)
if not result["OK"]:
self.failedQueues[queueName] += 1
break
self.log.debug(f"After {self.ceName}CE submitJob()")

# Committing the JobReport before evaluating the result of job submission
Expand Down Expand Up @@ -384,3 +429,110 @@ def _checkMatchingIssues(self, jobRequest):
self.log.notice("Failed to get jobs", jobRequest["Message"])

return S_OK()

def _submitJobWrapper(
self,
jobID: str,
ce: ComputingElement,
jobParams: dict,
resourceParams: dict,
optimizerParams: dict,
proxyChain: X509Chain,
processors: int,
):
"""Submit a JobWrapper to the remote site
:param jobID: job ID
:param ce: ComputingElement instance
:param jobParams: job parameters
:param resourceParams: resource parameters
:param optimizerParams: optimizer parameters
:param proxyChain: proxy chain
:param processors: number of processors
:return: S_OK
"""
# Add the number of requested processors to the job environment
if "ExecutionEnvironment" in jobParams:
if isinstance(jobParams["ExecutionEnvironment"], str):
jobParams["ExecutionEnvironment"] = jobParams["ExecutionEnvironment"].split(";")
jobParams.setdefault("ExecutionEnvironment", []).append("DIRAC_JOB_PROCESSORS=%d" % processors)

# Prepare the job for submission
self.verbose("Getting a JobWrapper")
arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams}
job = getJobWrapper(jobID, arguments, self.jobReport)
if not job:
return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}")

if "InputSandbox" in jobParams:
self.log.verbose("Getting the inputSandbox of the job")
if not transferInputSandbox(job, jobParams["InputSandbox"], self.jobReport):
return S_ERROR(f"Cannot get input sandbox of job {jobID}")
self.jobReport.commit()

if "InputData" in jobParams and jobParams["InputData"]:
self.log.verbose("Getting the inputData of the job")
if not resolveInputData(job, self.jobReport):
return S_ERROR(f"Cannot get input data of job {jobID}")
self.jobReport.commit()

# Preprocess the payload
payloadParams = job.preProcess()
self.jobReport.commit()

# Generate a light JobWrapper executor script
result = createJobWrapper(log=self.log, logLevel=LogLevel.DEBUG, **payloadParams)
if not result["OK"]:
return result

wrapperFile = result["Value"][0]
inputs = os.listdir(".")
inputs.remove(os.path.basename(wrapperFile))
self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs))
# Request the whole directory as output
outputs = ["/"]

self.jobReport.setJobStatus(minorStatus="Submitting To CE")
self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE")

# Pass proxy to the CE
proxy = proxyChain.dumpAllToString()
if not proxy["OK"]:
self.log.error("Invalid proxy", proxy)
return S_ERROR("Payload Proxy Not Found")
ce.setProxy(proxy["Value"])

result = ce.submitJob(
wrapperFile,
None,
inputs,
outputs,
)
if not result["OK"]:
self._rescheduleFailedJob(jobID, result["Message"])
return result

taskID = result.get("Value")
self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})")

self.submissionDict[jobID] = taskID
time.sleep(self.jobSubmissionDelay)
return S_OK()

def _checkSubmittedJobWrapper(self, ce: ComputingElement):
"""Check the status of the submitted jobs
:return: S_OK
"""
result = ce.getJobStatus(self.submissionDict.values())
if not result["OK"]:
self.log.error("Failed to get job status", result["Message"])
return result

for jobID, status in result["Value"]:
if status in ["Done", "Failed"]:
self.log.info("Job execution finished", f"(DIRAC JobID: {jobID}; Status: {status})")
# jobWrapper.postProcess()
del self.submissionDict[jobID]
return S_OK()
49 changes: 10 additions & 39 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tarfile
import threading
import time
from typing import Dict
from urllib.parse import unquote

import DIRAC
Expand Down Expand Up @@ -322,6 +323,7 @@ def __prepareCommand(self):

def __prepareEnvironment(self):
"""Prepare the environment to be used by the payload."""
os.environ["JOBID"] = str(self.jobID)
os.environ["DIRACJOBID"] = str(self.jobID)

diracSite = DIRAC.siteName()
Expand Down Expand Up @@ -379,7 +381,7 @@ def preProcess(self):
)

#############################################################################
def process(self, command, output, error, env):
def process(self, command: str, output: str, error: str, env: dict[str, str]):
"""This method calls the payload."""
self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}")

Expand Down Expand Up @@ -496,7 +498,13 @@ def process(self, command, output, error, env):

#############################################################################
def postProcess(
self, payloadStatus, payloadOutput, payloadExecutorError, cpuTimeConsumed, watchdogError, watchdogStats
self,
payloadStatus: int | None,
payloadOutput: str,
payloadExecutorError: str,
cpuTimeConsumed: List[int],
watchdogError: str | None,
watchdogStats: dict,
):
"""This method is called after the payload has finished running."""
self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}")
Expand Down Expand Up @@ -1596,40 +1604,3 @@ def getOutput(self, lines=0):
self.outputLines = self.outputLines[cut:]
return S_OK(self.outputLines)
return S_ERROR("No Job output found")


def rescheduleFailedJob(jobID, minorStatus, jobReport=None):
"""Function for rescheduling a jobID, setting a minorStatus"""

rescheduleResult = JobStatus.RESCHEDULED

try:
gLogger.warn("Failure during", minorStatus)

# Setting a job parameter does not help since the job will be rescheduled,
# instead set the status with the cause and then another status showing the
# reschedule operation.

if not jobReport:
gLogger.info("Creating a new JobReport Object")
jobReport = JobReport(int(jobID), "JobWrapper")

jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False)
jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False)

# We must send Job States and Parameters before it gets reschedule
jobReport.sendStoredStatusInfo()
jobReport.sendStoredJobParameters()

gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper")

result = JobManagerClient().rescheduleJob(int(jobID))
if not result["OK"]:
gLogger.warn(result["Message"])
if "Maximum number of reschedulings is reached" in result["Message"]:
rescheduleResult = JobStatus.FAILED

return rescheduleResult
except Exception:
gLogger.exception("JobWrapperTemplate failed to reschedule Job")
return JobStatus.FAILED
Loading

0 comments on commit d6e13b9

Please sign in to comment.