Skip to content

Commit

Permalink
Record overheads (#14)
Browse files Browse the repository at this point in the history
* Add OverheadHelper

* Decorate stage coroutines to record overhead

* Handle timestamp to UTC datetime in 3.10

* Move import of UTC after version check

* Update codecov config

* Add tests for OverheadHelper
  • Loading branch information
albireox authored Dec 5, 2023
1 parent 5a8cf9a commit 274165a
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 28 deletions.
15 changes: 4 additions & 11 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@ coverage:
status:
project:
default:
target: 90%
if_not_found: success
if_ci_failed: error
informational: true
only_pulls: true
target: auto # Require that we increase the coverage wrt the base commit
informational: false
patch:
default:
target: 90%
if_no_uploads: error
if_not_found: success
if_ci_failed: error
informational: true
only_pulls: true
target: 90% # Require to cover at least 90% of the new code
informational: false
12 changes: 6 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ sdsstools = "^1.0.0"
sdss-clu = "^2.0.0"
click-default-group = "^1.2.2"
numpy = "^1.22.1"
sdssdb = ">=0.6.0"
sdssdb = ">=0.8.2"
peewee = "^3.17.0"

[tool.poetry.group.dev.dependencies]
ipython = ">=8.0.0"
Expand Down
8 changes: 8 additions & 0 deletions src/hal/etc/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
"type": "array",
"items": { "type": "string" }
},
"stage_duration": {
"type": "array",
"items": [
{ "type": "string" },
{ "type": "string" },
{ "type": "number" }
]
},
"bypasses": {
"type": "array",
"items": { "type": "string" }
Expand Down
1 change: 1 addition & 0 deletions src/hal/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ async def _send_command(
from .ffs import *
from .jaeger import *
from .lamps import *
from .overhead import *
from .scripts import *
from .tcc import *
126 changes: 126 additions & 0 deletions src/hal/helpers/overhead.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2023-12-04
# @Filename: overhead.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import sys
import time
from dataclasses import dataclass
from datetime import datetime

from typing import TYPE_CHECKING


if TYPE_CHECKING:
from hal.macros.macro import Macro


__all__ = ["OverheadHelper"]


@dataclass
class OverheadHelper:
"""Collects and records stage overheads."""

macro: Macro
stage: str

def __post_init__(self) -> None:
self.elapsed: float | None = None

self.start_time: float | None = None
self.end_time: float | None = None

self.success: bool = False

async def __aenter__(self):
"""Starts the timer."""

self.elapsed = 0
self.start_time = time.time()

return self

async def __aexit__(self, exc_type, exc_value, traceback):
"""Stops the timer and records the overhead."""

if self.start_time is None:
raise ValueError("Timer not started")

self.end_time = time.time()
self.elapsed = round(time.time() - self.start_time, 2)

if exc_type is not None:
self.success = False
else:
self.success = True

await self.emit_keywords()
asyncio.get_running_loop().call_soon(self.update_database)

# __aexit__ will re-raise the exception if the return value is None/False.
return self.success

async def emit_keywords(self):
"""Emits the overhead as a keyword."""

command = self.macro.command
stage_full = f"{self.macro.name}.{self.stage}"

if self.elapsed is None:
command.warning(f"Overhead was not recorded for stage {stage_full}.")
return

command.info(stage_duration=[self.macro.name, self.stage, self.elapsed])

def _get_datetime(self, timestamp: float | None):
"""Converts a timestamp to a datetime object."""

if timestamp is None:
return None

if sys.version_info >= (3, 11):
from datetime import UTC

return datetime.fromtimestamp(timestamp, tz=UTC)

return datetime.utcfromtimestamp(timestamp)

def update_database(self):
"""Updates the database with the overhead."""

from sdssdb.peewee.sdss5db.opsdb import Overhead, database

command = self.macro.command

if not database.connected:
command.warning("Failed connecting to DB. Overhead cannot be recorded.")
return

with database.atomic():
try:
actor = self.macro.actor
configuration = actor.helpers.jaeger.configuration
cid = configuration.configuration_id if configuration else None

start_time_dt = self._get_datetime(self.start_time)
end_time_dt = self._get_datetime(self.end_time)

Overhead.insert(
configuration_id=cid,
macro=self.macro.name,
stage=self.stage,
start_time=start_time_dt,
end_time=end_time_dt,
elapsed=self.elapsed,
success=self.success,
).execute()

except Exception as err:
command.warning(f"Failed creating overhead record: {err}")
24 changes: 20 additions & 4 deletions src/hal/macros/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from collections import defaultdict
from contextlib import suppress

from typing import TYPE_CHECKING, Any, ClassVar, Coroutine, Optional, Union
from typing import TYPE_CHECKING, Any, Awaitable, ClassVar, Coroutine, Optional, Union

from clu import Command, CommandStatus

from hal import config
from hal.exceptions import HALUserWarning, MacroError
from hal.helpers.overhead import OverheadHelper


if TYPE_CHECKING:
Expand All @@ -32,6 +33,17 @@
StageType = Union[str, tuple[str, ...], list[str]]


def record_overhead(macro: Macro):
"""Runs a macro stage and records its overhead."""

async def record_overhead_wrapper(stage_coro: Awaitable[None]):
overhead_helper = OverheadHelper(macro, stage_coro.__name__)
async with overhead_helper:
await stage_coro

return record_overhead_wrapper


class StageStatus(enum.Flag):
"""Stage status codes."""

Expand Down Expand Up @@ -403,8 +415,12 @@ async def _do_run(self):
current_task: asyncio.Future | None = None

for istage, stage in enumerate(self.stages):
stage_coros = [asyncio.create_task(coro) for coro in self._get_coros(stage)]
current_task = asyncio.gather(*stage_coros)
wrapped_coros = [
asyncio.create_task(record_overhead(self)(coro))
for coro in self._get_coros(stage)
]

current_task = asyncio.gather(*wrapped_coros)

self.set_stage_status(stage, StageStatus.ACTIVE)

Expand Down Expand Up @@ -452,7 +468,7 @@ async def _do_run(self):

# Cancel stage tasks (in case we are running multiple concurrently).
with suppress(asyncio.CancelledError):
for task in stage_coros:
for task in wrapped_coros:
if not task.done():
task.cancel()

Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os

import pytest
from pytest_mock import MockerFixture

from clu import Command
from clu.testing import setup_test_actor
Expand All @@ -32,6 +33,14 @@ def mock_send_command(mocker):
Macro.send_command = mocker.AsyncMock(return_value=command)


@pytest.fixture(autouse=True)
def mock_overhead_table(mocker: MockerFixture):
import sdssdb.peewee.sdss5db.opsdb

mocker.patch.object(sdssdb.peewee.sdss5db.opsdb, "Overhead", autospec=True)
mocker.patch.object(sdssdb.peewee.sdss5db.opsdb, "database", autospec=True)


@pytest.fixture
async def actor():
config = read_yaml_file(config_path)
Expand Down
10 changes: 4 additions & 6 deletions tests/test_macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@

from typing import TYPE_CHECKING

import pytest

from hal.exceptions import MacroError


if TYPE_CHECKING:
from hal.actor import HALCommandType
from hal.macros import Macro

pytestmark = [pytest.mark.asyncio]


async def test_macro(actor, macro: Macro):
async def test_macro(actor, macro: Macro, command: HALCommandType):
await macro.run()

assert len(actor.mock_replies) == 10
assert len(actor.mock_replies) == 13
assert command.replies.get("stage_duration") == ["macro_test", "stage1", 0.0]


async def test_macro_stage_fails(actor, macro: Macro, mocker):
Expand Down
Loading

0 comments on commit 274165a

Please sign in to comment.