Skip to content

Commit

Permalink
* to change MCORE to SCORE for small ES jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Dec 19, 2018
1 parent 4479ea9 commit c2bc54f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
5 changes: 4 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
Release Notes

* 12/19/2018
* to change MCORE to SCORE for small ES jobs

* 12/5/2018
* to take into account objstoreIDs of zip files
* to take into account objstoreIDs of zip files when making esmerge

* 11/14/2018
* one sub per job for ART
Expand Down
28 changes: 23 additions & 5 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def getConfigValue(self, component, key, app='pandaserver', vo=None):
# insert job to jobsDefined
def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,groupJobSN=0,toPending=False,
origEsJob=False,eventServiceInfo=None,oldPandaIDs=None,relationType=None,fileIDPool=[],
origSpecialHandling=None):
origSpecialHandling=None, unprocessedMap=None):
comment = ' /* DBProxy.insertNewJob */'
methodName = comment.split(' ')[-2].split('.')[-1]
methodName += ' <JediTaskID={0} idPool={1}>'.format(job.jediTaskID,len(fileIDPool))
Expand Down Expand Up @@ -461,6 +461,8 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr
# commit
if not self._commit():
raise RuntimeError, 'Commit error'
if unprocessedMap is not None:
return (False, unprocessedMap)
return False
# insert job
varMap = job.valuesMap(useSeq=True)
Expand Down Expand Up @@ -772,6 +774,16 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr
varMap[':specialHandling'] = job.specialHandling
varMap[':PandaID'] = job.PandaID
self.cur.execute(sqlJediJSH+comment, varMap)
# use score if not so many events are available
if origEsJob and unprocessedMap is not None:
unprocessedMap[job.jobsetID] = nEventsToProcess
if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job) and unprocessedMap is not None:
if job.coreCount not in [None, '', 'NULL'] and job.coreCount > 1:
minUnprocessed = self.getConfigValue('dbproxy', 'AES_MINEVENTSFORMCORE')
if minUnprocessed is not None:
minUnprocessed = max(minUnprocessed, job.coreCount)
if unprocessedMap[job.jobsetID] < minUnprocessed and unprocessedMap[job.jobsetID] > 0:
self.setScoreSiteToEs(job, "insertNewJob : {0}".format(job.PandaID), comment)
# bulk insert files
if len(varMapsForFile) > 0:
_logger.debug("insertNewJob : {0} bulk insert {1} files for jediTaskID:{2}".format(job.PandaID,len(varMapsForFile),
Expand Down Expand Up @@ -880,12 +892,16 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr
self.recordStatusChange(job.PandaID,job.jobStatus,jobInfo=job)
except:
_logger.error('recordStatusChange in insertNewJob')
if unprocessedMap is not None:
return (True, unprocessedMap)
return True
except:
# roll back
self._rollback()
# error
self.dumpErrorMessage(_logger,methodName)
if unprocessedMap is not None:
return (False, unprocessedMap)
return False


Expand Down Expand Up @@ -1654,7 +1670,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa
job.taskBufferErrorDiag = 'maximum event attempts reached'
# kill other consumers
self.killEventServiceConsumers(job,False,False)
self.killUnusedEventServiceConsumers(job,False,killAll=True)
self.killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True)
elif retEvS == 4:
# other consumers are running
job.jobStatus = 'merging'
Expand Down Expand Up @@ -15036,7 +15052,7 @@ def getEventRanges(self,pandaID,jobsetID,jediTaskID,nRanges,acceptJson,scattered
tmpJobSpec.PandaID = pandaID
tmpJobSpec.jobsetID = jobsetID
tmpJobSpec.jediTaskID = jediTaskID
self.killUnusedEventServiceConsumers(tmpJobSpec,False)
self.killUnusedEventServiceConsumers(tmpJobSpec, False, checkAttemptNr=True)
# commit
if not self._commit():
raise RuntimeError, 'Commit error'
Expand Down Expand Up @@ -16300,12 +16316,12 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):
# set score site to ES job
def setScoreSiteToEs(self, jobSpec, methodName, comment):
_logger.debug('{0} looking for SCORE site'.format(methodName))
# get score PQ in the nucleus associated to the site to run the smal ES job
# get score PQ in the nucleus associated to the site to run the small ES job
sqlSN = "SELECT ps2.panda_site_name "
sqlSN += "FROM ATLAS_PANDA.panda_site ps1,ATLAS_PANDA.panda_site ps2,ATLAS_PANDAMETA.schedconfig sc "
sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.siteid=ps2.panda_site_name "
sqlSN += "AND (sc.corecount IS NULL OR sc.corecount=1 OR sc.catchall LIKE '%unifiedPandaQueue%' OR sc.capability=:capability) "
sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed='es') "
sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed<>'std') "
sqlSN += "AND sc.status=:siteStatus "
varMap = {}
varMap[':site'] = jobSpec.computingSite
Expand Down Expand Up @@ -16355,6 +16371,8 @@ def setScoreSiteToEs(self, jobSpec, methodName, comment):
newSiteName = jobSpec.computingSite
if newSiteName is not None:
_logger.info('{0} set SCORE site to {1}'.format(methodName, newSiteName))
else:
_logger.info('{0} no SCORE site for {1}'.format(methodName, jobSpec.computingSite))
# return
return

Expand Down
21 changes: 12 additions & 9 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def getPrioParameters(self,jobs,user,fqans,userDefinedWG,validWorkingGroup):
# store Jobs into DB
def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname='',resetLocInSetupper=False,
checkSpecialHandling=True,toPending=False,oldPandaIDs=None,relationType=None, userVO='atlas',
esJobsetMap=None, getEsJobsetMap=False):
esJobsetMap=None, getEsJobsetMap=False, unprocessedMap=None):
try:
_logger.debug("storeJobs : start for %s nJobs=%s" % (user,len(jobs)))
# check quota for priority calculation
Expand All @@ -140,7 +140,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
if not tmpStatus:
_logger.debug("storeJobs : end for %s DN is blocked 1" % user)
if getEsJobsetMap:
return ([], None)
return ([], None, unprocessedMap)
return []
# set parameters for user jobs
if len(jobs) > 0 and (jobs[0].prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources) \
Expand Down Expand Up @@ -231,7 +231,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
if not userStatus:
_logger.debug("storeJobs : end for %s DN is blocked 2" % user)
if getEsJobsetMap:
return ([], None)
return ([], None, unprocessedMap)
return []
# extract VO
for tmpFQAN in fqans:
Expand Down Expand Up @@ -394,10 +394,13 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
if not isOK:
# skip since there is no ready event
job.PandaID = None
elif not proxy.insertNewJob(job,user,serNum,weight,priorityOffset,userVO,groupJobSerialNum,
toPending,origEsJob,eventServiceInfo,oldPandaIDs=jobOldPandaIDs,
relationType=relationType,fileIDPool=fileIDPool,
origSpecialHandling=origSH):
tmpRetI = proxy.insertNewJob(job,user,serNum,weight,priorityOffset,userVO,groupJobSerialNum,
toPending,origEsJob,eventServiceInfo,oldPandaIDs=jobOldPandaIDs,
relationType=relationType,fileIDPool=fileIDPool,
origSpecialHandling=origSH, unprocessedMap=unprocessedMap)
if unprocessedMap is not None:
tmpRetI, unprocessedMap = tmpRetI
if not tmpRetI:
# reset if failed
job.PandaID = None
else:
Expand Down Expand Up @@ -441,14 +444,14 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
# return jobIDs
_logger.debug("storeJobs : end for %s succeeded" % user)
if getEsJobsetMap:
return (ret, esJobsetMap)
return (ret, esJobsetMap, unprocessedMap)
return ret
except:
errType,errValue = sys.exc_info()[:2]
_logger.error("storeJobs : %s %s" % (errType,errValue))
errStr = "ERROR: ServerError with storeJobs"
if getEsJobsetMap:
return (errStr, None)
return (errStr, None, unprocessedMap)
return errStr


Expand Down

0 comments on commit c2bc54f

Please sign in to comment.