From 98472c357ecc8e1d27bacc2a8bd0e034f730240d Mon Sep 17 00:00:00 2001 From: aldbr Date: Mon, 22 Jan 2024 18:55:30 +0100 Subject: [PATCH] feat: SiteDirector does not submit if there are enough waiting pilots --- .../WorkloadManagement/Pilots/index.rst | 3 - .../Agent/SiteDirector.py | 95 ++++++++++--------- .../Agent/test/Test_Agent_SiteDirector.py | 48 +++++++--- .../ConfigTemplate.cfg | 6 +- .../Utilities/SubmissionPolicy.py | 64 ------------- .../Utilities/test/Test_SubmissionPolicy.py | 91 ------------------ 6 files changed, 90 insertions(+), 217 deletions(-) delete mode 100644 src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py delete mode 100644 src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py diff --git a/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst b/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst index 1502ef5ec19..7d1e80c3c09 100644 --- a/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst +++ b/docs/source/AdministratorGuide/Systems/WorkloadManagement/Pilots/index.rst @@ -170,9 +170,6 @@ The :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` is a central co responsible for managing and optimizing the submission of pilot jobs to various computing resources. It features: - *Parallel Submission*: Capable of submitting pilot jobs in parallel across different Computing Elements (CEs) to enhance throughput. -- :py:mod:`~DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy`: It utilizes various submission policies to optimize pilot-job distribution: - - *AggressiveFilling*: Fills available slots regardless of waiting jobs, ideal for continuously busy sites. - - *WaitingSupportedJobs* (default one): Fills slots based on the number of waiting jobs, suitable for intermittently busy sites. - *Monitoring and Accounting*: Features parallel monitoring and accounting for efficient tracking and management of pilot jobs. - *Pilot Wrapping*: Creates pilot wrappers that facilitate the execution of pilot scripts in diverse environments, including Grid, cloud, and virtualized resources. - *Resource Status Handling*: Integrates with the Resource Status System to ensure that pilots are only submitted to operational and enabled resources. diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index a28770ca4a0..5fc1617f338 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -25,7 +25,6 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Security import X509Chain -from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager @@ -34,6 +33,7 @@ from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials @@ -43,7 +43,6 @@ pilotWrapperScript, ) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved -from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS MAX_PILOTS_TO_SUBMIT = 100 @@ -74,12 +73,16 @@ def __init__(self, *args, **kwargs): self.siteClient = None self.rssClient = None self.pilotAgentsDB = None + self.matcherClient = None self.rssFlag = None # self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again self.failedQueueCycleFactor = 10 - self.submissionPolicyName = WAITING_SUPPORTED_JOBS - self.submissionPolicy = None + + # Every N cycles, pilot status update is performed by the SiteDirector + self.pilotStatusUpdateCycleFactor = 10 + # Every N cycles, pilot submission is performed by the SiteDirector + self.pilotSubmissionCycleFactor = 1 self.workingDirectory = None self.maxQueueLength = 86400 * 3 @@ -98,6 +101,7 @@ def initialize(self): self.siteClient = SiteStatus() self.rssClient = ResourceStatus() self.pilotAgentsDB = getPilotAgentsDB() + self.matcherClient = MatcherClient() return S_OK() @@ -124,12 +128,12 @@ def beginExecution(self): self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength) self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) - - # Load submission policy - self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName) - result = self._loadSubmissionPolicy() - if not result: - return result + self.pilotStatusUpdateCycleFactor = self.am_getOption( + "PilotStatusUpdateCycleFactor", self.pilotStatusUpdateCycleFactor + ) + self.pilotSubmissionCycleFactor = self.am_getOption( + "PilotSubmissionCycleFactor", self.pilotSubmissionCycleFactor + ) # Flags self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting) @@ -181,19 +185,6 @@ def beginExecution(self): return S_OK() - def _loadSubmissionPolicy(self): - """Load a submission policy""" - objectLoader = ObjectLoader() - result = objectLoader.loadObject( - "WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy" - ) - if not result["OK"]: - self.log.error(f"Failed to load submission policy: {result['Message']}") - return result - - self.submissionPolicy = result["Value"]() - return S_OK() - def _buildQueueDict( self, siteNames: list[str] | None = None, @@ -262,14 +253,12 @@ def execute(self): It basically just submits pilots and gets their status """ - submissionResult = self.submitPilots() - monitoringResult = self.monitorPilots() - - if not submissionResult["OK"]: - return submissionResult + cyclesDone = self.am_getModuleParam("cyclesDone") + if cyclesDone % self.pilotSubmissionCycleFactor == 0: + self.submitPilots() - if not monitoringResult["OK"]: - return monitoringResult + if cyclesDone % self.pilotStatusUpdateCycleFactor == 0: + self.monitorPilots() return S_OK() @@ -340,24 +329,23 @@ def _submitPilotsPerQueue(self, queueName: str): return result # Get the number of available slots on the target site/queue - totalSlots = self._getQueueSlots(queueName) + totalSlots, waitingPilots = self._getQueueSlots(queueName) if totalSlots <= 0: self.log.verbose(f"{queueName}: No slot available") return S_OK(0) - # Apply the submission policy - submittablePilots = self.submissionPolicy.apply( - totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters - ) - - if submittablePilots <= 0: + # Get the number of jobs that need pilots + waitingJobs = self._getNumberOfJobsNeedingPilots(waitingPilots, queueName) + if waitingJobs <= 0: self.log.verbose(f"{queueName}: Nothing to submit") return S_OK(0) - self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}") - - # Limit the number of pilots to submit to self.maxPilotsToSubmit + # Get the number of pilots to submit + submittablePilots = min(totalSlots, waitingJobs) pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots) + self.log.info( + f"{queueName}: slots available={totalSlots}; waiting jobs={waitingJobs}; to submit={pilotsToSubmit}" + ) # Now really submitting result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) @@ -389,7 +377,7 @@ def _getQueueSlots(self, queue: str): "CE queue report", f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}", ) - return result["Value"] + return (result["Value"], ceInfoDict["WaitingJobs"]) # If we cannot get available slots from the CE, then we get them from the pilotAgentsDB maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10)) @@ -402,7 +390,7 @@ def _getQueueSlots(self, queue: str): if not result["OK"]: self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") self.failedQueues[queue] += 1 - return 0 + return (0, 0) totalJobs = result["Value"] # Get the number of waiting pilots @@ -412,7 +400,7 @@ def _getQueueSlots(self, queue: str): if not result["OK"]: self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") self.failedQueues[queue] += 1 - return 0 + return (0, 0) waitingJobs = result["Value"] runningJobs = totalJobs - waitingJobs @@ -422,7 +410,26 @@ def _getQueueSlots(self, queue: str): ) totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) - return totalSlots + return (totalSlots, waitingJobs) + + def _getNumberOfJobsNeedingPilots(self, waitingPilots: int, queue: str): + """Get the number of jobs needing pilots for the targeted queue. + + :param waitingPilots: number of waiting pilots in the queue + :param queue: queue name + """ + result = self.matcherClient.getMatchingTaskQueues(self.queueDict[queue]["CE"].ceParameters) + if not result["OK"]: + return 0 + taskQueueDict = result["Value"] + + # Get the number of jobs that would match the capability of the CE + waitingSupportedJobs = 0 + for tq in taskQueueDict.values(): + waitingSupportedJobs += tq["Jobs"] + + # Get the number of jobs that need pilots + return max(0, waitingSupportedJobs - waitingPilots) def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str): """Method that really submits the pilots to the ComputingElements' queue diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index 9d00e622243..6fb77943840 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -9,11 +9,11 @@ from DIRAC import gLogger, gConfig from DIRAC.ConfigurationSystem.Client import ConfigurationData +from DIRAC.Core.Utilities.ProcessPool import S_OK from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector from DIRAC.WorkloadManagementSystem.Client import PilotStatus -from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES CONFIG = """ @@ -177,18 +177,40 @@ def pilotWrapperDirectory(tmp_path_factory): return fn -def test_loadSubmissionPolicy(sd): - """Load each submission policy and call it""" - for submissionPolicyName in SUBMISSION_POLICIES: - # Load the submission policy - sd.submissionPolicyName = submissionPolicyName - res = sd._loadSubmissionPolicy() - assert res["OK"] - - # Call the submission policy with predefined parameters - targetQueue = "ce1.site1.com_condor" - res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters) - assert res >= 0 and res <= 50 +def test_getNumberOfJobsNeedingPilots(sd, mocker): + """Make sure it returns the number of needed pilots""" + + # 1. No waiting job, no waiting pilot + # Because it requires an access to a DB, we mock the value returned by the Matcher + mocker.patch.object(sd, "matcherClient", autospec=True) + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor") + assert numberToSubmit == 0 + + # 2. 10 waiting jobs, no waiting pilot + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor") + assert numberToSubmit == 10 + + # 3. 10 waiting jobs split into 2 task queues, no waiting pilot + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor") + assert numberToSubmit == 10 + + # 4. 10 waiting jobs, 5 waiting pilots + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=5, queue="ce1.site3.com_condor") + assert numberToSubmit == 5 + + # 5. 10 waiting jobs split into 2 task queues, 10 waiting pilots + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=10, queue="ce1.site3.com_condor") + assert numberToSubmit == 0 + + # 6.10 waiting jobs, 20 waiting pilots + sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}}) + numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=20, queue="ce1.site3.com_condor") + assert numberToSubmit == 0 def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory): diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index ac43ffda16d..1827e01f2ad 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -296,14 +296,16 @@ Agents # How many cycles to skip if queue is not working FailedQueueCycleFactor = 10 + # Every N cycles, pilot status update is performed by the SiteDirector + PilotStatusUpdateCycleFactor = 10 + # Every N cycles, pilot submission is performed by the SiteDirector + PilotSubmissionCycleFactor = 10 # The maximum length of a queue (in seconds). Default: 3 days MaxQueueLength = 259200 # Max number of pilots to submit per cycle MaxPilotsToSubmit = 100 # Boolean value that indicates if the pilot job will send information for accounting SendPilotAccounting = True - # Submission policy to apply - SubmissionPolicy = WaitingSupportedJobs # Working directory containing the pilot files if not set in the CE WorkDirectory = } diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py deleted file mode 100644 index 2ae82f6d34b..00000000000 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py +++ /dev/null @@ -1,64 +0,0 @@ -from abc import ABC, abstractmethod - -from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient - - -# Submission policies -AGGRESSIVE_FILLING = "AggressiveFilling" -WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" -SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] - - -class SubmissionPolicy(ABC): - """Abstract class to define a submission strategy.""" - - @abstractmethod - def apply(self, availableSlots: int, **kwargs) -> int: - """Method to redefine in the concrete subclasses - - :param availableSlots: slots available for new pilots - """ - if availableSlots < 0: - raise RuntimeError("Available slots cannot be negative") - - -class AggressiveFillingPolicy(SubmissionPolicy): - def apply(self, availableSlots: int, **kwargs) -> int: - """All the available slots should be filled up. - Should be employed for sites that are always processing jobs. - - * Pros: would quickly fill up a queue - * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs - """ - super().apply(availableSlots, **kwargs) - return availableSlots - - -class WaitingSupportedJobsPolicy(SubmissionPolicy): - def __init__(self) -> None: - super().__init__() - self.matcherClient = MatcherClient() - - def apply(self, availableSlots: int, **kwargs) -> int: - """Fill up available slots only if waiting supported jobs exist. - Should be employed for sites that are used from time to time (targeting specific Task Queues). - - * Pros: submit pilots only if necessary, and quickly fill up the queue if needed - * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue - - :param ceParameters: CE parameters - """ - super().apply(availableSlots, **kwargs) - # Get Task Queues related to the CE - result = self.matcherClient.getMatchingTaskQueues(kwargs["ceParameters"]) - if not result["OK"]: - return 0 - taskQueueDict = result["Value"] - - # Get the number of jobs that would match the capability of the CE - waitingSupportedJobs = 0 - for tq in taskQueueDict.values(): - waitingSupportedJobs += tq["Jobs"] - - # Return the minimum value between the number of slots available and supported jobs - return min(availableSlots, waitingSupportedJobs) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py deleted file mode 100644 index 66f3ce5209b..00000000000 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py +++ /dev/null @@ -1,91 +0,0 @@ -""" Test class for Submission policy -""" -# pylint: disable=protected-access - -import pytest -from DIRAC.Core.Utilities.ReturnValues import S_OK - -from DIRAC.WorkloadManagementSystem.Client import PilotStatus -from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import ( - SUBMISSION_POLICIES, - AggressiveFillingPolicy, - WaitingSupportedJobsPolicy, -) - - -def test_AggressiveFillingPolicy(): - """Make sure it always return the number of slots provided""" - policy = AggressiveFillingPolicy() - - # 1. We want to submit 50 elements - numberToSubmit = policy.apply(50) - assert numberToSubmit == 50 - - # 2. We want to submit 0 element - numberToSubmit = policy.apply(0) - assert numberToSubmit == 0 - - # 3. We want to submit -10 elements - with pytest.raises(RuntimeError): - numberToSubmit = policy.apply(-10) - - -def test_WaitingSupportedJobsPolicy(mocker): - """Make sure it returns the min between the available slots and the jobs available""" - policy = WaitingSupportedJobsPolicy() - - # 1. We want to submit 50 elements without specifying the CE parameters - with pytest.raises(KeyError): - numberToSubmit = policy.apply(50) - - # 2. We want to submit 50 elements but there are no waiting job - # Because it requires an access to a DB, we mock the value returned by the Matcher - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", return_value=S_OK({}) - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 0 - - # 3. We want to submit 50 elements and we have 10 similar waiting jobs - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", - return_value=S_OK({"TQ1": {"Jobs": 10}}), - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 10 - - # 4. We want to submit 50 elements and we have 10 waiting jobs, split into 2 task queues - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", - return_value=S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}), - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 10 - - # 5. We want to submit 50 elements and we have 60 similar waiting jobs - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", - return_value=S_OK({"TQ1": {"Jobs": 60}}), - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 50 - - # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", - return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 50 - - # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues - mocker.patch( - "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", - return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), - ) - numberToSubmit = policy.apply(50, ceParameters={}) - assert numberToSubmit == 50 - - # 7. We want to submit -10 elements - with pytest.raises(RuntimeError): - numberToSubmit = policy.apply(-10, ceParameters={})