diff --git a/ChangeLog.txt b/ChangeLog.txt index e74814f6c..57f10260d 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,7 +1,10 @@ Release Notes +* 12/19/2018 + * to change MCORE to SCORE for small ES jobs + * 12/5/2018 - * to take into account objstoreIDs of zip files + * to take into account objstoreIDs of zip files when making esmerge * 11/14/2018 * one sub per job for ART diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 3813d2f98..a2be655e9 100755 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -299,7 +299,7 @@ def getConfigValue(self, component, key, app='pandaserver', vo=None): # insert job to jobsDefined def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,groupJobSN=0,toPending=False, origEsJob=False,eventServiceInfo=None,oldPandaIDs=None,relationType=None,fileIDPool=[], - origSpecialHandling=None): + origSpecialHandling=None, unprocessedMap=None): comment = ' /* DBProxy.insertNewJob */' methodName = comment.split(' ')[-2].split('.')[-1] methodName += ' '.format(job.jediTaskID,len(fileIDPool)) @@ -461,6 +461,8 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr # commit if not self._commit(): raise RuntimeError, 'Commit error' + if unprocessedMap is not None: + return (False, unprocessedMap) return False # insert job varMap = job.valuesMap(useSeq=True) @@ -772,6 +774,16 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr varMap[':specialHandling'] = job.specialHandling varMap[':PandaID'] = job.PandaID self.cur.execute(sqlJediJSH+comment, varMap) + # use score if not so many events are available + if origEsJob and unprocessedMap is not None: + unprocessedMap[job.jobsetID] = nEventsToProcess + if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job) and unprocessedMap is not None: + if job.coreCount not in [None, '', 'NULL'] and job.coreCount > 1: + minUnprocessed = self.getConfigValue('dbproxy', 'AES_MINEVENTSFORMCORE') + if minUnprocessed is not None: + minUnprocessed = max(minUnprocessed, job.coreCount) + if unprocessedMap[job.jobsetID] < minUnprocessed and unprocessedMap[job.jobsetID] > 0: + self.setScoreSiteToEs(job, "insertNewJob : {0}".format(job.PandaID), comment) # bulk insert files if len(varMapsForFile) > 0: _logger.debug("insertNewJob : {0} bulk insert {1} files for jediTaskID:{2}".format(job.PandaID,len(varMapsForFile), @@ -880,12 +892,16 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr self.recordStatusChange(job.PandaID,job.jobStatus,jobInfo=job) except: _logger.error('recordStatusChange in insertNewJob') + if unprocessedMap is not None: + return (True, unprocessedMap) return True except: # roll back self._rollback() # error self.dumpErrorMessage(_logger,methodName) + if unprocessedMap is not None: + return (False, unprocessedMap) return False @@ -1654,7 +1670,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa job.taskBufferErrorDiag = 'maximum event attempts reached' # kill other consumers self.killEventServiceConsumers(job,False,False) - self.killUnusedEventServiceConsumers(job,False,killAll=True) + self.killUnusedEventServiceConsumers(job, False, killAll=True, checkAttemptNr=True) elif retEvS == 4: # other consumers are running job.jobStatus = 'merging' @@ -15036,7 +15052,7 @@ def getEventRanges(self,pandaID,jobsetID,jediTaskID,nRanges,acceptJson,scattered tmpJobSpec.PandaID = pandaID tmpJobSpec.jobsetID = jobsetID tmpJobSpec.jediTaskID = jediTaskID - self.killUnusedEventServiceConsumers(tmpJobSpec,False) + self.killUnusedEventServiceConsumers(tmpJobSpec, False, checkAttemptNr=True) # commit if not self._commit(): raise RuntimeError, 'Commit error' @@ -16300,12 +16316,12 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment): # set score site to ES job def setScoreSiteToEs(self, jobSpec, methodName, comment): _logger.debug('{0} looking for SCORE site'.format(methodName)) - # get score PQ in the nucleus associated to the site to run the smal ES job + # get score PQ in the nucleus associated to the site to run the small ES job sqlSN = "SELECT ps2.panda_site_name " sqlSN += "FROM ATLAS_PANDA.panda_site ps1,ATLAS_PANDA.panda_site ps2,ATLAS_PANDAMETA.schedconfig sc " sqlSN += "WHERE ps1.panda_site_name=:site AND ps1.site_name=ps2.site_name AND sc.siteid=ps2.panda_site_name " sqlSN += "AND (sc.corecount IS NULL OR sc.corecount=1 OR sc.catchall LIKE '%unifiedPandaQueue%' OR sc.capability=:capability) " - sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed='es') " + sqlSN += "AND (sc.jobseed IS NULL OR sc.jobseed<>'std') " sqlSN += "AND sc.status=:siteStatus " varMap = {} varMap[':site'] = jobSpec.computingSite @@ -16355,6 +16371,8 @@ def setScoreSiteToEs(self, jobSpec, methodName, comment): newSiteName = jobSpec.computingSite if newSiteName is not None: _logger.info('{0} set SCORE site to {1}'.format(methodName, newSiteName)) + else: + _logger.info('{0} no SCORE site for {1}'.format(methodName, jobSpec.computingSite)) # return return diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index f1c850afd..6009b6113 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -115,7 +115,7 @@ def getPrioParameters(self,jobs,user,fqans,userDefinedWG,validWorkingGroup): # store Jobs into DB def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname='',resetLocInSetupper=False, checkSpecialHandling=True,toPending=False,oldPandaIDs=None,relationType=None, userVO='atlas', - esJobsetMap=None, getEsJobsetMap=False): + esJobsetMap=None, getEsJobsetMap=False, unprocessedMap=None): try: _logger.debug("storeJobs : start for %s nJobs=%s" % (user,len(jobs))) # check quota for priority calculation @@ -140,7 +140,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname= if not tmpStatus: _logger.debug("storeJobs : end for %s DN is blocked 1" % user) if getEsJobsetMap: - return ([], None) + return ([], None, unprocessedMap) return [] # set parameters for user jobs if len(jobs) > 0 and (jobs[0].prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources) \ @@ -231,7 +231,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname= if not userStatus: _logger.debug("storeJobs : end for %s DN is blocked 2" % user) if getEsJobsetMap: - return ([], None) + return ([], None, unprocessedMap) return [] # extract VO for tmpFQAN in fqans: @@ -394,10 +394,13 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname= if not isOK: # skip since there is no ready event job.PandaID = None - elif not proxy.insertNewJob(job,user,serNum,weight,priorityOffset,userVO,groupJobSerialNum, - toPending,origEsJob,eventServiceInfo,oldPandaIDs=jobOldPandaIDs, - relationType=relationType,fileIDPool=fileIDPool, - origSpecialHandling=origSH): + tmpRetI = proxy.insertNewJob(job,user,serNum,weight,priorityOffset,userVO,groupJobSerialNum, + toPending,origEsJob,eventServiceInfo,oldPandaIDs=jobOldPandaIDs, + relationType=relationType,fileIDPool=fileIDPool, + origSpecialHandling=origSH, unprocessedMap=unprocessedMap) + if unprocessedMap is not None: + tmpRetI, unprocessedMap = tmpRetI + if not tmpRetI: # reset if failed job.PandaID = None else: @@ -441,14 +444,14 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname= # return jobIDs _logger.debug("storeJobs : end for %s succeeded" % user) if getEsJobsetMap: - return (ret, esJobsetMap) + return (ret, esJobsetMap, unprocessedMap) return ret except: errType,errValue = sys.exc_info()[:2] _logger.error("storeJobs : %s %s" % (errType,errValue)) errStr = "ERROR: ServerError with storeJobs" if getEsJobsetMap: - return (errStr, None) + return (errStr, None, unprocessedMap) return errStr