Skip to content

Commit

Permalink
feat: SiteDirector does not submit if there are enough waiting pilots
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 25, 2024
1 parent 4e3499b commit 98472c3
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ The :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` is a central co
responsible for managing and optimizing the submission of pilot jobs to various computing resources. It features:

- *Parallel Submission*: Capable of submitting pilot jobs in parallel across different Computing Elements (CEs) to enhance throughput.
- :py:mod:`~DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy`: It utilizes various submission policies to optimize pilot-job distribution:
- *AggressiveFilling*: Fills available slots regardless of waiting jobs, ideal for continuously busy sites.
- *WaitingSupportedJobs* (default one): Fills slots based on the number of waiting jobs, suitable for intermittently busy sites.
- *Monitoring and Accounting*: Features parallel monitoring and accounting for efficient tracking and management of pilot jobs.
- *Pilot Wrapping*: Creates pilot wrappers that facilitate the execution of pilot scripts in diverse environments, including Grid, cloud, and virtualized resources.
- *Resource Status Handling*: Integrates with the Resource Status System to ensure that pilots are only submitted to operational and enabled resources.
Expand Down
95 changes: 51 additions & 44 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security import X509Chain
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
Expand All @@ -34,6 +33,7 @@
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
Expand All @@ -43,7 +43,6 @@
pilotWrapperScript,
)
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS

MAX_PILOTS_TO_SUBMIT = 100

Expand Down Expand Up @@ -74,12 +73,16 @@ def __init__(self, *args, **kwargs):
self.siteClient = None
self.rssClient = None
self.pilotAgentsDB = None
self.matcherClient = None
self.rssFlag = None

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = WAITING_SUPPORTED_JOBS
self.submissionPolicy = None

# Every N cycles, pilot status update is performed by the SiteDirector
self.pilotStatusUpdateCycleFactor = 10
# Every N cycles, pilot submission is performed by the SiteDirector
self.pilotSubmissionCycleFactor = 1

self.workingDirectory = None
self.maxQueueLength = 86400 * 3
Expand All @@ -98,6 +101,7 @@ def initialize(self):
self.siteClient = SiteStatus()
self.rssClient = ResourceStatus()
self.pilotAgentsDB = getPilotAgentsDB()
self.matcherClient = MatcherClient()

return S_OK()

Expand All @@ -124,12 +128,12 @@ def beginExecution(self):
self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength)
self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit)
self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)

# Load submission policy
self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName)
result = self._loadSubmissionPolicy()
if not result:
return result
self.pilotStatusUpdateCycleFactor = self.am_getOption(
"PilotStatusUpdateCycleFactor", self.pilotStatusUpdateCycleFactor
)
self.pilotSubmissionCycleFactor = self.am_getOption(
"PilotSubmissionCycleFactor", self.pilotSubmissionCycleFactor
)

# Flags
self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting)
Expand Down Expand Up @@ -181,19 +185,6 @@ def beginExecution(self):

return S_OK()

def _loadSubmissionPolicy(self):
"""Load a submission policy"""
objectLoader = ObjectLoader()
result = objectLoader.loadObject(
"WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy"
)
if not result["OK"]:
self.log.error(f"Failed to load submission policy: {result['Message']}")
return result

self.submissionPolicy = result["Value"]()
return S_OK()

def _buildQueueDict(
self,
siteNames: list[str] | None = None,
Expand Down Expand Up @@ -262,14 +253,12 @@ def execute(self):
It basically just submits pilots and gets their status
"""
submissionResult = self.submitPilots()
monitoringResult = self.monitorPilots()

if not submissionResult["OK"]:
return submissionResult
cyclesDone = self.am_getModuleParam("cyclesDone")
if cyclesDone % self.pilotSubmissionCycleFactor == 0:
self.submitPilots()

if not monitoringResult["OK"]:
return monitoringResult
if cyclesDone % self.pilotStatusUpdateCycleFactor == 0:
self.monitorPilots()

return S_OK()

Expand Down Expand Up @@ -340,24 +329,23 @@ def _submitPilotsPerQueue(self, queueName: str):
return result

# Get the number of available slots on the target site/queue
totalSlots = self._getQueueSlots(queueName)
totalSlots, waitingPilots = self._getQueueSlots(queueName)
if totalSlots <= 0:
self.log.verbose(f"{queueName}: No slot available")
return S_OK(0)

# Apply the submission policy
submittablePilots = self.submissionPolicy.apply(
totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters
)

if submittablePilots <= 0:
# Get the number of jobs that need pilots
waitingJobs = self._getNumberOfJobsNeedingPilots(waitingPilots, queueName)
if waitingJobs <= 0:
self.log.verbose(f"{queueName}: Nothing to submit")
return S_OK(0)

self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}")

# Limit the number of pilots to submit to self.maxPilotsToSubmit
# Get the number of pilots to submit
submittablePilots = min(totalSlots, waitingJobs)
pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots)
self.log.info(
f"{queueName}: slots available={totalSlots}; waiting jobs={waitingJobs}; to submit={pilotsToSubmit}"
)

# Now really submitting
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
Expand Down Expand Up @@ -389,7 +377,7 @@ def _getQueueSlots(self, queue: str):
"CE queue report",
f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}",
)
return result["Value"]
return (result["Value"], ceInfoDict["WaitingJobs"])

# If we cannot get available slots from the CE, then we get them from the pilotAgentsDB
maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10))
Expand All @@ -402,7 +390,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
totalJobs = result["Value"]

# Get the number of waiting pilots
Expand All @@ -412,7 +400,7 @@ def _getQueueSlots(self, queue: str):
if not result["OK"]:
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
self.failedQueues[queue] += 1
return 0
return (0, 0)
waitingJobs = result["Value"]

runningJobs = totalJobs - waitingJobs
Expand All @@ -422,7 +410,26 @@ def _getQueueSlots(self, queue: str):
)

totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs))
return totalSlots
return (totalSlots, waitingJobs)

def _getNumberOfJobsNeedingPilots(self, waitingPilots: int, queue: str):
"""Get the number of jobs needing pilots for the targeted queue.
:param waitingPilots: number of waiting pilots in the queue
:param queue: queue name
"""
result = self.matcherClient.getMatchingTaskQueues(self.queueDict[queue]["CE"].ceParameters)
if not result["OK"]:
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Get the number of jobs that need pilots
return max(0, waitingSupportedJobs - waitingPilots)

def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str):
"""Method that really submits the pilots to the ComputingElements' queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from DIRAC import gLogger, gConfig
from DIRAC.ConfigurationSystem.Client import ConfigurationData
from DIRAC.Core.Utilities.ProcessPool import S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus

from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES


CONFIG = """
Expand Down Expand Up @@ -177,18 +177,40 @@ def pilotWrapperDirectory(tmp_path_factory):
return fn


def test_loadSubmissionPolicy(sd):
"""Load each submission policy and call it"""
for submissionPolicyName in SUBMISSION_POLICIES:
# Load the submission policy
sd.submissionPolicyName = submissionPolicyName
res = sd._loadSubmissionPolicy()
assert res["OK"]

# Call the submission policy with predefined parameters
targetQueue = "ce1.site1.com_condor"
res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters)
assert res >= 0 and res <= 50
def test_getNumberOfJobsNeedingPilots(sd, mocker):
"""Make sure it returns the number of needed pilots"""

# 1. No waiting job, no waiting pilot
# Because it requires an access to a DB, we mock the value returned by the Matcher
mocker.patch.object(sd, "matcherClient", autospec=True)
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 0

# 2. 10 waiting jobs, no waiting pilot
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 3. 10 waiting jobs split into 2 task queues, no waiting pilot
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=0, queue="ce1.site3.com_condor")
assert numberToSubmit == 10

# 4. 10 waiting jobs, 5 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=5, queue="ce1.site3.com_condor")
assert numberToSubmit == 5

# 5. 10 waiting jobs split into 2 task queues, 10 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=10, queue="ce1.site3.com_condor")
assert numberToSubmit == 0

# 6.10 waiting jobs, 20 waiting pilots
sd.matcherClient.getMatchingTaskQueues.return_value = S_OK({"TQ1": {"Jobs": 10}})
numberToSubmit = sd._getNumberOfJobsNeedingPilots(waitingPilots=20, queue="ce1.site3.com_condor")
assert numberToSubmit == 0


def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
Expand Down
6 changes: 4 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,16 @@ Agents

# How many cycles to skip if queue is not working
FailedQueueCycleFactor = 10
# Every N cycles, pilot status update is performed by the SiteDirector
PilotStatusUpdateCycleFactor = 10
# Every N cycles, pilot submission is performed by the SiteDirector
PilotSubmissionCycleFactor = 10
# The maximum length of a queue (in seconds). Default: 3 days
MaxQueueLength = 259200
# Max number of pilots to submit per cycle
MaxPilotsToSubmit = 100
# Boolean value that indicates if the pilot job will send information for accounting
SendPilotAccounting = True
# Submission policy to apply
SubmissionPolicy = WaitingSupportedJobs
# Working directory containing the pilot files if not set in the CE
WorkDirectory =
}
Expand Down
64 changes: 0 additions & 64 deletions src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py

This file was deleted.

Loading

0 comments on commit 98472c3

Please sign in to comment.