From 742966e18418029d6f2409e2395f7d0d8e4c5910 Mon Sep 17 00:00:00 2001 From: aldbr Date: Thu, 18 Jan 2024 16:08:48 +0100 Subject: [PATCH] fix: SiteDirector should not interact with the CEs if nothing to submit --- .../Computing/CloudComputingElement.py | 12 +++++-- .../Agent/SiteDirector.py | 32 +++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index 6ba416b0a15..1e3b0a3a2e7 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -154,6 +154,7 @@ import uuid from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +import requests import yaml from libcloud.compute.providers import get_driver, set_driver @@ -525,9 +526,14 @@ def getCEStatus(self): """ driver = self._getDriver() count = 0 - for node in driver.list_nodes(): - if node.name.startswith(VM_NAME_PREFIX): - count += 1 + try: + for node in driver.list_nodes(): + if node.name.startswith(VM_NAME_PREFIX): + count += 1 + except requests.exceptions.ConnectTimeout as err: + self.log.error("Cannot get CE Status. Connection timeout occurred:", str(err)) + return S_ERROR("Cannot get CE Status. Connection timeout occurred") + result = S_OK() result["SubmittedJobs"] = 0 result["RunningJobs"] = count diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index cb27b8aa149..a28770ca4a0 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -282,6 +282,7 @@ def submitPilots(self): self.log.verbose("Submission: Queues treated are", ",".join(self.queueDict)) errors = [] + totalSubmittedPilots = 0 with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor: futures = [] for queue in self.queueDict: @@ -291,6 +292,10 @@ def submitPilots(self): result = future.result() if not result["OK"]: errors.append(result["Message"]) + else: + totalSubmittedPilots += result["Value"] + + self.log.info("Total number of pilots submitted", f"to all queues: {totalSubmittedPilots}") if errors: self.log.error("The following errors occurred during the pilot submission operation", "\n".join(errors)) @@ -313,7 +318,7 @@ def _submitPilotsPerQueue(self, queueName: str): f"{queueName} ==> {self.failedQueueCycleFactor - failedCount}", ) self.failedQueues[queueName] += 1 - return S_OK() + return S_OK(0) # Adjust queueCPUTime: needed to generate the proxy if "CPUTime" not in queueDictionary["ParametersDict"]: @@ -336,16 +341,23 @@ def _submitPilotsPerQueue(self, queueName: str): # Get the number of available slots on the target site/queue totalSlots = self._getQueueSlots(queueName) - # Apply the submission policy - totalSlots = self.submissionPolicy.apply(totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters) - if totalSlots <= 0: self.log.verbose(f"{queueName}: No slot available") - return S_ERROR(f"{queueName}: No slot available") - self.log.info(f"{queueName}: to submit={totalSlots}") + return S_OK(0) + + # Apply the submission policy + submittablePilots = self.submissionPolicy.apply( + totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters + ) + + if submittablePilots <= 0: + self.log.verbose(f"{queueName}: Nothing to submit") + return S_OK(0) + + self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}") # Limit the number of pilots to submit to self.maxPilotsToSubmit - pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots) + pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots) # Now really submitting result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) @@ -359,9 +371,9 @@ def _submitPilotsPerQueue(self, queueName: str): if not result["OK"]: return result - # Summary after the cycle over queues - self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}") - return S_OK() + submittedPilots = len(pilotList) + self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}") + return S_OK(submittedPilots) def _getQueueSlots(self, queue: str): """Get the number of available slots in the queue"""