Skip to content

Commit

Permalink
* to take into account objstoreIDs of zip files
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Dec 5, 2018
1 parent 95e9ced commit 1e3447f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 4 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Release Notes

* 12/5/2018
* to take into account objstoreIDs of zip files

* 11/14/2018
* one sub per job for ART

Expand Down
4 changes: 4 additions & 0 deletions pandaserver/config/panda_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
if not tmpSelf.__dict__.has_key('certdir'):
tmpSelf.__dict__['certdir'] = '/data/atlpan'

# endpoint map file
if not tmpSelf.__dict__.has_key('endpoint_mapfile'):
tmpSelf.__dict__['endpoint_mapfile'] = '/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json'

# schemas
if not tmpSelf.__dict__.has_key('schemaPANDA'):
tmpSelf.__dict__['schemaPANDA'] = 'ATLAS_PANDA'
Expand Down
2 changes: 1 addition & 1 deletion pandaserver/dataservice/AdderAtlasPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ def registerEventServiceFiles(self):
# get endpoint ID
epID = int(fileSpec.destinationSE.split('/')[0])
# convert to DDM endpoint
rse = self.taskBuffer.convertObjIDtoEndPoint('/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json',
rse = self.taskBuffer.convertObjIDtoEndPoint(panda_config.endpoint_mapfile,
epID)
if rse['is_deterministic']:
epName = rse['name']
Expand Down
53 changes: 50 additions & 3 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import commands
import traceback
import warnings
import operator
import ErrorCode
import SiteSpec
import CloudSpec
Expand Down Expand Up @@ -16077,9 +16078,54 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
if catchAll == None:
catchAll = ''
try:
objectstores = json.loads(objectstores)
if isFakeCJ:
objectstores = []
else:
objectstores = json.loads(objectstores)
except:
objectstores = []
# get objstoreIDs
sqlZIP = "SELECT /*+ INDEX_RS_ASC(tab JEDI_EVENTS_FILEID_IDX) NO_INDEX_FFS(tab JEDI_EVENTS_PK) NO_INDEX_SS(tab JEDI_EVENTS_PK) */ "
sqlZIP += "DISTINCT zipRow_ID FROM {0}.JEDI_Events ".format(panda_config.schemaJEDI)
sqlZIP += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
sqlZIP += "AND status=:esDone "
sqlOST = "SELECT fsize,destinationSE FROM {0}.filesTable4 ".format(panda_config.schemaPANDA)
sqlOST += "WHERE row_ID=:row_ID "
objStoreZipMap = dict()
zipRowIDs = set()
totalZipSize = 0
for tmpFileSpec in jobSpec.Files:
if tmpFileSpec.type in ['input', 'pseudo_input']:
varMap = dict()
varMap[':jediTaskID'] = tmpFileSpec.jediTaskID
varMap[':datasetID'] = tmpFileSpec.datasetID
varMap[':fileID'] = tmpFileSpec.fileID
varMap[':esDone'] = EventServiceUtils.ST_done
self.cur.execute(sqlZIP+comment,varMap)
resZIP = self.cur.fetchall()
for zipRowID, in resZIP:
if zipRowID is None:
continue
if zipRowID in zipRowIDs:
continue
zipRowIDs.add(zipRowID)
# get file info
varMap = dict()
varMap[':row_ID'] = zipRowID
self.cur.execute(sqlOST+comment, varMap)
resOST = self.cur.fetchone()
tmpFsize, tmpDestSE = resOST
totalZipSize += tmpFsize
tmpRSE = self.convertObjIDtoEndPoint(panda_config.endpoint_mapfile, int(tmpDestSE.split('/')[0]))
if tmpRSE is not None:
objStoreZipMap.setdefault(tmpRSE['name'], 0)
objStoreZipMap[tmpRSE['name']] += tmpFsize
sortedOST = sorted(objStoreZipMap.items(), key=operator.itemgetter(1))
sortedOST.reverse()
if len(sortedOST) > 0:
_logger.debug('{0} old objectstores {1}'.format(methodName, str(objectstores)))
objectstores = [{'ddmendpoint': sortedOST[0][0]}]
_logger.debug('{0} new objectstores {1}'.format(methodName, str(objectstores)))
if isFakeCJ:
# use nucleus for fake co-jumbo since they don't have sub datasets
pass
Expand Down Expand Up @@ -16126,7 +16172,7 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
else:
# use nucleus close to OS
tmpNucleus = None
if isMergeAtOS and len(objectstores) > 0 and not isFakeCJ:
if isMergeAtOS and len(objectstores) > 0:
osEndpoint = objectstores[0]['ddmendpoint']
sqlCO = "SELECT site_name FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:osEndpoint "
varMap = dict()
Expand All @@ -16135,10 +16181,11 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
resCO = self.cur.fetchone()
if resCO is not None:
tmpNucleus, = resCO
_logger.info('{0} look for merge sites in nucleus:{1} close to pre-merged files'.format(methodName,tmpNucleus))
# use nucleus
if tmpNucleus is None:
tmpNucleus = jobSpec.destinationSE.split(':')[-1]
_logger.info('{0} look for merge sites in nucleus:{1}'.format(methodName,tmpNucleus))
_logger.info('{0} look for merge sites in destination nucleus:{1}'.format(methodName,tmpNucleus))
# get sites in a nucleus
sqlSN = "SELECT dr.panda_site_name,dr.ddm_endpoint_name "
sqlSN += "FROM ATLAS_PANDA.panda_site ps,ATLAS_PANDAMETA.schedconfig sc,ATLAS_PANDA.panda_ddm_relation dr "
Expand Down

0 comments on commit 1e3447f

Please sign in to comment.