diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index ca3651d1c..b229fc24d 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.61" +release_version = "0.0.62" diff --git a/pandaserver/dataservice/AdderGen.py b/pandaserver/dataservice/AdderGen.py index 11d347af8..e1a5cbac0 100644 --- a/pandaserver/dataservice/AdderGen.py +++ b/pandaserver/dataservice/AdderGen.py @@ -655,13 +655,6 @@ def parseXML(self): lfns_set = set(lfns) fileList = [] for file in self.job.Files: - if file.lfn.startswith('regex|'): - target = re.sub(r'^[^|]+\|', '', file.lfn) - for tmp_lfn in lfns_set: - if re.search(target, tmp_lfn): - file.lfn = tmp_lfn - self.logger.debug(f'use new LFN {tmp_lfn} for {target}') - break fileList.append(file.lfn) if file.type == 'input': if file.lfn in lfns_set: @@ -742,34 +735,32 @@ def parseXML(self): def copyFilesForVariableNumOutputs(self,lfns): # get original output files origOutputs = {} - updateOrig = {} for tmpFile in self.job.Files: if tmpFile.type in ['output','log']: origOutputs[tmpFile.lfn] = tmpFile - if tmpFile.lfn in lfns: - # keep original - updateOrig[tmpFile.lfn] = False - else: - # overwrite original - updateOrig[tmpFile.lfn] = True # look for unkown files - addedNewFiles = False + orig_to_new_map = {} for newLFN in lfns: if newLFN not in origOutputs: # look for corresponding original output for origLFN in origOutputs: - tmpPatt = '^{0}\.*_\d+$'.format(origLFN) - if re.search(tmpPatt,newLFN) is not None: - # copy file record - tmpStat = self.taskBuffer.copyFileRecord(newLFN,origOutputs[origLFN],updateOrig[origLFN]) - if not tmpStat: - return False - addedNewFiles = True - # disable further overwriting - updateOrig[origLFN] = False + tmpPatt = r'^{0}\.*_\d+$'.format(origLFN) + regPatt = re.sub(r'^[^|]+\|', '', origLFN) + if re.search(tmpPatt,newLFN) or \ + (origLFN.startswith('regex|') and re.search(regPatt, newLFN)): + self.logger.debug(f'use new LFN {newLFN} for {origLFN}') + # collect new filenames + orig_to_new_map.setdefault(origLFN, []) + orig_to_new_map[origLFN].append(newLFN) break + # copy file records + for origLFN in orig_to_new_map: + tmpStat = self.taskBuffer.copy_file_records(orig_to_new_map[origLFN], + origOutputs[origLFN]) + if not tmpStat: + return False # refresh job info - if addedNewFiles: + if orig_to_new_map: self.job = self.taskBuffer.peekJobs([self.jobID],fromDefined=False, fromWaiting=False, forAnal=True)[0] diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 773ee76c5..c3c24b83e 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -18259,79 +18259,81 @@ def recordRetryHistoryJEDI(self,jediTaskID,newPandaID,oldPandaIDs,relationType): - # copy file record - def copyFileRecord(self,newLFN,fileSpec,updateOrig): - comment = ' /* DBProxy.copyFileRecord */' + # copy file records + def copy_file_records(self, new_lfns, file_spec): + comment = ' /* DBProxy.copy_file_records */' methodName = comment.split(' ')[-2].split('.')[-1] - methodName += " ".format(fileSpec.PandaID, fileSpec.lfn, newLFN, updateOrig) + methodName += " ".format(file_spec.PandaID, file_spec.lfn) tmpLog = LogWrapper(_logger,methodName) - tmpLog.debug("start") + tmpLog.debug("start with {} files".format(len(new_lfns))) try: - # reset rowID - tmpFileSpec = copy.copy(fileSpec) - tmpFileSpec.lfn = newLFN - if not updateOrig: - tmpFileSpec.row_ID = None # begin transaction self.conn.begin() - # insert file in JEDI - if not updateOrig and tmpFileSpec.jediTaskID not in [None,'NULL'] and tmpFileSpec.fileID not in ['', 'NULL', None]: - # get fileID - sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual " - self.cur.execute(sqlFileID+comment) - newFileID, = self.cur.fetchone() - # read file in JEDI - varMap = {} - varMap[':jediTaskID'] = tmpFileSpec.jediTaskID - varMap[':datasetID'] = tmpFileSpec.datasetID - varMap[':fileID'] = tmpFileSpec.fileID - sqlGI = 'SELECT * FROM {0}.JEDI_Dataset_Contents '.format(panda_config.schemaJEDI) - sqlGI += 'WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID ' - self.cur.execute(sqlGI+comment,varMap) - resGI = self.cur.fetchone() - tmpFileSpec.fileID = newFileID - if resGI is not None: - # make sql and map - sqlJI = "INSERT INTO {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI) - sqlJI += "VALUES (" + for idx_lfn, new_lfn in enumerate(new_lfns): + # reset rowID + tmpFileSpec = copy.copy(file_spec) + tmpFileSpec.lfn = new_lfn + if idx_lfn > 0: + tmpFileSpec.row_ID = None + # insert file in JEDI + if idx_lfn > 0 and tmpFileSpec.jediTaskID not in [None,'NULL'] and \ + tmpFileSpec.fileID not in ['', 'NULL', None]: + # get fileID + sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual " + self.cur.execute(sqlFileID+comment) + newFileID, = self.cur.fetchone() + # read file in JEDI varMap = {} - for columDesc,columVal in zip(self.cur.description,resGI): - columName = columDesc[0] - # overwrite fileID - if columName == 'FILEID': - columVal = tmpFileSpec.fileID - keyName = ':{0}'.format(columName) - varMap[keyName] = columVal - sqlJI += '{0},'.format(keyName) - sqlJI = sqlJI[:-1] - sqlJI += ") " - # insert file in JEDI - self.cur.execute(sqlJI+comment,varMap) - if not updateOrig: - # insert file in Panda - sqlFile = "INSERT INTO ATLAS_PANDA.filesTable4 ({0}) ".format(FileSpec.columnNames()) - sqlFile+= FileSpec.bindValuesExpression(useSeq=True) - varMap = tmpFileSpec.valuesMap(useSeq=True) - self.cur.execute(sqlFile+comment, varMap) - else: - # update LFN - sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn " - sqlFSF += "WHERE row_ID=:row_ID " - varMap = {} - varMap[':lfn'] = tmpFileSpec.lfn - varMap[':row_ID'] = tmpFileSpec.row_ID - self.cur.execute(sqlFSF+comment,varMap) - # update LFN in JEDI - if tmpFileSpec.fileID not in ['', 'NULL', None]: - sqlJF = "UPDATE {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI) - sqlJF += "SET lfn=:lfn " - sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID " - varMap = {} - varMap[':lfn'] = tmpFileSpec.lfn - varMap[':jediTaskID'] = tmpFileSpec.jediTaskID - varMap[':datasetID'] = tmpFileSpec.datasetID - varMap[':fileID'] = tmpFileSpec.fileID - self.cur.execute(sqlJF+comment,varMap) + varMap[':jediTaskID'] = tmpFileSpec.jediTaskID + varMap[':datasetID'] = tmpFileSpec.datasetID + varMap[':fileID'] = tmpFileSpec.fileID + sqlGI = 'SELECT * FROM {0}.JEDI_Dataset_Contents '.format(panda_config.schemaJEDI) + sqlGI += 'WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID ' + self.cur.execute(sqlGI+comment,varMap) + resGI = self.cur.fetchone() + tmpFileSpec.fileID = newFileID + if resGI is not None: + # make sql and map + sqlJI = "INSERT INTO {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI) + sqlJI += "VALUES (" + varMap = {} + for columDesc,columVal in zip(self.cur.description,resGI): + columName = columDesc[0] + # overwrite fileID + if columName == 'FILEID': + columVal = tmpFileSpec.fileID + keyName = ':{0}'.format(columName) + varMap[keyName] = columVal + sqlJI += '{0},'.format(keyName) + sqlJI = sqlJI[:-1] + sqlJI += ") " + # insert file in JEDI + self.cur.execute(sqlJI+comment,varMap) + if idx_lfn > 0: + # insert file in Panda + sqlFile = "INSERT INTO ATLAS_PANDA.filesTable4 ({0}) ".format(FileSpec.columnNames()) + sqlFile+= FileSpec.bindValuesExpression(useSeq=True) + varMap = tmpFileSpec.valuesMap(useSeq=True) + self.cur.execute(sqlFile+comment, varMap) + else: + # update LFN + sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn " + sqlFSF += "WHERE row_ID=:row_ID " + varMap = {} + varMap[':lfn'] = tmpFileSpec.lfn + varMap[':row_ID'] = tmpFileSpec.row_ID + self.cur.execute(sqlFSF+comment,varMap) + # update LFN in JEDI + if tmpFileSpec.fileID not in ['', 'NULL', None]: + sqlJF = "UPDATE {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI) + sqlJF += "SET lfn=:lfn " + sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID " + varMap = {} + varMap[':lfn'] = tmpFileSpec.lfn + varMap[':jediTaskID'] = tmpFileSpec.jediTaskID + varMap[':datasetID'] = tmpFileSpec.datasetID + varMap[':fileID'] = tmpFileSpec.fileID + self.cur.execute(sqlJF+comment,varMap) # commit if not self._commit(): raise RuntimeError('Commit error') diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index 65187e25a..3c89ccbd0 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -3013,12 +3013,12 @@ def getInputDatasetsForOutputDatasetJEDI(self,datasetName): - # copy file record - def copyFileRecord(self,newLFN,fileSpec,updateOrig): + # copy file records + def copy_file_records(self, new_lfns, file_spec): # get proxy proxy = self.proxyPool.getProxy() # exec - ret = proxy.copyFileRecord(newLFN,fileSpec,updateOrig) + ret = proxy.copy_file_records(new_lfns, file_spec) # release proxy self.proxyPool.putProxy(proxy) # return