From 1e3447fbae32dc3600400ddf323f5e53f3c0a1e2 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Wed, 5 Dec 2018 23:52:45 +0100 Subject: [PATCH] * to take into account objstoreIDs of zip files --- ChangeLog.txt | 3 ++ pandaserver/config/panda_config.py | 4 ++ pandaserver/dataservice/AdderAtlasPlugin.py | 2 +- pandaserver/taskbuffer/OraDBProxy.py | 53 +++++++++++++++++++-- 4 files changed, 58 insertions(+), 4 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 577028855..e74814f6c 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,5 +1,8 @@ Release Notes +* 12/5/2018 + * to take into account objstoreIDs of zip files + * 11/14/2018 * one sub per job for ART diff --git a/pandaserver/config/panda_config.py b/pandaserver/config/panda_config.py index 11f34173a..dd01a9a11 100755 --- a/pandaserver/config/panda_config.py +++ b/pandaserver/config/panda_config.py @@ -53,6 +53,10 @@ if not tmpSelf.__dict__.has_key('certdir'): tmpSelf.__dict__['certdir'] = '/data/atlpan' +# endpoint map file +if not tmpSelf.__dict__.has_key('endpoint_mapfile'): + tmpSelf.__dict__['endpoint_mapfile'] = '/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json' + # schemas if not tmpSelf.__dict__.has_key('schemaPANDA'): tmpSelf.__dict__['schemaPANDA'] = 'ATLAS_PANDA' diff --git a/pandaserver/dataservice/AdderAtlasPlugin.py b/pandaserver/dataservice/AdderAtlasPlugin.py index c0734e326..75d941484 100644 --- a/pandaserver/dataservice/AdderAtlasPlugin.py +++ b/pandaserver/dataservice/AdderAtlasPlugin.py @@ -878,7 +878,7 @@ def registerEventServiceFiles(self): # get endpoint ID epID = int(fileSpec.destinationSE.split('/')[0]) # convert to DDM endpoint - rse = self.taskBuffer.convertObjIDtoEndPoint('/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json', + rse = self.taskBuffer.convertObjIDtoEndPoint(panda_config.endpoint_mapfile, epID) if rse['is_deterministic']: epName = rse['name'] diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 967c64042..e6f0a9f00 100755 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -20,6 +20,7 @@ import commands import traceback import warnings +import operator import ErrorCode import SiteSpec import CloudSpec @@ -16077,9 +16078,54 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment): if catchAll == None: catchAll = '' try: - objectstores = json.loads(objectstores) + if isFakeCJ: + objectstores = [] + else: + objectstores = json.loads(objectstores) except: objectstores = [] + # get objstoreIDs + sqlZIP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ " + sqlZIP += "DISTINCT zipRow_ID FROM {0}.JEDI_Events ".format(panda_config.schemaJEDI) + sqlZIP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID " + sqlZIP += "AND status=:esDone " + sqlOST = "SELECT fsize,destinationSE FROM {0}.filesTable4 ".format(panda_config.schemaPANDA) + sqlOST += "WHERE row_ID=:row_ID " + objStoreZipMap = dict() + zipRowIDs = set() + totalZipSize = 0 + for tmpFileSpec in jobSpec.Files: + if tmpFileSpec.type in ['input', 'pseudo_input']: + varMap = dict() + varMap[':jediTaskID'] = tmpFileSpec.jediTaskID + varMap[':datasetID'] = tmpFileSpec.datasetID + varMap[':fileID'] = tmpFileSpec.fileID + varMap[':esDone'] = EventServiceUtils.ST_done + self.cur.execute(sqlZIP+comment,varMap) + resZIP = self.cur.fetchall() + for zipRowID, in resZIP: + if zipRowID is None: + continue + if zipRowID in zipRowIDs: + continue + zipRowIDs.add(zipRowID) + # get file info + varMap = dict() + varMap[':row_ID'] = zipRowID + self.cur.execute(sqlOST+comment, varMap) + resOST = self.cur.fetchone() + tmpFsize, tmpDestSE = resOST + totalZipSize += tmpFsize + tmpRSE = self.convertObjIDtoEndPoint(panda_config.endpoint_mapfile, int(tmpDestSE.split('/')[0])) + if tmpRSE is not None: + objStoreZipMap.setdefault(tmpRSE['name'], 0) + objStoreZipMap[tmpRSE['name']] += tmpFsize + sortedOST = sorted(objStoreZipMap.items(), key=operator.itemgetter(1)) + sortedOST.reverse() + if len(sortedOST) > 0: + _logger.debug('{0} old objectstores {1}'.format(methodName, str(objectstores))) + objectstores = [{'ddmendpoint': sortedOST[0][0]}] + _logger.debug('{0} new objectstores {1}'.format(methodName, str(objectstores))) if isFakeCJ: # use nucleus for fake co-jumbo since they don't have sub datasets pass @@ -16126,7 +16172,7 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment): else: # use nucleus close to OS tmpNucleus = None - if isMergeAtOS and len(objectstores) > 0 and not isFakeCJ: + if isMergeAtOS and len(objectstores) > 0: osEndpoint = objectstores[0]['ddmendpoint'] sqlCO = "SELECT site_name FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:osEndpoint " varMap = dict() @@ -16135,10 +16181,11 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment): resCO = self.cur.fetchone() if resCO is not None: tmpNucleus, = resCO + _logger.info('{0} look for merge sites in nucleus:{1} close to pre-merged files'.format(methodName,tmpNucleus)) # use nucleus if tmpNucleus is None: tmpNucleus = jobSpec.destinationSE.split(':')[-1] - _logger.info('{0} look for merge sites in nucleus:{1}'.format(methodName,tmpNucleus)) + _logger.info('{0} look for merge sites in destination nucleus:{1}'.format(methodName,tmpNucleus)) # get sites in a nucleus sqlSN = "SELECT dr.panda_site_name,dr.ddm_endpoint_name " sqlSN += "FROM ATLAS_PANDA.panda_site ps,ATLAS_PANDAMETA.schedconfig sc,ATLAS_PANDA.panda_ddm_relation dr "