Skip to content

Commit

Permalink
added list_ptest_prod_sources
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Oct 30, 2018
1 parent 76df723 commit 68687bc
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 34 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Release Notes

* 10/30/2018
* added list_ptest_prod_sources

* 10/22/2018
* added proc_status
* sending ioIntensity
Expand Down
3 changes: 2 additions & 1 deletion pandaserver/dataservice/AdderAtlasPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pandalogger.PandaLogger import PandaLogger
from AdderPluginBase import AdderPluginBase
from taskbuffer import EventServiceUtils
from taskbuffer import JobUtils
from MailUtils import MailUtils
import DataServiceUtils

Expand Down Expand Up @@ -350,7 +351,7 @@ def _updateOutputs(self):
zipFiles[file.lfn]['scope'] = file.scope
zipFiles[file.lfn]['rse'] = dsDestMap[fileDestinationDBlock]
# for subscription
if self.job.prodSourceLabel in ['managed','test','software','rc_test','rc_test2','ptest','user','rucio_test'] and \
if self.job.prodSourceLabel in ['managed','test','software','user','rucio_test'] + JobUtils.list_ptest_prod_sources and \
re.search('_sub\d+$',fileDestinationDBlock) != None and (not self.addToTopOnly) and \
self.job.destinationSE != 'local':
if self.siteMapper == None:
Expand Down
5 changes: 3 additions & 2 deletions pandaserver/dataservice/AdderGen.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pandalogger.LogWrapper import LogWrapper
from taskbuffer import EventServiceUtils
from taskbuffer import retryModule
from taskbuffer import JobUtils
import taskbuffer.ErrorCode

# logger
Expand Down Expand Up @@ -373,7 +374,7 @@ def run(self):
if not file.destinationDBlock in destDBList:
destDBList.append(file.destinationDBlock)
# collect GUIDs
if (self.job.prodSourceLabel=='panda' or (self.job.prodSourceLabel in ['ptest','rc_test','rc_test2','rucio_test'] and \
if (self.job.prodSourceLabel=='panda' or (self.job.prodSourceLabel in ['rucio_test'] + JobUtils.list_ptest_prod_sources and \
self.job.processingType in ['pathena','prun','gangarobot-rctest','hammercloud'])) \
and file.type == 'output':
# extract base LFN since LFN was changed to full LFN for CMS
Expand Down Expand Up @@ -645,7 +646,7 @@ def parseXML(self):
if self.job.prodSourceLabel in ['user','panda']:
# skipped file
file.status = 'skipped'
elif self.job.prodSourceLabel in ['managed','test','rc_test','rc_test2','ptest']:
elif self.job.prodSourceLabel in ['managed','test'] + JobUtils.list_ptest_prod_sources:
# failed by pilot
file.status = 'failed'
elif file.type == 'output' or file.type == 'log':
Expand Down
9 changes: 5 additions & 4 deletions pandaserver/dataservice/SetupperAtlasPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from taskbuffer.DatasetSpec import DatasetSpec
from taskbuffer import retryModule
from taskbuffer import EventServiceUtils
from taskbuffer import JobUtils
from brokerage.SiteMapper import SiteMapper
from brokerage.PandaSiteIDs import PandaMoverIDs
import brokerage.broker
Expand Down Expand Up @@ -448,7 +449,7 @@ def _setupDestination(self,startIdx=-1,nJobsInLoop=50):
if not destError.has_key(dest):
destError[dest] = ''
originalName = ''
if (job.prodSourceLabel == 'panda') or (job.prodSourceLabel in ['ptest','rc_test','rc_test2'] and \
if (job.prodSourceLabel == 'panda') or (job.prodSourceLabel in JobUtils.list_ptest_prod_sources and \
job.processingType in ['pathena','prun','gangarobot-rctest']):
# keep original name
nameList = [file.destinationDBlock]
Expand Down Expand Up @@ -585,7 +586,7 @@ def _setupDestination(self,startIdx=-1,nJobsInLoop=50):
# skip registerDatasetLocations
status,out = True,''
elif name == originalName or tmpSrcDDM != tmpDstDDM or \
job.prodSourceLabel == 'panda' or (job.prodSourceLabel in ['ptest','rc_test','rc_test2'] and \
job.prodSourceLabel == 'panda' or (job.prodSourceLabel in JobUtils.list_ptest_prod_sources and \
job.processingType in ['pathena','prun','gangarobot-rctest']) \
or len(tmpTokenList) > 1 or EventServiceUtils.isMergeAtOS(job.specialHandling):
# set replica lifetime to _sub
Expand Down Expand Up @@ -1300,7 +1301,7 @@ def _correctLFN(self):
tmpJobList = tuple(jobsProcessed)
for job in tmpJobList:
# check only production/test jobs
if not job.prodSourceLabel in ['managed','test','software','rc_test','rc_test2','ptest']:
if not job.prodSourceLabel in ['managed','test','software'] + JobUtils.list_ptest_prod_sources:
continue
# don't check if site is already set
if job.prodSourceLabel in ['managed','test'] and not job.computingSite in ['NULL','',None]:
Expand Down Expand Up @@ -1378,7 +1379,7 @@ def _correctLFN(self):
for tmpJob in self.jobs:
try:
# set only for production/analysis/test
if not tmpJob.prodSourceLabel in ['managed','test','rc_test','rc_test2','ptest','user','prod_test']:
if not tmpJob.prodSourceLabel in ['managed','test','user','prod_test'] + JobUtils.list_ptest_prod_sources:
continue
# loop over all files
tmpJob.nInputDataFiles = 0
Expand Down
4 changes: 4 additions & 0 deletions pandaserver/taskbuffer/JobUtils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import re

# list of prod source label for pilot tests
list_ptest_prod_sources = ['ptest', 'rc_test', 'rc_test2', 'rc_alrb']



# get core count
def getCoreCount(actualCoreCount, defCoreCount, jobMetrics):
Expand Down
88 changes: 65 additions & 23 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def insertNewJob(self,job,user,serNum,weight=0.0,priorityOffset=0,userVO=None,gr
# group job SN
groupJobSN = "%05d" % groupJobSN
# set attempt numbers
if job.prodSourceLabel in ['user','panda','ptest','rc_test','rc_test2']:
if job.prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources:
if job.attemptNr in [None,'NULL','']:
job.attemptNr = 0
if job.maxAttempt in [None,'NULL','']:
Expand Down Expand Up @@ -2645,7 +2645,7 @@ def retryJob(self,pandaID,param,failedInActive=False,changeJobInMem=False,inMemJ
useJEDI = True
# check pilot retry
usePilotRetry = False
if job.prodSourceLabel in ['user','panda','ptest','rc_test','rc_test2'] and \
if job.prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources and \
param.has_key('pilotErrorCode') and \
param['pilotErrorCode'].startswith('-') and \
job.maxAttempt > job.attemptNr and \
Expand Down Expand Up @@ -2770,7 +2770,7 @@ def retryJob(self,pandaID,param,failedInActive=False,changeJobInMem=False,inMemJ
file.GUID = commands.getoutput('uuidgen')
# don't change input or lib.tgz, or ES merge output/log since it causes a problem with input name construction
if file.type in ['input','pseudo_input'] or (file.type == 'output' and job.prodSourceLabel == 'panda') or \
(file.type == 'output' and file.lfn.endswith('.lib.tgz') and job.prodSourceLabel in ['rc_test','rc_test2','ptest']):
(file.type == 'output' and file.lfn.endswith('.lib.tgz') and job.prodSourceLabel in JobUtils.list_ptest_prod_sources):
continue
# append attemptNr to LFN
oldName = file.lfn
Expand Down Expand Up @@ -6196,17 +6196,22 @@ def getPandaIDsSite(self,site,status,limit):
def getPandaIDsForProdDB(self,limit,lockedby):
comment = ' /* DBProxy.getPandaIDsForProdDB */'
_logger.debug("getPandaIDsForProdDB %s" % limit)
sql0 = "PandaID,jobStatus,stateChangeTime,attemptNr,jobDefinitionID,jobExecutionID FROM %s "
sqlW = "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND lockedby=:lockedby "
sqlX = "AND stateChangeTime>prodDBUpdateTime "
sqlA = "AND (CASE WHEN stateChangeTime>prodDBUpdateTime THEN 1 ELSE null END) = 1 "
sql1 = "AND rownum<=:limit "
varMap = {}
varMap[':lockedby'] = lockedby
varMap[':limit'] = limit
varMap[':prodSourceLabel1'] = 'managed'
varMap[':prodSourceLabel2'] = 'rc_test'
varMap[':prodSourceLabel3'] = 'rc_test2'
sql0 = "PandaID,jobStatus,stateChangeTime,attemptNr,jobDefinitionID,jobExecutionID FROM %s "
sqlW = "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlW += tmpKey
sqlW += ','
varMap[tmpKey] = tmpLabel
sqlW = sqlW[:-1]
sqlW += ") AND lockedby=:lockedby "
sqlX = "AND stateChangeTime>prodDBUpdateTime "
sqlA = "AND (CASE WHEN stateChangeTime>prodDBUpdateTime THEN 1 ELSE null END) = 1 "
sql1 = "AND rownum<=:limit "
try:
retMap = {}
totalIDs = 0
Expand Down Expand Up @@ -8889,14 +8894,26 @@ def getJobStatisticsForExtIF(self,sourcetype=None):
_logger.debug("getJobStatisticsForExtIF()")
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(hours=12)
if sourcetype == 'analysis':
sql0 = "SELECT jobStatus,COUNT(*),cloud FROM %s WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) GROUP BY jobStatus,cloud"
sql0 = "SELECT jobStatus,COUNT(*),cloud FROM %s WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) GROUP BY jobStatus,cloud"
sqlA = "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ jobStatus,COUNT(*),tabS.cloud FROM %s tab,ATLAS_PANDAMETA.schedconfig tabS "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND tab.computingSite=tabS.siteid "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) AND tab.computingSite=tabS.siteid "
else:
sql0 = "SELECT tab.jobStatus,COUNT(*),tabS.cloud FROM %s tab,ATLAS_PANDAMETA.schedconfig tabS "
sql0 += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND tab.computingSite=tabS.siteid GROUP BY tab.jobStatus,tabS.cloud"
sql0 += "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sql0 += tmpKey
sql0 += ','
sql0 = sql0[:-1]
sql0 += ") AND tab.computingSite=tabS.siteid GROUP BY tab.jobStatus,tabS.cloud"
sqlA = "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ jobStatus,COUNT(*),tabS.cloud FROM %s tab,ATLAS_PANDAMETA.schedconfig tabS "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND tab.computingSite=tabS.siteid "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlA += tmpKey
sqlA += ','
sqlA = sqlA[:-1]
sqlA += ") AND tab.computingSite=tabS.siteid "
sqlA+= "AND modificationTime>:modificationTime GROUP BY tab.jobStatus,tabS.cloud"
# sql for materialized view
sqlMV = re.sub('COUNT\(\*\)','SUM(num_of_jobs)',sql0)
Expand All @@ -8911,11 +8928,11 @@ def getJobStatisticsForExtIF(self,sourcetype=None):
if sourcetype == 'analysis':
varMap[':prodSourceLabel1'] = 'user'
varMap[':prodSourceLabel2'] = 'panda'
varMap[':prodSourceLabel3'] = 'dummy'
else:
varMap[':prodSourceLabel1'] = 'managed'
varMap[':prodSourceLabel2'] = 'rc_test'
varMap[':prodSourceLabel3'] = 'rc_test2'
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
varMap[tmpKey] = tmpLabel
if table != 'ATLAS_PANDA.jobsArchived4':
self.cur.arraysize = 10000
if table == 'ATLAS_PANDA.jobsActive4':
Expand Down Expand Up @@ -8957,20 +8974,44 @@ def getJobStatisticsPerProcessingType(self,useMorePG=False):
if useMorePG == False:
sqlN = "SELECT jobStatus,COUNT(*),tabS.cloud,processingType "
sqlN += "FROM %s tab, ATLAS_PANDAMETA.schedconfig tabS "
sqlN += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND computingSite=tabS.siteid "
sqlN += "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlN += tmpKey
sqlN += ','
sqlN = sqlN[:-1]
sqlN += ") AND computingSite=tabS.siteid "
sqlN += "GROUP BY jobStatus,tabS.cloud,processingType "
sqlA = "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ jobStatus,COUNT(*),tabS.cloud,processingType "
sqlA += "FROM %s tab,ATLAS_PANDAMETA.schedconfig tabS "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND modificationTime>:modificationTime "
sqlA += "WHERE prodSourceLabel IN ("
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlA += tmpKey
sqlA += ','
sqlA = sqlA[:-1]
sqlA += ") AND modificationTime>:modificationTime "
sqlA += "AND computingSite=tabS.siteid "
sqlA += "GROUP BY jobStatus,tabS.cloud,processingType"
else:
sqlN = "SELECT jobStatus,COUNT(*),cloud,processingType,coreCount,workingGroup FROM %s "
sqlN += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) "
sqlN += "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlN += tmpKey
sqlN += ','
sqlN = sqlN[:-1]
sqlN += ") "
sqlN += "GROUP BY jobStatus,cloud,processingType,coreCount,workingGroup"
sqlA = "SELECT /*+ INDEX_RS_ASC(tab (MODIFICATIONTIME PRODSOURCELABEL)) */ "
sqlA += "jobStatus,COUNT(*),cloud,processingType,coreCount,workingGroup FROM %s tab "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2,:prodSourceLabel3) AND modificationTime>:modificationTime "
sqlA += "WHERE prodSourceLabel IN (:prodSourceLabel1,"
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
sqlA += tmpKey
sqlA += ','
sqlA = sqlA[:-1]
sqlA += ") AND modificationTime>:modificationTime "
sqlA += "GROUP BY jobStatus,cloud,processingType,coreCount,workingGroup"
# sql for materialized view
sqlMV = re.sub('COUNT\(\*\)','SUM(num_of_jobs)',sqlN)
Expand All @@ -8985,8 +9026,9 @@ def getJobStatisticsPerProcessingType(self,useMorePG=False):
# select
varMap = {}
varMap[':prodSourceLabel1'] = 'managed'
varMap[':prodSourceLabel2'] = 'rc_test'
varMap[':prodSourceLabel3'] = 'rc_test2'
for tmpLabel in JobUtils.list_ptest_prod_sources:
tmpKey = ':prodSourceLabel_{0}'.format(tmpLabel)
varMap[tmpKey] = tmpLabel
if table == 'ATLAS_PANDA.jobsArchived4':
varMap[':modificationTime'] = timeLimit
self.cur.execute((sqlA+comment) % table, varMap)
Expand Down
5 changes: 4 additions & 1 deletion pandaserver/taskbuffer/ProcessGroups.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from . import JobUtils


processGroups = [('others', []),
('evgen', ['evgen']),
('simul', ['simul']),
('reprocessing', ['reprocessing']),
('test', ['prod_test','rc_test','rc_test2','validation']),
('test', ['prod_test', 'validation'] + JobUtils.list_ptest_prod_sources),
('mcore', ['mcore']),
('group', ['group']),
]
Expand Down
7 changes: 4 additions & 3 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ProcessGroups
import EventServiceUtils
import ErrorCode
from . import JobUtils
from threading import Lock
from DBProxyPool import DBProxyPool
from brokerage.SiteMapper import SiteMapper
Expand Down Expand Up @@ -139,7 +140,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
_logger.debug("storeJobs : end for %s DN is blocked 1" % user)
return []
# set parameters for user jobs
if len(jobs) > 0 and (jobs[0].prodSourceLabel in ['user','panda','ptest','rc_test','rc_test2']) \
if len(jobs) > 0 and (jobs[0].prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources) \
and (not jobs[0].processingType in ['merge','unmerge']):
# get DB proxy
proxy = self.proxyPool.getProxy()
Expand Down Expand Up @@ -303,7 +304,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
and (not jobs[0].processingType in ['merge','unmerge']):
job.jobDefinitionID = userJobID
# set jobsetID
if job.prodSourceLabel in ['user','panda','ptest','rc_test','rc_test2']:
if job.prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources:
job.jobsetID = userJobsetID
# set specialHandling
if job.prodSourceLabel in ['user','panda']:
Expand Down Expand Up @@ -409,7 +410,7 @@ def storeJobs(self,jobs,user,joinThr=False,forkSetupper=False,fqans=[],hostname=
# mapping of jobsetID for event service
if origEsJob:
esJobsetMap[esIndex] = job.jobsetID
if job.prodSourceLabel in ['user','panda','ptest','rc_test','rc_test']:
if job.prodSourceLabel in ['user','panda'] + JobUtils.list_ptest_prod_sources:
ret.append((job.PandaID,job.jobDefinitionID,{'jobsetID':job.jobsetID}))
else:
ret.append((job.PandaID,job.jobDefinitionID,job.jobName))
Expand Down

0 comments on commit 68687bc

Please sign in to comment.