diff --git a/condor_queue_to_es.py b/condor_queue_to_es.py index 10cfec7..e7b35fe 100755 --- a/condor_queue_to_es.py +++ b/condor_queue_to_es.py @@ -19,7 +19,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') - +import htcondor from condor_utils import * # daily index manditory @@ -57,13 +57,21 @@ def es_generator(entries): timeout=5000) es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600) +failed = False if options.collectors: for coll_address in args: - gen = es_generator(read_from_collector(coll_address)) - success, _ = es_import(gen) + try: + gen = es_generator(read_from_collector(coll_address)) + success, _ = es_import(gen) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) else: for path in args: for filename in glob.iglob(path): gen = es_generator(read_from_file(filename)) success, _ = es_import(gen) logging.info('finished processing %s', filename) + +if failed: + raise failed diff --git a/condor_status_to_es.py b/condor_status_to_es.py index 3530623..7a2b5ba 100755 --- a/condor_status_to_es.py +++ b/condor_status_to_es.py @@ -346,56 +346,64 @@ def key = status+"."+resource; }, ) +failed = False for coll_address in options.collectors: - - gen = update_machines( - read_status_from_collector(coll_address, datetime.now() - options.after) - ) - success = es_import(gen) - - # Update claims from evicted and held jobs - after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple()) - gen = update_jobs( - read_from_collector( - coll_address, - constraint=( - "((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)" - + " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))" - ).format(after, after), - projection=[ - "GlobalJobId", - "NumJobStarts", - "JobStatus", - "JobLastStartDate", - "JobCurrentStartDate", - "EnteredCurrentStatus", - "LastVacateTime", - "LastRemoteHost", - ] - + ["Request" + resource for resource in RESOURCES], - ), - history=False, - ) - success = es_import(gen) - - # Update claims from finished jobs - gen = update_jobs( - read_from_collector( - coll_address, - constraint="!isUndefined(LastRemoteHost)", - projection=[ - "GlobalJobId", - "NumJobStarts", - "JobLastStartDate", - "JobCurrentStartDate", - "EnteredCurrentStatus", - "JobStatus", - "ExitCode", - "LastRemoteHost", - ] - + ["Request" + resource for resource in RESOURCES], + try: + gen = update_machines( + read_status_from_collector(coll_address, datetime.now() - options.after) + ) + success = es_import(gen) + + # Update claims from evicted and held jobs + after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple()) + gen = update_jobs( + read_from_collector( + coll_address, + constraint=( + "((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)" + + " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))" + ).format(after, after), + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobStatus", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "LastVacateTime", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + ), + history=False, + ) + success = es_import(gen) + + # Update claims from finished jobs + gen = update_jobs( + read_from_collector( + coll_address, + constraint="!isUndefined(LastRemoteHost)", + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "JobStatus", + "ExitCode", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + history=True, + ), history=True, - ), - history=True, - ) - success = es_import(gen) + ) + success = es_import(gen) + + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) + +if failed: + raise failed