From b375bac9a036d8e111f136e950a34dc77848bad0 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Tue, 29 Oct 2024 22:22:03 -0700 Subject: [PATCH] feat: specify task queue location (#820) * chore: improved log messages * feat: specify local queue directory --- zetta_utils/mazepa/semaphores.py | 5 ++++- .../mazepa_addons/configurations/execute_locally.py | 7 +++++-- .../configurations/execute_on_gcp_with_sqs.py | 9 +++++++++ zetta_utils/mazepa_addons/configurations/worker_pool.py | 4 ++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/zetta_utils/mazepa/semaphores.py b/zetta_utils/mazepa/semaphores.py index 05d3b741e..000586c85 100644 --- a/zetta_utils/mazepa/semaphores.py +++ b/zetta_utils/mazepa/semaphores.py @@ -77,7 +77,10 @@ def configure_semaphores( for name in semaphores_spec_: sema = Semaphore(name_to_posix_name(name, os.getpid())) sema.unlink() - logger.info(f"Cleaned up semaphores created by process {os.getpid()}.") + logger.info( + f"Cleaned up semaphores created by process {os.getpid()}.\n" + "(Please ignore the UserWarning about leaked semaphore objects.)" + ) except ExistentialError as e: raise RuntimeError( f"Trying to unlink semaphores created by process {os.getpid()} that do not exist." diff --git a/zetta_utils/mazepa_addons/configurations/execute_locally.py b/zetta_utils/mazepa_addons/configurations/execute_locally.py index 163ba0b59..ab537195b 100644 --- a/zetta_utils/mazepa_addons/configurations/execute_locally.py +++ b/zetta_utils/mazepa_addons/configurations/execute_locally.py @@ -30,6 +30,7 @@ def execute_locally( do_dryrun_estimation: bool = True, show_progress: bool = True, checkpoint: Optional[str] = None, + queues_dir: str | None = None, checkpoint_interval_sec: Optional[float] = 150, raise_on_failed_checkpoint: bool = True, num_procs: int = 1, @@ -37,6 +38,8 @@ def execute_locally( debug: bool = False, ): + queues_dir_ = queues_dir if queues_dir else "" + with ExitStack() as stack: logger.info( "Configuring for local execution: " @@ -49,8 +52,8 @@ def execute_locally( task_queue = None outcome_queue = None else: - task_queue_name = f"{os.getpid()}_task_queue" - outcome_queue_name = f"{os.getpid()}_outcome_queue" + task_queue_name = os.path.join(queues_dir_, f"{os.getpid()}_task_queue") + outcome_queue_name = os.path.join(queues_dir_, f"{os.getpid()}_outcome_queue") task_queue = stack.enter_context(FileQueue(task_queue_name)) outcome_queue = stack.enter_context(FileQueue(outcome_queue_name)) stack.enter_context( diff --git a/zetta_utils/mazepa_addons/configurations/execute_on_gcp_with_sqs.py b/zetta_utils/mazepa_addons/configurations/execute_on_gcp_with_sqs.py index a590383dd..4e78430c7 100644 --- a/zetta_utils/mazepa_addons/configurations/execute_on_gcp_with_sqs.py +++ b/zetta_utils/mazepa_addons/configurations/execute_on_gcp_with_sqs.py @@ -155,6 +155,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals show_progress: bool = True, do_dryrun_estimation: bool = True, local_test: bool = False, + local_test_queues_dir: str | None = None, debug: bool = False, checkpoint: Optional[str] = None, checkpoint_interval_sec: float = 300.0, @@ -177,6 +178,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals batch_gap_sleep_sec=batch_gap_sleep_sec, show_progress=show_progress, do_dryrun_estimation=do_dryrun_estimation, + queues_dir=local_test_queues_dir, checkpoint=checkpoint, checkpoint_interval_sec=checkpoint_interval_sec, raise_on_failed_checkpoint=raise_on_failed_checkpoint, @@ -188,6 +190,13 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals _ensure_required_env_vars() ctx_managers = copy.copy(list(extra_ctx_managers)) + if local_test_queues_dir: + logger.warning( + "`local_test_queues_dir` was given, but `local_test` is False. " + "The argument will be unused, and remote workers will use the " + "default locations for their local task and outcome queues." + ) + if worker_cluster_name is None: logger.info(f"Cluster info not provided, using default: {DEFAULT_GCP_CLUSTER}") worker_cluster = DEFAULT_GCP_CLUSTER diff --git a/zetta_utils/mazepa_addons/configurations/worker_pool.py b/zetta_utils/mazepa_addons/configurations/worker_pool.py index 85bf29a66..2b0911555 100644 --- a/zetta_utils/mazepa_addons/configurations/worker_pool.py +++ b/zetta_utils/mazepa_addons/configurations/worker_pool.py @@ -86,7 +86,7 @@ def setup_local_worker_pool( ) logger.info( f"Created {num_procs} local workers attached to queues " - f"`{task_queue_name}`/`{outcome_queue_name}`." + f"`{task_queue_name}` / `{outcome_queue_name}`." ) yield finally: @@ -94,7 +94,7 @@ def setup_local_worker_pool( pool.join() logger.info( f"Cleaned up {num_procs} local workers that were attached to queues " - f"`{task_queue_name}`/`{outcome_queue_name}`." + f"`{task_queue_name}` / `{outcome_queue_name}`." )