Skip to content

Commit

Permalink
HH-228324 fix failfast error
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Sep 2, 2024
1 parent e4a3aea commit 34fd6d4
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions frontik/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from asyncio.futures import Future
from functools import partial, wraps
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, Union, overload
from typing import TYPE_CHECKING, Any, NoReturn, Optional, Type, TypeVar, Union, overload

import tornado.web
from fastapi import Depends, Request
Expand Down Expand Up @@ -410,12 +410,11 @@ async def execute(self) -> tuple[int, str, HTTPHeaders, bytes]:
if self._prepared_future is not None and not self._prepared_future.done():
self._prepared_future.set_result(None)

try:
return await asyncio.wait_for(self.handler_result_future, timeout=5.0)
except TimeoutError:
done, pending = await asyncio.wait((self.handler_result_future,), timeout=5.0)
if not done:
self.log.error('handler was never finished')
self.send_error()
return self.handler_result_future.result()
return self.handler_result_future.result()

async def get(self, *args, **kwargs):
await self._execute_page()
Expand Down Expand Up @@ -510,6 +509,7 @@ def _cb(future: Future) -> None:
self.finish(future.result())

asyncio.create_task(self._postprocess()).add_done_callback(_cb)
raise FinishSignal()

def run_task(self: PageHandler, coro: Coroutine) -> Task:
task = asyncio.create_task(coro)
Expand Down Expand Up @@ -556,11 +556,17 @@ def _handle_request_exception(self, e: BaseException) -> None:
return

if isinstance(e, FinishWithPostprocessors):
if e.wait_finish_group:
self._handler_finished_notification()
self.add_future(self.finish_group.get_finish_future(), lambda _: self.finish_with_postprocessors())
else:
self.finish_with_postprocessors()
try:
if e.wait_finish_group:
self._handler_finished_notification()
self.add_future(self.finish_group.get_finish_future(), lambda _: self.finish_with_postprocessors())
else:
self.finish_with_postprocessors()
except FinishSignal:
return
except Exception as exc:
super()._handle_request_exception(exc)

return

if self._finished and not isinstance(e, Finish):
Expand Down Expand Up @@ -593,9 +599,11 @@ def _handle_request_exception(self, e: BaseException) -> None:
method = getattr(self, error_method_name, None)
if callable(method):
method(e.failed_result)
self.finish()
else:
self.__return_error(e.failed_result.status_code, error_info={'is_fail_fast': True})

except FinishSignal:
return
except Exception as exc:
super()._handle_request_exception(exc)

Expand Down Expand Up @@ -627,10 +635,9 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None:

try:
self.write_error(status_code, **kwargs)
except Exception as exc:
if isinstance(exc, FinishSignal):
return

except FinishSignal:
return
except Exception:
self.log.exception('Uncaught exception in write_error')
if not self._finished:
self.finish()
Expand All @@ -648,7 +655,7 @@ def write_error(self, status_code: int = 500, **kwargs: Any) -> None:
self.set_header('Content-Type', media_types.TEXT_HTML)
super().write_error(status_code, **kwargs)

def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> Future[None]:
def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> NoReturn:
self.stages_logger.commit_stage('postprocess')
for name, value in self._mandatory_headers.items():
self.set_header(name, value)
Expand Down

0 comments on commit 34fd6d4

Please sign in to comment.