Skip to content

Commit

Permalink
RDPS precip output (#3810)
Browse files Browse the repository at this point in the history
- Changes file naming convention for output 24 hour precip
- Extends calculation of precip to 36 hours from model run time
  • Loading branch information
brettedw authored Aug 2, 2024
1 parent a5a2685 commit e83dd6e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 34 deletions.
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@
"module": "app.auto_spatial_advisory.local.generate_classified_tpi",
"console": "integratedTerminal",
},
{
"name": "app.jobs.rdps_sfms ",
"type": "python",
"request": "launch",
"module": "app.jobs.rdps_sfms",
"console": "integratedTerminal"
},
{
"name": "Chrome",
"type": "pwa-chrome",
Expand Down
15 changes: 9 additions & 6 deletions api/app/jobs/rdps_sfms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import os
import sys
from datetime import timedelta
from datetime import timedelta, datetime, timezone
from collections.abc import Generator
import logging
import tempfile
Expand All @@ -27,6 +27,7 @@
from app.rocketchat_notifications import send_rocketchat_notification
from app.jobs.env_canada_utils import get_regional_model_run_download_urls
from app.weather_models.precip_rdps_model import compute_and_store_precip_rasters
from app.weather_models.rdps_filename_marshaller import model_run_for_hour

# If running as its own process, configure logging appropriately.
if __name__ == "__main__":
Expand All @@ -47,9 +48,7 @@ def get_model_run_hours_to_process() -> Generator[int, None, None]:


class RDPSGrib:
"""Class that orchestrates downloading, storage and retention policy of RDPS grib files.
TODO - Implement retention policy
"""
"""Class that orchestrates downloading, storage and retention policy of RDPS grib files."""

def __init__(self, session: Session):
"""Prep variables"""
Expand Down Expand Up @@ -128,7 +127,7 @@ async def process(self):

async def apply_retention_policy(self, days_to_retain: int):
"""Delete objects from S3 storage and remove records from database that are older than DAYS_TO_RETAIN"""
logger.info(f"Applying retention policy to RDPS data downloaded for SFMS. Data in S3 and corresponding database records older than {days_to_retain} are being deleted.")
logger.info(f"Applying retention policy to RDPS data downloaded for SFMS. Data in S3 and corresponding database records older than {days_to_retain} days are being deleted.")
deletion_threshold = self.now - timedelta(days=days_to_retain)
records_for_deletion = get_rdps_sfms_urls_for_deletion(self.session, deletion_threshold)
async with get_client() as (client, bucket):
Expand All @@ -148,11 +147,15 @@ async def run(self):

# grab the start time.
start_time = time_utils.get_utc_now()

# start our computing at the utc datetime of the most recent model run
model_run_hour = model_run_for_hour(start_time.hour)
model_start_time = datetime(start_time.year, start_time.month, start_time.day, model_run_hour, tzinfo=timezone.utc)
with get_write_session_scope() as session:
rdps_grib = RDPSGrib(session)
await rdps_grib.process()
await rdps_grib.apply_retention_policy(DAYS_TO_RETAIN)
await compute_and_store_precip_rasters(start_time)
await compute_and_store_precip_rasters(model_start_time)
# calculate the execution time.
execution_time = time_utils.get_utc_now() - start_time
hours, remainder = divmod(execution_time.seconds, 3600)
Expand Down
21 changes: 20 additions & 1 deletion api/app/tests/weather_models/test_rdps_filename_marshaller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from app.weather_models.rdps_filename_marshaller import parse_rdps_filename
from datetime import datetime, timezone
from app.weather_models.rdps_filename_marshaller import parse_rdps_filename, compose_computed_precip_rdps_key


def test_parse_rdps_filename_ok():
Expand Down Expand Up @@ -31,3 +32,21 @@ def test_parse_rdps_filename_ok():
def test_parse_rdps_filename_failure(filename):
with pytest.raises(Exception) as _:
parse_rdps_filename(filename)


@pytest.mark.parametrize(
"timestamp,expected_output_key",
[
(
datetime(2024, 7, 30, 10, tzinfo=timezone.utc),
"00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240730_10z.tif",
),
(
datetime(2024, 7, 30, 20, tzinfo=timezone.utc),
"12/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240730_20z.tif",
),
],
)
def test_compose_computed_precip_rdps_key(timestamp, expected_output_key):
output_precip_key = compose_computed_precip_rdps_key(timestamp)
assert output_precip_key == expected_output_key
40 changes: 20 additions & 20 deletions api/app/weather_models/precip_rdps_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from numba import vectorize
from app.utils.s3 import get_client, read_into_memory
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_computed_precip_rdps_key
from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_precip_rdps_key, compose_computed_precip_rdps_key

logger = logging.getLogger(__name__)

Expand All @@ -27,17 +27,17 @@ def is_after(self, other) -> bool:
return self.timestamp > other.timestamp


async def compute_and_store_precip_rasters(current_time: datetime):
async def compute_and_store_precip_rasters(model_run_timestamp: datetime):
"""
Given a UTC datetime, trigger 24 hours worth of accumulated precip
Given a UTC datetime, trigger 36 hours worth of accumulated precip
difference rasters and store them.
"""
async with get_client() as (client, bucket):
for hour in range(0, 24):
timestamp = current_time + timedelta(hours=hour)
(precip_diff_raster, geotransform, projection) = await generate_24_hour_accumulating_precip_raster(timestamp)
key = f"weather_models/{ModelEnum.RDPS.lower()}/{current_time.date().isoformat()}/" + compose_computed_precip_rdps_key(
current_time, current_time.hour, hour, SourcePrefix.COMPUTED
for hour in range(0, 36):
accumulation_timestamp = model_run_timestamp + timedelta(hours=hour)
(precip_diff_raster, geotransform, projection) = await generate_24_hour_accumulating_precip_raster(accumulation_timestamp)
key = f"weather_models/{ModelEnum.RDPS.lower()}/{accumulation_timestamp.date().isoformat()}/" + compose_computed_precip_rdps_key(
accumulation_end_datetime=accumulation_timestamp
)

res = await client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1)
Expand All @@ -49,13 +49,13 @@ async def compute_and_store_precip_rasters(current_time: datetime):

logger.info(
"Uploading RDPS 24 hour acc precip raster for date: %s, hour: %s, forecast hour: %s to %s",
current_time.date().isoformat(),
current_time.hour,
adjust_forecast_hour(current_time.hour, hour),
model_run_timestamp.date().isoformat(),
model_run_timestamp.hour,
adjust_forecast_hour(model_run_timestamp.hour, hour),
key,
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, current_time.date().isoformat() + "precip" + str(hour) + ".tif")
temp_filename = os.path.join(temp_dir, model_run_timestamp.date().isoformat() + "precip" + str(hour) + ".tif")
# Create temp file
driver = gdal.GetDriverByName("GTiff")
rows, cols = precip_diff_raster.shape
Expand Down Expand Up @@ -84,24 +84,24 @@ async def compute_and_store_precip_rasters(current_time: datetime):
logger.info("Done uploading file to %s", key)


async def generate_24_hour_accumulating_precip_raster(current_time: datetime):
async def generate_24_hour_accumulating_precip_raster(timestamp: datetime):
"""
Given a UTC datetime, grab the raster for that date
and the date for 24 hours before to compute the difference.
"""
(yesterday_key, today_key) = get_raster_keys_to_diff(current_time)
(yesterday_key, today_key) = get_raster_keys_to_diff(timestamp)
(day_data, day_geotransform, day_projection) = await read_into_memory(today_key)
if yesterday_key is None:
if day_data is None:
raise ValueError("No precip raster data for %s", today_key)
raise ValueError("No precip raster data for %s" % today_key)
return (day_data, day_geotransform, day_projection)

yesterday_time = current_time - timedelta(days=1)
yesterday_time = timestamp - timedelta(days=1)
(yesterday_data, _, _) = await read_into_memory(yesterday_key)
if yesterday_data is None:
raise ValueError("No precip raster data for %s", today_key, yesterday_key)
raise ValueError("No precip raster data for %s" % yesterday_key)

later_precip = TemporalPrecip(timestamp=current_time, precip_amount=day_data)
later_precip = TemporalPrecip(timestamp=timestamp, precip_amount=day_data)
earlier_precip = TemporalPrecip(timestamp=yesterday_time, precip_amount=yesterday_data)
return (compute_precip_difference(later_precip, earlier_precip), day_geotransform, day_projection)

Expand All @@ -116,10 +116,10 @@ def get_raster_keys_to_diff(timestamp: datetime):
# From earlier model run, get the keys for 24 hours before timestamp and the timestamp to perform the diff
earlier_key = f"{key_prefix}/"
later_key = f"{key_prefix}/"
later_key = later_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24, SourcePrefix.CMC)
later_key = later_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24)
if target_model_run_date.hour != 0 and target_model_run_date.hour != 12:
# not a model run hour, return earlier and later keys to take difference
earlier_key = earlier_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour, SourcePrefix.CMC)
earlier_key = earlier_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour)
return (earlier_key, later_key)

# model run hour, just return the model value from 24 hours ago
Expand Down
57 changes: 50 additions & 7 deletions api/app/weather_models/rdps_filename_marshaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime
import enum
from typing import Literal
from app.weather_models import ModelEnum
from dataclasses import dataclass


class SourcePrefix(enum.Enum):
Expand Down Expand Up @@ -33,6 +35,25 @@ class SourcePrefix(enum.Enum):
FORECAST_HOURS = [f"{hour:03d}" for hour in list(range(0, 84))]


@dataclass(frozen=True)
class WeatherModelKeyParams:
variable: str
level_type: str
level: str


weather_key_parameters = {
"temp": WeatherModelKeyParams("TMP", "TGL", "2"),
"precip": WeatherModelKeyParams("APCP", "SFC", "0"),
"wind_speed": WeatherModelKeyParams("WIND", "TGL", "10"),
"rh": WeatherModelKeyParams("RH", "TGL", "2"),
}


def get_weather_key_params(parameter):
return weather_key_parameters.get(parameter, None)


def model_run_for_hour(hour: int) -> Literal[0, 12]:
"""Returns the model run the hour is for based on when the latest model ran."""
return 0 if hour < 12 else 12
Expand Down Expand Up @@ -77,28 +98,50 @@ def parse_rdps_filename(url: str):
return (forecast_start_date, run_hour, forecast_hour)


def check_compose_invariants(forecast_start_date: datetime, run_hour: int, forecast_hour: int):
def check_compose_invariants(forecast_start_date: datetime, run_hour: int, forecast_hour: int, weather_parameter: str):
"""Explode if any of these assertions fail"""
assert forecast_start_date.tzinfo is not None
assert int(forecast_start_date.utcoffset().total_seconds()) == 0
assert f"{forecast_hour:03d}" in FORECAST_HOURS
assert run_hour in list(range(0, 36))
assert weather_parameter in weather_key_parameters


def compose_computed_rdps_filename(forecast_start_date: datetime, run_hour: int, forecast_hour: int, source_prefix: Literal[SourcePrefix.CMC, SourcePrefix.COMPUTED]):
def compose_rdps_filename(forecast_start_date: datetime, run_hour: int, forecast_hour: int, weather_parameter: str):
"""Compose and return a computed RDPS url given a forecast start date, run hour and forecast hour."""
check_compose_invariants(forecast_start_date, run_hour, forecast_hour)
check_compose_invariants(forecast_start_date, run_hour, forecast_hour, weather_parameter)
key_params = get_weather_key_params(weather_parameter)
model_hour = model_run_for_hour(run_hour)
adjusted_forecast_hour = adjust_forecast_hour(run_hour, forecast_hour)
file_ext = ".grib2" if source_prefix == SourcePrefix.CMC else ".tif"
file_ext = ".grib2"

return (
f"{source_prefix.value}{DELIMITER}{REG}{DELIMITER}{APCP}{DELIMITER}{SFC}{DELIMITER}{LEVEL}{DELIMITER}{PS10KM}{DELIMITER}"
f"{SourcePrefix.CMC.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}"
f"{forecast_start_date.date().isoformat().replace('-','')}{model_hour:02d}{DELIMITER}P{adjusted_forecast_hour:03d}{file_ext}"
)


def compose_computed_precip_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int, source_prefix: Literal[SourcePrefix.CMC, SourcePrefix.COMPUTED]):
def compose_precip_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int):
"""Compose and return a computed RDPS url given a forecast start date, run hour and forecast hour."""
model_hour = model_run_for_hour(run_hour)
return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(forecast_start_date, run_hour, forecast_hour, source_prefix)}"
return f"{model_hour:02d}/precip/{compose_rdps_filename(forecast_start_date, run_hour, forecast_hour, 'precip')}"


def compose_computed_rdps_filename(accumulation_end_datetime: datetime) -> str:
"""
Compose and return a computed RDPS url given the datetime that precip is being accumulated to.
For details on weather model naming conventions, see: https://github.com/bcgov/wps/tree/main/api/app/weather_models/weather-model-naming.md
"""
key_params = get_weather_key_params("precip")
file_ext = ".tif"

return (
f"{SourcePrefix.COMPUTED.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}"
f"{accumulation_end_datetime.strftime(f'%Y%m%d{DELIMITER}%Hz')}{file_ext}"
)


def compose_computed_precip_rdps_key(accumulation_end_datetime: datetime):
"""Compose and return a computed RDPS url given the datetime that precip is being accumulated to."""
model_hour = model_run_for_hour(accumulation_end_datetime.hour)
return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(accumulation_end_datetime)}"
38 changes: 38 additions & 0 deletions api/app/weather_models/weather-model-naming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Weather Model Naming Conventions

### Environment Canada weather model naming conventions

[RDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_rdps/readme_rdps-datamart_en/)\
[GDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_gdps/readme_gdps-datamart_en/)\
[HRDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_hrdps/readme_hrdps-datamart_en/)

### 24 hour accumulated precipitation

Weather models calculate accumulated precipitation since the start of the model run.
For example:
`CMC_reg_APCP_SFC_0_ps10km_2024073100_P003.grib2` indicates 003 hours of precip accumulation from the start
of the model run on 2024-07-31 at 00:00 UTC (2024-07-30 17:00 PDT).

For various fire weather indices, 24 hour accumulated precip is required across the province. To accomplish this
we are differencing weather model grib2 files for specific times 24 hours apart and storing the result as a geotiff.

### Naming convention

- **COMPUTED** : constant string indicating that the data is computed data, not source data.
- **reg** : constant string indicating that the data is from the RDPS. This varies depending on weather model.
- **Variable** : Variable type included in this file. APCP refers to accumulated precipitation.
- **LevelType** : Level type. SFC refers to surface.
- **Level** : Level value. 0 refers to ground level (0 height above ground)
- **ps10km** : constant string indicating that the projection used is polar-stereographic at 10km resolution.
- **YYYYMMDD** : Year, month and day that the 24 hour accumulation refers to.
- **hhz** : The data includes 24 hours of precip accumulation ending at this UTC hour of **YYYYMMDD** [00...23]
- **tif** : constant string indicating the geotiff format is used

**Example**\
Input:\
`2024-07-31/00/precip/CMC_reg_APCP_SFC_0_ps10km_2024073100_P002.grib2`\
`2024-07-31/00/precip/CMC_reg_APCP_SFC_0_ps10km_2024073100_P026.grib2`

Output:\
`2024-08-01/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240801_02z.tif`\
This geotiff contains 24 hours of precip accumulation ending on 2024-08-01 02:00 UTC. The geotiff will be stored according to the date and model run used in the calculation (00 in this case).

0 comments on commit e83dd6e

Please sign in to comment.