Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDPS precip output #3810

Merged
merged 17 commits into from
Aug 2, 2024
Merged
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@
"module": "app.auto_spatial_advisory.local.generate_classified_tpi",
"console": "integratedTerminal",
},
{
"name": "app.jobs.rdps_sfms ",
"type": "python",
"request": "launch",
"module": "app.jobs.rdps_sfms",
"console": "integratedTerminal"
},
{
"name": "Chrome",
"type": "pwa-chrome",
Expand Down
15 changes: 9 additions & 6 deletions api/app/jobs/rdps_sfms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import os
import sys
from datetime import timedelta
from datetime import timedelta, datetime, timezone

Check warning on line 9 in api/app/jobs/rdps_sfms.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/rdps_sfms.py#L9

Added line #L9 was not covered by tests
from collections.abc import Generator
import logging
import tempfile
Expand All @@ -27,6 +27,7 @@
from app.rocketchat_notifications import send_rocketchat_notification
from app.jobs.env_canada_utils import get_regional_model_run_download_urls
from app.weather_models.precip_rdps_model import compute_and_store_precip_rasters
from app.weather_models.rdps_filename_marshaller import model_run_for_hour

Check warning on line 30 in api/app/jobs/rdps_sfms.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/rdps_sfms.py#L30

Added line #L30 was not covered by tests

# If running as its own process, configure logging appropriately.
if __name__ == "__main__":
Expand All @@ -47,9 +48,7 @@


class RDPSGrib:
"""Class that orchestrates downloading, storage and retention policy of RDPS grib files.
TODO - Implement retention policy
"""
"""Class that orchestrates downloading, storage and retention policy of RDPS grib files."""

def __init__(self, session: Session):
"""Prep variables"""
Expand Down Expand Up @@ -128,7 +127,7 @@

async def apply_retention_policy(self, days_to_retain: int):
"""Delete objects from S3 storage and remove records from database that are older than DAYS_TO_RETAIN"""
logger.info(f"Applying retention policy to RDPS data downloaded for SFMS. Data in S3 and corresponding database records older than {days_to_retain} are being deleted.")
logger.info(f"Applying retention policy to RDPS data downloaded for SFMS. Data in S3 and corresponding database records older than {days_to_retain} days are being deleted.")

Check warning on line 130 in api/app/jobs/rdps_sfms.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/rdps_sfms.py#L130

Added line #L130 was not covered by tests
deletion_threshold = self.now - timedelta(days=days_to_retain)
records_for_deletion = get_rdps_sfms_urls_for_deletion(self.session, deletion_threshold)
async with get_client() as (client, bucket):
Expand All @@ -148,11 +147,15 @@

# grab the start time.
start_time = time_utils.get_utc_now()

# start our computing at the utc datetime of the most recent model run
model_run_hour = model_run_for_hour(start_time.hour)
model_start_time = datetime(start_time.year, start_time.month, start_time.day, model_run_hour, tzinfo=timezone.utc)

Check warning on line 153 in api/app/jobs/rdps_sfms.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/rdps_sfms.py#L152-L153

Added lines #L152 - L153 were not covered by tests
with get_write_session_scope() as session:
rdps_grib = RDPSGrib(session)
await rdps_grib.process()
await rdps_grib.apply_retention_policy(DAYS_TO_RETAIN)
await compute_and_store_precip_rasters(start_time)
await compute_and_store_precip_rasters(model_start_time)

Check warning on line 158 in api/app/jobs/rdps_sfms.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/rdps_sfms.py#L158

Added line #L158 was not covered by tests
# calculate the execution time.
execution_time = time_utils.get_utc_now() - start_time
hours, remainder = divmod(execution_time.seconds, 3600)
Expand Down
21 changes: 20 additions & 1 deletion api/app/tests/weather_models/test_rdps_filename_marshaller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from app.weather_models.rdps_filename_marshaller import parse_rdps_filename
from datetime import datetime, timezone
from app.weather_models.rdps_filename_marshaller import parse_rdps_filename, compose_computed_precip_rdps_key


def test_parse_rdps_filename_ok():
Expand Down Expand Up @@ -31,3 +32,21 @@ def test_parse_rdps_filename_ok():
def test_parse_rdps_filename_failure(filename):
with pytest.raises(Exception) as _:
parse_rdps_filename(filename)


@pytest.mark.parametrize(
"timestamp,expected_output_key",
[
(
datetime(2024, 7, 30, 10, tzinfo=timezone.utc),
"00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240730_10z.tif",
),
(
datetime(2024, 7, 30, 20, tzinfo=timezone.utc),
"12/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240730_20z.tif",
),
],
)
def test_compose_computed_precip_rdps_key(timestamp, expected_output_key):
output_precip_key = compose_computed_precip_rdps_key(timestamp)
assert output_precip_key == expected_output_key
40 changes: 21 additions & 19 deletions api/app/weather_models/precip_rdps_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from numba import vectorize
from app.utils.s3 import get_client, read_into_memory
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_computed_precip_rdps_key
from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_precip_rdps_key, compose_computed_precip_rdps_key

logger = logging.getLogger(__name__)

Expand All @@ -27,17 +27,17 @@
return self.timestamp > other.timestamp


async def compute_and_store_precip_rasters(current_time: datetime):
async def compute_and_store_precip_rasters(timestamp: datetime):
brettedw marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a UTC datetime, trigger 24 hours worth of accumulated precip
brettedw marked this conversation as resolved.
Show resolved Hide resolved
difference rasters and store them.
"""
async with get_client() as (client, bucket):
for hour in range(0, 24):
timestamp = current_time + timedelta(hours=hour)
(precip_diff_raster, geotransform, projection) = await generate_24_hour_accumulating_precip_raster(timestamp)
key = f"weather_models/{ModelEnum.RDPS.lower()}/{current_time.date().isoformat()}/" + compose_computed_precip_rdps_key(
current_time, current_time.hour, hour, SourcePrefix.COMPUTED
for hour in range(0, 36):
accumulation_timestamp = timestamp + timedelta(hours=hour)
(precip_diff_raster, geotransform, projection) = await generate_24_hour_accumulating_precip_raster(accumulation_timestamp)
key = f"weather_models/{ModelEnum.RDPS.lower()}/{accumulation_timestamp.date().isoformat()}/" + compose_computed_precip_rdps_key(

Check warning on line 39 in api/app/weather_models/precip_rdps_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/precip_rdps_model.py#L36-L39

Added lines #L36 - L39 were not covered by tests
brettedw marked this conversation as resolved.
Show resolved Hide resolved
accumulation_end_datetime=accumulation_timestamp
)

res = await client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1)
Expand All @@ -49,13 +49,13 @@

logger.info(
"Uploading RDPS 24 hour acc precip raster for date: %s, hour: %s, forecast hour: %s to %s",
current_time.date().isoformat(),
current_time.hour,
adjust_forecast_hour(current_time.hour, hour),
timestamp.date().isoformat(),
timestamp.hour,
adjust_forecast_hour(timestamp.hour, hour),
key,
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, current_time.date().isoformat() + "precip" + str(hour) + ".tif")
temp_filename = os.path.join(temp_dir, timestamp.date().isoformat() + "precip" + str(hour) + ".tif")

Check warning on line 58 in api/app/weather_models/precip_rdps_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/precip_rdps_model.py#L58

Added line #L58 was not covered by tests
# Create temp file
driver = gdal.GetDriverByName("GTiff")
rows, cols = precip_diff_raster.shape
Expand Down Expand Up @@ -84,24 +84,26 @@
logger.info("Done uploading file to %s", key)


async def generate_24_hour_accumulating_precip_raster(current_time: datetime):
async def generate_24_hour_accumulating_precip_raster(timestamp: datetime):
"""
Given a UTC datetime, grab the raster for that date
and the date for 24 hours before to compute the difference.
"""
(yesterday_key, today_key) = get_raster_keys_to_diff(current_time)
(yesterday_key, today_key) = get_raster_keys_to_diff(timestamp)
logger.info(yesterday_key)
brettedw marked this conversation as resolved.
Show resolved Hide resolved
logger.info(today_key)
(day_data, day_geotransform, day_projection) = await read_into_memory(today_key)
if yesterday_key is None:
if day_data is None:
raise ValueError("No precip raster data for %s", today_key)
raise ValueError("No precip raster data for %s" % today_key)
return (day_data, day_geotransform, day_projection)

yesterday_time = current_time - timedelta(days=1)
yesterday_time = timestamp - timedelta(days=1)
(yesterday_data, _, _) = await read_into_memory(yesterday_key)
if yesterday_data is None:
raise ValueError("No precip raster data for %s", today_key, yesterday_key)
conbrad marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("No precip raster data for %s" % yesterday_key)

Check warning on line 104 in api/app/weather_models/precip_rdps_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/precip_rdps_model.py#L104

Added line #L104 was not covered by tests

later_precip = TemporalPrecip(timestamp=current_time, precip_amount=day_data)
later_precip = TemporalPrecip(timestamp=timestamp, precip_amount=day_data)
earlier_precip = TemporalPrecip(timestamp=yesterday_time, precip_amount=yesterday_data)
return (compute_precip_difference(later_precip, earlier_precip), day_geotransform, day_projection)

Expand All @@ -116,10 +118,10 @@
# From earlier model run, get the keys for 24 hours before timestamp and the timestamp to perform the diff
earlier_key = f"{key_prefix}/"
later_key = f"{key_prefix}/"
later_key = later_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24, SourcePrefix.CMC)
later_key = later_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24)
if target_model_run_date.hour != 0 and target_model_run_date.hour != 12:
# not a model run hour, return earlier and later keys to take difference
earlier_key = earlier_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour, SourcePrefix.CMC)
earlier_key = earlier_key + compose_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour)
return (earlier_key, later_key)

# model run hour, just return the model value from 24 hours ago
Expand Down
57 changes: 50 additions & 7 deletions api/app/weather_models/rdps_filename_marshaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime
import enum
from typing import Literal
from app.weather_models import ModelEnum
from dataclasses import dataclass


class SourcePrefix(enum.Enum):
Expand Down Expand Up @@ -33,6 +35,25 @@ class SourcePrefix(enum.Enum):
FORECAST_HOURS = [f"{hour:03d}" for hour in list(range(0, 84))]


@dataclass(frozen=True)
brettedw marked this conversation as resolved.
Show resolved Hide resolved
class WeatherModelKeyParams:
variable: str
level_type: str
level: str


weather_key_parameters = {
"temp": WeatherModelKeyParams("TMP", "TGL", "2"),
"precip": WeatherModelKeyParams("APCP", "SFC", "0"),
"wind_speed": WeatherModelKeyParams("WIND", "TGL", "10"),
"rh": WeatherModelKeyParams("RH", "TGL", "2"),
}


def get_weather_key_params(parameter):
conbrad marked this conversation as resolved.
Show resolved Hide resolved
return weather_key_parameters.get(parameter, None)


def model_run_for_hour(hour: int) -> Literal[0, 12]:
"""Returns the model run the hour is for based on when the latest model ran."""
return 0 if hour < 12 else 12
Expand Down Expand Up @@ -77,28 +98,50 @@ def parse_rdps_filename(url: str):
return (forecast_start_date, run_hour, forecast_hour)


def check_compose_invariants(forecast_start_date: datetime, run_hour: int, forecast_hour: int):
def check_compose_invariants(forecast_start_date: datetime, run_hour: int, forecast_hour: int, weather_parameter: str):
"""Explode if any of these assertions fail"""
assert forecast_start_date.tzinfo is not None
assert int(forecast_start_date.utcoffset().total_seconds()) == 0
assert f"{forecast_hour:03d}" in FORECAST_HOURS
assert run_hour in list(range(0, 36))
assert weather_parameter in weather_key_parameters


def compose_computed_rdps_filename(forecast_start_date: datetime, run_hour: int, forecast_hour: int, source_prefix: Literal[SourcePrefix.CMC, SourcePrefix.COMPUTED]):
def compose_rdps_filename(forecast_start_date: datetime, run_hour: int, forecast_hour: int, weather_parameter: str):
"""Compose and return a computed RDPS url given a forecast start date, run hour and forecast hour."""
check_compose_invariants(forecast_start_date, run_hour, forecast_hour)
check_compose_invariants(forecast_start_date, run_hour, forecast_hour, weather_parameter)
key_params = get_weather_key_params(weather_parameter)
model_hour = model_run_for_hour(run_hour)
adjusted_forecast_hour = adjust_forecast_hour(run_hour, forecast_hour)
file_ext = ".grib2" if source_prefix == SourcePrefix.CMC else ".tif"
file_ext = ".grib2"

return (
f"{source_prefix.value}{DELIMITER}{REG}{DELIMITER}{APCP}{DELIMITER}{SFC}{DELIMITER}{LEVEL}{DELIMITER}{PS10KM}{DELIMITER}"
f"{SourcePrefix.CMC.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}"
f"{forecast_start_date.date().isoformat().replace('-','')}{model_hour:02d}{DELIMITER}P{adjusted_forecast_hour:03d}{file_ext}"
)


def compose_computed_precip_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int, source_prefix: Literal[SourcePrefix.CMC, SourcePrefix.COMPUTED]):
def compose_precip_rdps_key(forecast_start_date: datetime, run_hour: int, forecast_hour: int):
"""Compose and return a computed RDPS url given a forecast start date, run hour and forecast hour."""
model_hour = model_run_for_hour(run_hour)
return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(forecast_start_date, run_hour, forecast_hour, source_prefix)}"
return f"{model_hour:02d}/precip/{compose_rdps_filename(forecast_start_date, run_hour, forecast_hour, 'precip')}"


def compose_computed_rdps_filename(accumulation_end_datetime: datetime) -> str:
"""
Compose and return a computed RDPS url given the datetime that precip is being accumulated to.
For details on weather model naming conventions, see: https://github.com/bcgov/wps/tree/main/api/app/weather_models/weather-model-naming.md
"""
key_params = get_weather_key_params("precip")
file_ext = ".tif"

return (
f"{SourcePrefix.COMPUTED.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}"
f"{accumulation_end_datetime.strftime(f'%Y%m%d{DELIMITER}%Hz')}{file_ext}"
)


def compose_computed_precip_rdps_key(accumulation_end_datetime: datetime):
"""Compose and return a computed RDPS url given the datetime that precip is being accumulated to."""
model_hour = model_run_for_hour(accumulation_end_datetime.hour)
return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(accumulation_end_datetime)}"
38 changes: 38 additions & 0 deletions api/app/weather_models/weather-model-naming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Weather Model Naming Conventions

### Environment Canada weather model naming conventions

[RDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_rdps/readme_rdps-datamart_en/)\
[GDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_gdps/readme_gdps-datamart_en/)\
[HRDPS](https://eccc-msc.github.io/open-data/msc-data/nwp_hrdps/readme_hrdps-datamart_en/)

### 24 hour accumulated precipitation

Weather models calculate accumulated precipitation since the start of the model run.
For example:
`CMC_reg_APCP_SFC_0_ps10km_2024073100_P003.grib2` indicates 003 hours of precip accumulation from the start
of the model run on 2024-07-31 at 00:00 UTC (2024-07-30 17:00 PDT).

For various fire weather indices, 24 hour accumulated precip is required across the province. To accomplish this
we are differencing weather model grib2 files for specific times 24 hours apart and storing the result as a geotiff.

### Naming convention

- **COMPUTED** : constant string indicating that the data is computed data, not source data.
- **reg** : constant string indicating that the data is from the RDPS. This varies depending on weather model.
- **Variable** : Variable type included in this file. APCP refers to accumulated precipitation.
- **LevelType** : Level type. SFC refers to surface.
- **Level** : Level value. 0 refers to ground level (0 height above ground)
- **ps10km** : constant string indicating that the projection used is polar-stereographic at 10km resolution.
- **YYYYMMDD** : Year, month and day that the 24 hour accumulation refers to.
- **hhz** : The data includes 24 hours of precip accumulation ending at this UTC hour of **YYYYMMDD** [00...23]
- **tif** : constant string indicating the geotiff format is used

**Example**\
Input:\
`2024-07-31/00/precip/CMC_reg_APCP_SFC_0_ps10km_2024073100_P002.grib2`\
`2024-07-31/00/precip/CMC_reg_APCP_SFC_0_ps10km_2024073100_P026.grib2`

Output:\
`2024-08-01/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20240801_02z.tif`\
This geotiff contains 24 hours of precip accumulation ending on 2024-08-01 02:00 UTC. The geotiff will be stored according to the date and model run used in the calculation (00 in this case).
Loading