From 94b5c88a564fc3e01764831291541f33d37709a9 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 8 Dec 2023 16:19:46 +0100 Subject: [PATCH] feat: introduce addPilotReferences() method --- src/DIRAC/Interfaces/API/DiracAdmin.py | 7 ++++- .../Agent/StatesAccountingAgent.py | 4 +++ .../DB/PilotAgentsDB.py | 26 +++++++++++++++++++ .../Service/PilotManagerHandler.py | 9 +++++++ .../scripts/dirac_admin_add_pilot.py | 3 +++ 5 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/DIRAC/Interfaces/API/DiracAdmin.py b/src/DIRAC/Interfaces/API/DiracAdmin.py index e3c14466310..e55a9f47ab4 100755 --- a/src/DIRAC/Interfaces/API/DiracAdmin.py +++ b/src/DIRAC/Interfaces/API/DiracAdmin.py @@ -553,6 +553,9 @@ def getPilotInfo(self, gridReference): if not isinstance(gridReference, str): return self._errorReport("Expected string for pilot reference") + # TODO: to remove from v9.0 + gLogger.notice("Notice: 'TaskQueueID' will be removed from the output in v9.0.") + result = PilotManagerClient().getPilotInfo(gridReference) return result @@ -599,7 +602,6 @@ def getJobPilots(self, jobID): :param job: JobID :type job: integer or string :return: S_OK,S_ERROR - """ if isinstance(jobID, str): try: @@ -607,6 +609,9 @@ def getJobPilots(self, jobID): except ValueError as x: return self._errorReport(str(x), "Expected integer or string for existing jobID") + # TODO: remove this comment from v9.0 + gLogger.notice("Notice: 'TaskQueueID' will be removed from the output in v9.0.") + result = PilotManagerClient().getPilots(jobID) if result["OK"]: gLogger.notice(self.pPrint.pformat(result["Value"])) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index df8f94fd36e..378cddf6f5f 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -79,6 +79,10 @@ def execute(self): # PilotsHistory to Monitoring if "Monitoring" in self.pilotMonitoringOption: self.log.info("Committing PilotsHistory to Monitoring") + + # TODO: remove this comment from v9.0 + self.log.notice("Notice: 'TaskQueueID' will be removed from the pilotAgentsDB in v9.0.") + result = PilotAgentsDB().getSummarySnapshot() now = datetime.datetime.utcnow() if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index ddadfa0cecf..7dbf75b1a67 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -5,6 +5,7 @@ Available methods are: + addPilotReferences() addPilotTQReference() setPilotStatus() deletePilot() @@ -20,6 +21,7 @@ import datetime import decimal import threading +from DIRAC.Core.Utilities.Decorators import deprecated import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC import S_ERROR, S_OK @@ -38,6 +40,30 @@ def __init__(self, parentLogger=None): self.lock = threading.Lock() ########################################################################################## + def addPilotReferences(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" + for ref in pilotRef: + stamp = "" + if ref in pilotStampDict: + stamp = pilotStampDict[ref] + + req = ( + "INSERT INTO PilotAgents " + + "(PilotJobReference, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) " + + "VALUES ('%s','%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')" + % (ref, ownerGroup, gridType, stamp) + ) + + result = self._update(req) + if not result["OK"]: + return result + + if "lastRowId" not in result: + return S_ERROR("PilotAgentsDB.addPilotReferences: Failed to retrieve a new Id.") + + return S_OK() + + @deprecated("Use addPilotReferences instead") def addPilotTQReference( self, pilotRef, taskQueueID, ownerDN, ownerGroup, broker="Unknown", gridType="DIRAC", pilotStampDict={} ): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index 2ed3d28325e..3d2eacb988e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -5,6 +5,7 @@ import datetime from DIRAC import S_OK, S_ERROR +from DIRAC.Core.Utilities.Decorators import deprecated import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC.Core.DISET.RequestHandler import RequestHandler @@ -80,8 +81,16 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) ########################################################################################## + types_addPilotReferences = [list, str] + + @classmethod + def export_addPilotReferences(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" + return cls.pilotAgentsDB.addPilotReferences(pilotRef, ownerGroup, gridType, pilotStampDict) + types_addPilotTQReference = [list, int, str, str] + @deprecated("Use addPilotReferences instead") @classmethod def export_addPilotTQReference( cls, pilotRef, taskQueueID, ownerDN, ownerGroup, broker="Unknown", gridType="DIRAC", pilotStampDict={} diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py index 7b2b55d3b7e..d83d68d0ab0 100644 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py @@ -51,6 +51,9 @@ def setTaskQueueID(self, value): :return: S_OK()/S_ERROR() """ + # TODO: remove this comment from v9.0 + gLogger.notice("Notice: 'TaskQueueID' will be removed from the pilotAgentsDB in v9.0.") + try: self.taskQueueID = int(value) except ValueError: