Skip to content

Commit

Permalink
feat: specify task queue location (#820)
Browse files Browse the repository at this point in the history
* chore: improved log messages
* feat: specify local queue directory
  • Loading branch information
dodamih authored Oct 30, 2024
1 parent 0dca420 commit b375bac
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
5 changes: 4 additions & 1 deletion zetta_utils/mazepa/semaphores.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ def configure_semaphores(
for name in semaphores_spec_:
sema = Semaphore(name_to_posix_name(name, os.getpid()))
sema.unlink()
logger.info(f"Cleaned up semaphores created by process {os.getpid()}.")
logger.info(
f"Cleaned up semaphores created by process {os.getpid()}.\n"
"(Please ignore the UserWarning about leaked semaphore objects.)"
)
except ExistentialError as e:
raise RuntimeError(
f"Trying to unlink semaphores created by process {os.getpid()} that do not exist."
Expand Down
7 changes: 5 additions & 2 deletions zetta_utils/mazepa_addons/configurations/execute_locally.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ def execute_locally(
do_dryrun_estimation: bool = True,
show_progress: bool = True,
checkpoint: Optional[str] = None,
queues_dir: str | None = None,
checkpoint_interval_sec: Optional[float] = 150,
raise_on_failed_checkpoint: bool = True,
num_procs: int = 1,
semaphores_spec: dict[SemaphoreType, int] | None = None,
debug: bool = False,
):

queues_dir_ = queues_dir if queues_dir else ""

with ExitStack() as stack:
logger.info(
"Configuring for local execution: "
Expand All @@ -49,8 +52,8 @@ def execute_locally(
task_queue = None
outcome_queue = None
else:
task_queue_name = f"{os.getpid()}_task_queue"
outcome_queue_name = f"{os.getpid()}_outcome_queue"
task_queue_name = os.path.join(queues_dir_, f"{os.getpid()}_task_queue")
outcome_queue_name = os.path.join(queues_dir_, f"{os.getpid()}_outcome_queue")
task_queue = stack.enter_context(FileQueue(task_queue_name))
outcome_queue = stack.enter_context(FileQueue(outcome_queue_name))
stack.enter_context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals
show_progress: bool = True,
do_dryrun_estimation: bool = True,
local_test: bool = False,
local_test_queues_dir: str | None = None,
debug: bool = False,
checkpoint: Optional[str] = None,
checkpoint_interval_sec: float = 300.0,
Expand All @@ -177,6 +178,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals
batch_gap_sleep_sec=batch_gap_sleep_sec,
show_progress=show_progress,
do_dryrun_estimation=do_dryrun_estimation,
queues_dir=local_test_queues_dir,
checkpoint=checkpoint,
checkpoint_interval_sec=checkpoint_interval_sec,
raise_on_failed_checkpoint=raise_on_failed_checkpoint,
Expand All @@ -188,6 +190,13 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals
_ensure_required_env_vars()
ctx_managers = copy.copy(list(extra_ctx_managers))

if local_test_queues_dir:
logger.warning(
"`local_test_queues_dir` was given, but `local_test` is False. "
"The argument will be unused, and remote workers will use the "
"default locations for their local task and outcome queues."
)

if worker_cluster_name is None:
logger.info(f"Cluster info not provided, using default: {DEFAULT_GCP_CLUSTER}")
worker_cluster = DEFAULT_GCP_CLUSTER
Expand Down
4 changes: 2 additions & 2 deletions zetta_utils/mazepa_addons/configurations/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ def setup_local_worker_pool(
)
logger.info(
f"Created {num_procs} local workers attached to queues "
f"`{task_queue_name}`/`{outcome_queue_name}`."
f"`{task_queue_name}` / `{outcome_queue_name}`."
)
yield
finally:
pool.stop()
pool.join()
logger.info(
f"Cleaned up {num_procs} local workers that were attached to queues "
f"`{task_queue_name}`/`{outcome_queue_name}`."
f"`{task_queue_name}` / `{outcome_queue_name}`."
)


Expand Down

0 comments on commit b375bac

Please sign in to comment.