Skip to content

Commit

Permalink
* added es_toolong
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Jan 29, 2019
1 parent caec2f5 commit c88f9e8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Release Notes

* 1/29/2019
* added es_toolong

* 1/25/2019
* added corrupted events

Expand Down
47 changes: 47 additions & 0 deletions pandaserver/test/copyArchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,53 @@ def _memoryCheck(str):
Client.killJobs(jobs[iJob:iJob+nJob],4)
iJob += nJob

# kill too long running ES jobs
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(hours=24)
varMap = {}
varMap[':jobStatus1'] = 'running'
varMap[':jobStatus2'] = 'starting'
varMap[':timeLimit'] = timeLimit
varMap[':esJob'] = EventServiceUtils.esJobFlagNumber
varMap[':coJumbo'] = EventServiceUtils.coJumboJobFlagNumber
sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
sql += "AND eventService IN (:esJob,:coJumbo) AND currentPriority>=900 "
status,res = taskBuffer.querySQLS(sql, varMap)
jobs = []
if res != None:
for (id,) in res:
jobs.append(id)
# kill
if len(jobs):
nJob = 100
iJob = 0
while iJob < len(jobs):
_logger.debug("killJobs for long running ES jobs (%s)" % str(jobs[iJob:iJob+nJob]))
Client.killJobs(jobs[iJob:iJob+nJob], 2, keepUnmerged=True, jobSubStatus='es_toolong')
iJob += nJob

# kill too long running ES merge jobs
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(hours=24)
varMap = {}
varMap[':jobStatus1'] = 'running'
varMap[':jobStatus2'] = 'starting'
varMap[':timeLimit'] = timeLimit
varMap[':esMergeJob'] = EventServiceUtils.esMergeJobFlagNumber
sql = "SELECT PandaID FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:jobStatus1,:jobStatus2) AND stateChangeTime<:timeLimit "
sql += "AND eventService=:esMergeJob "
status,res = taskBuffer.querySQLS(sql, varMap)
jobs = []
if res != None:
for (id,) in res:
jobs.append(id)
# kill
if len(jobs):
nJob = 100
iJob = 0
while iJob < len(jobs):
_logger.debug("killJobs for long running ES merge jobs (%s)" % str(jobs[iJob:iJob+nJob]))
Client.killJobs(jobs[iJob:iJob+nJob], 2)
iJob += nJob

# kill too long waiting jobs
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(days=7)
sql = "SELECT PandaID FROM ATLAS_PANDA.jobsWaiting4 WHERE ((creationTime<:timeLimit AND (eventService IS NULL OR eventService<>:coJumbo)) "
Expand Down

0 comments on commit c88f9e8

Please sign in to comment.