Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Oct 4, 2024
1 parent c5fd843 commit 569edff
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
7 changes: 4 additions & 3 deletions src/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -
)

async def _consume_cached_jobs_of_workspace(
self, workspace_key: str, mets_server_url: str
self, workspace_key: str, mets_server_url: str, path_to_mets: str
) -> List[PYJobInput]:

# Check whether the internal queue for the workspace key still exists
Expand All @@ -602,7 +602,8 @@ async def _consume_cached_jobs_of_workspace(
# more internal callbacks are expected for that workspace
self.log.debug(f"Stopping the mets server: {mets_server_url}")

self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url)
self.deployer.stop_uds_mets_server(
mets_server_url=mets_server_url, path_to_mets=path_to_mets, stop_with_pid=True)

try:
# The queue is empty - delete it
Expand Down Expand Up @@ -652,7 +653,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage):
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)

consumed_cached_jobs = await self._consume_cached_jobs_of_workspace(
workspace_key=workspace_key, mets_server_url=mets_server_url
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets
)
await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs)

Expand Down
32 changes: 20 additions & 12 deletions src/ocrd_network/runtime_data/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"""
from __future__ import annotations
from pathlib import Path
from subprocess import Popen, run as subprocess_run
from time import sleep
from typing import Dict, List, Union

Expand All @@ -30,6 +29,8 @@ def __init__(self, config_path: str) -> None:
self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"])
self.internal_callback_url = ps_config.get("internal_callback_url", None)
self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"}
# This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere
self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"}
self.use_tcp_mets = ps_config.get("use_tcp_mets", False)

# TODO: Reconsider this.
Expand Down Expand Up @@ -153,26 +154,33 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path:
Path(mets_server_url).unlink()
self.log.info(f"Starting UDS mets server: {mets_server_url}")
pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file)
self.mets_servers[mets_server_url] = pid
self.mets_servers[str(mets_server_url)] = pid
self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url)
return mets_server_url

def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None:
def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_with_pid: bool = False) -> None:
self.log.info(f"Stopping UDS mets server: {mets_server_url}")
self.log.info(f"Path to the mets file: {path_to_mets}")
self.log.info(f"mets_server: {self.mets_servers}")
self.log.info(f"mets_server_paths: {self.mets_servers_paths}")
if stop_with_pid:
if Path(mets_server_url) not in self.mets_servers:
message = f"UDS Mets server not found at URL: {mets_server_url}"
self.log.exception(message)
raise Exception(message)
mets_server_pid = self.mets_servers[Path(mets_server_url)]
mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)]
if Path(mets_server_url_uds) not in self.mets_servers:
message = f"UDS Mets server not found at URL: {mets_server_url_uds}, mets path: {path_to_mets}"
self.log.warning(message)
mets_server_pid = self.mets_servers[str(mets_server_url_uds)]
self.log.info(f"Killing mets server pid: {mets_server_pid} of {mets_server_url_uds}")
OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid)
if Path(mets_server_url).exists():
self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}")
Path(mets_server_url).unlink()
self.log.info(f"Returning after the kill process")
if Path(mets_server_url_uds).exists():
self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url_uds}")
Path(mets_server_url_uds).unlink()
self.log.info(f"Returning from the stop_uds_mets_server")
return
# TODO: Reconsider this again
# Not having this sleep here causes connection errors
# on the last request processed by the processing worker.
# Sometimes 3 seconds is enough, sometimes not.
sleep(5)
stop_mets_server(mets_server_url=mets_server_url)
stop_mets_server(mets_server_url=mets_server_url, ws_dir_path=Path(path_to_mets).parent)
return
4 changes: 2 additions & 2 deletions src/ocrd_network/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo
return False


def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool:
def stop_mets_server(mets_server_url: str, ws_dir_path: Path = None) -> bool:
protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds"
session = Session_TCP() if protocol == "tcp" else Session_UDS()
if protocol == "uds":
Expand All @@ -160,7 +160,7 @@ def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool:
if 'tcp_mets' in mets_server_url:
if not ws_dir_path:
return False
response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path))
response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(str(ws_dir_path)))
else:
response = session.delete(url=f"{mets_server_url}/")
except Exception:
Expand Down

0 comments on commit 569edff

Please sign in to comment.