From 274165ae8e5f561dd886c1ce060d728efdccd605 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Tue, 5 Dec 2023 09:18:06 -0800 Subject: [PATCH] Record overheads (#14) * Add OverheadHelper * Decorate stage coroutines to record overhead * Handle timestamp to UTC datetime in 3.10 * Move import of UTC after version check * Update codecov config * Add tests for OverheadHelper --- codecov.yml | 15 ++-- poetry.lock | 12 ++-- pyproject.toml | 3 +- src/hal/etc/schema.json | 8 +++ src/hal/helpers/__init__.py | 1 + src/hal/helpers/overhead.py | 126 ++++++++++++++++++++++++++++++++++ src/hal/macros/macro.py | 24 +++++-- tests/conftest.py | 9 +++ tests/test_macro.py | 10 ++- tests/test_overhead_helper.py | 114 ++++++++++++++++++++++++++++++ 10 files changed, 294 insertions(+), 28 deletions(-) create mode 100644 src/hal/helpers/overhead.py create mode 100644 tests/test_overhead_helper.py diff --git a/codecov.yml b/codecov.yml index dd223fc..8d87fc9 100644 --- a/codecov.yml +++ b/codecov.yml @@ -5,16 +5,9 @@ coverage: status: project: default: - target: 90% - if_not_found: success - if_ci_failed: error - informational: true - only_pulls: true + target: auto # Require that we increase the coverage wrt the base commit + informational: false patch: default: - target: 90% - if_no_uploads: error - if_not_found: success - if_ci_failed: error - informational: true - only_pulls: true + target: 90% # Require to cover at least 90% of the new code + informational: false diff --git a/poetry.lock b/poetry.lock index 1d94fad..85060e0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2498,13 +2498,13 @@ websocket = ["websockets (>=11.0.3,<12.0.0)"] [[package]] name = "sdssdb" -version = "0.8.0" +version = "0.8.2" description = "SDSS product for database management" optional = false python-versions = ">=3.6" files = [ - {file = "sdssdb-0.8.0-py3-none-any.whl", hash = "sha256:500d660e5c5b2657b970e89d9d769d77a61c20a43feaa3be85f035d615edad56"}, - {file = "sdssdb-0.8.0.tar.gz", hash = "sha256:d85b2d7bb09a7005cb4d5bd53cef3495c8fa77c7012fba0e7e0504aaf734b0a7"}, + {file = "sdssdb-0.8.2-py3-none-any.whl", hash = "sha256:8f11967245bc946d68bd7c2d02d271dda0a3d49098881fb7915df52efe9bdd3c"}, + {file = "sdssdb-0.8.2.tar.gz", hash = "sha256:d0d26fbbfe96c37e76b87cae4d676574ba6943f9202ed755da7f05b5f5da67aa"}, ] [package.dependencies] @@ -2515,14 +2515,14 @@ pgpasslib = ">=1.1.0" psycopg2-binary = ">=2.7.7" pygments = "*" pyyaml = ">=5.1" -sdsstools = ">=0.1.11" +sdsstools = ">=0.1.0" six = ">=1.12.0" sqlalchemy = ">=1.3.6" [package.extras] all = ["astropy (>=4.0.0)", "inflect (>=4.1.0)", "pandas (>=1.0.0)", "progressbar2 (>=3.46.1)", "pydot (>=1.4.1)"] dev = ["astropy (>=4.0.0)", "factory-boy (>=2.12.0)", "ipdb (>=0.13.2)", "ipython (>=7.13.0)", "pydot (>=1.4.2)", "pytest (>=5.2)", "pytest-cov (>=2.4.0)", "pytest-factoryboy (>=2.0.3)", "pytest-postgresql (>=2.2.1)", "pytest-sugar (>=0.8.0)"] -docs = ["Sphinx (>=3.0.0,<4.0.0)", "jinja2 (==3.0.0)", "sphinx-bootstrap-theme (>=0.4.12)"] +docs = ["Sphinx (>=7.0.0)", "releases (>=2.0.0)", "sphinx-bootstrap-theme (>=0.4.12)"] [[package]] name = "sdsstools" @@ -3212,4 +3212,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10,<4.0" -content-hash = "c86068bae483f6e8460ca5d0e0390ac1839d738d343568069798144c81fb254d" +content-hash = "b4b1dd52ca37f58eb7b4795f5d6ff425b95446e1bd92a04291a58002397826ea" diff --git a/pyproject.toml b/pyproject.toml index 4d03020..606913d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ sdsstools = "^1.0.0" sdss-clu = "^2.0.0" click-default-group = "^1.2.2" numpy = "^1.22.1" -sdssdb = ">=0.6.0" +sdssdb = ">=0.8.2" +peewee = "^3.17.0" [tool.poetry.group.dev.dependencies] ipython = ">=8.0.0" diff --git a/src/hal/etc/schema.json b/src/hal/etc/schema.json index bd80e68..5d4a866 100644 --- a/src/hal/etc/schema.json +++ b/src/hal/etc/schema.json @@ -43,6 +43,14 @@ "type": "array", "items": { "type": "string" } }, + "stage_duration": { + "type": "array", + "items": [ + { "type": "string" }, + { "type": "string" }, + { "type": "number" } + ] + }, "bypasses": { "type": "array", "items": { "type": "string" } diff --git a/src/hal/helpers/__init__.py b/src/hal/helpers/__init__.py index f3b63a1..508636d 100644 --- a/src/hal/helpers/__init__.py +++ b/src/hal/helpers/__init__.py @@ -65,5 +65,6 @@ async def _send_command( from .ffs import * from .jaeger import * from .lamps import * +from .overhead import * from .scripts import * from .tcc import * diff --git a/src/hal/helpers/overhead.py b/src/hal/helpers/overhead.py new file mode 100644 index 0000000..919c87d --- /dev/null +++ b/src/hal/helpers/overhead.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2023-12-04 +# @Filename: overhead.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import asyncio +import sys +import time +from dataclasses import dataclass +from datetime import datetime + +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from hal.macros.macro import Macro + + +__all__ = ["OverheadHelper"] + + +@dataclass +class OverheadHelper: + """Collects and records stage overheads.""" + + macro: Macro + stage: str + + def __post_init__(self) -> None: + self.elapsed: float | None = None + + self.start_time: float | None = None + self.end_time: float | None = None + + self.success: bool = False + + async def __aenter__(self): + """Starts the timer.""" + + self.elapsed = 0 + self.start_time = time.time() + + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + """Stops the timer and records the overhead.""" + + if self.start_time is None: + raise ValueError("Timer not started") + + self.end_time = time.time() + self.elapsed = round(time.time() - self.start_time, 2) + + if exc_type is not None: + self.success = False + else: + self.success = True + + await self.emit_keywords() + asyncio.get_running_loop().call_soon(self.update_database) + + # __aexit__ will re-raise the exception if the return value is None/False. + return self.success + + async def emit_keywords(self): + """Emits the overhead as a keyword.""" + + command = self.macro.command + stage_full = f"{self.macro.name}.{self.stage}" + + if self.elapsed is None: + command.warning(f"Overhead was not recorded for stage {stage_full}.") + return + + command.info(stage_duration=[self.macro.name, self.stage, self.elapsed]) + + def _get_datetime(self, timestamp: float | None): + """Converts a timestamp to a datetime object.""" + + if timestamp is None: + return None + + if sys.version_info >= (3, 11): + from datetime import UTC + + return datetime.fromtimestamp(timestamp, tz=UTC) + + return datetime.utcfromtimestamp(timestamp) + + def update_database(self): + """Updates the database with the overhead.""" + + from sdssdb.peewee.sdss5db.opsdb import Overhead, database + + command = self.macro.command + + if not database.connected: + command.warning("Failed connecting to DB. Overhead cannot be recorded.") + return + + with database.atomic(): + try: + actor = self.macro.actor + configuration = actor.helpers.jaeger.configuration + cid = configuration.configuration_id if configuration else None + + start_time_dt = self._get_datetime(self.start_time) + end_time_dt = self._get_datetime(self.end_time) + + Overhead.insert( + configuration_id=cid, + macro=self.macro.name, + stage=self.stage, + start_time=start_time_dt, + end_time=end_time_dt, + elapsed=self.elapsed, + success=self.success, + ).execute() + + except Exception as err: + command.warning(f"Failed creating overhead record: {err}") diff --git a/src/hal/macros/macro.py b/src/hal/macros/macro.py index 2199f3e..e1dd65a 100644 --- a/src/hal/macros/macro.py +++ b/src/hal/macros/macro.py @@ -14,12 +14,13 @@ from collections import defaultdict from contextlib import suppress -from typing import TYPE_CHECKING, Any, ClassVar, Coroutine, Optional, Union +from typing import TYPE_CHECKING, Any, Awaitable, ClassVar, Coroutine, Optional, Union from clu import Command, CommandStatus from hal import config from hal.exceptions import HALUserWarning, MacroError +from hal.helpers.overhead import OverheadHelper if TYPE_CHECKING: @@ -32,6 +33,17 @@ StageType = Union[str, tuple[str, ...], list[str]] +def record_overhead(macro: Macro): + """Runs a macro stage and records its overhead.""" + + async def record_overhead_wrapper(stage_coro: Awaitable[None]): + overhead_helper = OverheadHelper(macro, stage_coro.__name__) + async with overhead_helper: + await stage_coro + + return record_overhead_wrapper + + class StageStatus(enum.Flag): """Stage status codes.""" @@ -403,8 +415,12 @@ async def _do_run(self): current_task: asyncio.Future | None = None for istage, stage in enumerate(self.stages): - stage_coros = [asyncio.create_task(coro) for coro in self._get_coros(stage)] - current_task = asyncio.gather(*stage_coros) + wrapped_coros = [ + asyncio.create_task(record_overhead(self)(coro)) + for coro in self._get_coros(stage) + ] + + current_task = asyncio.gather(*wrapped_coros) self.set_stage_status(stage, StageStatus.ACTIVE) @@ -452,7 +468,7 @@ async def _do_run(self): # Cancel stage tasks (in case we are running multiple concurrently). with suppress(asyncio.CancelledError): - for task in stage_coros: + for task in wrapped_coros: if not task.done(): task.cancel() diff --git a/tests/conftest.py b/tests/conftest.py index bde2279..554ea3f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import os import pytest +from pytest_mock import MockerFixture from clu import Command from clu.testing import setup_test_actor @@ -32,6 +33,14 @@ def mock_send_command(mocker): Macro.send_command = mocker.AsyncMock(return_value=command) +@pytest.fixture(autouse=True) +def mock_overhead_table(mocker: MockerFixture): + import sdssdb.peewee.sdss5db.opsdb + + mocker.patch.object(sdssdb.peewee.sdss5db.opsdb, "Overhead", autospec=True) + mocker.patch.object(sdssdb.peewee.sdss5db.opsdb, "database", autospec=True) + + @pytest.fixture async def actor(): config = read_yaml_file(config_path) diff --git a/tests/test_macro.py b/tests/test_macro.py index c6992aa..307d7e8 100644 --- a/tests/test_macro.py +++ b/tests/test_macro.py @@ -10,21 +10,19 @@ from typing import TYPE_CHECKING -import pytest - from hal.exceptions import MacroError if TYPE_CHECKING: + from hal.actor import HALCommandType from hal.macros import Macro -pytestmark = [pytest.mark.asyncio] - -async def test_macro(actor, macro: Macro): +async def test_macro(actor, macro: Macro, command: HALCommandType): await macro.run() - assert len(actor.mock_replies) == 10 + assert len(actor.mock_replies) == 13 + assert command.replies.get("stage_duration") == ["macro_test", "stage1", 0.0] async def test_macro_stage_fails(actor, macro: Macro, mocker): diff --git a/tests/test_overhead_helper.py b/tests/test_overhead_helper.py new file mode 100644 index 0000000..6f49bd1 --- /dev/null +++ b/tests/test_overhead_helper.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2023-12-05 +# @Filename: test_overhead_helper.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import asyncio +import time + +from typing import TYPE_CHECKING + +import pytest +import sdssdb.peewee.sdss5db.opsdb +from pytest_mock import MockerFixture + +from hal.helpers.overhead import OverheadHelper + + +if TYPE_CHECKING: + from hal.actor import HALCommandType + from hal.macros.macro import Macro + + +@pytest.fixture() +def overhead_helper(macro: Macro): + """Returns an ``OverheadHelper`` instance.""" + + stage = "stage1" + + yield OverheadHelper(macro, stage) + + +async def test_overhead_helper( + overhead_helper: OverheadHelper, + command: HALCommandType, +): + assert isinstance(overhead_helper, OverheadHelper) + + assert overhead_helper.macro is not None + assert overhead_helper.elapsed is None + + async with overhead_helper: + await asyncio.sleep(0.5) + + assert overhead_helper.elapsed is not None and overhead_helper.elapsed > 0.4 + assert overhead_helper.start_time is not None + assert overhead_helper.end_time is not None + assert overhead_helper.success is True + + assert len(command.replies) == 3 + assert command.replies[-1].message["stage_duration"][0] == "macro_test" + + +async def test_overhead_helper_timer_not_started( + overhead_helper: OverheadHelper, + mocker: MockerFixture, +): + mocker.patch.object(time, "time", return_value=None) + + with pytest.raises(ValueError): + async with overhead_helper: + await asyncio.sleep(0.1) + + +async def test_overhead_helper_inner_fails(overhead_helper: OverheadHelper): + with pytest.raises(RuntimeError): + async with overhead_helper: + await asyncio.sleep(0.1) + raise RuntimeError("test error") + + assert overhead_helper.success is False + + +async def test_overhead_helper_emit_elapsed_none(overhead_helper: OverheadHelper): + overhead_helper.elapsed = None + await overhead_helper.emit_keywords() + + command = overhead_helper.macro.command + + assert command.replies[-1].message_code == "w" + assert "Overhead was not recorded" in command.replies[-1].message["text"] + + +async def test_overhead_helper_update_database_connect_fails( + overhead_helper: OverheadHelper, + mocker: MockerFixture, +): + db = mocker.patch.object(sdssdb.peewee.sdss5db.opsdb, "database") + type(db).connected = mocker.PropertyMock(return_value=False) + + assert overhead_helper.update_database() is None + + command = overhead_helper.macro.command + assert "Failed connecting to DB" in command.replies[-1].message["text"] + + +async def test_overhead_helper_update_database_indert_fails( + overhead_helper: OverheadHelper, + mocker: MockerFixture, +): + mocker.patch.object( + sdssdb.peewee.sdss5db.opsdb.Overhead, + "insert", + side_effect=RuntimeError, + ) + + assert overhead_helper.update_database() is None + + command = overhead_helper.macro.command + assert "Failed creating overhead record" in command.replies[-1].message["text"]