From 5ff21f84e2fa32f727d4d99b5302053745ccd5d9 Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Mon, 2 Dec 2024 08:11:46 -0500 Subject: [PATCH] introduce a native 'cache', eventually we will replace diskcache --- CHANGELOG.md | 3 ++ pioreactor/background_jobs/base.py | 8 +-- pioreactor/background_jobs/stirring.py | 7 ++- pioreactor/tests/test_stirring.py | 2 +- pioreactor/tests/test_utils.py | 3 +- pioreactor/utils/__init__.py | 72 +++++++++++++++++++++++--- pioreactor/whoami.py | 6 +++ 7 files changed, 85 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd954c36..57bde473 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,9 @@ #### Bug fixes - Fixed "circulate X" actions in the Manage All dialog in the UI. +#### Breaking changes + - moved all the temporary caches, which previously where their own sqlite3 db in /tmp/ to /tmp/local_intermittent_pioreactor_metadata.sqlite. This shouldn't break anything unless you update _during_ an experiment - don't do that! + ### 24.10.29 #### Enhancements diff --git a/pioreactor/background_jobs/base.py b/pioreactor/background_jobs/base.py index 5a134ea0..b5820c4f 100644 --- a/pioreactor/background_jobs/base.py +++ b/pioreactor/background_jobs/base.py @@ -1124,7 +1124,7 @@ def _setup_timer(self) -> None: if pre_delay < 0.25: self.logger.warning("For optimal OD readings, keep `pre_delay_duration` more than 0.25 seconds.") - def sneak_in(ads_interval, post_delay, pre_delay) -> None: + def sneak_in(ads_interval: float, post_delay: float, pre_delay: float) -> None: if self.state != self.READY: return @@ -1149,9 +1149,9 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None: # in the same experiment, the od_reading fails catastrophically so that the settings are never # cleared. Later, this job starts, and it will pick up the _old_ settings. with JobManager() as jm: - ads_interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5) - ads_start_time = jm.get_setting_from_running_job( - "od_reading", "first_od_obs_time", timeout=5 + ads_interval = float(jm.get_setting_from_running_job("od_reading", "interval", timeout=5)) + ads_start_time = float( + jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5) ) # this is populated later in the OD job... # get interval, and confirm that the requirements are possible: post_delay + pre_delay <= ADS interval - (od reading duration) diff --git a/pioreactor/background_jobs/stirring.py b/pioreactor/background_jobs/stirring.py index 0c688f95..687b583d 100644 --- a/pioreactor/background_jobs/stirring.py +++ b/pioreactor/background_jobs/stirring.py @@ -201,6 +201,7 @@ class Stirrer(BackgroundJobWithDodging): "measured_rpm": {"datatype": "MeasuredRPM", "settable": False, "unit": "RPM"}, "duty_cycle": {"datatype": "float", "settable": True, "unit": "%"}, } + # the _estimate_duty_cycle parameter is like the unrealized DC, and the duty_cycle is the realized DC. _estimate_duty_cycle: float = config.getfloat("stirring.config", "initial_duty_cycle", fallback=30) duty_cycle: float = 0 _measured_rpm: Optional[float] = None @@ -370,8 +371,10 @@ def kick_stirring_but_avoid_od_reading(self) -> None: wait until it completes before kicking stirring. """ with JobManager() as jm: - interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5) - first_od_obs_time = jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5) + interval = float(jm.get_setting_from_running_job("od_reading", "interval", timeout=5)) + first_od_obs_time = float( + jm.get_setting_from_running_job("od_reading", "first_od_obs_time", timeout=5) + ) seconds_to_next_reading = interval - (time() - first_od_obs_time) % interval sleep( diff --git a/pioreactor/tests/test_stirring.py b/pioreactor/tests/test_stirring.py index c324ec74..5e0c6ccd 100644 --- a/pioreactor/tests/test_stirring.py +++ b/pioreactor/tests/test_stirring.py @@ -209,7 +209,7 @@ def test_block_until_rpm_is_close_to_target_will_timeout() -> None: def test_block_until_rpm_is_close_will_exit() -> None: - exp = "test_block_until_rpm_isf_close_to_target_will_timeout" + exp = "test_block_until_rpm_is_close_will_exit" rpm_calculator = MockRpmCalculator() rpm_calculator.setup() with Stirrer( diff --git a/pioreactor/tests/test_utils.py b/pioreactor/tests/test_utils.py index 44101102..6c19ca1a 100644 --- a/pioreactor/tests/test_utils.py +++ b/pioreactor/tests/test_utils.py @@ -9,7 +9,6 @@ from msgspec.json import encode as dumps from pioreactor.background_jobs.stirring import start_stirring -from pioreactor.exc import JobNotRunningError from pioreactor.tests.conftest import capture_requests from pioreactor.utils import callable_stack from pioreactor.utils import ClusterJobManager @@ -342,5 +341,5 @@ def test_retrieve_setting(job_manager, job_id): # turn off job_manager.set_not_running(job_key) - with pytest.raises(JobNotRunningError): + with pytest.raises(NameError): job_manager.get_setting_from_running_job("test_name", "my_setting_int") diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index 6c16c4e3..356d8c92 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -33,8 +33,8 @@ from pioreactor.pubsub import patch_into from pioreactor.pubsub import subscribe_and_callback from pioreactor.utils.networking import resolve_to_address -from pioreactor.utils.timing import current_utc_timestamp from pioreactor.utils.timing import catchtime +from pioreactor.utils.timing import current_utc_timestamp if TYPE_CHECKING: from pioreactor.pubsub import Client @@ -262,6 +262,68 @@ def publish_setting(self, setting: str, value: Any) -> None: jm.upsert_setting(self._job_id, setting, value) +class cache: + def __init__(self, table_name): + self.table_name = f"cache_{table_name}" + self.db_path = f"{tempfile.gettempdir()}/local_intermittent_pioreactor_metadata.sqlite" + + def __enter__(self): + self.conn = sqlite3.connect(self.db_path) + self.cursor = self.conn.cursor() + self._initialize_table() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.conn.commit() + self.conn.close() + + def _initialize_table(self): + self.cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.table_name} ( + key BLOB PRIMARY KEY, + value BLOB + ) + """ + ) + + def __setitem__(self, key, value): + self.cursor.execute( + f""" + INSERT INTO {self.table_name} (key, value) + VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value=excluded.value + """, + (key, value), + ) + + def get(self, key, default=None): + self.cursor.execute(f"SELECT value FROM {self.table_name} WHERE key = ?", (key,)) + result = self.cursor.fetchone() + return result[0] if result else default + + def iterkeys(self): + self.cursor.execute(f"SELECT key FROM {self.table_name}") + return (row[0] for row in self.cursor.fetchall()) + + def __contains__(self, key): + self.cursor.execute(f"SELECT 1 FROM {self.table_name} WHERE key = ?", (key,)) + return self.cursor.fetchone() is not None + + def __iter__(self): + return self.iterkeys() + + def __delitem__(self, key): + self.cursor.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,)) + + def __getitem__(self, key): + self.cursor.execute(f"SELECT value FROM {self.table_name} WHERE key = ?", (key,)) + result = self.cursor.fetchone() + if result is None: + raise KeyError(f"Key '{key}' not found in cache.") + return result[0] + + @contextmanager def local_intermittent_storage( cache_name: str, @@ -282,11 +344,8 @@ def local_intermittent_storage( Opening the same cache in a context manager is tricky, and should be avoided. """ - # gettempdir find the directory named by the TMPDIR environment variable. - # TMPDIR is set in the Pioreactor img. - tmp_dir = tempfile.gettempdir() - with Cache(f"{tmp_dir}/{cache_name}", sqlite_journal_mode="wal") as cache: - yield cache # type: ignore + with cache(f"{cache_name}") as c: + yield c # type: ignore @contextmanager @@ -625,7 +684,6 @@ def get_setting_from_running_job(self, job_name: str, setting: str, timeout=None if (timeout and timer() > timeout) or (timeout is None): raise NameError(f"Setting {setting} was not found.") - def set_not_running(self, job_id: JobMetadataKey) -> None: update_query = "UPDATE pio_job_metadata SET is_running=0, ended_at=STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW') WHERE id=(?)" self.cursor.execute(update_query, (job_id,)) diff --git a/pioreactor/whoami.py b/pioreactor/whoami.py index e5f9c12a..4dec5024 100644 --- a/pioreactor/whoami.py +++ b/pioreactor/whoami.py @@ -4,6 +4,7 @@ import os import sys import time +import warnings from functools import cache from pioreactor import mureq @@ -18,6 +19,11 @@ NO_EXPERIMENT = "$no_experiment_present" +def get_latest_experiment_name() -> str: + warnings.warn("Use whoami.get_assigned_experiment_name(unit) instead", DeprecationWarning, stacklevel=2) + return get_assigned_experiment_name(get_unit_name()) + + def get_testing_experiment_name() -> str: try: exp = get_assigned_experiment_name(get_unit_name())