From 503f760e06c6bb0a8bcaf6425d3a51c29c142953 Mon Sep 17 00:00:00 2001 From: Leonid Vinogradov Date: Thu, 30 May 2024 12:18:55 +0300 Subject: [PATCH] HH-217736 restart whole app on worker oom --- frontik/integrations/telemetry.py | 2 +- frontik/process.py | 14 ++++++++++++-- pyproject.toml | 2 +- tests/test_telemetry.py | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/frontik/integrations/telemetry.py b/frontik/integrations/telemetry.py index 447d6b3c2..e5165414e 100644 --- a/frontik/integrations/telemetry.py +++ b/frontik/integrations/telemetry.py @@ -76,7 +76,7 @@ def initialize_app(self, app: FrontikApplication) -> Optional[Future]: resource = Resource( attributes={ - ResourceAttributes.SERVICE_NAME: options.app, # type: ignore + ResourceAttributes.SERVICE_NAME: app.app_name, ResourceAttributes.SERVICE_VERSION: app.application_version(), # type: ignore ResourceAttributes.HOST_NAME: options.node_name, ResourceAttributes.CLOUD_REGION: http_client_options.datacenter, diff --git a/frontik/process.py b/frontik/process.py index b7d76199e..4dd4db3fc 100644 --- a/frontik/process.py +++ b/frontik/process.py @@ -90,10 +90,12 @@ def master_sigterm_handler(signum, _frame): time.sleep(0.1) _master_function_wrapper(worker_state, master_function) worker_state.master_done.value = True - _supervise_workers(worker_state, worker_function_wrapped) + _supervise_workers(worker_state, worker_function_wrapped, master_before_shutdown_action) -def _supervise_workers(worker_state: WorkerState, worker_function: Callable) -> None: +def _supervise_workers( + worker_state: WorkerState, worker_function: Callable, master_before_shutdown_action: Callable +) -> None: while worker_state.children: try: pid, status = os.wait() @@ -114,6 +116,14 @@ def _supervise_workers(worker_state: WorkerState, worker_function: Callable) -> if os.WIFSIGNALED(status): log.warning('child %d (pid %d) killed by signal %d, restarting', worker_id, pid, os.WTERMSIG(status)) + + # TODO remove this block # noqa + worker_state.terminating = True + master_before_shutdown_action() + for pid, worker_id in worker_state.children.items(): + log.info('sending %s to child %d (pid %d)', signal.Signals(os.WTERMSIG(status)).name, worker_id, pid) + os.kill(pid, signal.SIGTERM) + elif os.WEXITSTATUS(status) != 0: log.warning('child %d (pid %d) exited with status %d, restarting', worker_id, pid, os.WEXITSTATUS(status)) else: diff --git a/pyproject.toml b/pyproject.toml index 02bf7d3cc..e2b3542d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ ignore = [ # should be ignored 'ANN101','ANN102','D102','D101','CPY001','D100','D107','D106','B008','D103','D104','D105','D202', 'RET505','RET506','RET504','RSE102','TCH003','TCH002', - 'COM812', 'ISC001', 'PT015', + 'COM812', 'ISC001', 'PT015', 'FIX002', ] [tool.ruff.lint.isort] diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index c60539721..4583564e7 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -142,7 +142,7 @@ def frontik_app(self) -> FrontikApplication: return app async def test_parent_span(self, frontik_app: FrontikApplication) -> None: - await self.fetch_json('/page_a') + await self.fetch('/page_a') BATCH_SPAN_PROCESSOR[0].force_flush() assert len(SPAN_STORAGE) == 4 client_a_span = find_span('http.request.cloud.region', 'externalRequest')