Skip to content

Commit

Permalink
give same treatment to other scripts, skipping collector failures
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Oct 26, 2021
1 parent 2f04f18 commit d455e75
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 54 deletions.
14 changes: 11 additions & 3 deletions condor_queue_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')


import htcondor
from condor_utils import *

# daily index manditory
Expand Down Expand Up @@ -57,13 +57,21 @@ def es_generator(entries):
timeout=5000)
es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600)

failed = False
if options.collectors:
for coll_address in args:
gen = es_generator(read_from_collector(coll_address))
success, _ = es_import(gen)
try:
gen = es_generator(read_from_collector(coll_address))
success, _ = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in args:
for filename in glob.iglob(path):
gen = es_generator(read_from_file(filename))
success, _ = es_import(gen)
logging.info('finished processing %s', filename)

if failed:
raise failed
110 changes: 59 additions & 51 deletions condor_status_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,56 +346,64 @@ def key = status+"."+resource;
},
)

failed = False
for coll_address in options.collectors:

gen = update_machines(
read_status_from_collector(coll_address, datetime.now() - options.after)
)
success = es_import(gen)

# Update claims from evicted and held jobs
after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple())
gen = update_jobs(
read_from_collector(
coll_address,
constraint=(
"((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)"
+ " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))"
).format(after, after),
projection=[
"GlobalJobId",
"NumJobStarts",
"JobStatus",
"JobLastStartDate",
"JobCurrentStartDate",
"EnteredCurrentStatus",
"LastVacateTime",
"LastRemoteHost",
]
+ ["Request" + resource for resource in RESOURCES],
),
history=False,
)
success = es_import(gen)

# Update claims from finished jobs
gen = update_jobs(
read_from_collector(
coll_address,
constraint="!isUndefined(LastRemoteHost)",
projection=[
"GlobalJobId",
"NumJobStarts",
"JobLastStartDate",
"JobCurrentStartDate",
"EnteredCurrentStatus",
"JobStatus",
"ExitCode",
"LastRemoteHost",
]
+ ["Request" + resource for resource in RESOURCES],
try:
gen = update_machines(
read_status_from_collector(coll_address, datetime.now() - options.after)
)
success = es_import(gen)

# Update claims from evicted and held jobs
after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple())
gen = update_jobs(
read_from_collector(
coll_address,
constraint=(
"((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)"
+ " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))"
).format(after, after),
projection=[
"GlobalJobId",
"NumJobStarts",
"JobStatus",
"JobLastStartDate",
"JobCurrentStartDate",
"EnteredCurrentStatus",
"LastVacateTime",
"LastRemoteHost",
]
+ ["Request" + resource for resource in RESOURCES],
),
history=False,
)
success = es_import(gen)

# Update claims from finished jobs
gen = update_jobs(
read_from_collector(
coll_address,
constraint="!isUndefined(LastRemoteHost)",
projection=[
"GlobalJobId",
"NumJobStarts",
"JobLastStartDate",
"JobCurrentStartDate",
"EnteredCurrentStatus",
"JobStatus",
"ExitCode",
"LastRemoteHost",
]
+ ["Request" + resource for resource in RESOURCES],
history=True,
),
history=True,
),
history=True,
)
success = es_import(gen)
)
success = es_import(gen)

except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)

if failed:
raise failed

0 comments on commit d455e75

Please sign in to comment.