Skip to content

Commit

Permalink
feat: SiteDirector does not submit if there are enough waiting pilots
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 22, 2024
1 parent 4e3499b commit f0fb0b0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 205 deletions.
75 changes: 38 additions & 37 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Client import MatcherClient, PilotStatus
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
Expand All @@ -43,7 +43,6 @@
pilotWrapperScript,
)
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS

MAX_PILOTS_TO_SUBMIT = 100

Expand Down Expand Up @@ -74,12 +73,11 @@ def __init__(self, *args, **kwargs):
self.siteClient = None
self.rssClient = None
self.pilotAgentsDB = None
self.matcherClient = None
self.rssFlag = None

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = WAITING_SUPPORTED_JOBS
self.submissionPolicy = None

self.workingDirectory = None
self.maxQueueLength = 86400 * 3
Expand All @@ -98,6 +96,7 @@ def initialize(self):
self.siteClient = SiteStatus()
self.rssClient = ResourceStatus()
self.pilotAgentsDB = getPilotAgentsDB()
self.matcherClient = MatcherClient()

return S_OK()

Expand Down Expand Up @@ -125,12 +124,6 @@ def beginExecution(self):
self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit)
self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)

# Load submission policy
self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName)
result = self._loadSubmissionPolicy()
if not result:
return result

# Flags
self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting)
self.rssFlag = self.rssClient.rssFlag
Expand Down Expand Up @@ -181,19 +174,6 @@ def beginExecution(self):

return S_OK()

def _loadSubmissionPolicy(self):
"""Load a submission policy"""
objectLoader = ObjectLoader()
result = objectLoader.loadObject(
"WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy"
)
if not result["OK"]:
self.log.error(f"Failed to load submission policy: {result['Message']}")
return result

self.submissionPolicy = result["Value"]()
return S_OK()

def _buildQueueDict(
self,
siteNames: list[str] | None = None,
Expand Down Expand Up @@ -340,24 +320,23 @@ def _submitPilotsPerQueue(self, queueName: str):
return result

# Get the number of available slots on the target site/queue
totalSlots = self._getQueueSlots(queueName)
totalSlots, waitingPilots = self._getQueueSlots(queueName)
if totalSlots <= 0:
self.log.verbose(f"{queueName}: No slot available")
return S_OK(0)

# Apply the submission policy
submittablePilots = self.submissionPolicy.apply(
totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters
)

if submittablePilots <= 0:
# Get the number of jobs that need pilots
waitingJobs = self._getNumberOfJobsNeedingPilots(totalSlots, waitingPilots, queueName)
if waitingJobs <= 0:
self.log.verbose(f"{queueName}: Nothing to submit")
return S_OK(0)

self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}")

# Limit the number of pilots to submit to self.maxPilotsToSubmit
# Get the number of pilots to submit
submittablePilots = min(totalSlots, waitingJobs)
pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots)
self.log.info(
f"{queueName}: slots available={totalSlots}; waiting jobs={waitingJobs}; to submit={pilotsToSubmit}"
)

# Now really submitting
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
Expand Down Expand Up @@ -389,7 +368,7 @@ def _getQueueSlots(self, queue: str):
"CE queue report",
f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}",
)
return result["Value"]
return (result["Value"], ceInfoDict["WaitingJobs"])

# If we cannot get available slots from the CE, then we get them from the pilotAgentsDB
maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10))
Expand All @@ -402,7 +381,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
totalJobs = result["Value"]

# Get the number of waiting pilots
Expand All @@ -412,7 +391,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
waitingJobs = result["Value"]

runningJobs = totalJobs - waitingJobs
Expand All @@ -422,7 +401,29 @@ def _getQueueSlots(self, queue: str):
)

totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs))
return totalSlots
return (totalSlots, waitingJobs)

def _getNumberOfJobsNeedingPilots(self, waitingPilots: int, queue: str):
"""Get the number of jobs needing pilots for the targeted queue.
:param waitingPilots: number of waiting pilots in the queue
:param queue: queue name
"""
result = self.matcherClient.getMatchingTaskQueues(self.queueDict[queue]["CE"].ceParameters)
if not result["OK"]:
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# If there are already enough pilots waiting, we do not submit more
if waitingSupportedJobs < waitingPilots:
return 0

return waitingSupportedJobs

def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str):
"""Method that really submits the pilots to the ComputingElements' queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from DIRAC import gLogger, gConfig
from DIRAC.ConfigurationSystem.Client import ConfigurationData
from DIRAC.Core.Utilities.ProcessPool import S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus

from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES


CONFIG = """
Expand Down Expand Up @@ -177,18 +177,40 @@ def pilotWrapperDirectory(tmp_path_factory):
return fn


def test_loadSubmissionPolicy(sd):
"""Load each submission policy and call it"""
for submissionPolicyName in SUBMISSION_POLICIES:
# Load the submission policy
sd.submissionPolicyName = submissionPolicyName
res = sd._loadSubmissionPolicy()
assert res["OK"]

# Call the submission policy with predefined parameters
targetQueue = "ce1.site1.com_condor"
res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters)
assert res >= 0 and res <= 50
def test_getNumberOfJobsNeedingPilots(sd, mocker):
"""Make sure it returns the min between the available slots and the jobs available"""

# 1. We want to submit 50 elements but there are no waiting job
# Because it requires an access to a DB, we mock the value returned by the Matcher
mocker.patch.object(sd, "matcherClient", autospec=True)
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=50, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 0

# 2. We want to submit 50 elements and we have 10 similar waiting jobs
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=50, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 3. We want to submit 50 elements and we have 10 waiting jobs, split into 2 task queues
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=50, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 4. We want to submit 50 elements and we have 60 similar waiting jobs
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 60}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=50, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 50

# 5. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=50, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 50

# 6. We want to submit -10 elements
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 60}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(availableSlots=-10, waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == -10


def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
Expand Down
64 changes: 0 additions & 64 deletions src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py

This file was deleted.

This file was deleted.

0 comments on commit f0fb0b0

Please sign in to comment.