Skip to content

Commit

Permalink
Schedconfig cleanup (PanDAWMS#134)
Browse files Browse the repository at this point in the history
* replaced queries to ATLAS_PANDAMETA.schedconfig by queries to ATLAS_PANDA.schedconfig_json

* using PK where possible

* copyArchive schedconfig cleanup

* removed internal stage out watcher

* Added indication to translate to postgres json format

* Casting of json fields
  • Loading branch information
fbarreir authored Jun 30, 2022
1 parent fe5b934 commit 165a4b5
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 558 deletions.
4 changes: 2 additions & 2 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1312,9 +1312,9 @@ Release Notes
* fixed the bulk OK file lookup in the brokerge for multiple ddm endpoints
* increased the number of PandaMover channels to 15
* using DATADISK for MCP T1 input at CERN
* using a default fareshare defined per cloud if T2 doesn't define share
* using a default fairshare defined per cloud if T2 doesn't define share
* added a protection against overwriting of dataset status by datasetMgr
* implemented a nested fareshare management mechanism
* implemented a nested fairshare management mechanism
* fixed the brokerage message when release is missing for repro
* fixed TA since replicas at T1 non DATADISK prevented T2 replicas from being used
* using DATADISK for MCP T1 input at ND,ES,DE,NL,TW
Expand Down
12 changes: 6 additions & 6 deletions pandaserver/brokerage/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def schedule(jobs,taskBuffer,siteMapper,forAnalysis=False,setScanSiteList=[],tru
break

# get statistics
faresharePolicy = {}
fairsharePolicy = {}
newJobStatWithPrio = {}
jobStatBrokerClouds = {}
jobStatBrokerCloudsWithPrio = {}
Expand All @@ -457,7 +457,7 @@ def schedule(jobs,taskBuffer,siteMapper,forAnalysis=False,setScanSiteList=[],tru
if not forAnalysis:
jobStatBroker = {}
jobStatBrokerClouds = taskBuffer.getJobStatisticsBrokerage()
faresharePolicy = taskBuffer.getFaresharePolicy()
fairsharePolicy = taskBuffer.getFairsharePolicy()
else:
if minPriority is None:
jobStatBroker = taskBuffer.getJobStatisticsAnalBrokerage()
Expand Down Expand Up @@ -966,8 +966,8 @@ def schedule(jobs,taskBuffer,siteMapper,forAnalysis=False,setScanSiteList=[],tru
# production share
skipDueToShare = False
try:
if not forAnalysis and prevSourceLabel in ['managed'] and site in faresharePolicy:
for tmpPolicy in faresharePolicy[site]['policyList']:
if not forAnalysis and prevSourceLabel in ['managed'] and site in fairsharePolicy:
for tmpPolicy in fairsharePolicy[site]['policyList']:
# ignore priority policy
if tmpPolicy['priority'] is not None:
continue
Expand All @@ -987,7 +987,7 @@ def schedule(jobs,taskBuffer,siteMapper,forAnalysis=False,setScanSiteList=[],tru
continue
else:
# catch all except WGs used by other policies
groupInDefList = faresharePolicy[site]['groupList']
groupInDefList = fairsharePolicy[site]['groupList']
usedByAnother = False
# loop over all groups
for groupInDefItem in groupInDefList:
Expand All @@ -1011,7 +1011,7 @@ def schedule(jobs,taskBuffer,siteMapper,forAnalysis=False,setScanSiteList=[],tru
break
else:
# catch all except PGs used by other policies
typeInDefList = faresharePolicy[site]['typeList'][tmpPolicy['group']]
typeInDefList = fairsharePolicy[site]['typeList'][tmpPolicy['group']]
usedByAnother = False
for typeInDefItem in typeInDefList:
if typeInDefItem == tmpProGroup:
Expand Down
152 changes: 10 additions & 142 deletions pandaserver/daemons/scripts/copyArchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,116 +128,23 @@ def _memoryCheck(str):


# instantiate TB
# if tbuf is None:
from pandaserver.taskbuffer.TaskBuffer import taskBuffer
taskBuffer.init(panda_config.dbhost,panda_config.dbpasswd,nDBConnection=1)
# else:
# taskBuffer = tbuf


# instantiate sitemapper
siteMapper = SiteMapper(taskBuffer)



# send email for access requests
_logger.debug("Site Access")
try:
# get contact
contactAddr = {}
siteContactAddr = {}
sql = "SELECT name,email FROM ATLAS_PANDAMETA.cloudconfig"
status,res = taskBuffer.querySQLS(sql,{})
for cloudName,cloudEmail in res:
contactAddr[cloudName] = cloudEmail
# get requests
sql = "SELECT pandaSite,status,dn FROM ATLAS_PANDAMETA.siteaccess WHERE status IN (:status1,:status2,:status3) "
sql += "ORDER BY pandaSite,status "
varMap = {}
varMap[':status1'] = 'requested'
varMap[':status2'] = 'tobeapproved'
varMap[':status3'] = 'toberejected'
status,res = taskBuffer.querySQLS(sql,varMap)
requestsInCloud = {}
# mailUtils = MailUtils()
# loop over all requests
for pandaSite,reqStatus,userName in res:
cloud = siteMapper.getSite(pandaSite).cloud
_logger.debug("request : '%s' site=%s status=%s cloud=%s" % (userName,pandaSite,reqStatus,cloud))
# send emails to user
if reqStatus in ['tobeapproved','toberejected']:
# set status
if reqStatus == 'tobeapproved':
newStatus = 'approved'
else:
newStatus = 'rejected'
# get mail address for user
userMailAddr = ''
sqlUM = "SELECT email FROM ATLAS_PANDAMETA.users WHERE name=:userName"
varMap = {}
varMap[':userName'] = userName
stUM,resUM = taskBuffer.querySQLS(sqlUM,varMap)
if resUM is None or len(resUM) == 0:
_logger.error("email address is unavailable for '%s'" % userName)
else:
userMailAddr = resUM[0][0]
# send
# if userMailAddr not in ['',None,'None','notsend']:
# _logger.debug("send update to %s" % userMailAddr)
# retMail = mailUtils.sendSiteAccessUpdate(userMailAddr,newStatus,pandaSite)
# _logger.debug(retMail)
# update database
sqlUp = "UPDATE ATLAS_PANDAMETA.siteaccess SET status=:newStatus "
sqlUp += "WHERE pandaSite=:pandaSite AND dn=:userName"
varMap = {}
varMap[':userName'] = userName
varMap[':newStatus'] = newStatus
varMap[':pandaSite'] = pandaSite
stUp,resUp = taskBuffer.querySQLS(sqlUp,varMap)
else:
# append cloud
requestsInCloud.setdefault(cloud, {})
# append site
requestsInCloud[cloud].setdefault(pandaSite, [])
# append user
requestsInCloud[cloud][pandaSite].append(userName)
# send requests to the cloud responsible
for cloud in requestsInCloud:
requestsMap = requestsInCloud[cloud]
_logger.debug("requests for approval : cloud=%s" % cloud)
# send
if cloud in contactAddr and contactAddr[cloud] not in ['',None,'None']:
# get site contact
for pandaSite in requestsMap:
userNames = requestsMap[pandaSite]
if pandaSite not in siteContactAddr:
varMap = {}
varMap[':siteid'] = pandaSite
sqlSite = "SELECT email FROM ATLAS_PANDAMETA.schedconfig WHERE siteid=:siteid AND rownum<=1"
status,res = taskBuffer.querySQLS(sqlSite,varMap)
siteContactAddr[pandaSite] = res[0][0]
# append
if siteContactAddr[pandaSite] not in ['',None,'None']:
contactAddr[cloud] += ',%s' % siteContactAddr[pandaSite]
else:
_logger.error("contact email address is unavailable for %s" % cloud)
except Exception:
type, value, traceBack = sys.exc_info()
_logger.error("Failed with %s %s" % (type,value))
_logger.debug("Site Access : done")


# finalize failed jobs
_logger.debug("AnalFinalizer session")
try:
# get min PandaID for failed jobs in Active table
sql = "SELECT MIN(PandaID),prodUserName,jobDefinitionID,jediTaskID,computingSite FROM ATLAS_PANDA.jobsActive4 "
sql = "SELECT MIN(PandaID), prodUserName, jobDefinitionID, jediTaskID, computingSite FROM ATLAS_PANDA.jobsActive4 "
sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus "
sql += "GROUP BY prodUserName,jobDefinitionID,jediTaskID,computingSite "
varMap = {}
varMap[':jobStatus'] = 'failed'
varMap[':prodSourceLabel'] = 'user'
status,res = taskBuffer.querySQLS(sql,varMap)
varMap = {':jobStatus': 'failed', ':prodSourceLabel': 'user'}
status,res = taskBuffer.querySQLS(sql, varMap)

if res is not None:
# loop over all user/jobdefID
for pandaID,prodUserName,jobDefinitionID,jediTaskID,computingSite in res:
Expand Down Expand Up @@ -360,7 +267,7 @@ def _memoryCheck(str):
# get sites to skip various timeout
varMap = {}
varMap[':status'] = 'paused'
sql = "SELECT siteid FROM ATLAS_PANDAMETA.schedconfig WHERE status=:status "
sql = "SELECT /* use_json_type */ panda_queue FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status=:status "
sitesToSkipTO = set()
status,res = taskBuffer.querySQLS(sql,varMap)
for siteid, in res:
Expand All @@ -383,7 +290,7 @@ def _memoryCheck(str):
_logger.debug("Watcher session")

# get the list of workflows
sql = "SELECT DISTINCT workflow FROM ATLAS_PANDAMETA.schedconfig WHERE status='online' "
sql = "SELECT /* use_json_type */ DISTINCT scj.data.workflow FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.data.status='online' "
status, res = taskBuffer.querySQLS(sql, {})
workflow_timeout_map = {}
for workflow, in res + [('production',), ('analysis',)]:
Expand Down Expand Up @@ -534,48 +441,9 @@ def _memoryCheck(str):
thr.start()
thr.join()

# check heartbeat for production jobs with internal stage-out
sql = "SELECT PandaID,jobStatus,jobSubStatus FROM ATLAS_PANDA.jobsActive4 j,ATLAS_PANDAMETA.schedconfig s "
sql += "WHERE j.computingSite=s.siteid AND jobStatus=:jobStatus1 AND jobSubStatus IS NOT NULL AND modificationTime<:modificationTime "
for workflow in workflows:
if workflow == 'analysis':
continue
varMap = {}
varMap[':modificationTime'] = timeLimit
varMap[':jobStatus1'] = 'transferring'
sqlX = sql
if workflow == 'production':
if len(workflows) > 2:
sqlX += "AND (s.workflow IS NULL OR s.workflow NOT IN ("
for ng_workflow in workflows:
if ng_workflow in ['production', 'analysis']:
continue
tmp_key = ':w_{0}'.format(ng_workflow)
varMap[tmp_key] = ng_workflow
sqlX += '{0},'.format(tmp_key)
sqlX = sqlX[:-1]
sqlX += ")) "
else:
tmp_key = ':w_{0}'.format(workflow)
sqlX += "AND s.workflow={0} ".format(tmp_key)
varMap[tmp_key] = workflow
timeOutVal = workflow_timeout_map[workflow]
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(hours=timeOutVal)
varMap[':modificationTime'] = timeLimit
status,res = taskBuffer.querySQLS(sqlX, varMap)
if res is None:
_logger.debug("# of Internal Staging Watcher with workflow={0}: {1}".format(workflow, res))
else:
_logger.debug("# of Internal Staging Watcher with workflow={0}: {1}".format(workflow, len(res)))
for pandaID, jobStatus, jobSubStatus in res:
_logger.debug("Internal Staging Watcher %s %s:%s" % (pandaID, jobStatus, jobSubStatus))
thr = Watcher(taskBuffer,pandaID,single=True,sleepTime=60*timeOutVal,sitemapper=siteMapper)
thr.start()
thr.join()

# check heartbeat for production jobs
sql = "SELECT PandaID,jobStatus,j.computingSite FROM ATLAS_PANDA.jobsActive4 j "\
"LEFT JOIN ATLAS_PANDAMETA.schedconfig s ON j.computingSite=s.siteid "\
sql = "SELECT /* use_json_type */ PandaID, jobStatus, j.computingSite FROM ATLAS_PANDA.jobsActive4 j "\
"LEFT JOIN ATLAS_PANDA.schedconfig_json s ON j.computingSite=s.panda_queue "\
"WHERE jobStatus IN (:jobStatus1,:jobStatus2,:jobStatus3,:jobStatus4) "\
"AND modificationTime<:modificationTime "
for workflow in workflows:
Expand All @@ -589,7 +457,7 @@ def _memoryCheck(str):
sqlX = sql
if workflow == 'production':
if len(workflows) > 2:
sqlX += "AND (s.workflow IS NULL OR s.workflow NOT IN ("
sqlX += "AND (s.data.workflow IS NULL OR s.data.workflow NOT IN ("
for ng_workflow in workflows:
if ng_workflow in ['production', 'analysis']:
continue
Expand All @@ -600,7 +468,7 @@ def _memoryCheck(str):
sqlX += ")) "
else:
tmp_key = ':w_{0}'.format(workflow)
sqlX += "AND s.workflow={0} ".format(tmp_key)
sqlX += "AND s.data.workflow={0} ".format(tmp_key)
varMap[tmp_key] = workflow
timeOutVal = workflow_timeout_map[workflow]
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(hours=timeOutVal)
Expand Down
14 changes: 1 addition & 13 deletions pandaserver/jobdispatcher/JobDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ def __init__(self):
self.timeInterval = datetime.timedelta(seconds=180)
# pilot owners
self.pilotOwners = None
# hostnames for authorization at grid-free sites
self.allowedNodes = None
# special dipatcher parameters
# special dispatcher parameters
self.specialDispatchParams = None
# site mapper cache
self.siteMapperCache = None
Expand All @@ -138,9 +136,6 @@ def init(self,taskBuffer):
# get pilot owners
if self.pilotOwners is None:
self.pilotOwners = self.taskBuffer.getPilotOwners()
# get allowed nodes
if self.allowedNodes is None:
self.allowedNodes = self.taskBuffer.getAllowedNodes()
# special dipatcher parameters
if self.specialDispatchParams is None:
self.specialDispatchParams = CachedObject(60*10, self.taskBuffer.getSpecialDispatchParams)
Expand Down Expand Up @@ -824,13 +819,6 @@ def _checkRole(fqans,dn,jdCore,withVomsPatch=True,site='',hostname=''):
if dn.startswith(tmpSub):
prodManager = True
break
# grid-free authorization
if not prodManager:
if hostname != '' and site in jdCore.allowedNodes:
for tmpPat in jdCore.allowedNodes[site]:
if re.search(tmpPat,hostname) is not None:
prodManager = True
break
# check DN with pilotOwners
if (not prodManager) and (dn not in [None]):
if site in jdCore.pilotOwners:
Expand Down
Loading

0 comments on commit 165a4b5

Please sign in to comment.