From 74290e1b10651cc21ab698c8b7eae22be44a4a35 Mon Sep 17 00:00:00 2001 From: Han Wang <92130845+wanghan-iapcm@users.noreply.github.com> Date: Sun, 11 Sep 2022 11:00:38 +0800 Subject: [PATCH] show status of workflows (#71) * show status of workflows. * fix bugs * change scheduler converged to complete. add complete for stage_scheduler * fix == None Co-authored-by: Han Wang --- dpgen2/entrypoint/main.py | 32 +- dpgen2/entrypoint/status.py | 32 ++ .../convergence_check_stage_scheduler.py | 35 ++- dpgen2/exploration/scheduler/scheduler.py | 122 +++++++- .../exploration/scheduler/stage_scheduler.py | 19 +- dpgen2/utils/dflow_query.py | 2 +- .../exploration/test_exploration_scheduler.py | 296 +++++++++++++++++- tests/utils/test_dflow_query.py | 4 +- 8 files changed, 507 insertions(+), 35 deletions(-) create mode 100644 dpgen2/entrypoint/status.py diff --git a/dpgen2/entrypoint/main.py b/dpgen2/entrypoint/main.py index 48551b8d..e87bba98 100644 --- a/dpgen2/entrypoint/main.py +++ b/dpgen2/entrypoint/main.py @@ -18,6 +18,9 @@ submit_concurrent_learning, resubmit_concurrent_learning, ) +from .status import ( + status, +) from dpgen2 import ( __version__ ) @@ -48,7 +51,7 @@ def main_parser() -> argparse.ArgumentParser: formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_run.add_argument( - "INPUT", help="the input file in json format defining the workflow." + "CONFIG", help="the config file in json format defining the workflow." ) parser_resubmit = subparsers.add_parser( @@ -57,7 +60,7 @@ def main_parser() -> argparse.ArgumentParser: formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_resubmit.add_argument( - "INPUT", help="the input file in json format defining the workflow." + "CONFIG", help="the config file in json format defining the workflow." ) parser_resubmit.add_argument( "ID", help="the ID of the existing workflow." @@ -69,6 +72,18 @@ def main_parser() -> argparse.ArgumentParser: "--reuse", type=str, nargs='+', default=None, help="specify which Steps to reuse." ) + parser_status = subparsers.add_parser( + "status", + help="Print the status (stage, iteration, convergence) of the DPGEN2 workflow", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser_status.add_argument( + "CONFIG", help="the config file in json format." + ) + parser_status.add_argument( + "ID", help="the ID of the existing workflow." + ) + # --version parser.add_argument( '--version', @@ -102,16 +117,25 @@ def main(): dict_args = vars(args) if args.command == "submit": - with open(args.INPUT) as fp: + with open(args.CONFIG) as fp: config = json.load(fp) submit_concurrent_learning(config) elif args.command == "resubmit": - with open(args.INPUT) as fp: + with open(args.CONFIG) as fp: config = json.load(fp) wfid = args.ID resubmit_concurrent_learning( config, wfid, list_steps=args.list, reuse=args.reuse, ) + elif args.command == "status": + with open(args.CONFIG) as fp: + config = json.load(fp) + wfid = args.ID + status( + wfid, config, + ) + elif args.command is None: + pass else: raise RuntimeError(f"unknown command {args.command}") diff --git a/dpgen2/entrypoint/status.py b/dpgen2/entrypoint/status.py new file mode 100644 index 00000000..13c13848 --- /dev/null +++ b/dpgen2/entrypoint/status.py @@ -0,0 +1,32 @@ +import logging +from dflow import ( + Workflow, +) +from dpgen2.utils import ( + dflow_config, +) +from dpgen2.utils.dflow_query import ( + get_last_scheduler, +) +from typing import ( + Optional, Dict, Union, List, +) + +def status( + workflow_id, + wf_config : Optional[Dict] = {}, +): + dflow_config_data = wf_config.get('dflow_config', None) + dflow_config(dflow_config_data) + + wf = Workflow(id=workflow_id) + + wf_keys = wf.query_keys_of_steps() + + scheduler = get_last_scheduler(wf, wf_keys) + + if scheduler is not None: + ptr_str = scheduler.print_convergence() + print(ptr_str) + else: + logging.warn('no scheduler is finished') diff --git a/dpgen2/exploration/scheduler/convergence_check_stage_scheduler.py b/dpgen2/exploration/scheduler/convergence_check_stage_scheduler.py index 66251604..864e7832 100644 --- a/dpgen2/exploration/scheduler/convergence_check_stage_scheduler.py +++ b/dpgen2/exploration/scheduler/convergence_check_stage_scheduler.py @@ -26,20 +26,34 @@ def __init__( self.max_numb_iter = max_numb_iter self.fatal_at_max = fatal_at_max self.nxt_iter = 0 + self.conv = False + self.reached_max_iter = False + self.complete_ = False + self.reports = [] + + def complete(self): + return self.complete_ + + def converged(self): + return self.conv + + def reached_max_iteration(self): + return self.reached_max_iter def plan_next_iteration( self, - hist_reports : List[ExplorationReport] = [], report : ExplorationReport = None, trajs : List[Path] = None, ) -> Tuple[bool, ExplorationTaskGroup, ConfSelector] : if report is None: - converged = False + stg_complete = False + self.conv = stg_complete lmp_task_grp = self.stage.make_task() ret_selector = self.selector else : - converged = report.accurate_ratio() >= self.conv_accuracy - if not converged: + stg_complete = report.accurate_ratio() >= self.conv_accuracy + self.conv = stg_complete + if not stg_complete: # check if we have any candidate to improve the quality of the model if report.candidate_ratio() == 0.0: raise FatalError( @@ -49,20 +63,23 @@ def plan_next_iteration( 'improved and the iteraction would not end. ' 'Please try to increase the higher trust levels. ' ) - # if not converged, check max iter + # if not stg_complete, check max iter if self.max_numb_iter is not None and self.nxt_iter == self.max_numb_iter: + self.reached_max_iter = True if self.fatal_at_max: raise FatalError('reached maximal number of iterations') else: - converged = True + stg_complete = True # make lmp tasks - if converged: - # if converged, no more lmp task + if stg_complete: + # if stg_complete, no more lmp task lmp_task_grp = None ret_selector = None else : lmp_task_grp = self.stage.make_task() ret_selector = self.selector + self.reports.append(report) self.nxt_iter += 1 - return converged, lmp_task_grp, ret_selector + self.complete_ = stg_complete + return stg_complete, lmp_task_grp, ret_selector diff --git a/dpgen2/exploration/scheduler/scheduler.py b/dpgen2/exploration/scheduler/scheduler.py index 39c2c6e0..397577f3 100644 --- a/dpgen2/exploration/scheduler/scheduler.py +++ b/dpgen2/exploration/scheduler/scheduler.py @@ -1,3 +1,5 @@ +import numpy as np + from typing import ( List, Tuple, @@ -24,9 +26,9 @@ def __init__( self, ): self.stage_schedulers = [] - self.stage_reports = [[]] self.cur_stage = 0 self.iteration = -1 + self.complete_ = False def add_stage_scheduler( self, @@ -44,6 +46,7 @@ def add_stage_scheduler( """ self.stage_schedulers.append(stage_scheduler) + self.complete_ = False return self def get_stage(self): @@ -64,6 +67,13 @@ def get_iteration(self): """ return self.iteration + def complete(self): + """ + Tell if all stages are converged. + + """ + return self.complete_ + def plan_next_iteration( self, report : ExplorationReport = None, @@ -81,8 +91,8 @@ def plan_next_iteration( Returns ------- - converged: bool - If DPGEN converges. + complete: bool + If all the DPGEN stages complete. task: ExplorationTaskGroup A `ExplorationTaskGroup` defining the exploration of the next iteration. Should be `None` if converged. conf_selector: ConfSelector @@ -91,26 +101,118 @@ def plan_next_iteration( """ try: - converged, lmp_task_grp, conf_selector = \ + stg_complete, lmp_task_grp, conf_selector = \ self.stage_schedulers[self.cur_stage].plan_next_iteration( - self.stage_reports[self.cur_stage], report, trajs, ) - self.stage_reports[self.cur_stage].append(report) except FatalError as e: raise FatalError(f'stage {self.cur_stage}: ' + str(e)) - if converged: + if stg_complete: self.cur_stage += 1 - self.stage_reports.append([]) if self.cur_stage < len(self.stage_schedulers): # goes to next stage return self.plan_next_iteration() else: - # all stages converged + # all stages complete + self.complete_ = True return True, None, None, else : self.iteration += 1 - return converged, lmp_task_grp, conf_selector + return stg_complete, lmp_task_grp, conf_selector + + + def get_stage_of_iterations(self): + """ + Get the stage index and the index in the stage of iterations. + """ + stages = self.stage_schedulers + n_stage_iters = [] + for ii in range(self.get_stage() + 1): + if ii < len(stages) and len(stages[ii].reports) > 0: + n_stage_iters.append(len(stages[ii].reports)) + cumsum_stage_iters = np.cumsum(n_stage_iters) + + max_iter = self.get_iteration() + if self.complete() or max_iter == -1: + max_iter += 1 + stage_idx = [] + idx_in_stage = [] + iter_idx = [] + for ii in range(max_iter): + idx = np.searchsorted(cumsum_stage_iters, ii+1) + stage_idx.append(idx) + if idx > 0: + idx_in_stage.append(ii - cumsum_stage_iters[idx-1]) + else : + idx_in_stage.append(ii) + iter_idx.append(ii) + assert( len(stage_idx) == max_iter) + assert( len(idx_in_stage) == max_iter) + assert( len(iter_idx) == max_iter) + return stage_idx, idx_in_stage, iter_idx + + + def get_convergence_ratio(self): + """ + Get the accurate, candidate and failed ratios of the iterations + + Returns + ------- + accu np.ndarray + The accurate ratio. length of array the same as # iterations. + cand np.ndarray + The candidate ratio. length of array the same as # iterations. + fail np.ndarray + The failed ration. length of array the same as # iterations. + """ + stages = self.stage_schedulers + stag_idx, idx_in_stag, iter_idx = self.get_stage_of_iterations() + accu = [] + cand = [] + fail = [] + for ii in range(np.size(iter_idx)): + accu.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].accurate_ratio()) + cand.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].candidate_ratio()) + fail.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].failed_ratio()) + return np.array(accu), np.array(cand), np.array(fail) + + def _print_prev_summary(self, prev_stg_idx): + if prev_stg_idx >= 0: + yes = 'YES' if self.stage_schedulers[prev_stg_idx].converged() else 'NO ' + rmx = 'YES' if self.stage_schedulers[prev_stg_idx].reached_max_iteration() else 'NO ' + return f'# Stage {prev_stg_idx:4d} converged {yes} reached max numb iterations {rmx}' + else: + return None + + def print_convergence(self): + spaces = [8, 8, 8, 10, 10, 10] + fmt_str = ' '.join([f'%{ii}s' for ii in spaces]) + fmt_flt = '%.4f' + header_str = '#' + fmt_str % ('stage', 'id_stg.', 'iter.', 'accu.', 'cand.', 'fail.') + ret = [header_str] + + stage_idx, idx_in_stage, iter_idx = self.get_stage_of_iterations() + accu, cand, fail = self.get_convergence_ratio() + + iidx = 0 + prev_stg_idx = -1 + for iidx in range(len(accu)): + if stage_idx[iidx] != prev_stg_idx: + if prev_stg_idx >= 0: + ret.append(self._print_prev_summary(prev_stg_idx)) + ret.append(f'# Stage {stage_idx[iidx]:4d} ' + '-'*20) + prev_stg_idx = stage_idx[iidx] + ret.append(' ' + fmt_str % ( + str(stage_idx[iidx]), str(idx_in_stage[iidx]), str(iidx), + fmt_flt%(accu[iidx]*1), + fmt_flt%(cand[iidx]*1), + fmt_flt%(fail[iidx]*1), + )) + if self.complete(): + if prev_stg_idx >= 0: + ret.append(self._print_prev_summary(prev_stg_idx)) + ret.append(f'# All stages converged') + return '\n'.join(ret + ['']) diff --git a/dpgen2/exploration/scheduler/stage_scheduler.py b/dpgen2/exploration/scheduler/stage_scheduler.py index 42df37e0..5a398693 100644 --- a/dpgen2/exploration/scheduler/stage_scheduler.py +++ b/dpgen2/exploration/scheduler/stage_scheduler.py @@ -16,10 +16,21 @@ class StageScheduler(ABC): The scheduler for an exploration stage. """ + @abstractmethod + def converged(self): + """ + Tell if the stage is converged + + Returns + ------- + converged bool + the convergence + """ + pass + @abstractmethod def plan_next_iteration( self, - hist_reports : List[ExplorationReport], report : ExplorationReport, trajs : List[Path], ) -> Tuple[bool, ExplorationTaskGroup, ConfSelector] : @@ -39,8 +50,10 @@ def plan_next_iteration( Returns ------- - converged: bool - If the stage converged. + stg_complete: bool + If the stage completed. Two cases may happen: + 1. converged. + 2. when not fatal_at_max, not converged but reached max number of iterations. task: ExplorationTaskGroup A `ExplorationTaskGroup` defining the exploration of the next iteration. Should be `None` if the stage is converged. conf_selector: ConfSelector diff --git a/dpgen2/utils/dflow_query.py b/dpgen2/utils/dflow_query.py index 378826eb..a478387d 100644 --- a/dpgen2/utils/dflow_query.py +++ b/dpgen2/utils/dflow_query.py @@ -25,7 +25,7 @@ def get_last_scheduler( return None else: skey = sorted(scheduler_keys)[-1] - step = wf.query_step(key=skey) + step = wf.query_step(key=skey)[0] return step.outputs.parameters['exploration_scheduler'].value diff --git a/tests/exploration/test_exploration_scheduler.py b/tests/exploration/test_exploration_scheduler.py index 59b345dd..39d5dd99 100644 --- a/tests/exploration/test_exploration_scheduler.py +++ b/tests/exploration/test_exploration_scheduler.py @@ -48,7 +48,7 @@ def test_success(self): self.assertTrue(sel.trust_level.level_v_lo is None) self.assertTrue(sel.trust_level.level_v_hi is None) - conv, ltg, sel = self.scheduler.plan_next_iteration([], foo_report, []) + conv, ltg, sel = self.scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, True) self.assertTrue(ltg is None) self.assertTrue(sel is None) @@ -77,7 +77,7 @@ def test_step1(self): self.assertTrue(sel.trust_level.level_v_lo is None) self.assertTrue(sel.trust_level.level_v_hi is None) - conv, ltg, sel = self.scheduler.plan_next_iteration([], foo_report, []) + conv, ltg, sel = self.scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) @@ -86,7 +86,7 @@ def test_step1(self): self.assertTrue(sel.trust_level.level_v_lo is None) self.assertTrue(sel.trust_level.level_v_hi is None) - conv, ltg, sel = self.scheduler.plan_next_iteration([], bar_report, []) + conv, ltg, sel = self.scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, True) self.assertTrue(ltg is None) self.assertTrue(sel is None) @@ -122,7 +122,7 @@ def test_max_numb_iter(self): self.assertTrue(sel.trust_level.level_v_lo is None) self.assertTrue(sel.trust_level.level_v_hi is None) - conv, ltg, sel = self.scheduler.plan_next_iteration([], foo_report, []) + conv, ltg, sel = self.scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) @@ -132,7 +132,7 @@ def test_max_numb_iter(self): self.assertTrue(sel.trust_level.level_v_hi is None) with self.assertRaisesRegex(FatalError, 'reached maximal number of iterations'): - conv, ltg, sel = self.scheduler.plan_next_iteration([], foo_report, []) + conv, ltg, sel = self.scheduler.plan_next_iteration(foo_report, []) class TestExplorationScheduler(unittest.TestCase): @@ -172,6 +172,11 @@ def test_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 0) self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -182,6 +187,11 @@ def test_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 1) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -192,12 +202,166 @@ def test_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) + self.assertFalse(scheduler.complete()) conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, True) self.assertTrue(ltg is None) self.assertTrue(sel is None) self.assertEqual(scheduler.get_stage(), 2) self.assertEqual(scheduler.get_iteration(), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertTrue(scheduler.stage_schedulers[1].converged()) + self.assertTrue(scheduler.stage_schedulers[1].complete()) + self.assertTrue(scheduler.complete()) + + + def test_print_scheduler(self): + scheduler = ExplorationScheduler() + trust_level = TrustLevel(0.1, 0.3) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage(), + selector, + max_numb_iter = 2, + ) + scheduler.add_stage_scheduler(stage_scheduler) + trust_level = TrustLevel(0.2, 0.4) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage1(), + selector, + max_numb_iter = 2, + ) + scheduler.add_stage_scheduler(stage_scheduler) + + foo_report = MockedExplorationReport() + foo_report.accurate = 0.5 + foo_report.failed = 0.5 + bar_report = MockedExplorationReport() + bar_report.accurate = 1.0 + bar_report.failed = 0.0 + + expected_output = [ + '# stage id_stg. iter. accu. cand. fail.', + '# Stage 0 --------------------', + ' 0 0 0 1.0000 0.1000 0.0000', + '# Stage 0 converged YES reached max numb iterations NO ', + '# Stage 1 --------------------', + ' 1 0 1 0.5000 0.1000 0.5000', + ' 1 1 2 1.0000 0.1000 0.0000', + '# Stage 1 converged YES reached max numb iterations NO ', + '# All stages converged', + ] + self.assertEqual(scheduler.print_convergence(), '\n'.join(expected_output[:1])+'\n') + conv, ltg, sel = scheduler.plan_next_iteration() + self.assertEqual(scheduler.print_convergence(), '\n'.join(expected_output[:1])+'\n') + conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) + self.assertEqual(scheduler.print_convergence(), '\n'.join(expected_output[:3])+'\n') + conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + self.assertEqual(scheduler.print_convergence(), '\n'.join(expected_output[:6])+'\n') + conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) + self.assertEqual(scheduler.print_convergence(), '\n'.join(expected_output)+'\n') + + + def test_success_and_ratios(self): + scheduler = ExplorationScheduler() + trust_level = TrustLevel(0.1, 0.3) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage(), + selector, + max_numb_iter = 4, + ) + scheduler.add_stage_scheduler(stage_scheduler) + trust_level = TrustLevel(0.2, 0.4) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage1(), + selector, + max_numb_iter = 4, + ) + scheduler.add_stage_scheduler(stage_scheduler) + + foo_report = MockedExplorationReport() + foo_report.accurate = 0.5 + foo_report.failed = 0.2 + foo_report.candidate = 0.3 + bar_report = MockedExplorationReport() + bar_report.accurate = 1.0 + bar_report.failed = 0.0 + bar_report.candidate = 0.0 + + conv, ltg, sel = scheduler.plan_next_iteration() + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.1) + self.assertEqual(sel.trust_level.level_f_hi, 0.3) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 0) + self.assertEqual(scheduler.get_iteration(), 0) + conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.2) + self.assertEqual(sel.trust_level.level_f_hi, 0.4) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 1) + self.assertEqual(scheduler.get_iteration(), 1) + conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.2) + self.assertEqual(sel.trust_level.level_f_hi, 0.4) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 1) + self.assertEqual(scheduler.get_iteration(), 2) + conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.2) + self.assertEqual(sel.trust_level.level_f_hi, 0.4) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 1) + self.assertEqual(scheduler.get_iteration(), 3) + conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) + self.assertEqual(conv, True) + self.assertTrue(ltg is None) + self.assertTrue(sel is None) + self.assertEqual(scheduler.get_stage(), 2) + self.assertEqual(scheduler.get_iteration(), 3) + + expected_stage_idx = np.array([0, 1, 1, 1], dtype=int) + expected_idx_in_stage = np.array([0, 0, 1, 2], dtype=int) + expected_iter_idx = np.array([0, 1, 2, 3], dtype=int) + expected_accu = np.array([1.0, 0.5, 0.5, 1.0], dtype=float) + expected_cand = np.array([0.0, 0.3, 0.3, 0.0], dtype=float) + expected_fail = np.array([0.0, 0.2, 0.2, 0.0], dtype=float) + + stage_idx, idx_in_stage, iter_idx = scheduler.get_stage_of_iterations() + np.testing.assert_array_equal(stage_idx, expected_stage_idx) + np.testing.assert_array_equal(idx_in_stage, expected_idx_in_stage) + np.testing.assert_array_equal(iter_idx, expected_iter_idx) + + accu, cand, fail = scheduler.get_convergence_ratio() + np.testing.assert_array_almost_equal(accu, expected_accu) + np.testing.assert_array_almost_equal(cand, expected_cand) + np.testing.assert_array_almost_equal(fail, expected_fail) def test_continue_adding_success(self): @@ -228,12 +392,18 @@ def test_continue_adding_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 0) self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 1) + self.assertFalse(scheduler.stage_schedulers[0].converged()) conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, True) self.assertTrue(ltg is None) self.assertTrue(sel is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 1) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertTrue(scheduler.complete()) trust_level = TrustLevel(0.2, 0.4) selector = ConfSelectorLammpsFrames(trust_level) @@ -244,6 +414,12 @@ def test_continue_adding_success(self): ) scheduler.add_stage_scheduler(stage_scheduler) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) + self.assertFalse(scheduler.complete()) conv, ltg, sel = scheduler.plan_next_iteration() self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -254,6 +430,11 @@ def test_continue_adding_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 1) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -264,13 +445,24 @@ def test_continue_adding_success(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) + self.assertFalse(scheduler.complete()) conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, True) self.assertTrue(ltg is None) self.assertTrue(sel is None) self.assertEqual(scheduler.get_stage(), 2) self.assertEqual(scheduler.get_iteration(), 2) - + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertTrue(scheduler.stage_schedulers[1].converged()) + self.assertTrue(scheduler.stage_schedulers[1].complete()) + self.assertTrue(scheduler.complete()) def test_failed_stage0(self): @@ -309,6 +501,9 @@ def test_failed_stage0(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 0) self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) @@ -319,10 +514,90 @@ def test_failed_stage0(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 0) self.assertEqual(scheduler.get_iteration(), 1) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.complete()) with self.assertRaisesRegex(FatalError, 'stage 0: reached maximal number of iterations'): conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + def test_failed_stage0_not_fatal(self): + scheduler = ExplorationScheduler() + trust_level = TrustLevel(0.1, 0.3) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage(), + selector, + max_numb_iter = 2, + fatal_at_max = False, + ) + scheduler.add_stage_scheduler(stage_scheduler) + trust_level = TrustLevel(0.2, 0.4) + selector = ConfSelectorLammpsFrames(trust_level) + stage_scheduler = ConvergenceCheckStageScheduler( + MockedStage1(), + selector, + max_numb_iter = 2, + fatal_at_max = False, + ) + scheduler.add_stage_scheduler(stage_scheduler) + + foo_report = MockedExplorationReport() + foo_report.accurate = 0.5 + foo_report.failed = 0.5 + bar_report = MockedExplorationReport() + bar_report.accurate = 1.0 + bar_report.failed = 0.0 + + conv, ltg, sel = scheduler.plan_next_iteration() + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.1) + self.assertEqual(sel.trust_level.level_f_hi, 0.3) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 0) + self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.1) + self.assertEqual(sel.trust_level.level_f_hi, 0.3) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 0) + self.assertEqual(scheduler.get_iteration(), 1) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[0].reached_max_iteration()) + self.assertFalse(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) + conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) + self.assertEqual(conv, False) + self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) + self.assertTrue(isinstance(sel, ConfSelectorLammpsFrames)) + self.assertEqual(sel.trust_level.level_f_lo, 0.2) + self.assertEqual(sel.trust_level.level_f_hi, 0.4) + self.assertTrue(sel.trust_level.level_v_lo is None) + self.assertTrue(sel.trust_level.level_v_hi is None) + self.assertEqual(scheduler.get_stage(), 1) + self.assertEqual(scheduler.get_iteration(), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertTrue(scheduler.stage_schedulers[0].reached_max_iteration()) + self.assertTrue(scheduler.stage_schedulers[0].complete()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) + self.assertFalse(scheduler.stage_schedulers[1].complete()) + self.assertFalse(scheduler.complete()) + + def test_failed_stage1(self): scheduler = ExplorationScheduler() trust_level = TrustLevel(0.1, 0.3) @@ -359,6 +634,9 @@ def test_failed_stage1(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 0) self.assertEqual(scheduler.get_iteration(), 0) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertFalse(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) conv, ltg, sel = scheduler.plan_next_iteration(bar_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -369,6 +647,9 @@ def test_failed_stage1(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 1) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) self.assertEqual(conv, False) self.assertTrue(isinstance(ltg, MockedExplorationTaskGroup1)) @@ -379,6 +660,9 @@ def test_failed_stage1(self): self.assertTrue(sel.trust_level.level_v_hi is None) self.assertEqual(scheduler.get_stage(), 1) self.assertEqual(scheduler.get_iteration(), 2) + self.assertEqual(len(scheduler.stage_schedulers), 2) + self.assertTrue(scheduler.stage_schedulers[0].converged()) + self.assertFalse(scheduler.stage_schedulers[1].converged()) with self.assertRaisesRegex(FatalError, 'stage 1: reached maximal number of iterations'): conv, ltg, sel = scheduler.plan_next_iteration(foo_report, []) diff --git a/tests/utils/test_dflow_query.py b/tests/utils/test_dflow_query.py index 77e54251..20a51439 100644 --- a/tests/utils/test_dflow_query.py +++ b/tests/utils/test_dflow_query.py @@ -99,7 +99,7 @@ class MockedBar: class MockedWF: def query_step(self,key=None): assert(key == 'iter1--scheduler') - return MockedBar() + return [MockedBar()] class TestDflowQuery(unittest.TestCase): def test_get_subkey(self): @@ -115,7 +115,7 @@ def test_get_subkey(self): def test_get_last_scheduler(self): value = get_last_scheduler( MockedWF(), - ['iter1--scheduler', 'foo', 'bar', 'iter0--scheduler'], + ['iter1--scheduler', 'foo', 'bar', 'iter0--scheduler', 'init--scheduler'], ) self.assertEqual(value, 10)