diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 505e106ba..50078be37 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -583,7 +583,7 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) - ) async def _consume_cached_jobs_of_workspace( - self, workspace_key: str, mets_server_url: str + self, workspace_key: str, mets_server_url: str, path_to_mets: str ) -> List[PYJobInput]: # Check whether the internal queue for the workspace key still exists @@ -602,7 +602,8 @@ async def _consume_cached_jobs_of_workspace( # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url) + self.deployer.stop_uds_mets_server( + mets_server_url=mets_server_url, path_to_mets=path_to_mets, stop_with_pid=True) try: # The queue is empty - delete it @@ -652,7 +653,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage): raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( - workspace_key=workspace_key, mets_server_url=mets_server_url + workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets ) await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 90f7c6d5c..f60194ce4 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -8,7 +8,6 @@ """ from __future__ import annotations from pathlib import Path -from subprocess import Popen, run as subprocess_run from time import sleep from typing import Dict, List, Union @@ -30,6 +29,8 @@ def __init__(self, config_path: str) -> None: self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"]) self.internal_callback_url = ps_config.get("internal_callback_url", None) self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"} + # This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere + self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"} self.use_tcp_mets = ps_config.get("use_tcp_mets", False) # TODO: Reconsider this. @@ -153,26 +154,33 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) - self.mets_servers[mets_server_url] = pid + self.mets_servers[str(mets_server_url)] = pid + self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url - def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None: + def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_with_pid: bool = False) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") + self.log.info(f"Path to the mets file: {path_to_mets}") + self.log.info(f"mets_server: {self.mets_servers}") + self.log.info(f"mets_server_paths: {self.mets_servers_paths}") if stop_with_pid: - if Path(mets_server_url) not in self.mets_servers: - message = f"UDS Mets server not found at URL: {mets_server_url}" - self.log.exception(message) - raise Exception(message) - mets_server_pid = self.mets_servers[Path(mets_server_url)] + mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)] + if Path(mets_server_url_uds) not in self.mets_servers: + message = f"UDS Mets server not found at URL: {mets_server_url_uds}, mets path: {path_to_mets}" + self.log.warning(message) + mets_server_pid = self.mets_servers[str(mets_server_url_uds)] + self.log.info(f"Killing mets server pid: {mets_server_pid} of {mets_server_url_uds}") OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) - if Path(mets_server_url).exists(): - self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}") - Path(mets_server_url).unlink() + self.log.info(f"Returning after the kill process") + if Path(mets_server_url_uds).exists(): + self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url_uds}") + Path(mets_server_url_uds).unlink() + self.log.info(f"Returning from the stop_uds_mets_server") return # TODO: Reconsider this again # Not having this sleep here causes connection errors # on the last request processed by the processing worker. # Sometimes 3 seconds is enough, sometimes not. sleep(5) - stop_mets_server(mets_server_url=mets_server_url) + stop_mets_server(mets_server_url=mets_server_url, ws_dir_path=Path(path_to_mets).parent) return diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index a2f563de4..13bbee7db 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -151,7 +151,7 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(mets_server_url: str, ws_dir_path: Path = None) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" session = Session_TCP() if protocol == "tcp" else Session_UDS() if protocol == "uds": @@ -160,7 +160,7 @@ def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: if 'tcp_mets' in mets_server_url: if not ws_dir_path: return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) + response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(str(ws_dir_path))) else: response = session.delete(url=f"{mets_server_url}/") except Exception: