From 25bbbd9afa91adda8c50428bfe7bda51b0f58bd2 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 15:22:18 +0100 Subject: [PATCH 01/10] add getWorkerStats in UserIF --- pandaserver/server/panda.py | 2 ++ pandaserver/userinterface/UserIF.py | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index 3d805532a..1342878f0 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -130,6 +130,7 @@ getTaskParamsMap, getTaskStatus, getUserJobMetadata, + getWorkerStats, harvesterIsAlive, increaseAttemptNrPanda, insertSandboxFileInfo, @@ -318,6 +319,7 @@ "getTaskParamsMap", "updateWorkers", "harvesterIsAlive", + "getWorkerStats", "reportWorkerStats", "reportWorkerStats_jobtype", "addHarvesterDialogs", diff --git a/pandaserver/userinterface/UserIF.py b/pandaserver/userinterface/UserIF.py index d0fe2aea6..1122ddbe0 100644 --- a/pandaserver/userinterface/UserIF.py +++ b/pandaserver/userinterface/UserIF.py @@ -965,6 +965,10 @@ def harvesterIsAlive(self, user, host, harvesterID, data): # serialize return json.dumps(retVal) + # get stats of workers + def getWorkerStats(self, site_name): + return self.taskBuffer.getWorkerStats(siteName=site_name) + # report stat of workers def reportWorkerStats(self, harvesterID, siteName, paramsList): return self.taskBuffer.reportWorkerStats(harvesterID, siteName, paramsList) @@ -2534,6 +2538,13 @@ def harvesterIsAlive(req, harvesterID, data=None): return userIF.harvesterIsAlive(user, host, harvesterID, data) +# get stats of workers +def getWorkerStats(site_name): + # get + ret = userIF.getWorkerStats(site_name) + return json.dumps(ret) + + # report stat of workers def reportWorkerStats(req, harvesterID, siteName, paramsList): # check security From 9155f101ab64f1cf874aa8b58f29d83a6874e936 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 16:26:44 +0100 Subject: [PATCH 02/10] fix --- pandaserver/taskbuffer/OraDBProxy.py | 2 +- pandaserver/userinterface/UserIF.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index f4b548f7c..0de7e4dcd 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22855,7 +22855,7 @@ def getWorkerStats(self, siteName): # sql to get stat sqlG = "SELECT SUM(n_workers), COUNT(harvester_ID), jobType, resourceType, status FROM ATLAS_PANDA.Harvester_Worker_Stats " sqlG += "WHERE computingSite=:siteName " - sqlG += "GROUP BY resourceType,status " + sqlG += "GROUP BY jobType,resourceType,status " varMap = dict() varMap[":siteName"] = siteName self.cur.execute(sqlG + comment, varMap) diff --git a/pandaserver/userinterface/UserIF.py b/pandaserver/userinterface/UserIF.py index 1122ddbe0..dd8d81e51 100644 --- a/pandaserver/userinterface/UserIF.py +++ b/pandaserver/userinterface/UserIF.py @@ -2539,7 +2539,7 @@ def harvesterIsAlive(req, harvesterID, data=None): # get stats of workers -def getWorkerStats(site_name): +def getWorkerStats(req, site_name): # get ret = userIF.getWorkerStats(site_name) return json.dumps(ret) From 1375e9a72782b36d178a92f7461ad68bb04ead0f Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 16:46:43 +0100 Subject: [PATCH 03/10] fix --- pandaserver/taskbuffer/OraDBProxy.py | 41 ++++++++++++++++------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 0de7e4dcd..5826e541b 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22840,22 +22840,25 @@ def getWorkerStats(self, siteName): # set autocommit on self.conn.begin() # sql to get nPilot - sqlP = "SELECT getJob+updateJob FROM ATLAS_PANDAMETA.SiteData " - sqlP += "WHERE HOURS=:hours AND FLAG IN (:flag1,:flag2) " - varMap = dict() - varMap[":hours"] = 1 - varMap[":flag1"] = "production" - varMap[":flag2"] = "analysis" - self.cur.execute(sqlP + comment, varMap) - res = self.cur.fetchone() - if res is not None: - (nPilot,) = res - else: - nPilot = 0 + # sqlP = ("SELECT getJob+updateJob FROM ATLAS_PANDAMETA.SiteData " + # "WHERE HOURS=:hours AND FLAG IN (:flag1,:flag2) ") + # varMap = dict() + # varMap[":hours"] = 1 + # varMap[":flag1"] = "production" + # varMap[":flag2"] = "analysis" + # self.cur.execute(sqlP + comment, varMap) + # res = self.cur.fetchone() + # if res is not None: + # (nPilot,) = res + # else: + # nPilot = 0 # sql to get stat - sqlG = "SELECT SUM(n_workers), COUNT(harvester_ID), jobType, resourceType, status FROM ATLAS_PANDA.Harvester_Worker_Stats " - sqlG += "WHERE computingSite=:siteName " - sqlG += "GROUP BY jobType,resourceType,status " + sqlG = ( + "SELECT SUM(n_workers), COUNT(DISTINCT harvester_ID), jobType, resourceType, status " + "FROM ATLAS_PANDA.Harvester_Worker_Stats " + "WHERE computingSite=:siteName " + "GROUP BY jobType,resourceType,status " + ) varMap = dict() varMap[":siteName"] = siteName self.cur.execute(sqlG + comment, varMap) @@ -22872,13 +22875,15 @@ def getWorkerStats(self, siteName): if not self._commit(): raise RuntimeError("Commit error") # return - tmpLog.debug(f"done with {str(retMap)} nPilot={nPilot}") - return retMap, nPilot + # tmpLog.debug(f"done with {str(retMap)} nPilot={nPilot}") + # return retMap, nPilot + tmpLog.debug(f"done with {str(retMap)}") + return retMap except Exception: # roll back self._rollback() self.dumpErrorMessage(tmpLog, methodName) - return {}, 0 + return {} # send command to harvester or lock command def commandToHarvester( From d5399366b800ceaccd2a488002abf06ce176fa0f Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 16:57:08 +0100 Subject: [PATCH 04/10] fix --- pandaserver/taskbuffer/OraDBProxy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 5826e541b..04145db0d 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22854,13 +22854,14 @@ def getWorkerStats(self, siteName): # nPilot = 0 # sql to get stat sqlG = ( - "SELECT SUM(n_workers), COUNT(DISTINCT harvester_ID), jobType, resourceType, status " + "SELECT SUM(n_workers), COUNT(harvester_ID), jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Worker_Stats " - "WHERE computingSite=:siteName " + "WHERE computingSite=:siteName AND lastupdate>=:time_limit " "GROUP BY jobType,resourceType,status " ) varMap = dict() varMap[":siteName"] = siteName + varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=4) self.cur.execute(sqlG + comment, varMap) res = self.cur.fetchall() retMap = {} From 9a50b7aa09a9f949e75429c078078fa2198f4c49 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 17:26:33 +0100 Subject: [PATCH 05/10] add stats for finished workers --- pandaserver/taskbuffer/OraDBProxy.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 04145db0d..34cd7c0a0 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22852,20 +22852,32 @@ def getWorkerStats(self, siteName): # (nPilot,) = res # else: # nPilot = 0 - # sql to get stat - sqlG = ( - "SELECT SUM(n_workers), COUNT(harvester_ID), jobType, resourceType, status " + # sql to get stat of active workers + sqlGA = ( + "SELECT SUM(n_workers), harvester_ID, jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Worker_Stats " - "WHERE computingSite=:siteName AND lastupdate>=:time_limit " - "GROUP BY jobType,resourceType,status " + "WHERE computingSite=:siteName AND lastUpdate>=:time_limit " + "GROUP BY harvester_ID,jobType,resourceType,status " ) varMap = dict() varMap[":siteName"] = siteName varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=4) - self.cur.execute(sqlG + comment, varMap) - res = self.cur.fetchall() + self.cur.execute(sqlGA + comment, varMap) + res_active = self.cur.fetchall() + # sql to get stat of finished workers from wokrer table + sqlGF = ( + "SELECT COUNT(*), harvesterID, jobType, resourceType, status " + "FROM ATLAS_PANDA.Harvester_Workers " + "WHERE computingSite=:siteName AND endTime>=:time_limit AND status='finished' " + "GROUP BY harvesterID,jobType,resourceType,status " + ) + varMap = dict() + varMap[":siteName"] = siteName + varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=24) + self.cur.execute(sqlGF + comment, varMap) + res_terminated = self.cur.fetchall() retMap = {} - for cnt, nInstances, jobType, resourceType, status in res: + for cnt, nInstances, jobType, resourceType, status in res_active + res_terminated: retMap.setdefault(jobType, {}) if resourceType not in retMap[jobType]: retMap[jobType][resourceType] = {"stats": dict(), "nInstances": 0} From 47089d3e3732ec53d5801401e513fe5e5833cea0 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 17:32:22 +0100 Subject: [PATCH 06/10] fix --- pandaserver/taskbuffer/OraDBProxy.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 34cd7c0a0..07d6dca04 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22877,13 +22877,12 @@ def getWorkerStats(self, siteName): self.cur.execute(sqlGF + comment, varMap) res_terminated = self.cur.fetchall() retMap = {} - for cnt, nInstances, jobType, resourceType, status in res_active + res_terminated: - retMap.setdefault(jobType, {}) - if resourceType not in retMap[jobType]: - retMap[jobType][resourceType] = {"stats": dict(), "nInstances": 0} - retMap[jobType][resourceType]["stats"][status] = cnt - if nInstances > retMap[jobType][resourceType]["nInstances"]: - retMap[jobType][resourceType]["nInstances"] = nInstances + for cnt, harvesterID, jobType, resourceType, status in res_active + res_terminated: + retMap.setdefault(harvesterID, {}) + retMap[harvesterID].setdefault(jobType, {}) + if resourceType not in retMap[harvesterID][jobType]: + retMap[harvesterID][jobType][resourceType] = dict() + retMap[harvesterID][jobType][resourceType][status] = cnt # commit if not self._commit(): raise RuntimeError("Commit error") From ede2c1140cad964de6b7c09736c36139acf85e1f Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 22 Nov 2023 17:46:52 +0100 Subject: [PATCH 07/10] fix --- pandaserver/taskbuffer/OraDBProxy.py | 31 ++++++++++++++-------------- pandaserver/taskbuffer/TaskBuffer.py | 4 ++-- pandaserver/userinterface/UserIF.py | 8 +++---- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 07d6dca04..a129602f2 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22831,10 +22831,10 @@ def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList): return False, "database error" # get stat of workers - def getWorkerStats(self, siteName): + def getWorkerStats(self): comment = " /* DBProxy.getWorkerStats */" methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper(_logger, methodName + f" < siteName={siteName} >") + tmpLog = LogWrapper(_logger) tmpLog.debug("start") try: # set autocommit on @@ -22854,35 +22854,34 @@ def getWorkerStats(self, siteName): # nPilot = 0 # sql to get stat of active workers sqlGA = ( - "SELECT SUM(n_workers), harvester_ID, jobType, resourceType, status " + "SELECT SUM(n_workers), computingSite, harvester_ID, jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Worker_Stats " - "WHERE computingSite=:siteName AND lastUpdate>=:time_limit " - "GROUP BY harvester_ID,jobType,resourceType,status " + "WHERE lastUpdate>=:time_limit " + "GROUP BY computingSite,harvester_ID,jobType,resourceType,status " ) varMap = dict() - varMap[":siteName"] = siteName varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=4) self.cur.execute(sqlGA + comment, varMap) res_active = self.cur.fetchall() # sql to get stat of finished workers from wokrer table sqlGF = ( - "SELECT COUNT(*), harvesterID, jobType, resourceType, status " + "SELECT COUNT(*), computingSite, harvesterID, jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Workers " - "WHERE computingSite=:siteName AND endTime>=:time_limit AND status='finished' " - "GROUP BY harvesterID,jobType,resourceType,status " + "WHERE endTime>=:time_limit AND status='finished' " + "GROUP BY computingSite,harvesterID,jobType,resourceType,status " ) varMap = dict() - varMap[":siteName"] = siteName varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=24) self.cur.execute(sqlGF + comment, varMap) res_terminated = self.cur.fetchall() retMap = {} - for cnt, harvesterID, jobType, resourceType, status in res_active + res_terminated: - retMap.setdefault(harvesterID, {}) - retMap[harvesterID].setdefault(jobType, {}) - if resourceType not in retMap[harvesterID][jobType]: - retMap[harvesterID][jobType][resourceType] = dict() - retMap[harvesterID][jobType][resourceType][status] = cnt + for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active + res_terminated: + retMap.setdefault(computingSite, {}) + retMap[computingSite].setdefault(harvesterID, {}) + retMap[computingSite][harvesterID].setdefault(jobType, {}) + if resourceType not in retMap[computingSite][harvesterID][jobType]: + retMap[computingSite][harvesterID][jobType][resourceType] = dict() + retMap[computingSite][harvesterID][jobType][resourceType][status] = cnt # commit if not self._commit(): raise RuntimeError("Commit error") diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index dd3510f6d..a7ebd4cc1 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -3773,11 +3773,11 @@ def checkJobStatus(self, pandaIDs): return retList # get stat of workers - def getWorkerStats(self, siteName): + def getWorkerStats(self): # get DBproxy proxy = self.proxyPool.getProxy() # exec - ret = proxy.getWorkerStats(siteName) + ret = proxy.getWorkerStats() # release proxy self.proxyPool.putProxy(proxy) # return diff --git a/pandaserver/userinterface/UserIF.py b/pandaserver/userinterface/UserIF.py index dd8d81e51..71365578e 100644 --- a/pandaserver/userinterface/UserIF.py +++ b/pandaserver/userinterface/UserIF.py @@ -966,8 +966,8 @@ def harvesterIsAlive(self, user, host, harvesterID, data): return json.dumps(retVal) # get stats of workers - def getWorkerStats(self, site_name): - return self.taskBuffer.getWorkerStats(siteName=site_name) + def getWorkerStats(self): + return self.taskBuffer.getWorkerStats() # report stat of workers def reportWorkerStats(self, harvesterID, siteName, paramsList): @@ -2539,9 +2539,9 @@ def harvesterIsAlive(req, harvesterID, data=None): # get stats of workers -def getWorkerStats(req, site_name): +def getWorkerStats(req): # get - ret = userIF.getWorkerStats(site_name) + ret = userIF.getWorkerStats() return json.dumps(ret) From 27d7868c10c22e9a0eee7b27f79b2e59ee07cdb1 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 23 Nov 2023 15:19:20 +0100 Subject: [PATCH 08/10] filter with lastupdate --- pandaserver/taskbuffer/OraDBProxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index a129602f2..25914bf9c 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22867,7 +22867,7 @@ def getWorkerStats(self): sqlGF = ( "SELECT COUNT(*), computingSite, harvesterID, jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Workers " - "WHERE endTime>=:time_limit AND status='finished' " + "WHERE lastUpdate>=:time_limit AND endTime>=:time_limit AND status='finished' " "GROUP BY computingSite,harvesterID,jobType,resourceType,status " ) varMap = dict() From 4662c3d58b57da34d1c2ca097bbea40e119381d4 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 23 Nov 2023 15:55:48 +0100 Subject: [PATCH 09/10] query Harvester_Worker_Stats only --- pandaserver/taskbuffer/OraDBProxy.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 25914bf9c..c7c24c294 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22852,7 +22852,7 @@ def getWorkerStats(self): # (nPilot,) = res # else: # nPilot = 0 - # sql to get stat of active workers + # sql to get stat of workers sqlGA = ( "SELECT SUM(n_workers), computingSite, harvester_ID, jobType, resourceType, status " "FROM ATLAS_PANDA.Harvester_Worker_Stats " @@ -22863,19 +22863,8 @@ def getWorkerStats(self): varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=4) self.cur.execute(sqlGA + comment, varMap) res_active = self.cur.fetchall() - # sql to get stat of finished workers from wokrer table - sqlGF = ( - "SELECT COUNT(*), computingSite, harvesterID, jobType, resourceType, status " - "FROM ATLAS_PANDA.Harvester_Workers " - "WHERE lastUpdate>=:time_limit AND endTime>=:time_limit AND status='finished' " - "GROUP BY computingSite,harvesterID,jobType,resourceType,status " - ) - varMap = dict() - varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=24) - self.cur.execute(sqlGF + comment, varMap) - res_terminated = self.cur.fetchall() retMap = {} - for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active + res_terminated: + for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active: retMap.setdefault(computingSite, {}) retMap[computingSite].setdefault(harvesterID, {}) retMap[computingSite][harvesterID].setdefault(jobType, {}) @@ -22886,8 +22875,6 @@ def getWorkerStats(self): if not self._commit(): raise RuntimeError("Commit error") # return - # tmpLog.debug(f"done with {str(retMap)} nPilot={nPilot}") - # return retMap, nPilot tmpLog.debug(f"done with {str(retMap)}") return retMap except Exception: From 19addb0f52ae2bbb7b74a31149f8bb55a9ba7120 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 23 Nov 2023 17:30:16 +0100 Subject: [PATCH 10/10] v0.0.71 --- PandaPkgInfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index db0d1633d..600561175 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.70" +release_version = "0.0.71"