diff --git a/fireworks/features/multi_launcher.py b/fireworks/features/multi_launcher.py index 67409290d..e6f5828b9 100644 --- a/fireworks/features/multi_launcher.py +++ b/fireworks/features/multi_launcher.py @@ -42,7 +42,7 @@ def ping_multilaunch(port, stop_event): def rapidfire_process( - fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect + fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect, m_dir ): """ Initializes shared data with multiprocessing parameters and starts a rapidfire. @@ -75,7 +75,7 @@ def rapidfire_process( rapidfire( launchpad, fworker=fworker, - m_dir=None, + m_dir=m_dir, nlaunches=nlaunches, max_loops=-1, sleep_time=sleep, @@ -91,19 +91,20 @@ def rapidfire_process( # Some other sub jobs are still running # Update the timeout according to the already elapsed time - time_elapsed = time.time() - process_start_time - timeout_left = timeout - time_elapsed + if timeout is not None: + time_elapsed = time.time() - process_start_time + timeout_left = timeout - time_elapsed - # Stand down if there is less than 3% of the time left - if timeout_left < 0.03 * timeout: - log_multi( - l_logger, - ( - f"Remaining time {timeout_left}s is less than 3% of the original timeout " - f"{timeout}s - standing down" - ), - ) - break + # Stand down if there is less than 3% of the time left + if timeout_left < 0.03 * timeout: + log_multi( + l_logger, + ( + f"Remaining time {timeout_left}s is less than 3% of the original timeout " + f"{timeout}s - standing down" + ), + ) + break log_multi(l_logger, f"Sleeping for {sleep_time} secs before resubmit sub job") time.sleep(sleep_time) @@ -135,6 +136,7 @@ def start_rockets( timeout=None, running_ids_dict=None, local_redirect=False, + m_dir=None, ): """ Create each sub job and start a rocket launch in each one. @@ -156,7 +158,7 @@ def start_rockets( processes = [ Process( target=rapidfire_process, - args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc, timeout, running_ids_dict, local_redirect), + args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc, timeout, running_ids_dict, local_redirect, m_dir), ) for nl, sub_nproc in zip(node_lists, sub_nproc_list) ] @@ -205,6 +207,7 @@ def launch_multiprocess( timeout=None, exclude_current_node=False, local_redirect=False, + m_dir=None ): """ Launch the jobs in the job packing mode. @@ -252,6 +255,7 @@ def launch_multiprocess( timeout=timeout, running_ids_dict=running_ids_dict, local_redirect=local_redirect, + m_dir=m_dir ) FWData().Running_IDs = running_ids_dict