Skip to content

Commit

Permalink
feat: SiteDirector does not submit if there are enough waiting pilots
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 25, 2024
1 parent 4e3499b commit 49ed951
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ The :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` is a central co
responsible for managing and optimizing the submission of pilot jobs to various computing resources. It features:

- *Parallel Submission*: Capable of submitting pilot jobs in parallel across different Computing Elements (CEs) to enhance throughput.
- :py:mod:`~DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy`: It utilizes various submission policies to optimize pilot-job distribution:
- *AggressiveFilling*: Fills available slots regardless of waiting jobs, ideal for continuously busy sites.
- *WaitingSupportedJobs* (default one): Fills slots based on the number of waiting jobs, suitable for intermittently busy sites.
- *Monitoring and Accounting*: Features parallel monitoring and accounting for efficient tracking and management of pilot jobs.
- *Pilot Wrapping*: Creates pilot wrappers that facilitate the execution of pilot scripts in diverse environments, including Grid, cloud, and virtualized resources.
- *Resource Status Handling*: Integrates with the Resource Status System to ensure that pilots are only submitted to operational and enabled resources.
Expand Down
95 changes: 51 additions & 44 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Client import MatcherClient, PilotStatus
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
Expand All @@ -43,7 +43,6 @@
pilotWrapperScript,
)
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS

MAX_PILOTS_TO_SUBMIT = 100

Expand Down Expand Up @@ -74,12 +73,16 @@ def __init__(self, *args, **kwargs):
self.siteClient = None
self.rssClient = None
self.pilotAgentsDB = None
self.matcherClient = None
self.rssFlag = None

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = WAITING_SUPPORTED_JOBS
self.submissionPolicy = None

# Every N cycles, pilot status update is performed by the SiteDirector
self.pilotStatusUpdateCycleFactor = 10
# Every N cycles, pilot submission is performed by the SiteDirector
self.pilotSubmissionCycleFactor = 1

self.workingDirectory = None
self.maxQueueLength = 86400 * 3
Expand All @@ -98,6 +101,7 @@ def initialize(self):
self.siteClient = SiteStatus()
self.rssClient = ResourceStatus()
self.pilotAgentsDB = getPilotAgentsDB()
self.matcherClient = MatcherClient()

return S_OK()

Expand All @@ -124,12 +128,12 @@ def beginExecution(self):
self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength)
self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit)
self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)

# Load submission policy
self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName)
result = self._loadSubmissionPolicy()
if not result:
return result
self.pilotStatusUpdateCycleFactor = self.am_getOption(
"PilotStatusUpdateCycleFactor", self.pilotStatusUpdateCycleFactor
)
self.pilotSubmissionCycleFactor = self.am_getOption(
"PilotSubmissionCycleFactor", self.pilotSubmissionCycleFactor
)

# Flags
self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting)
Expand Down Expand Up @@ -181,19 +185,6 @@ def beginExecution(self):

return S_OK()

def _loadSubmissionPolicy(self):
"""Load a submission policy"""
objectLoader = ObjectLoader()
result = objectLoader.loadObject(
"WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy"
)
if not result["OK"]:
self.log.error(f"Failed to load submission policy: {result['Message']}")
return result

self.submissionPolicy = result["Value"]()
return S_OK()

def _buildQueueDict(
self,
siteNames: list[str] | None = None,
Expand Down Expand Up @@ -262,14 +253,12 @@ def execute(self):
It basically just submits pilots and gets their status
"""
submissionResult = self.submitPilots()
monitoringResult = self.monitorPilots()

if not submissionResult["OK"]:
return submissionResult
cyclesDone = self.am_getModuleParam("cyclesDone")
if cyclesDone % self.pilotSubmissionCycleFactor == 0:
self.submitPilots()

if not monitoringResult["OK"]:
return monitoringResult
if cyclesDone % self.pilotStatusUpdateCycleFactor == 0:
self.monitorPilots()

return S_OK()

Expand Down Expand Up @@ -340,24 +329,23 @@ def _submitPilotsPerQueue(self, queueName: str):
return result

# Get the number of available slots on the target site/queue
totalSlots = self._getQueueSlots(queueName)
totalSlots, waitingPilots = self._getQueueSlots(queueName)
if totalSlots <= 0:
self.log.verbose(f"{queueName}: No slot available")
return S_OK(0)

# Apply the submission policy
submittablePilots = self.submissionPolicy.apply(
totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters
)

if submittablePilots <= 0:
# Get the number of jobs that need pilots
waitingJobs = self._getNumberOfJobsNeedingPilots(waitingPilots, queueName)
if waitingJobs <= 0:
self.log.verbose(f"{queueName}: Nothing to submit")
return S_OK(0)

self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}")

# Limit the number of pilots to submit to self.maxPilotsToSubmit
# Get the number of pilots to submit
submittablePilots = min(totalSlots, waitingJobs)
pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots)
self.log.info(
f"{queueName}: slots available={totalSlots}; waiting jobs={waitingJobs}; to submit={pilotsToSubmit}"
)

# Now really submitting
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
Expand Down Expand Up @@ -389,7 +377,7 @@ def _getQueueSlots(self, queue: str):
"CE queue report",
f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}",
)
return result["Value"]
return (result["Value"], ceInfoDict["WaitingJobs"])

# If we cannot get available slots from the CE, then we get them from the pilotAgentsDB
maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10))
Expand All @@ -402,7 +390,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
totalJobs = result["Value"]

# Get the number of waiting pilots
Expand All @@ -412,7 +400,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
waitingJobs = result["Value"]

runningJobs = totalJobs - waitingJobs
Expand All @@ -422,7 +410,26 @@ def _getQueueSlots(self, queue: str):
)

totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs))
return totalSlots
return (totalSlots, waitingJobs)

def _getNumberOfJobsNeedingPilots(self, waitingPilots: int, queue: str):
"""Get the number of jobs needing pilots for the targeted queue.
:param waitingPilots: number of waiting pilots in the queue
:param queue: queue name
"""
result = self.matcherClient.getMatchingTaskQueues(self.queueDict[queue]["CE"].ceParameters)
if not result["OK"]:
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Get the number of jobs that need pilots
return max(0, waitingSupportedJobs - waitingPilots)

def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str):
"""Method that really submits the pilots to the ComputingElements' queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from DIRAC import gLogger, gConfig
from DIRAC.ConfigurationSystem.Client import ConfigurationData
from DIRAC.Core.Utilities.ProcessPool import S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus

from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES


CONFIG = """
Expand Down Expand Up @@ -177,18 +177,40 @@ def pilotWrapperDirectory(tmp_path_factory):
return fn


def test_loadSubmissionPolicy(sd):
"""Load each submission policy and call it"""
for submissionPolicyName in SUBMISSION_POLICIES:
# Load the submission policy
sd.submissionPolicyName = submissionPolicyName
res = sd._loadSubmissionPolicy()
assert res["OK"]

# Call the submission policy with predefined parameters
targetQueue = "ce1.site1.com_condor"
res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters)
assert res >= 0 and res <= 50
def test_getNumberOfJobsNeedingPilots(sd, mocker):
"""Make sure it returns the number of needed pilots"""

# 1. No waiting job, no waiting pilot
# Because it requires an access to a DB, we mock the value returned by the Matcher
mocker.patch.object(sd, "matcherClient", autospec=True)
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 0

# 2. 10 waiting jobs, no waiting pilot
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 3. 10 waiting jobs split into 2 task queues, no waiting pilot
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 4. 10 waiting jobs, 5 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=5, queue="ce1.site3.com_condor")
assert numberToSubmit == 5

# 5. 10 waiting jobs split into 2 task queues, 10 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=10, queue="ce1.site3.com_condor")
assert numberToSubmit == 0

# 6.10 waiting jobs, 20 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=20, queue="ce1.site3.com_condor")
assert numberToSubmit == 0


def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
Expand Down
64 changes: 0 additions & 64 deletions src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py

This file was deleted.

Loading

0 comments on commit 49ed951

Please sign in to comment.