diff --git a/pandaserver/server/panda.py b/pandaserver/server/panda.py index 59de1526..537b4631 100755 --- a/pandaserver/server/panda.py +++ b/pandaserver/server/panda.py @@ -124,7 +124,6 @@ relay_idds_command, release_task, reloadInput, - reportWorkerStats, reportWorkerStats_jobtype, resumeTask, retryTask, diff --git a/pandaserver/srvcore/allowed_methods.py b/pandaserver/srvcore/allowed_methods.py index e837284e..6ecd79af 100644 --- a/pandaserver/srvcore/allowed_methods.py +++ b/pandaserver/srvcore/allowed_methods.py @@ -82,7 +82,6 @@ "getTaskParamsMap", "updateWorkers", "harvesterIsAlive", - "reportWorkerStats", "reportWorkerStats_jobtype", "getWorkerStats", "addHarvesterDialogs", diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index f01e9662..a2c6ec2c 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -2141,81 +2141,6 @@ def finalizePendingJobs(self, prodUserName, jobDefinitionID, waitLock=False): tmpLog.error(f"failed with {str(e)}") return False - # delete stalled jobs - def deleteStalledJobs(self, libFileName): - comment = " /* DBProxy.deleteStalledJobs */" - _logger.debug(f"deleteStalledJobs : {libFileName}") - sql2 = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) " - sql2 += JobSpec.bindValuesExpression() - nTry = 3 - try: - # begin transaction - self.conn.begin() - # look for downstream jobs - sqlD = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE type=:type AND lfn=:lfn GROUP BY PandaID" - sqlDJS = f"SELECT {JobSpec.columnNames()} " - sqlDJS += "FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID" - sqlDJD = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID" - sqlDJI = f"INSERT INTO ATLAS_PANDA.jobsArchived4 ({JobSpec.columnNames()}) " - sqlDJI += JobSpec.bindValuesExpression() - sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID" - sqlMMod = "UPDATE ATLAS_PANDA.metaTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID" - sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID" - _logger.debug(f"deleteStalledJobs : look for downstream jobs for {libFileName}") - # select PandaID - varMap = {} - varMap[":lfn"] = libFileName - varMap[":type"] = "input" - self.cur.arraysize = 100000 - self.cur.execute(sqlD + comment, varMap) - res = self.cur.fetchall() - for (downID,) in res: - _logger.debug(f"deleteStalledJobs : delete {downID}") - # select jobs - varMap = {} - varMap[":PandaID"] = downID - self.cur.arraysize = 10 - self.cur.execute(sqlDJS + comment, varMap) - resJob = self.cur.fetchall() - if len(resJob) == 0: - continue - # instantiate JobSpec - dJob = JobSpec() - dJob.pack(resJob[0]) - # delete - varMap = {} - varMap[":PandaID"] = downID - self.cur.execute(sqlDJD + comment, varMap) - retD = self.cur.rowcount - if retD == 0: - continue - # error code - dJob.jobStatus = "cancelled" - dJob.endTime = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - dJob.taskBufferErrorCode = ErrorCode.EC_Kill - dJob.taskBufferErrorDiag = "killed by Panda server : upstream job failed" - dJob.modificationTime = dJob.endTime - dJob.stateChangeTime = dJob.endTime - # insert - self.cur.execute(sqlDJI + comment, dJob.valuesMap()) - # update files,metadata,parametes - varMap = {} - varMap[":PandaID"] = downID - varMap[":modificationTime"] = dJob.modificationTime - self.cur.execute(sqlFMod + comment, varMap) - self.cur.execute(sqlMMod + comment, varMap) - self.cur.execute(sqlPMod + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - return True - except Exception: - # roll back - self._rollback(True) - errtype, errvalue = sys.exc_info()[:2] - _logger.error(f"deleteStalledJobs : {errtype} {errvalue}") - return False - # update Job status in jobsActive def updateJobStatus(self, pandaID, jobStatus, param, updateStateChange=False, attemptNr=None): comment = " /* DBProxy.updateJobStatus */" @@ -5135,199 +5060,6 @@ def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup): _logger.error(f"setDebugMode : {errtype} {errvalue}") return None - # get PandaID with destinationDBlock - def getPandaIDwithDestDBlock(self, destinationDBlock): - comment = " /* DBProxy.getPandaIDwithDestDBlock */" - _logger.debug(f"getPandaIDwithDestDBlock : {destinationDBlock}") - try: - sqlP = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab " - sqlP += "WHERE type IN (:type1,:type2) AND destinationDBlock=:destinationDBlock AND rownum<=1" - # start transaction - self.conn.begin() - pandaID = None - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - varMap[":destinationDBlock"] = destinationDBlock - # select - self.cur.arraysize = 10 - self.cur.execute(sqlP + comment, varMap) - res = self.cur.fetchone() - # append - if res is not None: - (pandaID,) = res - # commit to release tables - if not self._commit(): - raise RuntimeError("Commit error") - # return - return pandaID - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"getPandaIDwithDestDBlock : {errType} {errValue}") - # return empty list - return None - - # get destSE with destinationDBlock - def getDestSEwithDestDBlock(self, destinationDBlock): - comment = " /* DBProxy.getDestSEwithDestDBlock */" - _logger.debug(f"getDestSEwithDestDBlock : {destinationDBlock}") - try: - sqlP = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ destinationSE,destinationDBlockToken FROM ATLAS_PANDA.filesTable4 tab " - sqlP += "WHERE type IN (:type1,:type2) AND destinationDBlock=:destinationDBlock AND rownum<=1" - # start transaction - self.conn.begin() - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - varMap[":destinationDBlock"] = destinationDBlock - # select - self.cur.arraysize = 10 - self.cur.execute(sqlP + comment, varMap) - res = self.cur.fetchone() - # append - destinationSE = None - destinationDBlockToken = None - if res is not None: - destinationSE, destinationDBlockToken = res - # commit to release tables - if not self._commit(): - raise RuntimeError("Commit error") - # return - return destinationSE, destinationDBlockToken - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"getDestSEwithDestDBlock : {errType} {errValue}") - # return empty list - return None, None - - # get number of activated/defined jobs with output datasets - def getNumWaitingJobsWithOutDS(self, outputDSs): - comment = " /* DBProxy.getNumWaitingJobsWithOutDS */" - _logger.debug(f"getNumWaitingJobsWithOutDS : {str(outputDSs)}") - try: - sqlD = "SELECT distinct destinationDBlock FROM ATLAS_PANDA.filesTable4 " - sqlD += "WHERE type IN (:type1,:type2) AND dataset=:dataset AND status IN (:status1,:status2)" - sqlP = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab " - sqlP += "WHERE type IN (:type1,:type2) AND destinationDBlock=:destinationDBlock AND status IN (:status1,:status2) AND rownum<=1" - sqlJ = "SELECT jobDefinitionID,taskID,prodUserName,jobStatus,prodSourceLabel FROM %s " - sqlJ += "WHERE PandaID=:PandaID" - sqlC = "SELECT count(*) FROM ATLAS_PANDA.jobsActive4 " - sqlC += "WHERE jobDefinitionID=:jobDefinitionID AND prodUserName=:prodUserName AND jobStatus IN (:jobStatus1)" - # start transaction - self.conn.begin() - # get sub datasets - subDSList = [] - for outputDS in outputDSs: - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - varMap[":status1"] = "unknown" - varMap[":status2"] = "pending" - varMap[":dataset"] = outputDS - # select - self.cur.arraysize = 1000 - self.cur.execute(sqlD + comment, varMap) - resList = self.cur.fetchall() - # append - for (destinationDBlock,) in resList: - subDSList.append(destinationDBlock) - # get PandaIDs - pandaIDs = [] - for subDS in subDSList: - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - varMap[":status1"] = "unknown" - varMap[":status2"] = "pending" - varMap[":destinationDBlock"] = subDS - # select - self.cur.arraysize = 10 - self.cur.execute(sqlP + comment, varMap) - res = self.cur.fetchone() - # append - if res is not None: - (pandaID,) = res - pandaIDs.append(pandaID) - # commit to release tables - if not self._commit(): - raise RuntimeError("Commit error") - # loop over all PandaIDs - jobInfos = [] - for pandaID in pandaIDs: - varMap = {} - varMap[":PandaID"] = pandaID - # start transaction - self.conn.begin() - # get jobID,nJobs,jobStatus,userName - res = None - for table in ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4"]: - # select - self.cur.arraysize = 10 - self.cur.execute((sqlJ % table) + comment, varMap) - res = self.cur.fetchone() - if res is not None: - break - # commit to release tables - if not self._commit(): - raise RuntimeError("Commit error") - # not found - if res is None: - continue - # append - jobInfos.append(res) - # no jobs - if jobInfos == []: - _logger.error("getNumWaitingJobsWithOutDS : no jobs found") - return False, {} - # loop over all jobIDs - retMap = {} - for jobID, taskID, prodUserName, jobStatus, prodSourceLabel in jobInfos: - if jobID in retMap: - continue - retMap[jobID] = {} - retMap[jobID]["nJobs"] = taskID - retMap[jobID]["sourceLabel"] = prodSourceLabel - # don't check # of activated - if jobStatus in ["defined"]: - retMap[jobID]["activated"] = False - retMap[jobID]["nActs"] = 0 - continue - retMap[jobID]["activated"] = True - # get # of activated jobs - varMap = {} - varMap[":prodUserName"] = prodUserName - varMap[":jobDefinitionID"] = jobID - varMap[":jobStatus1"] = "activated" - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10 - self.cur.execute(sqlC + comment, varMap) - res = self.cur.fetchone() - # commit to release tables - if not self._commit(): - raise RuntimeError("Commit error") - if res is None: - _logger.error(f"getNumWaitingJobsWithOutDS : cannot get # of activated for {jobID}:{prodUserName}") - return False, {} - # set # of activated - (nActs,) = res - retMap[jobID]["nActs"] = nActs - # return - _logger.debug(f"getNumWaitingJobsWithOutDS -> {str(retMap)}") - return True, retMap - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"getNumWaitingJobsWithOutDS : {errType} {errValue}") - # return empty list - return False, {} - # lock jobs for reassign def lockJobsForReassign( self, @@ -5546,55 +5278,6 @@ def lockJobsForActivator(self, timeLimit, rownum, prio): # return empty return False, [] - # query PandaID - def queryPandaID(self, jobDefID): - comment = " /* DBProxy.queryPandaID */" - _logger.debug(f"queryPandaID : {jobDefID}") - sql0 = "SELECT PandaID,attemptNr FROM %s WHERE attemptNr=(" - sql0 += "SELECT MAX(attemptNr) FROM %s" - sql1 = " WHERE prodSourceLabel=:prodSourceLabel AND jobDefinitionID=:jobDefinitionID)" - sql1 += " AND prodSourceLabel=:prodSourceLabel AND jobDefinitionID=:jobDefinitionID" - try: - ids = [] - # select - varMap = {} - varMap[":jobDefinitionID"] = jobDefID - varMap[":prodSourceLabel"] = "managed" - for table in [ - "ATLAS_PANDA.jobsDefined4", - "ATLAS_PANDA.jobsActive4", - "ATLAS_PANDA.jobsArchived4", - "ATLAS_PANDA.jobsWaiting4", - ]: - # start transaction - self.conn.begin() - # select - sql = sql0 % (table, table) + sql1 - self.cur.arraysize = 10 - self.cur.execute(sql + comment, varMap) - res = self.cur.fetchall() - ids += list(res) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # look for the latest attempt - preAtt = -1 - pandaID = None - for pID, att in ids: - if att > preAtt: - pandaID = pID - preAtt = att - if att == preAtt: - if pandaID < pID: - pandaID = pID - return pandaID - except Exception: - type, value, traceBack = sys.exc_info() - _logger.error(f"queryPandaID : {type} {value}") - # roll back - self._rollback() - return None - # add metadata def addMetadata(self, pandaID, metadata, newStatus): comment = " /* DBProxy.addMetaData */" @@ -6059,29 +5742,6 @@ def updateDataset(self, datasets, withLock, withCriteria, criteriaMap): _logger.error(f"updateDataset() : {type} {value}") return [] - # delete dataset - def deleteDataset(self, name): - comment = " /* DBProxy.deleteDataset */" - sql1 = "DELETE /*+ INDEX(tab DATASETS_NAME_IDX)*/ FROM ATLAS_PANDA.Datasets tab WHERE name=:name" - try: - # start transaction - self.conn.begin() - # delete - varMap = {} - varMap[":name"] = name - self.cur.execute(sql1 + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - return True - except Exception: - # roll back - self._rollback() - # error - type, value, traceBack = sys.exc_info() - _logger.error(f"deleteDataset() : {type} {value}") - return False - # trigger cleanup of internal datasets used by a task def trigger_cleanup_internal_datasets(self, task_id: int) -> bool: """ @@ -6180,56 +5840,6 @@ def getSerialNumber(self, datasetname, definedFreshFlag=None): _logger.error(f"getSerialNumber() : {type} {value}") return (-1, False) - # query files with map - def queryFilesWithMap(self, map): - comment = " /* DBProxy.queryFilesWithMap */" - _logger.debug("queryFilesWithMap()") - sql1 = f"SELECT PandaID,{FileSpec.columnNames()} FROM ATLAS_PANDA.filesTable4" - varMap = {} - for key in map: - if len(varMap) == 0: - sql1 += f" WHERE {key}=:{key}" - else: - sql1 += f" AND {key}=:{key}" - varMap[f":{key}"] = map[key] - nTry = 3 - for iTry in range(nTry): - try: - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10000 - self.cur.execute(sql1 + comment, varMap) - res = self.cur.fetchall() - _logger.debug(f"queryFilesWithMap() : {str(res)}") - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # instantiate files - retList = [] - for item in res: - # instantiate dummy JobSpec obj for PandaID - job = JobSpec() - job.PandaID = item[0] - # instantiate file - file = FileSpec() - file.pack(item[1:]) - # set owner - file.setOwner(job) - # append - retList.append(file) - return retList - except Exception: - # roll back - self._rollback() - if iTry + 1 < nTry: - _logger.debug(f"queryFilesWithMap retry : {iTry}") - time.sleep(random.randint(10, 20)) - continue - type, value, traceBack = sys.exc_info() - _logger.error(f"queryFilesWithMap : {type} {value}") - return [] - # count the number of files with map def countFilesWithMap(self, map): comment = " /* DBProxy.countFilesWithMap */" @@ -6270,111 +5880,12 @@ def countFilesWithMap(self, map): _logger.error(f"countFilesWithMap({map}) : {type} {value}") return -1 - # count the number of pending files - def countPendingFiles(self, pandaID, forInput=True): - comment = " /* DBProxy.countPendingFiles */" - varMap = {} - varMap[":pandaID"] = pandaID - varMap[":status"] = "ready" - if forInput: - sql1 = "SELECT COUNT(*) FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:pandaID AND type=:type AND status<>:status " - varMap[":type"] = "input" - else: - sql1 = "SELECT COUNT(*) FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:pandaID AND type IN (:type1,:type2) AND status<>:status " - varMap[":type1"] = "output" - varMap[":type2"] = "log" - try: - # start transaction - self.conn.begin() - # select - _logger.debug(f"countPendingFiles : {pandaID} start") - self.cur.arraysize = 10 - retS = self.cur.execute(sql1 + comment, varMap) - res = self.cur.fetchone() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - nFiles = -1 - if res is not None: - nFiles = res[0] - _logger.debug(f"countPendingFiles : {pandaID} -> {nFiles}") - return nFiles - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"countPendingFiles : {pandaID} : {errType} {errValue}") - return -1 - - # get datasets associated with file - def getDatasetWithFile(self, lfn, jobPrioity=0): - comment = " /* DBProxy.getDatasetWithFile */" - varMap = {} - varMap[":lfn"] = lfn - varMap[":status1"] = "pending" - varMap[":status2"] = "transferring" - sql1 = "SELECT PandaID,status,destinationDBlock,destinationDBlockToken,dispatchDBlock FROM ATLAS_PANDA.filesTable4 " - sql1 += "WHERE lfn=:lfn AND status IN (:status1,:status2) AND modificationTime {str(retMap)}") - return retMap - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"getDatasetWithFile : {lfn} : {errType} {errValue}") - return {} - - # update input files and return corresponding PandaIDs - def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""): - comment = " /* DBProxy.updateInFilesReturnPandaIDs */" - _logger.debug(f"updateInFilesReturnPandaIDs({dataset},{fileLFN})") - sql0 = "SELECT /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE status<>:status AND dispatchDBlock=:dispatchDBlock" - sql1 = "UPDATE /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status=:status WHERE status<>:status AND dispatchDBlock=:dispatchDBlock" + # update input files and return corresponding PandaIDs + def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""): + comment = " /* DBProxy.updateInFilesReturnPandaIDs */" + _logger.debug(f"updateInFilesReturnPandaIDs({dataset},{fileLFN})") + sql0 = "SELECT /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ row_ID,PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE status<>:status AND dispatchDBlock=:dispatchDBlock" + sql1 = "UPDATE /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status=:status WHERE status<>:status AND dispatchDBlock=:dispatchDBlock" varMap = {} varMap[":status"] = status varMap[":dispatchDBlock"] = dataset @@ -6416,45 +5927,6 @@ def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""): _logger.error(f"updateInFilesReturnPandaIDs : {type} {value}") return [] - # update file status in dispatch dataset - def updateFileStatusInDisp(self, dataset, fileStatusMap): - comment = " /* DBProxy.updateFileStatusInDisp */" - _logger.debug(f"updateFileStatusInDisp({dataset},{fileStatusMap})") - sql1 = "UPDATE /*+ index(tab FILESTABLE4_DISPDBLOCK_IDX) */ ATLAS_PANDA.filesTable4 tab SET status=:status WHERE dispatchDBlock=:dispatchDBlock AND lfn=:lfn" - nTry = 1 - for iTry in range(nTry): - try: - # start transaction - self.conn.begin() - # update - for status in fileStatusMap: - lfns = fileStatusMap[status] - varMap = {} - varMap[":status"] = status - varMap[":dispatchDBlock"] = dataset - # loop over all files - for lfn in lfns: - varMap["lfn"] = lfn - # update - retU = self.cur.execute(sql1 + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # return - _logger.debug("updateFileStatusInDisp : done") - return True - except Exception: - # roll back - self._rollback() - # error report - if iTry + 1 < nTry: - _logger.debug(f"updateFileStatusInDisp retry : {iTry}") - time.sleep(random.randint(5, 10)) - continue - type, value, traceBack = sys.exc_info() - _logger.error(f"updateFileStatusInDisp : {type} {value}") - return False - # update output files and return corresponding PandaIDs def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""): comment = " /* DBProxy.updateOutFilesReturnPandaIDs */" @@ -6597,84 +6069,6 @@ def setGUIDs(self, files): _logger.error(f"setGUIDs : {type} {value}") return False - # query PandaID with Datasets - def queryPandaIDwithDataset(self, datasets): - comment = " /* DBProxy.queryPandaIDwithDataset */" - _logger.debug(f"queryPandaIDwithDataset({datasets})") - if len(datasets) == 0: - return [] - # make SQL query - sql1 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock GROUP BY PandaID" - # execute - try: - retList = [] - for dataset in datasets: - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10000 - varMap = {} - varMap[":destinationDBlock"] = dataset - self.cur.execute(sql1 + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # get IDs - for r in res: - retList.append(r[0]) - # return - _logger.debug(f"queryPandaIDwithDataset : {str(retList)}") - return retList - except Exception: - # roll back - self._rollback() - # error report - type, value, traceBack = sys.exc_info() - _logger.error(f"queryPandaIDwithDataset : {type} {value}") - return [] - - # query PandaID with filenames - def queryPandaIDwithLFN(self, vlfns): - comment = " /* DBProxy.queryPandaIDwithLFN */" - _logger.debug(f"queryPandaIDwithLFN({vlfns})") - if len(vlfns) == 0: - return [] - # make SQL query - sql1 = "SELECT PandaID FROM ATLAS_PANDA.filesTable4 WHERE lfn=:lfn GROUP BY PandaID" - # execute - retList = [] - for lfn in vlfns: - # get generic LFNs - gLFN = re.sub("\.\d+$", "", lfn) - # try - try: - # start transaction - self.conn.begin() - # select - varMap = {} - varMap[":lfn"] = gLFN - self.cur.arraysize = 10000 - self.cur.execute(sql1 + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # append IDs - for (tmpID,) in res: - if tmpID not in retList: - retList.append(tmpID) - except Exception: - # roll back - self._rollback() - # error report - type, value, traceBack = sys.exc_info() - _logger.error(f"queryPandaIDwithLFN : {type} {value}") - return [] - # return - _logger.debug(f"queryPandaIDwithLFN : {str(retList)}") - return retList - # get job statistics def getJobStatistics( self, @@ -6783,334 +6177,50 @@ def getJobStatistics( self.cur.arraysize = 10000 if table == jobs_active_4_table: sqlExeTmp = (sqlMV + comment) % f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS" - else: - sqlExeTmp = (sql0 + comment) % table - _logger.debug(f"getJobStatistics : {sqlExeTmp} {str(varMap)}") - self.cur.execute(sqlExeTmp, varMap) - else: - varMap[":modificationTime"] = timeLimit - self.cur.arraysize = 10000 - self.cur.execute(sqlA + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - for computingSite, jobStatus, nJobs in res: - # FIXME - # ignore some job status since they break APF - if jobStatus in ["merging"]: - continue - if computingSite not in ret: - ret[computingSite] = {} - if jobStatus not in ret[computingSite]: - ret[computingSite][jobStatus] = 0 - ret[computingSite][jobStatus] += nJobs - # for zero - stateList = ["assigned", "activated", "running"] - if archived: - stateList += ["finished", "failed"] - for site in ret: - for state in stateList: - if state not in ret[site]: - ret[site][state] = 0 - # return - _logger.debug(f"getJobStatistics -> {str(ret)}") - return ret - except Exception: - # roll back - self._rollback() - if iTry + 1 < nTry: - _logger.debug(f"getJobStatistics() retry : {iTry}") - time.sleep(2) - continue - type, value, traceBack = sys.exc_info() - _logger.error(f"getJobStatistics : {type} {value}") - return {} - - # get job statistics for brokerage - def getJobStatisticsBrokerage(self, minPriority=None, maxPriority=None): - comment = " /* DBProxy.getJobStatisticsBrokerage */" - _logger.debug(f"getJobStatisticsBrokerage(min={minPriority} max={maxPriority})") - sql0 = "SELECT cloud,computingSite,jobStatus,processingType,COUNT(*) FROM %s WHERE " - sql0 += "prodSourceLabel IN (:prodSourceLabel1) " - tmpPrioMap = {} - if minPriority is not None: - sql0 += "AND currentPriority>=:minPriority " - tmpPrioMap[":minPriority"] = minPriority - sql0 += "GROUP BY cloud,computingSite,jobStatus,processingType" - # sql for materialized view - sqlMV = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql0) - sqlMV = re.sub(":minPriority", "TRUNC(:minPriority,-1)", sqlMV) - sqlMV = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sqlMV) - tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4"] - if minPriority is not None or maxPriority is not None: - # read the number of running jobs with prio<=MIN - tables.append("ATLAS_PANDA.jobsActive4") - sqlMVforRun = re.sub("currentPriority>=", "currentPriority<=", sqlMV) - ret = {} - nTry = 3 - iActive = 0 - for iTry in range(nTry): - try: - for table in tables: - # start transaction - self.conn.begin() - # select - varMap = {} - varMap[":prodSourceLabel1"] = "managed" - for tmpPrio in tmpPrioMap: - varMap[tmpPrio] = tmpPrioMap[tmpPrio] - self.cur.arraysize = 10000 - useRunning = None - if table == "ATLAS_PANDA.jobsActive4": - # first count non-running and then running if minPriority is specified - if minPriority is not None: - if iActive == 0: - useRunning = False - else: - useRunning = True - iActive += 1 - if useRunning in [None, False]: - self.cur.execute( - (sqlMV + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS", - varMap, - ) - else: - # use maxPriority to avoid underestimation of running jobs - if minPriority is not None and maxPriority is not None: - varMap[":minPriority"] = maxPriority - self.cur.execute( - (sqlMVforRun + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS", - varMap, - ) - else: - self.cur.execute((sql0 + comment) % table, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - for cloud, computingSite, jobStatus, processingType, count in res: - # check jobstatus if minPriority isspecified - if minPriority is not None: - # count the number of non-running with prio>=MIN - if useRunning is True and jobStatus != "running": - continue - # count the number of running with prio<=MIN - if useRunning is False and jobStatus == "running": - continue - # add cloud - ret.setdefault(cloud, {}) - # add site - ret[cloud].setdefault(computingSite, {}) - # add processingType - ret[cloud][computingSite].setdefault(processingType, {}) - # add jobStatus - if jobStatus not in ret[cloud][computingSite][processingType]: - ret[cloud][computingSite][processingType][jobStatus] = count - # for zero - for cloud in ret: - cloudVal = ret[cloud] - for site in cloudVal: - siteVal = cloudVal[site] - for pType in siteVal: - typeVal = siteVal[pType] - for stateItem in [ - "assigned", - "activated", - "running", - "transferring", - ]: - if stateItem not in typeVal: - typeVal[stateItem] = 0 - # return - _logger.debug(f"getJobStatisticsBrokerage -> {str(ret)}") - return ret - except Exception: - # roll back - self._rollback() - if iTry + 1 < nTry: - _logger.debug(f"getJobStatisticsBrokerage retry : {iTry}") - time.sleep(2) - continue - type, value, traceBack = sys.exc_info() - _logger.error(f"getJobStatisticsBrokerage : {type} {value}") - return {} - - # get job statistics for analysis brokerage - def getJobStatisticsAnalBrokerage(self, minPriority=None): - comment = " /* DBProxy.getJobStatisticsAnalBrokerage */" - _logger.debug(f"getJobStatisticsAnalBrokerage({minPriority})") - sql0 = "SELECT computingSite,jobStatus,processingType,COUNT(*) FROM %s WHERE " - sql0 += "prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) " - if minPriority is not None: - sql0 += "AND currentPriority>=:minPriority " - sql0 += "GROUP BY cloud,computingSite,jobStatus,processingType" - # sql for materialized view - sqlMV = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sql0) - sqlMV = re.sub(":minPriority", "TRUNC(:minPriority,-1)", sqlMV) - sqlMV = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sqlMV) - tables = ["ATLAS_PANDA.jobsActive4", "ATLAS_PANDA.jobsDefined4"] - ret = {} - nTry = 3 - for iTry in range(nTry): - try: - for table in tables: - # start transaction - self.conn.begin() - # select - varMap = {} - varMap[":prodSourceLabel1"] = "user" - varMap[":prodSourceLabel2"] = "panda" - if minPriority is not None: - varMap[":minPriority"] = minPriority - self.cur.arraysize = 10000 - if table == "ATLAS_PANDA.jobsActive4": - self.cur.execute( - (sqlMV + comment) % "ATLAS_PANDA.MV_JOBSACTIVE4_STATS", - varMap, - ) - else: - self.cur.execute((sql0 + comment) % table, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - for computingSite, jobStatus, processingType, count in res: - # add site - if computingSite not in ret: - ret[computingSite] = {} - # add processingType - if processingType not in ret[computingSite]: - ret[computingSite][processingType] = {} - # add jobStatus - if jobStatus not in ret[computingSite][processingType]: - ret[computingSite][processingType][jobStatus] = count - # for zero - for site in ret: - siteVal = ret[site] - for pType in siteVal: - typeVal = siteVal[pType] - for stateItem in [ - "defined", - "assigned", - "activated", - "running", - ]: - if stateItem not in typeVal: - typeVal[stateItem] = 0 - # return - _logger.debug(f"getJobStatisticsAnalBrokerage -> {str(ret)}") - return ret - except Exception: - # roll back - self._rollback() - if iTry + 1 < nTry: - _logger.debug(f"getJobStatisticsAnalBrokerage retry : {iTry}") - time.sleep(2) - continue - type, value, traceBack = sys.exc_info() - _logger.error(f"getJobStatisticsAnalBrokerage : {type} {value}") - return {} - - # get computingSite and destinationSE for a dataset - def getDestSE(self, dsname, fromArch=False): - comment = " /* DBProxy.getDestSE */" - _logger.debug(f"getDestSE({dsname},{fromArch})") - sql0 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock " - if not fromArch: - sql0 += "AND status=:status " - sql0 += "AND rownum=1" - sql1 = "SELECT computingSite,destinationSE FROM %s WHERE PandaID=:PandaID" - actTableList = ["ATLAS_PANDA.jobsActive4"] - if fromArch: - actTableList.append("ATLAS_PANDA.jobsArchived4") - try: - # start transaction - self.conn.begin() - # select - varMap = {} - if not fromArch: - varMap[":status"] = "transferring" - varMap[":destinationDBlock"] = dsname - self.cur.arraysize = 10 - self.cur.execute(sql0 + comment, varMap) - res = self.cur.fetchall() - # get PandaID - pandaID = None - if len(res) != 0: - pandaID = res[0][0] - # get computingSite and destinationSE - destSE = None, None - if pandaID is not None: - varMap = {} - varMap[":PandaID"] = pandaID - # loop over all active tables - foundInActive = False - for actTable in actTableList: - self.cur.execute((sql1 % actTable) + comment, varMap) - res = self.cur.fetchall() - if len(res) != 0: - destSE = res[0] - foundInActive = True - break - # look into ARCH table - if not foundInActive: - if fromArch: - sqlA = "SELECT computingSite,destinationSE FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID " - sqlA += "AND modificationTime>(CURRENT_DATE-30) " - self.cur.execute(sqlA + comment, varMap) - res = self.cur.fetchall() - if len(res) != 0: - destSE = res[0] - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # return - _logger.debug(f"getDestSE({dsname}) : {str(destSE)}") - return destSE - except Exception: - # roll back - self._rollback() - type, value, traceBack = sys.exc_info() - _logger.error(f"getDestSE : {type} {value}") - return None, None - - # get destinationDBlockToken for a dataset - def getDestTokens(self, dsname): - comment = " /* DBProxy.getDestTokens */" - _logger.debug(f"getDestTokens({dsname})") - sql0 = "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ destinationDBlockToken FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock AND rownum=1" - try: - # start transaction - self.conn.begin() - # select - varMap = {} - varMap[":destinationDBlock"] = dsname - self.cur.arraysize = 10 - self.cur.execute(sql0 + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - retToken = None - if len(res) != 0: - retToken = res[0][0] - # convert None to NULL - if retToken is None: - retToken = "NULL" - # return - _logger.debug(f"getDestTokens({dsname}) : {retToken}") - return retToken - except Exception: - # roll back - self._rollback() - type, value, traceBack = sys.exc_info() - _logger.error(f"getDestTokens : {type} {value}") - return None + else: + sqlExeTmp = (sql0 + comment) % table + _logger.debug(f"getJobStatistics : {sqlExeTmp} {str(varMap)}") + self.cur.execute(sqlExeTmp, varMap) + else: + varMap[":modificationTime"] = timeLimit + self.cur.arraysize = 10000 + self.cur.execute(sqlA + comment, varMap) + res = self.cur.fetchall() + # commit + if not self._commit(): + raise RuntimeError("Commit error") + # create map + for computingSite, jobStatus, nJobs in res: + # FIXME + # ignore some job status since they break APF + if jobStatus in ["merging"]: + continue + if computingSite not in ret: + ret[computingSite] = {} + if jobStatus not in ret[computingSite]: + ret[computingSite][jobStatus] = 0 + ret[computingSite][jobStatus] += nJobs + # for zero + stateList = ["assigned", "activated", "running"] + if archived: + stateList += ["finished", "failed"] + for site in ret: + for state in stateList: + if state not in ret[site]: + ret[site][state] = 0 + # return + _logger.debug(f"getJobStatistics -> {str(ret)}") + return ret + except Exception: + # roll back + self._rollback() + if iTry + 1 < nTry: + _logger.debug(f"getJobStatistics() retry : {iTry}") + time.sleep(2) + continue + type, value, traceBack = sys.exc_info() + _logger.error(f"getJobStatistics : {type} {value}") + return {} # get the number of job for a user def getNumberJobsUser(self, dn, workingGroup=None): @@ -7380,41 +6490,6 @@ def getJobStatisticsPerProcessingType(self, useMorePG=False): _logger.error(f"getJobStatisticsPerProcessingType : {type} {value}") return {} - # get number of activated analysis jobs - def getNAnalysisJobs(self, nProcesses): - comment = " /* DBProxy.getNAnalysisJobs */" - _logger.debug(f"getNAnalysisJobs({nProcesses})") - sql0 = "SELECT computingSite,COUNT(*) FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus=:jobStatus " - sql0 += "AND (prodSourceLabel=:prodSourceLabel1 OR prodSourceLabel=:prodSourceLabel2) GROUP BY computingSite" - varMap = {} - varMap[":jobStatus"] = "activated" - varMap[":prodSourceLabel1"] = "user" - varMap[":prodSourceLabel2"] = "panda" - ret = {} - try: - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10000 - self.cur.execute(sql0 + comment) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - for item in res: - ret[item[0]] = float(item[1]) / nProcesses - # return - _logger.debug(f"getNAnalysisJobs() : {str(ret)}") - return ret - except Exception: - # roll back - self._rollback() - # error - type, value, traceBack = sys.exc_info() - _logger.error(f"getNAnalysisJobs : {type} {value}") - return {} - # update site data def updateSiteData(self, hostID, pilotRequests, interval): comment = " /* DBProxy.updateSiteData */" @@ -7779,41 +6854,6 @@ def insertnRunningInSiteData(self): _logger.error(f"insertnRunningInSiteData : {type} {value}") return False - # get nRunning in site data - def getnRunningInSiteData(self): - comment = " /* DBProxy.getnRunningInSiteData */" - _logger.debug("getnRunningInSiteData start") - sqlMax = "SELECT SITE,RUNNING FROM ATLAS_PANDAMETA.SiteData WHERE HOURS=:HOURS AND FLAG=:FLAG" - try: - # get nRunning - varMap = {} - varMap[":FLAG"] = "max" - varMap[":HOURS"] = 0 - # start transaction - self.conn.begin() - self.cur.arraysize = 10000 - # get - self.cur.execute(sqlMax + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # loop over all sites - retMap = {} - for computingSite, maxnRunning in res: - retMap[computingSite] = maxnRunning - # commit - if not self._commit(): - raise RuntimeError("Commit error") - _logger.debug("getnRunningInSiteData done") - return retMap - except Exception: - # roll back - self._rollback() - type, value, traceBack = sys.exc_info() - _logger.error(f"getnRunningInSiteData : {type} {value}") - return {} - # get site info def getSiteInfo(self): comment = " /* DBProxy.getSiteInfo */" @@ -8558,35 +7598,6 @@ def getUserParameter(self, dn, jobID, jobsetID): self._rollback() return retJobID, retJobsetID, retStatus - # get JobID for user - def getJobIdUser(self, dn): - comment = " /* DBProxy.getJobIdUser */" - _logger.debug(f"getJobIdUser {dn}") - jobID = 0 - try: - # set autocommit on - self.conn.begin() - # select - name = self.cleanUserID(dn) - sql = "SELECT jobid FROM ATLAS_PANDAMETA.users WHERE name=:name" - varMap = {} - varMap[":name"] = name - self.cur.arraysize = 10 - self.cur.execute(sql + comment, varMap) - res = self.cur.fetchone() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - if res is not None: - (jobID,) = res - _logger.debug(f"getJobIdUser {name} -> {jobID}") - except Exception: - errType, errValue = sys.exc_info()[:2] - _logger.error(f"getJobIdUser : {errType} {errValue}") - # roll back - self._rollback() - return jobID - # check ban user def checkBanUser(self, dn, sourceLabel, jediCheck=False): comment = " /* DBProxy.checkBanUser */" @@ -8975,146 +7986,6 @@ def peekJobLog(self, pandaID, days=None): # return None return None - # check status of all sub datasets to trigger Notifier - def checkDatasetStatusForNotifier(self, jobsetID, jobDefinitionID, prodUserName): - comment = " /* DBProxy.checkDatasetStatusForNotifier */" - _logger.debug(f"checkDatasetStatusForNotifier({jobsetID},{jobDefinitionID},{prodUserName})") - try: - # get PandaIDs to get all associated destinationDBlocks - varMap = {} - varMap[":jobsetID"] = jobsetID - varMap[":prodUserName"] = prodUserName - sql = "SELECT MAX(PandaID),jobDefinitionID FROM %s WHERE prodUserName=:prodUserName AND jobsetID=:jobsetID GROUP BY jobDefinitionID" - pandaIDs = {} - for table in [ - "ATLAS_PANDA.jobsArchived4", - "ATLAS_PANDA.jobsActive4", - "ATLAS_PANDA.jobsDefined4", - "ATLAS_PANDA.jobsWaiting4", - ]: - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 1000 - self.cur.execute((sql % table) + comment, varMap) - resSs = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # get PandaIDs - for tmpPandaID, tmpJobDefID in resSs: - if tmpJobDefID not in pandaIDs or tmpPandaID > pandaIDs[tmpJobDefID]: - pandaIDs[tmpJobDefID] = tmpPandaID - # get all destinationDBlocks - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - sql = "SELECT DISTINCT destinationDBlock FROM ATLAS_PANDA.filesTable4 WHERE PandaID=:PandaID AND type IN (:type1,:type2)" - datasetMap = {} - # start transaction - self.conn.begin() - self.cur.arraysize = 1000 - for tmpJobDefID in pandaIDs: - tmpPandaID = pandaIDs[tmpJobDefID] - varMap[":PandaID"] = tmpPandaID - # select - self.cur.execute(sql + comment, varMap) - resSs = self.cur.fetchall() - # get destinationDBlock - for (tmpDestDBlock,) in resSs: - if tmpJobDefID not in datasetMap: - datasetMap[tmpJobDefID] = [] - if tmpDestDBlock not in datasetMap[tmpJobDefID]: - datasetMap[tmpJobDefID].append(tmpDestDBlock) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # check dataset status - allClosed = True - retInfo = {} - latestUpdate = None - latestJobDefID = None - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - sql = "SELECT status,modificationDate FROM ATLAS_PANDA.Datasets WHERE name=:name AND type IN (:type1,:type2)" - sqlJ = "SELECT MAX(modificationTime) FROM ATLAS_PANDA.jobsArchived4 " - sqlJ += "WHERE prodUserName=:prodUserName AND jobDefinitionID=:jobDefinitionID" - # start transaction - self.conn.begin() - self.cur.arraysize = 1000 - for tmpJobDefID in datasetMap: - tmpDatasets = datasetMap[tmpJobDefID] - retInfo[tmpJobDefID] = [] - for tmpDataset in tmpDatasets: - if tmpDataset not in retInfo[tmpJobDefID]: - retInfo[tmpJobDefID].append(tmpDataset) - varMap[":name"] = tmpDataset - # select - self.cur.execute(sql + comment, varMap) - resSs = self.cur.fetchall() - # check status and mod time - for tmpStatus, tmpModificationDate in resSs: - _logger.debug( - "checkDatasetStatusForNotifier(%s,%s) %s has %s with %s at %s" - % ( - jobsetID, - jobDefinitionID, - tmpJobDefID, - tmpDataset, - tmpStatus, - tmpModificationDate, - ) - ) - if tmpStatus not in ["closed", "tobeclosed", "completed"]: - # some datasets are still active - allClosed = False - _logger.debug(f"checkDatasetStatusForNotifier({jobsetID},{jobDefinitionID}) wait due to {tmpJobDefID} {tmpDataset} {tmpStatus}") - break - elif tmpStatus == "tobeclosed": - # select latest modificationTime in job table - varMapJ = {} - varMapJ[":prodUserName"] = prodUserName - varMapJ[":jobDefinitionID"] = tmpJobDefID - self.cur.execute(sqlJ + comment, varMapJ) - resJ = self.cur.fetchone() - if resJ is None: - # error - allClosed = False - _logger.error(f"checkDatasetStatusForNotifier({jobsetID},{jobDefinitionID}) {tmpJobDefID} cannot find job") - break - (tmpModificationTime,) = resJ - _logger.debug(f"checkDatasetStatusForNotifier({jobsetID},{jobDefinitionID}) {tmpJobDefID} modtime:{tmpModificationTime}") - if latestUpdate is None or latestUpdate < tmpModificationTime: - # use the latest updated jobDefID - latestUpdate = tmpModificationTime - latestJobDefID = tmpJobDefID - elif latestUpdate == tmpModificationTime and latestJobDefID < tmpJobDefID: - # use larger jobDefID when datasets are closed at the same time - latestJobDefID = tmpJobDefID - # escape - if not allClosed: - break - # escape - if not allClosed: - break - # commit - if not self._commit(): - raise RuntimeError("Commit error") - _logger.debug( - f"checkDatasetStatusForNotifier({jobsetID},{jobDefinitionID}) -> all:{allClosed} {latestJobDefID} latest:{jobDefinitionID == latestJobDefID}" - ) - # return - if not allClosed or jobDefinitionID != latestJobDefID: - return False, {} - return True, retInfo - except Exception: - # roll back - self._rollback() - errType, errValue = sys.exc_info()[:2] - _logger.error(f"checkDatasetStatusForNotifier : {errType} {errValue}") - return False, {} - # record status change def recordStatusChange(self, pandaID, jobStatus, jobInfo=None, infoMap={}, useCommit=True, no_late_bulk_exec=True, extracted_sqls=None): comment = " /* DBProxy.recordStatusChange */" @@ -13142,44 +12013,6 @@ def changeTaskPriorityPanda(self, jediTaskID, newPriority): self.dumpErrorMessage(_logger, methodName) return None - # throttle job - def throttleJob(self, pandaID): - comment = " /* DBProxy.throttleJob */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - _logger.debug(f"{methodName} start") - try: - # sql to update job - sqlT = "UPDATE ATLAS_PANDA.jobsActive4 SET currentPriority=assignedPriority,jobStatus=:newJobStatus " - sqlT += "WHERE PandaID=:PandaID AND jobStatus=:oldJobStatus " - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10 - varMap = {} - varMap[":PandaID"] = pandaID - varMap[":newJobStatus"] = "throttled" - varMap[":oldJobStatus"] = "activated" - # get datasets - self.cur.execute(sqlT + comment, varMap) - nRow = self.cur.rowcount - # commit - if not self._commit(): - raise RuntimeError("Commit error") - try: - self.recordStatusChange(pandaID, varMap[":newJobStatus"]) - except Exception: - _logger.error("recordStatusChange in throttleJob") - self.push_job_status_message(None, pandaID, varMap[":newJobStatus"]) - _logger.debug(f"{methodName} done with {nRow}") - return nRow - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return None - # throttle user jobs def throttleUserJobs(self, prodUserName, workingGroup, get_dict): comment = " /* DBProxy.throttleUserJobs */" @@ -13292,44 +12125,6 @@ def throttleUserJobs(self, prodUserName, workingGroup, get_dict): self.dumpErrorMessage(_logger, methodName) return None - # unthrottle job - def unThrottleJob(self, pandaID): - comment = " /* DBProxy.unThrottleJob */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - _logger.debug(f"{methodName} start") - try: - # sql to update job - sqlT = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:newJobStatus " - sqlT += "WHERE PandaID=:PandaID AND jobStatus=:oldJobStatus " - # start transaction - self.conn.begin() - # select - self.cur.arraysize = 10 - varMap = {} - varMap[":PandaID"] = pandaID - varMap[":newJobStatus"] = "activated" - varMap[":oldJobStatus"] = "throttled" - # get datasets - self.cur.execute(sqlT + comment, varMap) - nRow = self.cur.rowcount - # commit - if not self._commit(): - raise RuntimeError("Commit error") - try: - self.recordStatusChange(pandaID, varMap[":newJobStatus"]) - except Exception: - _logger.error("recordStatusChange in unThrottleJob") - self.push_job_status_message(None, pandaID, varMap[":newJobStatus"]) - _logger.debug(f"{methodName} done with {nRow}") - return nRow - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return None - # unthrottle user jobs def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict): comment = " /* DBProxy.unThrottleUserJobs */" @@ -14329,78 +13124,40 @@ def increaseAttemptNrPanda(self, jediTaskID, increasedNr): self._rollback() # error self.dumpErrorMessage(tmpLog, methodName) - return None, "DB error" - - # get jediTaskID from taskName - def getTaskIDwithTaskNameJEDI(self, userName, taskName): - comment = " /* DBProxy.getTaskIDwithTaskNameJEDI */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - _logger.debug(f"{methodName} : start") - try: - # begin transaction - self.conn.begin() - # sql to get jediTaskID - sqlGF = f"SELECT MAX(jediTaskID) FROM {panda_config.schemaJEDI}.JEDI_Tasks " - sqlGF += "WHERE userName=:userName AND taskName=:taskName " - varMap = {} - varMap[":userName"] = userName - varMap[":taskName"] = taskName - self.cur.execute(sqlGF + comment, varMap) - resFJ = self.cur.fetchone() - if resFJ is not None: - (jediTaskID,) = resFJ - else: - jediTaskID = None - # commit - if not self._commit(): - raise RuntimeError("Commit error") - _logger.debug(f"{methodName} : jediTaskID={jediTaskID}") - return jediTaskID - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return None + return None, "DB error" - # get prodSourceLabel from TaskID - def getProdSourceLabelwithTaskID(self, jediTaskID): - comment = " /* DBProxy.getProdSourceLabelwithTaskID */" + # get jediTaskID from taskName + def getTaskIDwithTaskNameJEDI(self, userName, taskName): + comment = " /* DBProxy.getTaskIDwithTaskNameJEDI */" methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " + methodName += f" " _logger.debug(f"{methodName} : start") try: # begin transaction self.conn.begin() # sql to get jediTaskID - sqlGF = f"SELECT prodSourceLabel, tasktype FROM {panda_config.schemaJEDI}.JEDI_Tasks " - sqlGF += "WHERE jediTaskID=:jediTaskID " + sqlGF = f"SELECT MAX(jediTaskID) FROM {panda_config.schemaJEDI}.JEDI_Tasks " + sqlGF += "WHERE userName=:userName AND taskName=:taskName " varMap = {} - varMap[":jediTaskID"] = jediTaskID + varMap[":userName"] = userName + varMap[":taskName"] = taskName self.cur.execute(sqlGF + comment, varMap) resFJ = self.cur.fetchone() if resFJ is not None: - prodSourceLabel, task_type = resFJ - else: - prodSourceLabel, task_type = None, None - - if task_type: - job_label = JobUtils.translate_tasktype_to_jobtype(task_type) + (jediTaskID,) = resFJ else: - job_label = None - + jediTaskID = None # commit if not self._commit(): raise RuntimeError("Commit error") - _logger.debug(f"{methodName} : jediTaskID={jediTaskID} prodSourceLabel={prodSourceLabel} job_label={job_label}") - return prodSourceLabel, job_label + _logger.debug(f"{methodName} : jediTaskID={jediTaskID}") + return jediTaskID except Exception: # roll back self._rollback() # error self.dumpErrorMessage(_logger, methodName) - return None, None + return None # update error dialog for a jediTaskID def updateTaskErrorDialogJEDI(self, jediTaskID, msg): @@ -15363,63 +14120,6 @@ def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recover self.dumpErrorMessage(_logger, methodName) return False, None, None - # get input datasets for output dataset - def getInputDatasetsForOutputDatasetJEDI(self, datasetName): - comment = " /* DBProxy.getInputDatasetsForOutputDatasetJEDI */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - tmpLog = LogWrapper(_logger, methodName) - tmpLog.debug("start") - try: - toSkip = False - inputDatasets = {} - # begin transaction - self.conn.begin() - # get jediTaskID - varMap = {} - varMap[":type1"] = "log" - varMap[":type2"] = "output" - varMap[":name1"] = datasetName - varMap[":name2"] = datasetName.split(":")[-1] - sqlGI = f"SELECT jediTaskID,datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets " - sqlGI += "WHERE type IN (:type1,:type2) AND datasetName IN (:name1,:name2) " - self.cur.execute(sqlGI + comment, varMap) - resGI = self.cur.fetchall() - # use the largest datasetID since broken tasks might have been retried - jediTaskID = None - datasetID = None - for tmpJediTaskID, tmpDatasetID in resGI: - if jediTaskID is None or jediTaskID < tmpJediTaskID: - jediTaskID = tmpJediTaskID - datasetID = tmpDatasetID - elif datasetID < tmpDatasetID: - datasetID = tmpDatasetID - if jediTaskID is None: - tmpLog.debug("jediTaskID not found") - toSkip = True - if not toSkip: - # get input datasets - sqlID = f"SELECT datasetID,datasetName,masterID FROM {panda_config.schemaJEDI}.JEDI_Datasets " - sqlID += "WHERE jediTaskID=:jediTaskID AND type=:type " - varMap = {} - varMap[":jediTaskID"] = jediTaskID - varMap[":type"] = "input" - self.cur.execute(sqlID + comment, varMap) - resID = self.cur.fetchall() - for tmpDatasetID, tmpDatasetName, tmpMasterID in resID: - inputDatasets[tmpDatasetID] = tmpDatasetName - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmpLog.debug("done") - return True, inputDatasets - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return False, None - # record retry history def recordRetryHistoryJEDI(self, jediTaskID, newPandaID, oldPandaIDs, relationType, no_late_bulk_exec=True, extracted_sqls=None): comment = " /* DBProxy.recordRetryHistoryJEDI */" @@ -15990,94 +14690,6 @@ def requestTaskParameterRecalculation(self, taskID): tmpLog.debug("Forced recalculation of CPUTime") return rowcount - # throttle jobs for resource shares - def throttleJobsForResourceShare(self, site): - comment = " /* DBProxy.throttleJobsForResourceShare */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - tmpLog = LogWrapper(_logger, methodName) - tmpLog.debug("start") - try: - # sql to throttle jobs - sql = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:newStatus " - sql += "WHERE computingSite=:site AND jobStatus=:oldStatus AND lockedby=:lockedby " - varMap = {} - varMap[":site"] = site - varMap[":lockedby"] = "jedi" - varMap[":newStatus"] = "throttled" - varMap[":oldStatus"] = "activated" - # begin transaction - self.conn.begin() - self.cur.execute(sql + comment, varMap) - nRow = self.cur.rowcount - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmpLog.debug(f"throttled {nRow} jobs") - return nRow - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return None - - # activate jobs for resource shares - def activateJobsForResourceShare(self, site, nJobsPerQueue): - comment = " /* DBProxy.activateJobsForResourceShare */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" " - tmpLog = LogWrapper(_logger, methodName) - tmpLog.debug("start") - try: - # sql to get jobs - sqlJ = "SELECT PandaID,jobStatus FROM (" - sqlJ += "SELECT PandaID,jobStatus," - sqlJ += "ROW_NUMBER() OVER(PARTITION BY workqueue_id ORDER BY currentPriority DESC,PandaID) AS row_number " - sqlJ += "FROM ATLAS_PANDA.jobsActive4 " - sqlJ += "WHERE computingSite=:site AND lockedby=:lockedby AND jobStatus IN (:st1,:st2) " - sqlJ += ") " - sqlJ += f"WHERE row_number<={nJobsPerQueue} " - # sql to activate jobs - sqlA = "UPDATE ATLAS_PANDA.jobsActive4 SET jobStatus=:newStatus " - sqlA += "WHERE PandaID=:PandaID AND jobStatus=:oldStatus " - varMap = {} - varMap[":site"] = site - varMap[":lockedby"] = "jedi" - varMap[":st1"] = "throttled" - varMap[":st2"] = "activated" - # begin transaction - self.conn.begin() - self.cur.execute(sqlJ + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - resList = self.cur.fetchall() - nRow = 0 - for pandaID, jobStatus in resList: - if jobStatus == "activated": - continue - # activate job - varMap = {} - varMap[":PandaID"] = pandaID - varMap[":newStatus"] = "activated" - varMap[":oldStatus"] = "throttled" - self.conn.begin() - self.cur.execute(sqlA + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - nRow += self.cur.rowcount - self.push_job_status_message(None, pandaID, varMap[":newStatus"]) - tmpLog.debug(f"activated {nRow} jobs") - return nRow - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return None - # add associate sub datasets for single consumer job def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets): comment = " /* DBProxy.getDestDBlocksWithSingleConsumer */" @@ -16511,8 +15123,8 @@ def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize): return {} # get task parameters - def getTaskPramsPanda(self, jediTaskID): - comment = " /* DBProxy.getTaskPramsPanda */" + def getTaskParamsPanda(self, jediTaskID): + comment = " /* DBProxy.getTaskParamsPanda */" methodName = comment.split(" ")[-2].split(".")[-1] methodName += f" " tmpLog = LogWrapper(_logger, methodName) @@ -17492,42 +16104,6 @@ def convertObjIDtoEndPoint(self, srcFileName, objID): self.dumpErrorMessage(_logger, methodName) return None - # get OS IDs - def getObjIDs(self, jediTaskID, pandaID): - comment = " /* DBProxy.getObjIDs */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper( - _logger, - methodName + f" ", - ) - tmpLog.debug("start") - try: - # sql to get obj IDs - sql = f"SELECT distinct objstore_ID FROM {panda_config.schemaJEDI}.JEDI_Events " - sql += "WHERE jediTaskID=:jediTaskID AND PandaID=:PandaID AND objstore_ID IS NOT NULL " - # begin transaction - self.conn.begin() - # get - varMap = {} - varMap[":PandaID"] = pandaID - varMap[":jediTaskID"] = jediTaskID - self.cur.execute(sql + comment, varMap) - resF = self.cur.fetchall() - resList = [] - for (objstoreID,) in resF: - resList.append(objstoreID) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmpLog.debug(f"got {len(resList)} OS IDs") - return resList - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, methodName) - return [] - # bulk fetch PandaIDs def bulk_fetch_panda_ids(self, num_ids): comment = " /* JediDBProxy.bulk_fetch_panda_ids */" @@ -18975,57 +17551,6 @@ def get_resource_type_job(self, job_spec): tmp_log.debug(f"done. resource_type is {resource_type}") return resource_type - # update stat of workers - def reportWorkerStats(self, harvesterID, siteName, paramsList): - comment = " /* DBProxy.reportWorkerStats */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper( - _logger, - methodName + f" < harvesterID={harvesterID} siteName={siteName} >", - ) - tmpLog.debug("start") - tmpLog.debug(f"params={str(paramsList)}") - try: - # load new site data - paramsList = json.loads(paramsList) - # set autocommit on - self.conn.begin() - # delete old site data - sqlDel = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats " - sqlDel += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName " - varMap = dict() - varMap[":harvesterID"] = harvesterID - varMap[":siteName"] = siteName - self.cur.execute(sqlDel + comment, varMap) - # insert new site data - sqlI = "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID,computingSite,resourceType,jobType,status,n_workers,lastUpdate) " - sqlI += "VALUES (:harvester_ID,:siteName,:resourceType,:jobType,:status,:n_workers,CURRENT_DATE) " - for resourceType in paramsList: - params = paramsList[resourceType] - if resourceType == "Undefined": - continue - for status in params: - n_workers = params[status] - varMap = dict() - varMap[":harvester_ID"] = harvesterID - varMap[":siteName"] = siteName - varMap[":status"] = status - varMap[":resourceType"] = resourceType - varMap[":n_workers"] = n_workers - varMap[":jobType"] = "DUMMY" - self.cur.execute(sqlI + comment, varMap) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # return - tmpLog.debug("done") - return True, "OK" - except Exception: - # roll back - self._rollback() - self.dumpErrorMessage(tmpLog, methodName) - return False, "database error" - # update stat of workers with jobtype breakdown def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list): comment = " /* DBProxy.reportWorkerStats_jobtype */" @@ -19238,42 +17763,6 @@ def commandToHarvester( self.dumpErrorMessage(tmpLog, methodName) return False - # get activated job statistics per resource - def getActivatedJobStatisticsPerResource(self, siteName): - comment = " /* DBProxy.getJobStatisticsPerResource */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper(_logger, methodName + f" < siteName={siteName} >") - tmpLog.debug("start") - try: - # get stats - sqlS = "SELECT resource_type,COUNT(*) FROM ATLAS_PANDA.jobsActive4 WHERE computingSite=:computingSite AND jobStatus=:status " - sqlS += "GROUP BY resource_type " - # start transaction - self.conn.begin() - # select - varMap = dict() - varMap[":computingSite"] = siteName - varMap[":status"] = "activated" - self.cur.arraysize = 10000 - self.cur.execute(sqlS + comment, varMap) - res = self.cur.fetchall() - # commit - if not self._commit(): - raise RuntimeError("Commit error") - # create map - returnMap = dict() - for resourceType, cnt in res: - # add - returnMap[resourceType] = cnt - # return - tmpLog.debug(f"got {returnMap}") - return returnMap - except Exception: - # roll back - self._rollback() - self.dumpErrorMessage(tmpLog, methodName) - return [] - # check Job status def checkJobStatus(self, pandaID): comment = " /* DBProxy.checkJobStatus */" @@ -19450,65 +17939,6 @@ def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, reso self.dumpErrorMessage(tmpLog, methodName) return False - # get active harvesters - def getActiveHarvesters(self, interval): - comment = " /* DBProxy.getActiveHarvesters */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper(_logger, methodName) - tmpLog.debug("start") - try: - timeNow = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - # sql to get instances - sqlC = "SELECT harvester_ID FROM ATLAS_PANDA.Harvester_Instances " - sqlC += "WHERE startTime>:timeLimit " - # get instances - self.conn.begin() - self.cur.arraysize = 10000 - varMap = dict() - varMap[":timeLimit"] = timeNow - datetime.timedelta(minutes=interval) - self.cur.execute(sqlC + comment, varMap) - res = self.cur.fetchall() - retList = [] - for (harvester_ID,) in res: - retList.append(harvester_ID) - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmpLog.debug(f"done with {str(retList)}") - return retList - except Exception: - # roll back - self._rollback() - self.dumpErrorMessage(tmpLog, methodName) - return [] - - # get minimal resource - def getMinimalResource(self): - comment = " /* JediDBProxy.getMinimalResource */" - method_name = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper(_logger, method_name) - tmpLog.debug("start") - try: - timeNow = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - # sql to get minimal - sqlC = "SELECT resource_name FROM ATLAS_PANDA.resource_types " - sqlC += "ORDER BY mincore, (CASE WHEN maxrampercore IS NULL THEN 1 ELSE 0 END), maxrampercore " - # get instances - self.conn.begin() - self.cur.execute(sqlC + comment) - res = self.cur.fetchone() - (resourceName,) = res - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmpLog.debug(f"got {resourceName}") - return resourceName - except Exception: - # roll back - self._rollback() - self.dumpErrorMessage(tmpLog, method_name) - return None - def ups_get_queues(self): """ Identify unified pilot streaming (ups) queues: served in pull (late binding) model @@ -21303,56 +19733,6 @@ def get_config_for_pq(self, pq_name): tmp_log.debug("done") return pq_data_des - # get old job metadata - def get_old_job_metadata(self, job_spec): - comment = " /* DBProxy.get_old_job_metadata */" - methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" < PandaID={job_spec.PandaID}>" - # sql to get master datasetIDs - sqlM = f"SELECT datasetID FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL" - # sql to get old PandaIDs - sqlI = f"SELECT MAX(PandaID) FROM {panda_config.schemaPANDA}.filesTable4 WHERE jediTaskID=:jedTaskID AND datasetID=:datasetID AND fileID=:fileID " - # sql to get associated fileIDs - sqlF = f"SELECT fileID FROM {panda_config.schemaPANDA}.fileTable4 WHERE PandaID=:PandaID AND type IN (:type1,:type2) " - try: - # get master datasetIDs - varMap = dict() - varMap[":jediTaskID"] = job_spec.jediTaskID - varMap[":type1"] = "input" - varMap[":type2"] = "pseudo_input" - self.cur.execute(sqlM + comment, varMap) - datasetIDs = set() - [datasetIDs.add(datasetID) for datasetID, in self.cur.fetchall()] - # loop over all files - fileIDs_checked = set() - pandaIDs = set() - for fileSpec in job_spec.Files: - if fileSpec.type not in ["input", "pseudo_input"]: - continue - if fileSpec.datasetID not in datasetIDs: - continue - if fileSpec.fileID in fileIDs_checked: - continue - fileIDs_checked.add(fileSpec.fileID) - # get old PandaIDs - varMap = dict() - varMap[":jediTaskID"] = job_spec.jediTaskID - varMap[":datasetID"] = fileSpec.datasetID - varMap[":fileID"] = fileSpec.fileID - self.cur.execute(sqlI + comment, varMap) - for (pandaID,) in self.cur.fetchall(): - pandaIDs.add(pandaID) - # get associated fileIDs - varMap = dict() - varMap[":PandaID"] = pandaID - varMap[":type1"] = "input" - varMap[":type2"] = "pseudo_input" - self.cur.execute(sqlF + comment, varMap) - [fileIDs_checked.add(fileID) for fileID, in self.cur.fetchall()] - except Exception: - self.dumpErrorMessage(_logger, methodName) - raise - # lock process def lockProcess_PANDA(self, component, pid, time_limit, force=False): comment = " /* DBProxy.lockProcess_PANDA */" diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index eeb8475f..c4737fce 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -642,17 +642,6 @@ def lockJobsForActivator(self, timeLimit, rownum, prio): return res - # get number of activated/defined jobs with output datasets - def getNumWaitingJobsWithOutDS(self, outputDSs): - # get DB proxy - proxy = self.proxyPool.getProxy() - # exec - res = proxy.getNumWaitingJobsWithOutDS(outputDSs) - # release DB proxy - self.proxyPool.putProxy(proxy) - - return res - # update overall job information def updateJobs( self, @@ -836,16 +825,6 @@ def archiveJobs(self, jobs, inJobsDefined, fromJobsWaiting=False): self.proxyPool.putProxy(proxy) return returns - # delete stalled jobs - def deleteStalledJobs(self, libFileName): - # get DB proxy - proxy = self.proxyPool.getProxy() - # execute - ret = proxy.deleteStalledJobs(libFileName) - # release proxy - self.proxyPool.putProxy(proxy) - return ret - # set debug mode def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup): # get DB proxy @@ -1360,17 +1339,6 @@ def updateInFilesReturnPandaIDs(self, dataset, status, fileLFN=""): return retList - # update file status in dispatch dataset - def updateFileStatusInDisp(self, dataset, fileStatusMap): - # get DBproxy - proxy = self.proxyPool.getProxy() - # query PandaID - retVal = proxy.updateFileStatusInDisp(dataset, fileStatusMap) - # release proxy - self.proxyPool.putProxy(proxy) - - return retVal - # update output files and return corresponding PandaIDs def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""): # get DBproxy @@ -1383,17 +1351,6 @@ def updateOutFilesReturnPandaIDs(self, dataset, fileLFN=""): return retList - # get datasets associated with file - def getDatasetWithFile(self, lfn, jobPrioity=0): - # get DBproxy - proxy = self.proxyPool.getProxy() - # query PandaID - retList = proxy.getDatasetWithFile(lfn, jobPrioity) - # release proxy - self.proxyPool.putProxy(proxy) - - return retList - # get _dis datasets associated to _sub def getAssociatedDisDatasets(self, subDsName): # get DBproxy @@ -1486,28 +1443,6 @@ def setGUIDs(self, files): return ret - # query PandaID with dataset - def queryPandaIDwithDataset(self, datasets): - # get DBproxy - proxy = self.proxyPool.getProxy() - # query Dataset - ret = proxy.queryPandaIDwithDataset(datasets) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - - # query PandaID with filenames - def queryPandaIDwithLFN(self, lfns): - # get DBproxy - proxy = self.proxyPool.getProxy() - # query Dataset - ret = proxy.queryPandaIDwithLFN(lfns) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # update dataset def updateDatasets(self, datasets, withLock=False, withCriteria="", criteriaMap={}): # get DBproxy @@ -1526,31 +1461,6 @@ def trigger_cleanup_internal_datasets(self, task_id: int) -> bool: self.proxyPool.putProxy(proxy) return ret - # delete dataset - def deleteDatasets(self, datasets): - # get DBproxy - proxy = self.proxyPool.getProxy() - retList = [] - # query Dataset - for dataset in datasets: - ret = proxy.deleteDataset(dataset) - retList.append(ret) - # release proxy - self.proxyPool.putProxy(proxy) - - return retList - - # query files with map - def queryFilesWithMap(self, map): - # get DBproxy - proxy = self.proxyPool.getProxy() - # query files - ret = proxy.queryFilesWithMap(map) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # count the number of files with map def countFilesWithMap(self, map): # get DBproxy @@ -1562,17 +1472,6 @@ def countFilesWithMap(self, map): return ret - # count the number of pending files - def countPendingFiles(self, pandaID, forInput=True): - # get DBproxy - proxy = self.proxyPool.getProxy() - # count files - ret = proxy.countPendingFiles(pandaID, forInput) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # get serial number for dataset def getSerialNumber(self, datasetname, definedFreshFlag=None): # get DBproxy @@ -1633,28 +1532,6 @@ def extractScope(self, name): return ret - # get destinationDBlockToken for a dataset - def getDestTokens(self, dsname): - # get DBproxy - proxy = self.proxyPool.getProxy() - # get token - ret = proxy.getDestTokens(dsname) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - - # get destinationSE for a dataset - def getDestSE(self, dsname, fromArch=False): - # get DBproxy - proxy = self.proxyPool.getProxy() - # get token - ret = proxy.getDestSE(dsname, fromArch) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # get job statistics def getJobStatistics( self, @@ -1683,33 +1560,6 @@ def getJobStatistics( return ret - # get job statistics for brokerage - def getJobStatisticsBrokerage(self, minPrio=None, maxPrio=None): - # get DBproxy - proxy = self.proxyPool.getProxy() - # get stat - ret = proxy.getJobStatisticsBrokerage(minPrio, maxPrio) - # release proxy - self.proxyPool.putProxy(proxy) - # convert - conRet = ProcessGroups.countJobsPerGroup(ret) - - return conRet - - # get job statistics for analysis brokerage - def getJobStatisticsAnalBrokerage(self, minPriority=None): - # get DBproxy - proxy = self.proxyPool.getProxy() - # get stat - ret = proxy.getJobStatisticsAnalBrokerage(minPriority=minPriority) - # release proxy - self.proxyPool.putProxy(proxy) - # convert - conRet = ProcessGroups.countJobsPerGroupForAnal(ret) - - return conRet - - # get job statistics for ExtIF def getJobStatisticsForExtIF(self, sourcetype=None): # get DBproxy proxy = self.proxyPool.getProxy() @@ -1731,17 +1581,6 @@ def getJobStatisticsForBamboo(self, useMorePG=False): return ret - # get number of activated analysis jobs - def getNAnalysisJobs(self, nProcesses): - # get DBproxy - proxy = self.proxyPool.getProxy() - # count - ret = proxy.getNAnalysisJobs(nProcesses) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # update site data def updateSiteData(self, hostID, pilotRequests, interval=3): # get DBproxy @@ -1775,17 +1614,6 @@ def insertnRunningInSiteData(self): return ret - # get nRunning in site data - def getnRunningInSiteData(self): - # get DBproxy - proxy = self.proxyPool.getProxy() - # get serial number - ret = proxy.getnRunningInSiteData() - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # get site info def getSiteInfo(self): # get DBproxy @@ -1918,28 +1746,6 @@ def checkQuota(self, dn): return ret - # get JobID for user - def getJobIdUser(self, dn): - # query an SQL return Status - proxy = self.proxyPool.getProxy() - # get - ret = proxy.getJobIdUser(dn) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - - # check status of all sub datasets to trigger Notifier - def checkDatasetStatusForNotifier(self, jobsetID, jobDefinitionID, prodUserName): - # query an SQL return Status - proxy = self.proxyPool.getProxy() - # get - ret = proxy.checkDatasetStatusForNotifier(jobsetID, jobDefinitionID, prodUserName) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # insert TaskParams def insertTaskParamsPanda( self, @@ -2123,17 +1929,6 @@ def changeTaskPriorityPanda(self, jediTaskID, newPriority): return ret - # throttle job - def throttleJob(self, pandaID): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.throttleJob(pandaID) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # throttle user jobs def throttleUserJobs(self, prodUserName, workingGroup, get_dict=False): # get proxy @@ -2145,17 +1940,6 @@ def throttleUserJobs(self, prodUserName, workingGroup, get_dict=False): return ret - # unthrottle job - def unThrottleJob(self, pandaID): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.unThrottleJob(pandaID) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # unthrottle user jobs def unThrottleUserJobs(self, prodUserName, workingGroup, get_dict=False): # get proxy @@ -2233,17 +2017,6 @@ def getTaskIDwithTaskNameJEDI(self, userName, taskName): return ret - # get prodSourceLabel from task ID - def getProdSourceLabelwithTaskID(self, taskID): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.getProdSourceLabelwithTaskID(taskID) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # update error dialog for a jediTaskID def updateTaskErrorDialogJEDI(self, jediTaskID, msg): # get proxy @@ -2332,17 +2105,6 @@ def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recover return ret - # get input datasets for output dataset - def getInputDatasetsForOutputDatasetJEDI(self, datasetName): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.getInputDatasetsForOutputDatasetJEDI(datasetName) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # copy file records def copy_file_records(self, new_lfns, file_spec): # get proxy @@ -2409,28 +2171,6 @@ def requestTaskParameterRecalculation(self, taskID): return ret - # throttle jobs for resource shares - def throttleJobsForResourceShare(self, site): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.throttleJobsForResourceShare(site) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - - # activate jobs for resource shares - def activateJobsForResourceShare(self, site, nJobsPerQueue): - # get proxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.activateJobsForResourceShare(site, nJobsPerQueue) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # add associate sub datasets for single consumer job def getDestDBlocksWithSingleConsumer(self, jediTaskID, PandaID, ngDatasets): # get proxy @@ -2487,11 +2227,11 @@ def getDispatchDatasetsPerUser(self, vo, prodSourceLabel, onlyActive, withSize): return ret # get task parameters - def getTaskPramsPanda(self, jediTaskID): + def getTaskParamsPanda(self, jediTaskID): # get proxy proxy = self.proxyPool.getProxy() # exec - ret = proxy.getTaskPramsPanda(jediTaskID) + ret = proxy.getTaskParamsPanda(jediTaskID) # release proxy self.proxyPool.putProxy(proxy) @@ -2574,17 +2314,6 @@ def convertObjIDtoEndPoint(self, srcFileName, ObjID): return res - # get OS IDs - def getObjIDs(self, jediTaskID, pandaID): - # get DB proxy - proxy = self.proxyPool.getProxy() - # exec - res = proxy.getObjIDs(jediTaskID, pandaID) - # release DB proxy - self.proxyPool.putProxy(proxy) - - return res - # get task status def getTaskStatus(self, jediTaskID): # get DB proxy @@ -2697,7 +2426,7 @@ def getTaskParamsMap(self, jediTaskID): # get DB proxy proxy = self.proxyPool.getProxy() # exec - res = proxy.getTaskPramsPanda(jediTaskID) + res = proxy.getTaskParamsPanda(jediTaskID) # release DB proxy self.proxyPool.putProxy(proxy) @@ -2770,17 +2499,6 @@ def getResourceTypes(self): return res - # report stat of workers - def reportWorkerStats(self, harvesterID, siteName, paramsList): - # get DB proxy - proxy = self.proxyPool.getProxy() - # exec - res = proxy.reportWorkerStats(harvesterID, siteName, paramsList) - # release DB proxy - self.proxyPool.putProxy(proxy) - - return res - # report stat of workers def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList): # get DB proxy @@ -2814,17 +2532,6 @@ def releaseCommandLockHarvester(self, harvester_ID, command, computingSite, reso return res - # get active harvesters - def getActiveHarvesters(self, interval): - # get DB proxy - proxy = self.proxyPool.getProxy() - # exec - res = proxy.getActiveHarvesters(interval) - # release DB proxy - self.proxyPool.putProxy(proxy) - - return res - # update workers def updateWorkers(self, harvesterID, data): """ @@ -2923,17 +2630,6 @@ def get_resource_type_job(self, job_spec): return ret_val - # get activated job statistics per resource - def getActivatedJobStatisticsPerResource(self, siteName): - # get DBproxy - proxy = self.proxyPool.getProxy() - # exec - ret_val = proxy.getActivatedJobStatisticsPerResource(siteName) - # release proxy - self.proxyPool.putProxy(proxy) - - return ret_val - # check Job status def checkJobStatus(self, pandaIDs): try: @@ -2963,17 +2659,6 @@ def getWorkerStats(self): return ret - # get minimal resource - def getMinimalResource(self): - # get DBproxy - proxy = self.proxyPool.getProxy() - # exec - ret = proxy.getMinimalResource() - # release proxy - self.proxyPool.putProxy(proxy) - - return ret - # get unified pilot streaming queues def ups_get_queues(self): # get DBproxy diff --git a/pandaserver/userinterface/UserIF.py b/pandaserver/userinterface/UserIF.py index 44252bec..e1227f11 100644 --- a/pandaserver/userinterface/UserIF.py +++ b/pandaserver/userinterface/UserIF.py @@ -415,7 +415,7 @@ def retryTask( # convert to dict newParams = PrioUtil.decodeJSON(newParams) # get original params - taskParams = self.taskBuffer.getTaskPramsPanda(jediTaskID) + taskParams = self.taskBuffer.getTaskParamsPanda(jediTaskID) taskParamsJson = PrioUtil.decodeJSON(taskParams) # replace with new values for newKey in newParams: @@ -580,10 +580,6 @@ def harvesterIsAlive(self, user, host, harvesterID, data): def getWorkerStats(self): return self.taskBuffer.getWorkerStats() - # report stat of workers - def reportWorkerStats(self, harvesterID, siteName, paramsList): - return self.taskBuffer.reportWorkerStats(harvesterID, siteName, paramsList) - # report stat of workers def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList): return self.taskBuffer.reportWorkerStats_jobtype(harvesterID, siteName, paramsList) @@ -1642,16 +1638,6 @@ def getWorkerStats(req): return json.dumps(ret) -# report stat of workers -def reportWorkerStats(req, harvesterID, siteName, paramsList): - # check security - if not isSecure(req): - return json.dumps((False, MESSAGE_SSL)) - # update - ret = userIF.reportWorkerStats(harvesterID, siteName, paramsList) - return json.dumps(ret) - - # report stat of workers def reportWorkerStats_jobtype(req, harvesterID, siteName, paramsList): # check security