Skip to content

Commit

Permalink
BUI calculations (#4042)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <[email protected]>

- Adds job to calculate DMC, DC, and BUI rasters from RDPS & SFMS inputs
- Adds WPSDataset functionality
   - from_array class method
   - multi-dataset context manager to help with testing
- Adds S3 Client class - encapsulating all s3 functionality
- Closes SFMS: Calculate forecast and outlook BUI rasters #3775
  • Loading branch information
brettedw authored Nov 6, 2024
1 parent e9af933 commit c920992
Show file tree
Hide file tree
Showing 25 changed files with 1,392 additions and 75 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ jobs:
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_rdps_sfms_cronjob.sh ${SUFFIX} apply
- name: SFMS Raster Calculations cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_sfms_calculations_cronjob.sh ${SUFFIX} apply
- name: Hourly pruner nightly cronjob
shell: bash
run: |
Expand Down
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@
"module": "app.auto_spatial_advisory.critical_hours",
"console": "integratedTerminal"
},
{
"name": "sfms raster processor job",
"type": "python",
"request": "launch",
"module": "app.jobs.sfms_calculations",
"console": "integratedTerminal"
},
{
"name": "Chrome",
"type": "pwa-chrome",
Expand Down
82 changes: 79 additions & 3 deletions api/app/geospatial/wps_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Optional
from contextlib import ExitStack, contextmanager
from typing import Iterator, List, Optional, Tuple, Union
from osgeo import gdal, osr
import numpy as np

from app.utils.geospatial import GDALResamplingMethod

gdal.UseExceptions()


class WPSDataset:
"""
Expand All @@ -26,6 +29,47 @@ def __enter__(self):
def __exit__(self, *_):
self.ds = None

@classmethod
def from_array(
cls,
array: np.ndarray,
geotransform: Tuple[float, float, float, float, float, float],
projection: str,
nodata_value: Optional[Union[float, int]] = None,
datatype=gdal.GDT_Float32,
) -> "WPSDataset":
"""
Create a WPSDataset from a NumPy array, geotransform, and projection.
:param array: NumPy array representing the raster data
:param geotransform: A tuple defining the geotransform
:param projection: WKT string of the projection
:param nodata_value: Optional nodata value to set for the dataset
:param datatype gdal datatype
:return: An instance of WPSDataset containing the created dataset
"""
rows, cols = array.shape

driver: gdal.Driver = gdal.GetDriverByName("MEM")
output_dataset: gdal.Dataset = driver.Create("memory", cols, rows, 1, datatype)

# Set the geotransform and projection
output_dataset.SetGeoTransform(geotransform)
output_dataset.SetProjection(projection)

# Write the array to the dataset
output_band: gdal.Band = output_dataset.GetRasterBand(1)
output_band.WriteArray(array)

# Set the NoData value if provided
if nodata_value is not None:
output_band.SetNoDataValue(nodata_value)

# Flush cache to ensure all data is written
output_band.FlushCache()

return cls(ds_path=None, ds=output_dataset, datatype=datatype)

def __mul__(self, other):
"""
Multiplies this WPSDataset with the other WPSDataset
Expand Down Expand Up @@ -93,7 +137,7 @@ def __mul__(self, other):

return WPSDataset(ds_path=None, ds=out_ds)

def warp_to_match(self, other, output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR):
def warp_to_match(self, other: "WPSDataset", output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR):
"""
Warp the dataset to match the extent, pixel size, and projection of the other dataset.
Expand Down Expand Up @@ -173,7 +217,7 @@ def generate_latitude_array(self):

return latitudes

def export_to_geotiff(self, output_path):
def export_to_geotiff(self, output_path: str):
"""
Exports the dataset to a geotiff with the given path
Expand Down Expand Up @@ -204,5 +248,37 @@ def export_to_geotiff(self, output_path):
output_band = None
del output_band

def get_nodata_mask(self) -> Tuple[Optional[np.ndarray], Optional[Union[float, int]]]:
band = self.ds.GetRasterBand(self.band)
nodata_value = band.GetNoDataValue()

if nodata_value is not None:
nodata_mask = band.ReadAsArray() == nodata_value
return nodata_mask, nodata_value

return None, None

def as_gdal_ds(self) -> gdal.Dataset:
return self.ds

def close(self):
self.ds = None


@contextmanager
def multi_wps_dataset_context(dataset_paths: List[str]) -> Iterator[List[WPSDataset]]:
"""
Context manager to handle multiple WPSDataset instances.
:param dataset_paths: List of dataset paths to open as WPSDataset instances
:yield: List of WPSDataset instances, one for each path
"""
datasets = [WPSDataset(path) for path in dataset_paths]
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in datasets]
finally:
# Close all datasets to ensure cleanup
for ds in datasets:
ds.close()
69 changes: 69 additions & 0 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
from datetime import datetime, timezone
import logging
import os
import sys

from app import configure_logging
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.date_range_processor import BUIDateRangeProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
from app.geospatial.wps_dataset import multi_wps_dataset_context


logger = logging.getLogger(__name__)

DAYS_TO_CALCULATE = 2


class SFMSCalcJob:
async def calculate_bui(self, start_time: datetime):
"""
Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
"""
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")

start_exec = get_utc_now()

bui_processor = BUIDateRangeProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

async with S3Client() as s3_client:
await bui_processor.process_bui(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")


def main():
if len(sys.argv) > 1:
try:
# command-line arg as 'YYYY-MM-DD HH'
start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H").replace(tzinfo=timezone.utc)
except ValueError:
logger.error("Error: Please provide the date and hour in 'YYYY-MM-DD HH' format (as a single string)")
sys.exit(1)
else:
# default to the current datetime
start_time = get_utc_now()
try:
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_bui(start_time))
except Exception as e:
logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
send_rocketchat_notification(rc_message, e)
sys.exit(os.EX_SOFTWARE)


if __name__ == "__main__":
configure_logging()
main()
Empty file added api/app/sfms/__init__.py
Empty file.
143 changes: 143 additions & 0 deletions api/app/sfms/date_range_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import os
import tempfile
from datetime import datetime, timedelta
from typing import Callable, Tuple, List, Iterator, cast

import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client
from app.weather_models.rdps_filename_marshaller import model_run_for_hour

logger = logging.getLogger(__name__)

# Type alias for clarity: the context manager function signature
MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]]


class BUIDateRangeProcessor:
"""
Class for calculating/generating forecasted DMC/DC/BUI rasters for a date range
"""

def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.days = days
self.addresser = addresser

async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
set_s3_gdal_config()

for day in range(self.days):
datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day)
logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}")

# Get and check existence of weather s3 keys
temp_key, rh_key, _, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
if not weather_keys_exist:
logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run")
break

# get and check existence of fwi s3 keys
dc_key, dmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key)
if not fwi_keys_exist:
logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}")
break

temp_key, rh_key, precip_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, precip_key)
dc_key, dmc_key = self.addresser.gdal_prefix_keys(dc_key, dmc_key)

with tempfile.TemporaryDirectory() as temp_dir:
with input_dataset_context([temp_key, rh_key, precip_key, dc_key, dmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, precip_ds, dc_ds, dmc_ds = input_datasets

# Warp weather datasets to match fwi
warped_temp_ds = temp_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# close unneeded datasets to reduce memory usage
precip_ds.close()
rh_ds.close()
temp_ds.close()
# Create latitude and month arrays needed for calculations
latitude_array = dmc_ds.generate_latitude_array()
month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month)

# Create and store DMC dataset
dmc_values, dmc_nodata_value = calculate_dmc(dmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array)
new_dmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DMC)
new_dmc_path = await s3_client.persist_raster_data(
temp_dir,
new_dmc_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
dmc_values,
dmc_nodata_value,
)

# Create and store DC dataset
dc_values, dc_nodata_value = calculate_dc(dc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array)
new_dc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DC)
new_dc_path = await s3_client.persist_raster_data(
temp_dir,
new_dc_key,
dc_ds.as_gdal_ds().GetGeoTransform(),
dc_ds.as_gdal_ds().GetProjection(),
dc_values,
dc_nodata_value,
)

# Open new DMC and DC datasets and calculate BUI
new_bui_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.BUI)
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference
new_dmc_ds, new_dc_ds = new_ds
bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds)

# Store the new BUI dataset
await s3_client.persist_raster_data(
temp_dir,
new_bui_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
bui_values,
nodata,
)

def _get_calculate_dates(self, day: int):
"""
Calculate the UTC date and times based on the provided day offset.
:param day: The day offset from the start date
:return: Tuple of (datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour)
"""
datetime_to_calculate_utc = self.start_datetime.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=day)
previous_fwi_datetime = datetime_to_calculate_utc - timedelta(days=1)
prediction_hour = 20 + (day * 24)
return datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour

def _get_previous_fwi_keys(self, day_to_calculate: int, previous_fwi_datetime: datetime) -> Tuple[str, str]:
"""
Based on the day being calculated, decide whether to use previously uploaded actuals from sfms or
calculated indices from the previous day's calculations.
:param day_to_calculate: day of the calculation loop
:param previous_fwi_datetime: the datetime previous to the datetime being calculated
:return: s3 keys for dc and dmc
"""
if day_to_calculate == 0: # if we're running the first day of the calculation, use previously uploaded actuals
dc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DMC)
else: # otherwise use the last calculated key
dc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DMC)
return dc_key, dmc_key
Loading

0 comments on commit c920992

Please sign in to comment.