diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index c6858880f4..e57c602f47 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -212,6 +212,12 @@ jobs: oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_rdps_sfms_cronjob.sh ${SUFFIX} apply + - name: SFMS Raster Calculations cronjob + shell: bash + run: | + oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" + PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_sfms_calculations_cronjob.sh ${SUFFIX} apply + - name: Hourly pruner nightly cronjob shell: bash run: | diff --git a/.vscode/launch.json b/.vscode/launch.json index 64ec41f3de..1d0154b86d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -141,6 +141,13 @@ "module": "app.auto_spatial_advisory.critical_hours", "console": "integratedTerminal" }, + { + "name": "sfms raster processor job", + "type": "python", + "request": "launch", + "module": "app.jobs.sfms_calculations", + "console": "integratedTerminal" + }, { "name": "Chrome", "type": "pwa-chrome", diff --git a/api/app/geospatial/wps_dataset.py b/api/app/geospatial/wps_dataset.py index 1e46e0e42d..f71bca6700 100644 --- a/api/app/geospatial/wps_dataset.py +++ b/api/app/geospatial/wps_dataset.py @@ -1,9 +1,12 @@ -from typing import Optional +from contextlib import ExitStack, contextmanager +from typing import Iterator, List, Optional, Tuple, Union from osgeo import gdal, osr import numpy as np from app.utils.geospatial import GDALResamplingMethod +gdal.UseExceptions() + class WPSDataset: """ @@ -26,6 +29,47 @@ def __enter__(self): def __exit__(self, *_): self.ds = None + @classmethod + def from_array( + cls, + array: np.ndarray, + geotransform: Tuple[float, float, float, float, float, float], + projection: str, + nodata_value: Optional[Union[float, int]] = None, + datatype=gdal.GDT_Float32, + ) -> "WPSDataset": + """ + Create a WPSDataset from a NumPy array, geotransform, and projection. + + :param array: NumPy array representing the raster data + :param geotransform: A tuple defining the geotransform + :param projection: WKT string of the projection + :param nodata_value: Optional nodata value to set for the dataset + :param datatype gdal datatype + :return: An instance of WPSDataset containing the created dataset + """ + rows, cols = array.shape + + driver: gdal.Driver = gdal.GetDriverByName("MEM") + output_dataset: gdal.Dataset = driver.Create("memory", cols, rows, 1, datatype) + + # Set the geotransform and projection + output_dataset.SetGeoTransform(geotransform) + output_dataset.SetProjection(projection) + + # Write the array to the dataset + output_band: gdal.Band = output_dataset.GetRasterBand(1) + output_band.WriteArray(array) + + # Set the NoData value if provided + if nodata_value is not None: + output_band.SetNoDataValue(nodata_value) + + # Flush cache to ensure all data is written + output_band.FlushCache() + + return cls(ds_path=None, ds=output_dataset, datatype=datatype) + def __mul__(self, other): """ Multiplies this WPSDataset with the other WPSDataset @@ -93,7 +137,7 @@ def __mul__(self, other): return WPSDataset(ds_path=None, ds=out_ds) - def warp_to_match(self, other, output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR): + def warp_to_match(self, other: "WPSDataset", output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR): """ Warp the dataset to match the extent, pixel size, and projection of the other dataset. @@ -173,7 +217,7 @@ def generate_latitude_array(self): return latitudes - def export_to_geotiff(self, output_path): + def export_to_geotiff(self, output_path: str): """ Exports the dataset to a geotiff with the given path @@ -204,5 +248,37 @@ def export_to_geotiff(self, output_path): output_band = None del output_band + def get_nodata_mask(self) -> Tuple[Optional[np.ndarray], Optional[Union[float, int]]]: + band = self.ds.GetRasterBand(self.band) + nodata_value = band.GetNoDataValue() + + if nodata_value is not None: + nodata_mask = band.ReadAsArray() == nodata_value + return nodata_mask, nodata_value + + return None, None + def as_gdal_ds(self) -> gdal.Dataset: return self.ds + + def close(self): + self.ds = None + + +@contextmanager +def multi_wps_dataset_context(dataset_paths: List[str]) -> Iterator[List[WPSDataset]]: + """ + Context manager to handle multiple WPSDataset instances. + + :param dataset_paths: List of dataset paths to open as WPSDataset instances + :yield: List of WPSDataset instances, one for each path + """ + datasets = [WPSDataset(path) for path in dataset_paths] + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in datasets] + finally: + # Close all datasets to ensure cleanup + for ds in datasets: + ds.close() diff --git a/api/app/jobs/sfms_calculations.py b/api/app/jobs/sfms_calculations.py new file mode 100644 index 0000000000..e7d87ab6ed --- /dev/null +++ b/api/app/jobs/sfms_calculations.py @@ -0,0 +1,69 @@ +import asyncio +from datetime import datetime, timezone +import logging +import os +import sys + +from app import configure_logging +from app.rocketchat_notifications import send_rocketchat_notification +from app.sfms.date_range_processor import BUIDateRangeProcessor +from app.sfms.raster_addresser import RasterKeyAddresser +from app.utils.s3_client import S3Client +from app.utils.time import get_utc_now +from app.geospatial.wps_dataset import multi_wps_dataset_context + + +logger = logging.getLogger(__name__) + +DAYS_TO_CALCULATE = 2 + + +class SFMSCalcJob: + async def calculate_bui(self, start_time: datetime): + """ + Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift, + see openshift/sfms-calculate/README.md + """ + logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") + + start_exec = get_utc_now() + + bui_processor = BUIDateRangeProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser()) + + async with S3Client() as s3_client: + await bui_processor.process_bui(s3_client, multi_wps_dataset_context, multi_wps_dataset_context) + + # calculate the execution time. + execution_time = get_utc_now() - start_exec + hours, remainder = divmod(execution_time.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds") + + +def main(): + if len(sys.argv) > 1: + try: + # command-line arg as 'YYYY-MM-DD HH' + start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H").replace(tzinfo=timezone.utc) + except ValueError: + logger.error("Error: Please provide the date and hour in 'YYYY-MM-DD HH' format (as a single string)") + sys.exit(1) + else: + # default to the current datetime + start_time = get_utc_now() + try: + job = SFMSCalcJob() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(job.calculate_bui(start_time)) + except Exception as e: + logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e) + rc_message = ":scream: Encountered an error while processing SFMS raster data." + send_rocketchat_notification(rc_message, e) + sys.exit(os.EX_SOFTWARE) + + +if __name__ == "__main__": + configure_logging() + main() diff --git a/api/app/sfms/__init__.py b/api/app/sfms/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/app/sfms/date_range_processor.py b/api/app/sfms/date_range_processor.py new file mode 100644 index 0000000000..41cd2c4c4e --- /dev/null +++ b/api/app/sfms/date_range_processor.py @@ -0,0 +1,143 @@ +import logging +import os +import tempfile +from datetime import datetime, timedelta +from typing import Callable, Tuple, List, Iterator, cast + +import numpy as np + +from app.geospatial.wps_dataset import WPSDataset +from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser +from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3 import set_s3_gdal_config +from app.utils.s3_client import S3Client +from app.weather_models.rdps_filename_marshaller import model_run_for_hour + +logger = logging.getLogger(__name__) + +# Type alias for clarity: the context manager function signature +MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]] + + +class BUIDateRangeProcessor: + """ + Class for calculating/generating forecasted DMC/DC/BUI rasters for a date range + """ + + def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddresser): + self.start_datetime = start_datetime + self.days = days + self.addresser = addresser + + async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext): + set_s3_gdal_config() + + for day in range(self.days): + datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day) + logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}") + + # Get and check existence of weather s3 keys + temp_key, rh_key, _, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour) + weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key) + if not weather_keys_exist: + logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run") + break + + # get and check existence of fwi s3 keys + dc_key, dmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime) + fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key) + if not fwi_keys_exist: + logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}") + break + + temp_key, rh_key, precip_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, precip_key) + dc_key, dmc_key = self.addresser.gdal_prefix_keys(dc_key, dmc_key) + + with tempfile.TemporaryDirectory() as temp_dir: + with input_dataset_context([temp_key, rh_key, precip_key, dc_key, dmc_key]) as input_datasets: + input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference + temp_ds, rh_ds, precip_ds, dc_ds, dmc_ds = input_datasets + + # Warp weather datasets to match fwi + warped_temp_ds = temp_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR) + warped_rh_ds = rh_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR) + warped_precip_ds = precip_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR) + + # close unneeded datasets to reduce memory usage + precip_ds.close() + rh_ds.close() + temp_ds.close() + # Create latitude and month arrays needed for calculations + latitude_array = dmc_ds.generate_latitude_array() + month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month) + + # Create and store DMC dataset + dmc_values, dmc_nodata_value = calculate_dmc(dmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array) + new_dmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DMC) + new_dmc_path = await s3_client.persist_raster_data( + temp_dir, + new_dmc_key, + dmc_ds.as_gdal_ds().GetGeoTransform(), + dmc_ds.as_gdal_ds().GetProjection(), + dmc_values, + dmc_nodata_value, + ) + + # Create and store DC dataset + dc_values, dc_nodata_value = calculate_dc(dc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array) + new_dc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DC) + new_dc_path = await s3_client.persist_raster_data( + temp_dir, + new_dc_key, + dc_ds.as_gdal_ds().GetGeoTransform(), + dc_ds.as_gdal_ds().GetProjection(), + dc_values, + dc_nodata_value, + ) + + # Open new DMC and DC datasets and calculate BUI + new_bui_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.BUI) + with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets: + new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference + new_dmc_ds, new_dc_ds = new_ds + bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds) + + # Store the new BUI dataset + await s3_client.persist_raster_data( + temp_dir, + new_bui_key, + dmc_ds.as_gdal_ds().GetGeoTransform(), + dmc_ds.as_gdal_ds().GetProjection(), + bui_values, + nodata, + ) + + def _get_calculate_dates(self, day: int): + """ + Calculate the UTC date and times based on the provided day offset. + + :param day: The day offset from the start date + :return: Tuple of (datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour) + """ + datetime_to_calculate_utc = self.start_datetime.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=day) + previous_fwi_datetime = datetime_to_calculate_utc - timedelta(days=1) + prediction_hour = 20 + (day * 24) + return datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour + + def _get_previous_fwi_keys(self, day_to_calculate: int, previous_fwi_datetime: datetime) -> Tuple[str, str]: + """ + Based on the day being calculated, decide whether to use previously uploaded actuals from sfms or + calculated indices from the previous day's calculations. + + :param day_to_calculate: day of the calculation loop + :param previous_fwi_datetime: the datetime previous to the datetime being calculated + :return: s3 keys for dc and dmc + """ + if day_to_calculate == 0: # if we're running the first day of the calculation, use previously uploaded actuals + dc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DC) + dmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DMC) + else: # otherwise use the last calculated key + dc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DC) + dmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DMC) + return dc_key, dmc_key diff --git a/api/app/sfms/fwi_processor.py b/api/app/sfms/fwi_processor.py new file mode 100644 index 0000000000..23c9d74c0f --- /dev/null +++ b/api/app/sfms/fwi_processor.py @@ -0,0 +1,57 @@ +from time import perf_counter +import logging +import numpy as np + +from app.geospatial.wps_dataset import WPSDataset +from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui + +logger = logging.getLogger(__name__) + + +def calculate_dc(dc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPSDataset, precip_ds: WPSDataset, latitude: np.ndarray, month: np.ndarray): + dc_array, _ = dc_ds.replace_nodata_with(0) + temp_array, _ = temp_ds.replace_nodata_with(0) + rh_array, _ = rh_ds.replace_nodata_with(0) + precip_array, _ = precip_ds.replace_nodata_with(0) + + start = perf_counter() + dc_values = vectorized_dc(dc_array, temp_array, rh_array, precip_array, latitude, month, True) + logger.info("%f seconds to calculate vectorized dc", perf_counter() - start) + + nodata_mask, nodata_value = dc_ds.get_nodata_mask() + if nodata_mask is not None: + dc_values[nodata_mask] = nodata_value + + return dc_values, nodata_value + + +def calculate_dmc(dmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPSDataset, precip_ds: WPSDataset, latitude: np.ndarray, month: np.ndarray): + dmc_array, _ = dmc_ds.replace_nodata_with(0) + temp_array, _ = temp_ds.replace_nodata_with(0) + rh_array, _ = rh_ds.replace_nodata_with(0) + precip_array, _ = precip_ds.replace_nodata_with(0) + + start = perf_counter() + dmc_values = vectorized_dmc(dmc_array, temp_array, rh_array, precip_array, latitude, month, True) + logger.info("%f seconds to calculate vectorized dmc", perf_counter() - start) + + nodata_mask, nodata_value = dmc_ds.get_nodata_mask() + if nodata_mask is not None: + dmc_values[nodata_mask] = nodata_value + + return dmc_values, nodata_value + + +def calculate_bui(dmc_ds: WPSDataset, dc_ds: WPSDataset): + dmc_array, _ = dmc_ds.replace_nodata_with(0) + dc_array, _ = dc_ds.replace_nodata_with(0) + + start = perf_counter() + bui_values = vectorized_bui(dmc_array, dc_array) + logger.info("%f seconds to calculate vectorized bui", perf_counter() - start) + + nodata_mask, nodata_value = dmc_ds.get_nodata_mask() + if nodata_mask is not None: + bui_values[nodata_mask] = nodata_value + + return bui_values, nodata_value diff --git a/api/app/sfms/raster_addresser.py b/api/app/sfms/raster_addresser.py new file mode 100644 index 0000000000..953707238a --- /dev/null +++ b/api/app/sfms/raster_addresser.py @@ -0,0 +1,107 @@ +import os +import enum +from datetime import datetime, timezone, timedelta +from zoneinfo import ZoneInfo +from app import config +from app.weather_models import ModelEnum +from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key + + +class WeatherParameter(enum.Enum): + TEMP = "temp" + RH = "rh" + WIND_SPEED = "wind_speed" + + +class FWIParameter(enum.Enum): + DC = "dc" + DMC = "dmc" + BUI = "bui" + FFMC = "ffmc" + ISI = "isi" + FWI = "fwi" + + +def assert_all_utc(*datetimes: datetime): + for dt in datetimes: + assert dt.tzinfo is not None, f"{dt} must be timezone-aware." + assert dt.tzinfo == timezone.utc or dt.tzinfo == ZoneInfo("UTC"), f"{dt} is not in UTC." + + +class RasterKeyAddresser: + """ + Encapsulates logic for addressing model and weather data rasters stored in object storage. + """ + + def __init__(self): + self.sfms_calculated_prefix = "sfms/calculated" + self.s3_prefix = f"/vsis3/{config.get('OBJECT_STORE_BUCKET')}" + self.sfms_upload_prefix = "sfms/uploads/actual" + self.weather_model_prefix = f"weather_models/{ModelEnum.RDPS.lower()}" + + def get_uploaded_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): + assert_all_utc(datetime_utc) + iso_date = datetime_utc.date().isoformat() + + return f"{self.sfms_upload_prefix}/{iso_date}/{fwi_param.value}{iso_date.replace('-', '')}.tif" + + def get_calculated_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): + """ + Generates the calculated fire weather index key that points to the associated raster artifact in the object store. + A calculated index is always generated for a future date, so always considered to be a forecast. + + :param datetime_utc: UTC datetime the calculated raster is for + :param index: the fire weather index caller is interested in + :return: the key to the raster artifact in object storage + """ + assert_all_utc(datetime_utc) + return f"{self.sfms_calculated_prefix}/forecast/{datetime_utc.date().isoformat()}/{fwi_param.value}{datetime_utc.date().isoformat().replace('-', '')}.tif" + + def get_model_data_key(self, start_time_utc: datetime, prediction_hour: int, weather_param: WeatherParameter): + """ + Generates the model data key that points to the associated raster artifact in the object store. + The model is always assumed to be RDPS. + + :param start_time_utc: UTC date time when the model run started + :param prediction_hour: the prediction hour offset from the start time + """ + assert_all_utc(start_time_utc) + weather_model_date_prefix = f"{self.weather_model_prefix}/{start_time_utc.date().isoformat()}/" + return os.path.join(weather_model_date_prefix, compose_rdps_key(start_time_utc, start_time_utc.hour, prediction_hour, weather_param.value)) + + def get_calculated_precip_key(self, datetime_to_calculate_utc: datetime): + """ + Generates the calculated precip key that points to the associated raster artifact in the object store. + The model is always assumed to be RDPS. + + :param datetime_to_calculate_utc: UTC datetime the calculated raster is for + :return: the calculated precip key to the raster artifact in object storage + """ + assert_all_utc(datetime_to_calculate_utc) + calculated_weather_prefix = f"{self.weather_model_prefix}/{datetime_to_calculate_utc.date().isoformat()}/" + return os.path.join(calculated_weather_prefix, compose_computed_precip_rdps_key(datetime_to_calculate_utc)) + + def get_weather_data_keys(self, start_time_utc: datetime, datetime_to_calculate_utc: datetime, prediction_hour: int): + """ + Generates all model data keys that point to their associated raster artifacts in the object store. + + :param start_time_utc: UTC date time when the model run started + :param datetime_to_calculate_utc: UTC datetime the calculated raster is for + :param prediction_hour: the prediction hour offset from the start time + :return: temp, rh, wind speed and precip model data key + """ + assert_all_utc(start_time_utc, datetime_to_calculate_utc) + non_precip_keys = tuple([self.get_model_data_key(start_time_utc, prediction_hour, param) for param in WeatherParameter]) + precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc) + all_weather_data_keys = non_precip_keys + (precip_key,) + + return all_weather_data_keys + + def gdal_prefix_keys(self, *keys): + """ + Prefix keys with vsis3/{bucket} for reading from s3 with gdal. GDAL s3 config must be setup for these + paths to work with GDAL. Can be set using app/utils/s3.set_s3_gdal_config() + + :return: A tuple of all strings provided, prefixed with vsis3/{bucket} + """ + return tuple(f"{self.s3_prefix}/{key}" for key in keys) diff --git a/api/app/tests/dataset_common.py b/api/app/tests/dataset_common.py new file mode 100644 index 0000000000..32b7353788 --- /dev/null +++ b/api/app/tests/dataset_common.py @@ -0,0 +1,49 @@ +import numpy as np +from osgeo import osr, gdal +import uuid +from app.geospatial.wps_dataset import WPSDataset + + +def create_test_dataset(filename, width, height, extent, projection, data_type=gdal.GDT_Float32, fill_value=None, no_data_value=None) -> gdal.Dataset: + """ + Create a test GDAL dataset. + """ + # Create a new GDAL dataset + driver: gdal.Driver = gdal.GetDriverByName("MEM") + dataset: gdal.Dataset = driver.Create(filename, width, height, 1, data_type) + + # Set the geotransform + xmin, xmax, ymin, ymax = extent + xres = (xmax - xmin) / width + yres = (ymax - ymin) / height + geotransform = (xmin, xres, 0, ymax, 0, -yres) # Top-left corner + dataset.SetGeoTransform(geotransform) + + # Set the projection + srs = osr.SpatialReference() + srs.ImportFromEPSG(projection) + dataset.SetProjection(srs.ExportToWkt()) + + # Create some test data (e.g., random values) + rng = np.random.default_rng(seed=42) # Reproducible random generator + fill_data = rng.random((height, width)).astype(np.float32) + + if fill_value is not None: + fill_data = np.full((height, width), fill_value) + + if no_data_value is not None: + dataset.GetRasterBand(1).SetNoDataValue(no_data_value) + dataset.GetRasterBand(1).WriteArray(fill_data) + + return dataset + + +def create_mock_gdal_dataset(): + extent = (-1, 1, -1, 1) # xmin, xmax, ymin, ymax + return create_test_dataset(f"{str(uuid.uuid4())}.tif", 1, 1, extent, 4326, data_type=gdal.GDT_Byte, fill_value=1) + + +# Create a mock for the WPSDataset class +def create_mock_wps_dataset(): + mock_ds = create_mock_gdal_dataset() + return WPSDataset(ds=mock_ds, ds_path=None) diff --git a/api/app/tests/geospatial/test_wps_dataset.py b/api/app/tests/geospatial/test_wps_dataset.py index 75d20f80a9..4469d64dfb 100644 --- a/api/app/tests/geospatial/test_wps_dataset.py +++ b/api/app/tests/geospatial/test_wps_dataset.py @@ -1,75 +1,16 @@ import os import numpy as np -from osgeo import osr, gdal +from osgeo import gdal import pytest import tempfile -from app.geospatial.wps_dataset import WPSDataset +from app.geospatial.wps_dataset import WPSDataset, multi_wps_dataset_context +from app.tests.dataset_common import create_mock_gdal_dataset, create_test_dataset hfi_tif = os.path.join(os.path.dirname(__file__), "snow_masked_hfi20240810.tif") zero_tif = os.path.join(os.path.dirname(__file__), "zero_layer.tif") -def create_test_dataset(filename, width, height, extent, projection, data_type=gdal.GDT_Float32, fill_value=None, no_data_value=None) -> gdal.Dataset: - """ - Create a test GDAL dataset. - """ - # Create a new GDAL dataset - driver: gdal.Driver = gdal.GetDriverByName("MEM") - dataset: gdal.Dataset = driver.Create(filename, width, height, 1, data_type) - - # Set the geotransform - xmin, xmax, ymin, ymax = extent - xres = (xmax - xmin) / width - yres = (ymax - ymin) / height - geotransform = (xmin, xres, 0, ymax, 0, -yres) # Top-left corner - dataset.SetGeoTransform(geotransform) - - # Set the projection - srs = osr.SpatialReference() - srs.ImportFromEPSG(projection) - dataset.SetProjection(srs.ExportToWkt()) - - # Create some test data (e.g., random values) - rng = np.random.default_rng(seed=42) # Reproducible random generator - fill_data = rng.random((height, width)).astype(np.float32) - - if fill_value is not None: - fill_data = np.full((height, width), fill_value) - - dataset.GetRasterBand(1).SetNoDataValue(0) - dataset.GetRasterBand(1).WriteArray(fill_data) - - return dataset - - -# def create_test_dataset_with_no_data_value(filename, width, height, extent, projection, data_type=gdal.GDT_Float32, fill_value: int) -> gdal.Dataset: -# """ -# Create a test GDAL dataset. -# """ -# # Create a new GDAL dataset -# driver: gdal.Driver = gdal.GetDriverByName("MEM") -# dataset: gdal.Dataset = driver.Create(filename, width, height, 1, data_type) - -# # Set the geotransform -# xmin, xmax, ymin, ymax = extent -# xres = (xmax - xmin) / width -# yres = (ymax - ymin) / height -# geotransform = (xmin, xres, 0, ymax, 0, -yres) # Top-left corner -# dataset.SetGeoTransform(geotransform) - -# # Set the projection -# srs = osr.SpatialReference() -# srs.ImportFromEPSG(projection) -# dataset.SetProjection(srs.ExportToWkt()) - -# rng = np.random.default_rng(seed=42) # Reproducible random generator -# random_data = rng.random((height, width)).astype(np.float32) - -# fill_data = np.full_like(random_data, fill_value) -# dataset.GetRasterBand(1).WriteArray(fill_data) - - def test_raster_with_context(): """ with opens the dataset and closes after the context ends @@ -210,3 +151,75 @@ def test_latitude_array(): warped_lats = output_ds.generate_latitude_array() assert np.all(original_lats == warped_lats) == True output_ds = None + + +def test_get_nodata_mask(): + set_no_data_value = 0 + driver: gdal.Driver = gdal.GetDriverByName("MEM") + dataset: gdal.Dataset = driver.Create("test_dataset_no_data_value.tif", 2, 2, 1, eType=gdal.GDT_Int32) + fill_data = np.full((2, 2), 2) + fill_data[0, 0] = set_no_data_value + dataset.GetRasterBand(1).SetNoDataValue(set_no_data_value) + dataset.GetRasterBand(1).WriteArray(fill_data) + + with WPSDataset(ds_path=None, ds=dataset) as ds: + mask, nodata_value = ds.get_nodata_mask() + assert nodata_value == set_no_data_value + assert mask[0, 0] == True # The first pixel should return True as nodata + assert mask[0, 1] == False # Any other pixel should return False + + +def test_get_nodata_mask_empty(): + dataset: gdal.Dataset = create_mock_gdal_dataset() + + with WPSDataset(ds_path=None, ds=dataset) as ds: + mask, nodata_value = ds.get_nodata_mask() + assert mask is None + assert nodata_value is None + + +def test_from_array(): + extent1 = (-1, 1, -1, 1) # xmin, xmax, ymin, ymax + original_ds = create_test_dataset("test_dataset_1.tif", 100, 100, extent1, 4326) + original_ds.GetRasterBand(1).SetNoDataValue(-99) + og_band = original_ds.GetRasterBand(1) + og_array = og_band.ReadAsArray() + dtype = og_band.DataType + og_transform = original_ds.GetGeoTransform() + og_proj = original_ds.GetProjection() + + with WPSDataset.from_array(og_array, og_transform, og_proj, nodata_value=-99, datatype=dtype) as wps: + wps_ds = wps.as_gdal_ds() + assert wps_ds.ReadAsArray()[1, 2] == og_array[1, 2] + assert wps_ds.GetGeoTransform() == og_transform + assert wps_ds.GetProjection() == og_proj + assert wps_ds.GetRasterBand(1).DataType == dtype + assert wps_ds.GetRasterBand(1).GetNoDataValue() == -99 + + +def test_multi_wps_dataset_context(mocker): + # mock WPSDataset and define the mock dataset paths + dataset_paths = ["path1", "path2"] + mock_wps_dataset = mocker.patch("app.geospatial.wps_dataset.WPSDataset") + mock_datasets = [mocker.MagicMock(), mocker.MagicMock()] + mock_wps_dataset.side_effect = mock_datasets # WPSDataset(path) returns each mock in sequence + + # set each mock to return itself when its context is entered + for mock_ds in mock_datasets: + mock_ds.__enter__.return_value = mock_ds + + with multi_wps_dataset_context(dataset_paths) as datasets: + # check that WPSDataset was called once per path + mock_wps_dataset.assert_any_call("path1") + mock_wps_dataset.assert_any_call("path2") + + # verify that the yielded datasets are the mocked instances + assert datasets == mock_datasets + + # ensure each dataset's context was entered + for ds in datasets: + ds.__enter__.assert_called_once() + + # ensure each dataset was closed after the context exited + for ds in mock_datasets: + ds.close.assert_called_once() diff --git a/api/app/tests/jobs/test_sfms_calculations.py b/api/app/tests/jobs/test_sfms_calculations.py new file mode 100644 index 0000000000..29f4b97a3e --- /dev/null +++ b/api/app/tests/jobs/test_sfms_calculations.py @@ -0,0 +1,49 @@ +import os +from datetime import datetime, timezone + +import pytest +from pytest_mock import MockerFixture + +from app.jobs import sfms_calculations +from app.jobs.sfms_calculations import SFMSCalcJob + + +def test_sfms_calc_job_fail_default(monkeypatch, mocker: MockerFixture): + async def mock_job_error(): + raise OSError("Error") + + monkeypatch.setattr(SFMSCalcJob, "calculate_bui", mock_job_error) + + monkeypatch.setattr("sys.argv", ["sfms_calculations.py"]) + + rocket_chat_spy = mocker.spy(sfms_calculations, "send_rocketchat_notification") + + with pytest.raises(SystemExit) as excinfo: + sfms_calculations.main() + # Assert that we exited with an error code + assert excinfo.value.code == os.EX_SOFTWARE + + assert rocket_chat_spy.call_count == 1 + + +def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture): + calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_bui", return_value=None) + + test_datetime = "2024-10-10 5" + monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime]) + + sfms_calculations.main() + + called_args, _ = calc_spy.call_args + assert called_args[0] == datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc) + + +@pytest.mark.anyio +async def test_sfms_calc_job_cli_arg_missing_hour(monkeypatch): + test_datetime = "2024-10-10" + monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime]) + + with pytest.raises(SystemExit) as excinfo: + await sfms_calculations.main() + + assert excinfo.value.code == 1 diff --git a/api/app/tests/sfms/test_bui_date_range_processor.py b/api/app/tests/sfms/test_bui_date_range_processor.py new file mode 100644 index 0000000000..e2ffa76f71 --- /dev/null +++ b/api/app/tests/sfms/test_bui_date_range_processor.py @@ -0,0 +1,204 @@ +from contextlib import ExitStack, contextmanager +from typing import List +from unittest.mock import AsyncMock +import pytest +from datetime import datetime, timezone, timedelta +from pytest_mock import MockerFixture +from app.geospatial.wps_dataset import WPSDataset +from app.sfms import date_range_processor +from app.sfms.date_range_processor import BUIDateRangeProcessor +from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser +from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_wps_dataset +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3_client import S3Client + +TEST_DATETIME = datetime(2024, 10, 10, 10, tzinfo=timezone.utc) +EXPECTED_FIRST_DAY = TEST_DATETIME.replace(hour=20, minute=0, second=0, microsecond=0) +EXPECTED_SECOND_DAY = TEST_DATETIME.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=1) + + +def create_mock_wps_datasets(num: int) -> List[WPSDataset]: + return [create_mock_wps_dataset() for _ in range(num)] + + +def create_mock_input_dataset_context(): + input_datasets = create_mock_wps_datasets(5) + + @contextmanager + def mock_input_dataset_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in input_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in input_datasets: + ds.close() + + return input_datasets, mock_input_dataset_context + + +def create_mock_new_dmc_dc_context(): + new_datasets = create_mock_wps_datasets(2) + + @contextmanager + def mock_new_dmc_dc_datasets_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in new_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in new_datasets: + ds.close() + + return new_datasets, mock_new_dmc_dc_datasets_context + + +@pytest.mark.anyio +async def test_bui_date_range_processor(mocker: MockerFixture): + mock_key_addresser = RasterKeyAddresser() + # key address spies + get_weather_data_key_spy = mocker.spy(mock_key_addresser, "get_weather_data_keys") + gdal_prefix_keys_spy = mocker.spy(mock_key_addresser, "gdal_prefix_keys") + get_calculated_index_key_spy = mocker.spy(mock_key_addresser, "get_calculated_index_key") + bui_date_range_processor = BUIDateRangeProcessor(TEST_DATETIME, 2, mock_key_addresser) + # mock/spy dataset storage + + # mock weather index, param datasets used for calculations + input_datasets, mock_input_dataset_context = create_mock_input_dataset_context() + mock_temp_ds, mock_rh_ds, mock_precip_ds, mock_dc_ds, mock_dmc_ds = input_datasets + temp_ds_spy = mocker.spy(mock_temp_ds, "warp_to_match") + rh_ds_spy = mocker.spy(mock_rh_ds, "warp_to_match") + precip_ds_spy = mocker.spy(mock_precip_ds, "warp_to_match") + + # mock new dmc and dc datasets + new_datasets, mock_new_dmc_dc_datasets_context = create_mock_new_dmc_dc_context() + mock_new_dmc_ds, mock_new_dc_ds = new_datasets + + # mock gdal open + mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset()) + + # calculation spies + calculate_dmc_spy = mocker.spy(date_range_processor, "calculate_dmc") + calculate_dc_spy = mocker.spy(date_range_processor, "calculate_dc") + calculate_bui_spy = mocker.spy(date_range_processor, "calculate_bui") + + async with S3Client() as mock_s3_client: + # mock s3 client + mock_all_objects_exist = AsyncMock(return_value=True) + mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist) + persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif") + + await bui_date_range_processor.process_bui(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context) + + # Verify weather model keys and actual keys are checked for both days + assert mock_all_objects_exist.call_count == 4 + + # Verify the arguments for each call for get_weather_data_keys + assert get_weather_data_key_spy.call_args_list == [ + mocker.call(TEST_DATETIME, EXPECTED_FIRST_DAY, 20), + mocker.call(TEST_DATETIME, EXPECTED_SECOND_DAY, 44), + ] + + # Verify the arguments for each call for gdal_prefix_keys + assert gdal_prefix_keys_spy.call_args_list == [ + # first day weather models + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P020.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P020.grib2", + "weather_models/rdps/2024-10-10/12/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_20z.tif", + ), + # first day uploads + mocker.call("sfms/uploads/actual/2024-10-09/dc20241009.tif", "sfms/uploads/actual/2024-10-09/dmc20241009.tif"), + # second day weather models + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P044.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P044.grib2", + "weather_models/rdps/2024-10-11/12/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241011_20z.tif", + ), + # second day uploads + mocker.call("sfms/calculated/forecast/2024-10-10/dc20241010.tif", "sfms/calculated/forecast/2024-10-10/dmc20241010.tif"), + ] + + # Verify calculated keys are generated in order + assert get_calculated_index_key_spy.call_args_list == [ + # first day + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC), + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC), + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.BUI), + # second day, previous days' dc and dmc are looked up first + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC), + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC), + mocker.call(EXPECTED_SECOND_DAY, FWIParameter.DMC), + mocker.call(EXPECTED_SECOND_DAY, FWIParameter.DC), + mocker.call(EXPECTED_SECOND_DAY, FWIParameter.BUI), + ] + + # Verify weather inputs are warped to match dmc raster + assert temp_ds_spy.call_args_list == [ + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert rh_ds_spy.call_args_list == [ + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert precip_ds_spy.call_args_list == [ + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_dmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + for dmc_calls in calculate_dmc_spy.call_args_list: + dmc_ds = dmc_calls[0][0] + assert dmc_ds == mock_dmc_ds + wps_datasets = dmc_calls[0][1:4] # Extract dataset arguments + assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + + for dc_calls in calculate_dc_spy.call_args_list: + dc_ds = dc_calls[0][0] + assert dc_ds == mock_dc_ds + wps_datasets = dc_calls[0][1:4] # Extract dataset arguments + assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + + assert calculate_bui_spy.call_args_list == [ + mocker.call(mock_new_dmc_ds, mock_new_dc_ds), + mocker.call(mock_new_dmc_ds, mock_new_dc_ds), + ] + + # 3 each day, new dmc, dc and bui rasters + assert persist_raster_spy.call_count == 6 + + +@pytest.mark.parametrize( + "side_effect_1, side_effect_2", + [ + (False, False), + (True, False), + (False, True), + ], +) +@pytest.mark.anyio +async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, mocker: MockerFixture): + mock_s3_client = S3Client() + + mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[side_effect_1, side_effect_2]) + + _, mock_input_dataset_context = create_mock_input_dataset_context() + + _, mock_new_dmc_dc_datasets_context = create_mock_new_dmc_dc_context() + + # calculation spies + calculate_dmc_spy = mocker.spy(date_range_processor, "calculate_dmc") + calculate_dc_spy = mocker.spy(date_range_processor, "calculate_dc") + calculate_bui_spy = mocker.spy(date_range_processor, "calculate_bui") + + bui_date_range_processor = BUIDateRangeProcessor(TEST_DATETIME, 1, RasterKeyAddresser()) + + await bui_date_range_processor.process_bui(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context) + + calculate_dmc_spy.assert_not_called() + calculate_dc_spy.assert_not_called() + calculate_bui_spy.assert_not_called() diff --git a/api/app/tests/sfms/test_fwi_processor.py b/api/app/tests/sfms/test_fwi_processor.py new file mode 100644 index 0000000000..4756f7226c --- /dev/null +++ b/api/app/tests/sfms/test_fwi_processor.py @@ -0,0 +1,124 @@ +import math + +import numpy as np +import pytest +from cffdrs import bui, dc, dmc +from osgeo import osr + +from app.geospatial.wps_dataset import WPSDataset +from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc + +FWI_ARRAY = np.array([[12, 20], [-999, -999]]) +TEST_ARRAY = np.array([[12, 20], [0, 0]]) + + +@pytest.fixture +def sample_datasets(): + srs = osr.SpatialReference() + srs.ImportFromEPSG(3005) + transform = (-2, 1, 0, 2, 0, -1) + + dc_wps = WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999) + dmc_wps = WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999) + temp_wps = WPSDataset.from_array(TEST_ARRAY, transform, srs.ExportToWkt()) + rh_wps = WPSDataset.from_array(TEST_ARRAY, transform, srs.ExportToWkt()) + precip_wps = WPSDataset.from_array(TEST_ARRAY, transform, srs.ExportToWkt()) + + return dc_wps, dmc_wps, temp_wps, rh_wps, precip_wps + + +@pytest.fixture +def latitude_month(): + latitude = np.array([[45, 45], [60, 60]]) + month = np.array([[6, 6], [7, 7]]) + return latitude, month + + +def test_calculate_dc_masked_correctly(sample_datasets, latitude_month): + dc_ds, _, temp_ds, rh_ds, precip_ds = sample_datasets + latitude, month = latitude_month + + dc_values, nodata_value = calculate_dc(dc_ds, temp_ds, rh_ds, precip_ds, latitude, month) + + # validate output shape and nodata masking + assert dc_values.shape == (2, 2) + assert dc_values[1, 0] == nodata_value + assert dc_values[1, 1] == nodata_value + assert dc_values[0, 0] != nodata_value + assert dc_values[0, 1] != nodata_value + + +def test_calculate_dmc_masked_correctly(sample_datasets, latitude_month): + _, dmc_ds, temp_ds, rh_ds, precip_ds = sample_datasets + latitude, month = latitude_month + + dmc_values, nodata_value = calculate_dmc(dmc_ds, temp_ds, rh_ds, precip_ds, latitude, month) + + # validate output shape and nodata masking + assert dmc_values.shape == (2, 2) + assert dmc_values[1, 0] == nodata_value + assert dmc_values[1, 1] == nodata_value + assert dmc_values[0, 0] != nodata_value + assert dmc_values[0, 1] != nodata_value + + +def test_calculate_bui_masked_correctly(sample_datasets): + dc_ds, dmc_ds, _, _, _ = sample_datasets + + bui_values, nodata_value = calculate_bui(dmc_ds, dc_ds) + + # validate output shape and nodata masking + assert bui_values.shape == (2, 2) + assert bui_values[1, 0] == nodata_value + assert bui_values[1, 1] == nodata_value + assert bui_values[0, 0] != nodata_value + assert bui_values[0, 1] != nodata_value + + +def test_calculate_dmc_values(sample_datasets, latitude_month): + _, dmc_ds, temp_ds, rh_ds, precip_ds = sample_datasets + latitude, month = latitude_month + + dmc_sample = FWI_ARRAY[0, 0] + temp_sample = TEST_ARRAY[0, 0] + rh_sample = TEST_ARRAY[0, 0] + precip_sample = TEST_ARRAY[0, 0] + lat_sample = latitude[0, 0] + month_sample = int(month[0, 0]) + + dmc_values, _ = calculate_dmc(dmc_ds, temp_ds, rh_ds, precip_ds, latitude, month) + + static_dmc = dmc(dmc_sample, temp_sample, rh_sample, precip_sample, lat_sample, month_sample) + + assert math.isclose(static_dmc, dmc_values[0, 0], abs_tol=0.01) + + +def test_calculate_dc_values(sample_datasets, latitude_month): + dc_ds, _, temp_ds, rh_ds, precip_ds = sample_datasets + latitude, month = latitude_month + + dc_sample = FWI_ARRAY[0, 0] + temp_sample = TEST_ARRAY[0, 0] + rh_sample = TEST_ARRAY[0, 0] + precip_sample = TEST_ARRAY[0, 0] + lat_sample = latitude[0, 0] + month_sample = int(month[0, 0]) + + dc_values, _ = calculate_dc(dc_ds, temp_ds, rh_ds, precip_ds, latitude, month) + + static_dmc = dc(dc_sample, temp_sample, rh_sample, precip_sample, lat_sample, month_sample) + + assert math.isclose(static_dmc, dc_values[0, 0], abs_tol=0.01) + + +def test_calculate_bui_values(sample_datasets): + dc_ds, dmc_ds, *_ = sample_datasets + + dc_sample = FWI_ARRAY[0, 0] + dmc_sample = FWI_ARRAY[0, 0] + + bui_values, _ = calculate_bui(dc_ds, dmc_ds) + + static_bui = bui(dmc_sample, dc_sample) + + assert math.isclose(static_bui, bui_values[0, 0], abs_tol=0.01) diff --git a/api/app/tests/sfms/test_raster_addresser.py b/api/app/tests/sfms/test_raster_addresser.py new file mode 100644 index 0000000000..fcf24aee24 --- /dev/null +++ b/api/app/tests/sfms/test_raster_addresser.py @@ -0,0 +1,29 @@ +from app.sfms.raster_addresser import RasterKeyAddresser, FWIParameter +import pytest +from datetime import datetime, timezone + +TEST_DATETIME_1 = datetime(2024, 10, 10, 23, tzinfo=timezone.utc) +TEST_DATE_1_ISO = TEST_DATETIME_1.date().isoformat() + +TEST_DATETIME_TO_CALC = TEST_DATETIME_1.replace(hour=20) + + +@pytest.fixture +def raster_key_addresser(): + return RasterKeyAddresser() + + +def test_get_uploaded_index_key(raster_key_addresser): + result = raster_key_addresser.get_uploaded_index_key(TEST_DATETIME_1, FWIParameter.DMC) + assert result == f"sfms/uploads/actual/{TEST_DATE_1_ISO}/dmc{TEST_DATE_1_ISO.replace('-','')}.tif" + + +def test_get_calculated_index_key(raster_key_addresser): + result = raster_key_addresser.get_calculated_index_key(TEST_DATETIME_1, FWIParameter.DC) + assert result == f"sfms/calculated/forecast/{TEST_DATE_1_ISO}/dc{TEST_DATE_1_ISO.replace('-', '')}.tif" + + +def test_get_weather_data_keys(raster_key_addresser): + result = raster_key_addresser.get_weather_data_keys(TEST_DATETIME_1, TEST_DATETIME_TO_CALC, 20) + + assert len(result) == 4 diff --git a/api/app/tests/utils/test_s3_client.py b/api/app/tests/utils/test_s3_client.py new file mode 100644 index 0000000000..d3a7ec3fd9 --- /dev/null +++ b/api/app/tests/utils/test_s3_client.py @@ -0,0 +1,29 @@ +from io import BufferedReader +import os +import tempfile +import pytest +from osgeo import gdal +from app.geospatial.wps_dataset import WPSDataset +from app.tests.dataset_common import create_mock_gdal_dataset +from app.utils.s3_client import S3Client +from pytest_mock import MockerFixture + + +@pytest.mark.anyio +async def test_put_object_called(mocker: MockerFixture): + async with S3Client() as s3_client: + persist_raster_spy = mocker.patch.object(s3_client, "put_object") + mock_ds = create_mock_gdal_dataset() + mock_band: gdal.Band = mock_ds.GetRasterBand(1) + values = mock_band.ReadAsArray() + no_data_value = mock_band.GetNoDataValue() + + with tempfile.TemporaryDirectory() as temp_dir: + with WPSDataset.from_array(values, mock_ds.GetGeoTransform(), mock_ds.GetProjection(), no_data_value) as expected_ds: + expected_key = "expected_key" + expected_filename = os.path.join(temp_dir, os.path.basename("expected_key")) + expected_ds.export_to_geotiff(expected_filename) + await s3_client.persist_raster_data(temp_dir, expected_key, mock_ds.GetGeoTransform(), mock_ds.GetProjection(), values, no_data_value) + + assert persist_raster_spy.call_args_list == [mocker.call(key=expected_key, body=mocker.ANY)] + assert isinstance(persist_raster_spy.call_args.kwargs["body"], BufferedReader) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index f28fe2662c..8b8ce9993c 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -1,12 +1,8 @@ from enum import Enum -import logging from typing import Tuple from osgeo import gdal, ogr, osr -logger = logging.getLogger(__name__) - - class GDALResamplingMethod(Enum): """ See api/app/utils/geospatial-interpolation.md for information about which interpolation method to use for your use case diff --git a/api/app/utils/s3.py b/api/app/utils/s3.py index 082073de47..9f63998810 100644 --- a/api/app/utils/s3.py +++ b/api/app/utils/s3.py @@ -69,3 +69,10 @@ async def read_into_memory(key: str): return (None, None, None) else: raise + + +def set_s3_gdal_config(): + gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", config.get("OBJECT_STORE_SECRET")) + gdal.SetConfigOption("AWS_ACCESS_KEY_ID", config.get("OBJECT_STORE_USER_ID")) + gdal.SetConfigOption("AWS_S3_ENDPOINT", config.get("OBJECT_STORE_SERVER")) + gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE") diff --git a/api/app/utils/s3_client.py b/api/app/utils/s3_client.py new file mode 100644 index 0000000000..f630c50ba0 --- /dev/null +++ b/api/app/utils/s3_client.py @@ -0,0 +1,76 @@ +import os +import logging +from typing import Any +from aiobotocore.session import get_session +from app import config +from app.geospatial.wps_dataset import WPSDataset + +logger = logging.getLogger(__name__) + + +class S3Client: + def __init__( + self, + server=config.get("OBJECT_STORE_SERVER"), + user_id=config.get("OBJECT_STORE_USER_ID"), + secret_key=config.get("OBJECT_STORE_SECRET"), + bucket=config.get("OBJECT_STORE_BUCKET"), + ): + self.server = server + self.user_id = user_id + self.secret_key = secret_key + self.bucket = bucket + self.session = get_session() + + async def __aenter__(self): + client_context = self.session.create_client("s3", endpoint_url=f"https://{self.server}", aws_secret_access_key=self.secret_key, aws_access_key_id=self.user_id) + self.client = await client_context.__aenter__() + return self + + async def __aexit__(self, *_): + self.client = None + self.session = None + del self.client + del self.session + + async def object_exists(self, target_path: str): + """Check if and object exists in the object store""" + # using list_objects, but could be using stat as well? don't know what's best. + result = await self.client.list_objects_v2(Bucket=self.bucket, Prefix=target_path) + contents = result.get("Contents", None) + if contents: + for content in contents: + key = content.get("Key") + if key == target_path: + return True + return False + + async def all_objects_exist(self, *s3_keys: str): + for key in s3_keys: + key_exists = await self.object_exists(key) + if not key_exists: + return False + return True + + async def put_object(self, key: str, body: Any): + await self.client.put_object(Bucket=self.bucket, Key=key, Body=body) + + async def persist_raster_data(self, temp_dir: str, key: str, transform, projection, values, no_data_value) -> str: + """ + Persists a geotiff in s3 based on transform, projection, values and no data value. + + :param temp_dir: temporary directory to write geotiff + :param key: s3 key to store output dataset + :param transform: gdal geotransform + :param projection: gdal projection + :param values: array of values + :param no_data_value: array no data value + :return: path to temporary written geotiff file + """ + temp_geotiff = os.path.join(temp_dir, os.path.basename(key)) + with WPSDataset.from_array(values, transform, projection, no_data_value) as ds: + ds.export_to_geotiff(temp_geotiff) + + logger.info(f"Writing to s3 -- {key}") + await self.put_object(key=key, body=open(temp_geotiff, "rb")) + return temp_geotiff diff --git a/api/app/weather_models/precip_rdps_model.py b/api/app/weather_models/precip_rdps_model.py index c1e2bccc6a..453efbec1f 100644 --- a/api/app/weather_models/precip_rdps_model.py +++ b/api/app/weather_models/precip_rdps_model.py @@ -9,7 +9,7 @@ from numba import vectorize from app.utils.s3 import get_client, read_into_memory from app.weather_models import ModelEnum -from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_precip_rdps_key, compose_computed_precip_rdps_key +from app.weather_models.rdps_filename_marshaller import adjust_forecast_hour, compose_rdps_key, compose_computed_precip_rdps_key logger = logging.getLogger(__name__) @@ -120,10 +120,10 @@ def get_raster_keys_to_diff(timestamp: datetime): # From earlier model run, get the keys for 24 hours before timestamp and the timestamp to perform the diff earlier_key = f"{key_prefix}/" later_key = f"{key_prefix}/" - later_key = later_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24) + later_key = later_key + compose_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24, "precip") if target_model_run_date.hour != 0 and target_model_run_date.hour != 12: # not a model run hour, return earlier and later keys to take difference - earlier_key = earlier_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour) + earlier_key = earlier_key + compose_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour, "precip") return (earlier_key, later_key) # model run hour, just return the model value from 24 hours ago diff --git a/api/app/weather_models/rdps_filename_marshaller.py b/api/app/weather_models/rdps_filename_marshaller.py index c7efa98041..c2f4203383 100644 --- a/api/app/weather_models/rdps_filename_marshaller.py +++ b/api/app/weather_models/rdps_filename_marshaller.py @@ -6,7 +6,6 @@ from datetime import datetime import enum from typing import Literal -from app.weather_models import ModelEnum from dataclasses import dataclass @@ -121,10 +120,10 @@ def compose_rdps_filename(forecast_start_date: datetime, run_hour: int, forecast ) -def compose_precip_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int): +def compose_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int, weather_parameter: str): """Compose and return a computed RDPS url given a forecast start date, run hour and forecast hour.""" model_hour = model_run_for_hour(run_hour) - return f"{model_hour:02d}/precip/{compose_rdps_filename(forecast_start_date, run_hour, forecast_hour, 'precip')}" + return f"{model_hour:02d}/{weather_parameter}/{compose_rdps_filename(forecast_start_date, run_hour, forecast_hour, weather_parameter)}" def compose_computed_rdps_filename(accumulation_end_datetime: datetime) -> str: diff --git a/openshift/scripts/oc_deploy_to_production.sh b/openshift/scripts/oc_deploy_to_production.sh index dd4a0d25cd..f2f10f92ad 100755 --- a/openshift/scripts/oc_deploy_to_production.sh +++ b/openshift/scripts/oc_deploy_to_production.sh @@ -59,6 +59,8 @@ echo Grass Curing PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_grass_curing_cronjob.sh prod ${RUN_TYPE} echo RDPS for SFMS PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_rdps_sfms_cronjob.sh prod ${RUN_TYPE} +echo SFMS Raster Calculations +PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_sfms_calculations_cronjob.sh prod ${RUN_TYPE} echo BC FireWeather cronjobs echo "Run forecast at 8h30 PDT and 16h30 PDT (so before and after noon)" PROJ_TARGET=${PROJ_TARGET} SCHEDULE="30 * * * *" bash $(dirname ${0})/oc_provision_wfwx_noon_forecasts_cronjob.sh prod ${RUN_TYPE} diff --git a/openshift/scripts/oc_provision_sfms_calculations_cronjob.sh b/openshift/scripts/oc_provision_sfms_calculations_cronjob.sh new file mode 100644 index 0000000000..1cc51f548c --- /dev/null +++ b/openshift/scripts/oc_provision_sfms_calculations_cronjob.sh @@ -0,0 +1,53 @@ +#!/bin/sh -l +# +source "$(dirname ${0})/common/common" + +#% +#% OpenShift Deploy Helper +#% +#% Intended for use with a pull request-based pipeline. +#% Suffixes incl.: pr-###. +#% +#% Usage: +#% +#% ${THIS_FILE} [SUFFIX] [apply] +#% +#% Examples: +#% +#% Provide a PR number. Defaults to a dry-run. +#% ${THIS_FILE} pr-0 +#% +#% Apply when satisfied. +#% ${THIS_FILE} pr-0 apply +#% + +# Target project override for Dev or Prod deployments +# +PROJ_TARGET="${PROJ_TARGET:-${PROJ_DEV}}" + +# Specify a default schedule to run daily at 5am-ish +SCHEDULE="${SCHEDULE:-$((RANDOM % 60)) */2 * * *}" + +# Process template +OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/sfms_calculations.cronjob.yaml \ +-p JOB_NAME=sfms-fwi-calc-${APP_NAME}-${SUFFIX} \ +-p APP_LABEL=${APP_NAME}-${SUFFIX} \ +-p NAME=${APP_NAME} \ +-p SUFFIX=${SUFFIX} \ +-p SCHEDULE=\"${SCHEDULE}\" \ +${PROJ_TOOLS:+ "-p PROJ_TOOLS=${PROJ_TOOLS}"} \ +${IMAGE_REGISTRY:+ "-p IMAGE_REGISTRY=${IMAGE_REGISTRY}"}" + +# Apply template (apply or use --dry-run) +# +OC_APPLY="oc -n ${PROJ_TARGET} apply -f -" +[ "${APPLY}" ] || OC_APPLY="${OC_APPLY} --dry-run" + +# Execute commands +# +eval "${OC_PROCESS}" +eval "${OC_PROCESS} | ${OC_APPLY}" + +# Provide oc command instruction +# +display_helper "${OC_PROCESS} | ${OC_APPLY}" diff --git a/openshift/sfms-calculate/README.md b/openshift/sfms-calculate/README.md new file mode 100644 index 0000000000..65e410826e --- /dev/null +++ b/openshift/sfms-calculate/README.md @@ -0,0 +1,16 @@ +# SFMS Raster Calculation job + +Creates a one off job to run SFMS raster calculations for a specific date and hour + +## Create job + +### Apply template to build the job on Openshift + +Example: + +```bash +oc process -p DATE="2024-10-10" -p HOUR=5 -p IMAGE_NAME=pr-4042 -f sfms-job.yaml | oc apply -f - +``` + +DATE is the start date of the calculations +HOUR is the hour for the start date. This determines which model run of data will be used [defaults to 20] diff --git a/openshift/sfms-calculate/openshift/sfms-job.yaml b/openshift/sfms-calculate/openshift/sfms-job.yaml new file mode 100644 index 0000000000..6b0e65e709 --- /dev/null +++ b/openshift/sfms-calculate/openshift/sfms-job.yaml @@ -0,0 +1,94 @@ +apiVersion: template.openshift.io/v1 +kind: Template +metadata: + name: sfms-fwi-calc-job + annotations: + description: "Template for SFMS FWI Calculation Job" +parameters: + - name: DATE + description: "Date argument for the Python script - YYYY-MM-DD" + required: true + - name: HOUR + description: "Hour argument for the Python script - HH" + required: true + value: "20" + - name: IMAGE_NAME + description: "Docker image to use for the job" + required: true +objects: + - kind: Job + apiVersion: batch/v1 + metadata: + name: sfms-fwi-calc-job-${DATE}-${HOUR} + spec: + parallelism: 1 + completions: 1 + backoffLimit: 6 + template: + spec: + containers: + - name: sfms-fwi-calc-container + image: image-registry.openshift-image-registry.svc:5000/e1e498-tools/wps-api-${IMAGE_NAME}:${IMAGE_NAME} + command: + - poetry + - run + - python + - "-m" + - app.jobs.sfms_calculations + - "${DATE} ${HOUR}" + env: + - name: ROCKET_URL_POST_MESSAGE + valueFrom: + configMapKeyRef: + name: wps-global + key: rocket.chat-url-post-message + - name: ROCKET_CHANNEL + valueFrom: + configMapKeyRef: + name: wps-global + key: rocket.chat-channel + - name: ROCKET_USER_ID + valueFrom: + secretKeyRef: + name: wps-global + key: rocket.chat-user-id-secret + - name: ROCKET_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: wps-global + key: rocket.chat-auth-token-secret + - name: OBJECT_STORE_SERVER + valueFrom: + secretKeyRef: + name: wps-global + key: object-store-server + - name: OBJECT_STORE_USER_ID + valueFrom: + secretKeyRef: + name: wps-global + key: object-store-user-id + - name: OBJECT_STORE_SECRET + valueFrom: + secretKeyRef: + name: wps-global + key: object-store-secret + - name: OBJECT_STORE_BUCKET + valueFrom: + secretKeyRef: + name: wps-global + key: object-store-bucket + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: 500m + memory: 512Mi + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + imagePullPolicy: Always + restartPolicy: OnFailure + terminationGracePeriodSeconds: 30 + dnsPolicy: ClusterFirst + securityContext: {} + schedulerName: default-scheduler diff --git a/openshift/templates/sfms_calculations.cronjob.yaml b/openshift/templates/sfms_calculations.cronjob.yaml new file mode 100644 index 0000000000..4bbf73e890 --- /dev/null +++ b/openshift/templates/sfms_calculations.cronjob.yaml @@ -0,0 +1,112 @@ +kind: Template +apiVersion: template.openshift.io/v1 +metadata: + name: ${JOB_NAME}-cronjob-template + annotations: + description: "Scheduled task to calculate new FWI indices rasters using weather model data." + tags: "sfms,fwi" +labels: + app.kubernetes.io/part-of: "${NAME}" + app: ${NAME}-${SUFFIX} +parameters: + - name: NAME + description: Module name + value: wps + - name: GLOBAL_NAME + description: Name of global Module + value: wps-global + - name: SUFFIX + description: Deployment suffix, e.g. pr-### + required: true + - name: PROJ_TOOLS + value: e1e498-tools + - name: JOB_NAME + value: sfms-fwi-calc + - name: IMAGE_REGISTRY + required: true + value: image-registry.openshift-image-registry.svc:5000 + - name: SCHEDULE + required: true + - name: APP_LABEL + required: true +objects: + - kind: CronJob + apiVersion: batch/v1 + metadata: + name: ${JOB_NAME} + spec: + schedule: ${SCHEDULE} + # We use the "Replace" policy, because we never want the cronjobs to run concurrently, + # and if for whatever reason a cronjob gets stuck, we want the next run to proceed. + # If we were to use Forbid, and a cronjob gets stuck, then we'd stop gathering data until someone + # noticed. We don't want that. + concurrencyPolicy: "Replace" + jobTemplate: + metadata: + labels: + cronjob: ${JOB_NAME} + app: ${APP_LABEL} + spec: + template: + spec: + containers: + - name: ${JOB_NAME} + image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${NAME}-api-${SUFFIX}:${SUFFIX} + imagePullPolicy: "Always" + command: + [ + "poetry", + "run", + "python", + "-m", + "app.jobs.sfms_calculations", + ] + env: + - name: ROCKET_URL_POST_MESSAGE + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-url-post-message + - name: ROCKET_CHANNEL + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-channel + - name: ROCKET_USER_ID + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-user-id-secret + - name: ROCKET_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-auth-token-secret + - name: OBJECT_STORE_SERVER + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-server + - name: OBJECT_STORE_USER_ID + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-user-id + - name: OBJECT_STORE_SECRET + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-secret + - name: OBJECT_STORE_BUCKET + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-bucket + resources: + limits: + cpu: 250m + memory: 512Mi + requests: + cpu: 100m + memory: 256Mi + restartPolicy: OnFailure