Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUI calculations #4042

Merged
merged 82 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
16dce56
initial work
brettedw Oct 15, 2024
cd192c9
dmc & dc
brettedw Oct 15, 2024
2bf56af
remove unused import, typing, logging
conbrad Oct 15, 2024
0f94a25
increment day logic
brettedw Oct 15, 2024
79984b0
latitude array - no need for s3
brettedw Oct 15, 2024
b4ec153
use get_weather_data_keys
brettedw Oct 15, 2024
5a453ad
add alternate implementation
conbrad Oct 16, 2024
237609c
new implementation logic changes
brettedw Oct 17, 2024
c201a00
add all indices to enum
brettedw Oct 21, 2024
b69854d
working bui raster calcs
brettedw Oct 22, 2024
2246deb
simplify storage
brettedw Oct 22, 2024
c640946
func name change
brettedw Oct 22, 2024
7ffd9d3
Merge branch 'main' into task/bui-raster-calc
brettedw Oct 22, 2024
8508077
simplify addresser uploaded key
brettedw Oct 22, 2024
c7a977e
start tests
brettedw Oct 22, 2024
1de98ba
logging
brettedw Oct 22, 2024
89fbce4
use WPSDataset
brettedw Oct 23, 2024
df5dce1
imports
brettedw Oct 23, 2024
3a47001
get_nodata_mask
brettedw Oct 23, 2024
0291d2b
cronjob
brettedw Oct 23, 2024
afcb923
remove unused files
brettedw Oct 23, 2024
3ede95e
remove nodata from read_into_memory
brettedw Oct 23, 2024
7bd9553
import issue
brettedw Oct 23, 2024
9a001c9
comments
brettedw Oct 23, 2024
4cd212d
use sfms uploads first
brettedw Oct 24, 2024
20de630
fwi key logic
brettedw Oct 24, 2024
b394d1d
minor logging change
brettedw Oct 25, 2024
7976671
add gdal.UseExceptions to WPSDataset
brettedw Oct 25, 2024
f278781
processor cleanup
brettedw Oct 25, 2024
ff52bfd
add wps test
brettedw Oct 25, 2024
eb3a329
docstring
brettedw Oct 25, 2024
aaac19e
remove unused geospatial
brettedw Oct 25, 2024
939c504
docs
brettedw Oct 25, 2024
10008e7
file rename
brettedw Oct 25, 2024
ae7755c
Needs export_to_geotiff for now
brettedw Oct 25, 2024
93402a0
from_array method
brettedw Oct 25, 2024
1690e4c
fwi_processor tests
brettedw Oct 25, 2024
50ff7f2
wps from_array test
brettedw Oct 28, 2024
6f26705
Merge branch 'main' into task/bui-raster-calc
brettedw Oct 28, 2024
25a9c5f
date_range_processor tests start
brettedw Oct 28, 2024
0a422df
comments
brettedw Oct 28, 2024
a1ecb37
addresser as parameter
brettedw Oct 29, 2024
2ca5b4a
test skeleton with mocker patched key addresser
conbrad Oct 29, 2024
4f3eb23
Pull out s3 client
conbrad Oct 29, 2024
acd817c
Get test mocks working with basic assert
conbrad Oct 29, 2024
2b1e836
fold back in open and warp
conbrad Oct 29, 2024
52a37a1
pass in multidataset manager as parameter to process_bui for testing
conbrad Oct 30, 2024
af28e13
pass in dataset context for new dmc and dc datasets
conbrad Oct 30, 2024
b89fd71
date & addresser test calls
brettedw Oct 30, 2024
ef40005
mock context managers
conbrad Oct 30, 2024
fc3b925
More test assertions
conbrad Oct 30, 2024
a1bf987
more test assertions
conbrad Oct 30, 2024
d05855d
All spy checks asserted on
conbrad Oct 30, 2024
90866f9
Merge branch 'main' into task/bui-raster-calc
conbrad Oct 30, 2024
1924f30
Add error case tests
conbrad Oct 31, 2024
0282cdf
Remove unnecessary mock
conbrad Oct 31, 2024
73d5323
job test
brettedw Oct 31, 2024
0a1236e
imports
brettedw Oct 31, 2024
1ccb31e
Move persist raster logic to S3Client
conbrad Oct 31, 2024
6d12667
Remove main from date_range_processor
conbrad Oct 31, 2024
6b65375
Merge branch 'main' into task/bui-raster-calc
conbrad Oct 31, 2024
0e86905
Pull out common mock functions, test S3Client
conbrad Oct 31, 2024
769a9bf
test wps dataset context
conbrad Oct 31, 2024
f2e2f8c
Another test
conbrad Oct 31, 2024
032a1d6
Cleaned up some imports
conbrad Nov 4, 2024
9c351ed
remove unused s3 stuff
brettedw Nov 4, 2024
c3209e2
calculation tests
brettedw Nov 4, 2024
ef515a5
imports
brettedw Nov 4, 2024
18b6e96
test fix
brettedw Nov 4, 2024
276977f
deployment copy error
brettedw Nov 4, 2024
5583ce1
geospatial imports
brettedw Nov 4, 2024
2ae4f10
command line arg
brettedw Nov 5, 2024
1ea974a
cli test
brettedw Nov 5, 2024
90d0582
Trigger Build
brettedw Nov 5, 2024
d7ad1fb
job template
brettedw Nov 5, 2024
b677636
calc time
brettedw Nov 5, 2024
7d7e49f
drop cpu
brettedw Nov 5, 2024
451714d
reduce further
brettedw Nov 5, 2024
a316ea9
Merge branch 'main' into task/bui-raster-calc
brettedw Nov 5, 2024
050889c
drop memory
brettedw Nov 5, 2024
b51d74b
Merge branch 'main' into task/bui-raster-calc
brettedw Nov 6, 2024
8185e0e
doc string
brettedw Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,18 @@ jobs:
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_rdps_sfms_cronjob.sh ${SUFFIX} apply

- name: RDPS SFMS cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_rdps_sfms_cronjob.sh ${SUFFIX} apply

- name: SFMS Raster Calculations cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_sfms_calculations_cronjob.sh ${SUFFIX} apply

- name: Hourly pruner nightly cronjob
shell: bash
run: |
Expand Down
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@
"module": "app.auto_spatial_advisory.critical_hours",
"console": "integratedTerminal"
},
{
"name": "sfms raster processor job",
"type": "python",
"request": "launch",
"module": "app.jobs.sfms_calculations",
"console": "integratedTerminal"
},
{
"name": "Chrome",
"type": "pwa-chrome",
Expand Down
82 changes: 79 additions & 3 deletions api/app/geospatial/wps_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Optional
from contextlib import ExitStack, contextmanager
from typing import Iterator, List, Optional, Tuple, Union
from osgeo import gdal, osr
import numpy as np

from app.utils.geospatial import GDALResamplingMethod

gdal.UseExceptions()


class WPSDataset:
"""
Expand All @@ -26,6 +29,47 @@ def __enter__(self):
def __exit__(self, *_):
self.ds = None

@classmethod
def from_array(
cls,
array: np.ndarray,
geotransform: Tuple[float, float, float, float, float, float],
projection: str,
nodata_value: Optional[Union[float, int]] = None,
datatype=gdal.GDT_Float32,
) -> "WPSDataset":
"""
Create a WPSDataset from a NumPy array, geotransform, and projection.

:param array: NumPy array representing the raster data
:param geotransform: A tuple defining the geotransform
:param projection: WKT string of the projection
:param nodata_value: Optional nodata value to set for the dataset
:param datatype gdal datatype
:return: An instance of WPSDataset containing the created dataset
"""
rows, cols = array.shape

driver: gdal.Driver = gdal.GetDriverByName("MEM")
output_dataset: gdal.Dataset = driver.Create("memory", cols, rows, 1, datatype)

# Set the geotransform and projection
output_dataset.SetGeoTransform(geotransform)
output_dataset.SetProjection(projection)

# Write the array to the dataset
output_band: gdal.Band = output_dataset.GetRasterBand(1)
output_band.WriteArray(array)

# Set the NoData value if provided
if nodata_value is not None:
output_band.SetNoDataValue(nodata_value)

# Flush cache to ensure all data is written
output_band.FlushCache()

return cls(ds_path=None, ds=output_dataset, datatype=datatype)

def __mul__(self, other):
"""
Multiplies this WPSDataset with the other WPSDataset
Expand Down Expand Up @@ -93,7 +137,7 @@ def __mul__(self, other):

return WPSDataset(ds_path=None, ds=out_ds)

def warp_to_match(self, other, output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR):
def warp_to_match(self, other: "WPSDataset", output_path: str, resample_method: GDALResamplingMethod = GDALResamplingMethod.NEAREST_NEIGHBOUR):
"""
Warp the dataset to match the extent, pixel size, and projection of the other dataset.

Expand Down Expand Up @@ -173,7 +217,7 @@ def generate_latitude_array(self):

return latitudes

def export_to_geotiff(self, output_path):
def export_to_geotiff(self, output_path: str):
"""
Exports the dataset to a geotiff with the given path

Expand Down Expand Up @@ -204,5 +248,37 @@ def export_to_geotiff(self, output_path):
output_band = None
del output_band

def get_nodata_mask(self) -> Tuple[Optional[np.ndarray], Optional[Union[float, int]]]:
band = self.ds.GetRasterBand(self.band)
nodata_value = band.GetNoDataValue()

if nodata_value is not None:
nodata_mask = band.ReadAsArray() == nodata_value
return nodata_mask, nodata_value

return None, None

def as_gdal_ds(self) -> gdal.Dataset:
return self.ds

def close(self):
self.ds = None


@contextmanager
def multi_wps_dataset_context(dataset_paths: List[str]) -> Iterator[List[WPSDataset]]:
"""
Context manager to handle multiple WPSDataset instances.

:param dataset_paths: List of dataset paths to open as WPSDataset instances
:yield: List of WPSDataset instances, one for each path
"""
datasets = [WPSDataset(path) for path in dataset_paths]
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in datasets]
finally:
# Close all datasets to ensure cleanup
for ds in datasets:
ds.close()
54 changes: 54 additions & 0 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import logging
import os
import sys

from app import configure_logging
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.date_range_processor import BUIDateRangeProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
from app.geospatial.wps_dataset import multi_wps_dataset_context


logger = logging.getLogger(__name__)

DAYS_TO_CALCULATE = 2


class SFMSCalcJob:
async def calculate_bui(self):
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")

Check warning on line 22 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L22

Added line #L22 was not covered by tests

start_time = get_utc_now()

Check warning on line 24 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L24

Added line #L24 was not covered by tests

bui_processor = BUIDateRangeProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

Check warning on line 26 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L26

Added line #L26 was not covered by tests

async with S3Client() as s3_client:
await bui_processor.process_bui(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)

Check warning on line 29 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L28-L29

Added lines #L28 - L29 were not covered by tests

# calculate the execution time.
execution_time = get_utc_now() - start_time
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

Check warning on line 34 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L32-L34

Added lines #L32 - L34 were not covered by tests

logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

Check warning on line 36 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L36

Added line #L36 was not covered by tests


def main():
try:
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_bui())
except Exception as e:
logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
send_rocketchat_notification(rc_message, e)
sys.exit(os.EX_SOFTWARE)


if __name__ == "__main__":
configure_logging()
main()

Check warning on line 54 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L53-L54

Added lines #L53 - L54 were not covered by tests
Empty file added api/app/sfms/__init__.py
Empty file.
143 changes: 143 additions & 0 deletions api/app/sfms/date_range_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import os
import tempfile
from datetime import datetime, timedelta
from typing import Callable, Tuple, List, Iterator, cast

import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client
from app.weather_models.rdps_filename_marshaller import model_run_for_hour

logger = logging.getLogger(__name__)

# Type alias for clarity: the context manager function signature
MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]]


class BUIDateRangeProcessor:
"""
Class for calculating/generating forecasted DMC/DC/BUI rasters for a date range
"""

def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.days = days
self.addresser = addresser

async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
set_s3_gdal_config()

for day in range(self.days):
datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day)
logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}")

# Get and check existence of weather s3 keys
temp_key, rh_key, _, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
if not weather_keys_exist:
logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run")
break

# get and check existence of fwi s3 keys
dc_key, dmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key)
if not fwi_keys_exist:
logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}")
break

temp_key, rh_key, precip_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, precip_key)
dc_key, dmc_key = self.addresser.gdal_prefix_keys(dc_key, dmc_key)

with tempfile.TemporaryDirectory() as temp_dir:
with input_dataset_context([temp_key, rh_key, precip_key, dc_key, dmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, precip_ds, dc_ds, dmc_ds = input_datasets

# Warp weather datasets to match fwi
warped_temp_ds = temp_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# close unneeded datasets to reduce memory usage
precip_ds.close()
rh_ds.close()
temp_ds.close()
# Create latitude and month arrays needed for calculations
latitude_array = dmc_ds.generate_latitude_array()
month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month)

# Create and store DMC dataset
dmc_values, dmc_nodata_value = calculate_dmc(dmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array)
new_dmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DMC)
new_dmc_path = await s3_client.persist_raster_data(
temp_dir,
new_dmc_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
dmc_values,
dmc_nodata_value,
)

# Create and store DC dataset
dc_values, dc_nodata_value = calculate_dc(dc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, latitude_array, month_array)
new_dc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.DC)
new_dc_path = await s3_client.persist_raster_data(
temp_dir,
new_dc_key,
dc_ds.as_gdal_ds().GetGeoTransform(),
dc_ds.as_gdal_ds().GetProjection(),
dc_values,
dc_nodata_value,
)

# Open new DMC and DC datasets and calculate BUI
new_bui_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.BUI)
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference
new_dmc_ds, new_dc_ds = new_ds
bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds)

# Store the new BUI dataset
await s3_client.persist_raster_data(
temp_dir,
new_bui_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
bui_values,
nodata,
)

def _get_calculate_dates(self, day: int):
"""
Calculate the UTC date and times based on the provided day offset.
:param day: The day offset from the start date
:return: Tuple of (datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour)
"""
datetime_to_calculate_utc = self.start_datetime.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=day)
previous_fwi_datetime = datetime_to_calculate_utc - timedelta(days=1)
prediction_hour = 20 + (day * 24)
return datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour

def _get_previous_fwi_keys(self, day_to_calculate: int, previous_fwi_datetime: datetime) -> Tuple[str, str]:
"""
Based on the day being calculated, decide whether to use previously uploaded actuals from sfms or
calculated indices from the previous day's calculations.
:param day_to_calculate: day of the calculation loop
:param previous_fwi_datetime: the datetime previous to the datetime being calculated
:return: s3 keys for dc and dmc
"""
if day_to_calculate == 0: # if we're running the first day of the calculation, use previously uploaded actuals
dc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DMC)
else: # otherwise use the last calculated key
dc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DMC)
return dc_key, dmc_key
Loading
Loading