Skip to content

Commit

Permalink
Merge pull request #273 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
add getWorkerStats in UserIF
  • Loading branch information
mightqxc authored Nov 23, 2023
2 parents 737f808 + 19addb0 commit 5ee2a29
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 34 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.70"
release_version = "0.0.71"
2 changes: 2 additions & 0 deletions pandaserver/server/panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
getTaskParamsMap,
getTaskStatus,
getUserJobMetadata,
getWorkerStats,
harvesterIsAlive,
increaseAttemptNrPanda,
insertSandboxFileInfo,
Expand Down Expand Up @@ -318,6 +319,7 @@
"getTaskParamsMap",
"updateWorkers",
"harvesterIsAlive",
"getWorkerStats",
"reportWorkerStats",
"reportWorkerStats_jobtype",
"addHarvesterDialogs",
Expand Down
65 changes: 34 additions & 31 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22831,54 +22831,57 @@ def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
return False, "database error"

# get stat of workers
def getWorkerStats(self, siteName):
def getWorkerStats(self):
comment = " /* DBProxy.getWorkerStats */"
methodName = comment.split(" ")[-2].split(".")[-1]
tmpLog = LogWrapper(_logger, methodName + f" < siteName={siteName} >")
tmpLog = LogWrapper(_logger)
tmpLog.debug("start")
try:
# set autocommit on
self.conn.begin()
# sql to get nPilot
sqlP = "SELECT getJob+updateJob FROM ATLAS_PANDAMETA.SiteData "
sqlP += "WHERE HOURS=:hours AND FLAG IN (:flag1,:flag2) "
varMap = dict()
varMap[":hours"] = 1
varMap[":flag1"] = "production"
varMap[":flag2"] = "analysis"
self.cur.execute(sqlP + comment, varMap)
res = self.cur.fetchone()
if res is not None:
(nPilot,) = res
else:
nPilot = 0
# sql to get stat
sqlG = "SELECT SUM(n_workers), COUNT(harvester_ID), jobType, resourceType, status FROM ATLAS_PANDA.Harvester_Worker_Stats "
sqlG += "WHERE computingSite=:siteName "
sqlG += "GROUP BY resourceType,status "
# sqlP = ("SELECT getJob+updateJob FROM ATLAS_PANDAMETA.SiteData "
# "WHERE HOURS=:hours AND FLAG IN (:flag1,:flag2) ")
# varMap = dict()
# varMap[":hours"] = 1
# varMap[":flag1"] = "production"
# varMap[":flag2"] = "analysis"
# self.cur.execute(sqlP + comment, varMap)
# res = self.cur.fetchone()
# if res is not None:
# (nPilot,) = res
# else:
# nPilot = 0
# sql to get stat of workers
sqlGA = (
"SELECT SUM(n_workers), computingSite, harvester_ID, jobType, resourceType, status "
"FROM ATLAS_PANDA.Harvester_Worker_Stats "
"WHERE lastUpdate>=:time_limit "
"GROUP BY computingSite,harvester_ID,jobType,resourceType,status "
)
varMap = dict()
varMap[":siteName"] = siteName
self.cur.execute(sqlG + comment, varMap)
res = self.cur.fetchall()
varMap[":time_limit"] = datetime.datetime.utcnow() - datetime.timedelta(hours=4)
self.cur.execute(sqlGA + comment, varMap)
res_active = self.cur.fetchall()
retMap = {}
for cnt, nInstances, jobType, resourceType, status in res:
retMap.setdefault(jobType, {})
if resourceType not in retMap[jobType]:
retMap[jobType][resourceType] = {"stats": dict(), "nInstances": 0}
retMap[jobType][resourceType]["stats"][status] = cnt
if nInstances > retMap[jobType][resourceType]["nInstances"]:
retMap[jobType][resourceType]["nInstances"] = nInstances
for cnt, computingSite, harvesterID, jobType, resourceType, status in res_active:
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(harvesterID, {})
retMap[computingSite][harvesterID].setdefault(jobType, {})
if resourceType not in retMap[computingSite][harvesterID][jobType]:
retMap[computingSite][harvesterID][jobType][resourceType] = dict()
retMap[computingSite][harvesterID][jobType][resourceType][status] = cnt
# commit
if not self._commit():
raise RuntimeError("Commit error")
# return
tmpLog.debug(f"done with {str(retMap)} nPilot={nPilot}")
return retMap, nPilot
tmpLog.debug(f"done with {str(retMap)}")
return retMap
except Exception:
# roll back
self._rollback()
self.dumpErrorMessage(tmpLog, methodName)
return {}, 0
return {}

# send command to harvester or lock command
def commandToHarvester(
Expand Down
4 changes: 2 additions & 2 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3773,11 +3773,11 @@ def checkJobStatus(self, pandaIDs):
return retList

# get stat of workers
def getWorkerStats(self, siteName):
def getWorkerStats(self):
# get DBproxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.getWorkerStats(siteName)
ret = proxy.getWorkerStats()
# release proxy
self.proxyPool.putProxy(proxy)
# return
Expand Down
11 changes: 11 additions & 0 deletions pandaserver/userinterface/UserIF.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,10 @@ def harvesterIsAlive(self, user, host, harvesterID, data):
# serialize
return json.dumps(retVal)

# get stats of workers
def getWorkerStats(self):
return self.taskBuffer.getWorkerStats()

# report stat of workers
def reportWorkerStats(self, harvesterID, siteName, paramsList):
return self.taskBuffer.reportWorkerStats(harvesterID, siteName, paramsList)
Expand Down Expand Up @@ -2534,6 +2538,13 @@ def harvesterIsAlive(req, harvesterID, data=None):
return userIF.harvesterIsAlive(user, host, harvesterID, data)


# get stats of workers
def getWorkerStats(req):
# get
ret = userIF.getWorkerStats()
return json.dumps(ret)


# report stat of workers
def reportWorkerStats(req, harvesterID, siteName, paramsList):
# check security
Expand Down

0 comments on commit 5ee2a29

Please sign in to comment.