Skip to content

Commit

Permalink
Hook up 24 precip tiff rolling computations (#3762)
Browse files Browse the repository at this point in the history
- Hooks up the 24 hour accumulating precip raster calculations as part of the RDPS job.
- Uses the start datetime of the job (now in UTC) as the seed.
- Decided to start by storing every computed tif in s3 for now to be able to inspect intermediate results in dev if need be, this can be changed to use memory only later (using the same keys as keys in a hashmap).
- Closes #3682
  • Loading branch information
conbrad authored Jul 16, 2024
1 parent a4694cf commit 43572d7
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 56 deletions.
11 changes: 11 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,34 @@
],
"typescript.preferences.importModuleSpecifier": "non-relative",
"cSpell.words": [
"aiobotocore",
"Albers",
"allclose",
"anyio",
"APCP",
"botocore",
"excinfo",
"GDPS",
"GEOGCS",
"grib",
"gribs",
"hourlies",
"HRDPS",
"ndarray",
"numba",
"osgeo",
"PRECIP",
"PRIMEM",
"PROJCS",
"RDPS",
"rocketchat",
"sfms",
"sqlalchemy",
"tobytes",
"vectorize",
"VSIL",
"vsimem",
"vsis",
"WDIR"
],
"autoDocstring.docstringFormat": "sphinx-notypes",
Expand Down
10 changes: 6 additions & 4 deletions api/app/jobs/rdps_sfms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import os
import sys
from datetime import datetime, timedelta
from datetime import timedelta
from collections.abc import Generator
import logging
import tempfile
Expand All @@ -26,6 +26,7 @@
from app.utils.s3 import get_client
from app.rocketchat_notifications import send_rocketchat_notification
from app.jobs.env_canada_utils import get_regional_model_run_download_urls
from app.weather_models.precip_rdps_model import compute_and_store_precip_rasters

# If running as its own process, configure logging appropriately.
if __name__ == "__main__":
Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(self, session: Session):
self.exception_count = 0
# We always work in UTC:
self.now = time_utils.get_utc_now()
self.date_key = f"{self.now.year}-{self.now.month}-{self.now.day}"
self.date_key = self.now.date().isoformat()
self.session = session

def _get_file_name_from_url(self, url: str) -> str:
Expand Down Expand Up @@ -146,13 +147,14 @@ async def run(self):
logger.info("Begin download and storage of RDPS gribs.")

# grab the start time.
start_time = datetime.now()
start_time = time_utils.get_utc_now()
with get_write_session_scope() as session:
rdps_grib = RDPSGrib(session)
await rdps_grib.process()
await rdps_grib.apply_retention_policy(DAYS_TO_RETAIN)
await compute_and_store_precip_rasters(start_time)
# calculate the execution time.
execution_time = datetime.now() - start_time
execution_time = time_utils.get_utc_now() - start_time
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

Expand Down
15 changes: 9 additions & 6 deletions api/app/tests/weather_models/test_precip_rdps_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from app.weather_models.precip_rdps_model import TemporalPrecip, compute_precip_difference, get_raster_keys_to_diff, generate_24_hour_accumulating_precip_raster
from app.weather_models.rdps_filename_marshaller import model_run_for_hour

geotransform = (-4556441.403315245, 10000.0, 0.0, 920682.1411659503, 0.0, -10000.0)
projection = 'PROJCS["unnamed",GEOGCS["Coordinate System imported from GRIB file",DATUM["unnamed",SPHEROID["Sphere",6371229,0]],PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]]],PROJECTION["Polar_Stereographic"],PARAMETER["latitude_of_origin",60],PARAMETER["central_meridian",249],PARAMETER["false_easting",0],PARAMETER["false_northing",0],UNIT["Metre",1],AXIS["Easting",SOUTH],AXIS["Northing",SOUTH]]'


def test_difference_identity():
"""
Expand Down Expand Up @@ -49,8 +52,8 @@ async def test_generate_24_hour_accumulating_precip_raster_ok(mocker: MockerFixt
"""
Verify that the appropriate rasters are diffed correctly for non model hour.
"""
mocker.patch("app.weather_models.precip_rdps_model.read_into_memory", side_effect=[np.array([1, 1]), np.array([1, 1])])
res = await generate_24_hour_accumulating_precip_raster(datetime(2024, 1, 1, 1, tzinfo=timezone.utc))
mocker.patch("app.weather_models.precip_rdps_model.read_into_memory", side_effect=[(np.array([1, 1]), geotransform, projection), (np.array([1, 1]), geotransform, projection)])
(res, _, _) = await generate_24_hour_accumulating_precip_raster(datetime(2024, 1, 1, 1, tzinfo=timezone.utc))
assert np.allclose(res, np.array([0, 0]))


Expand All @@ -59,16 +62,16 @@ async def test_generate_24_hour_accumulating_precip_raster_model_hour_ok(mocker:
"""
Verify that the appropriate rasters are diffed correctly on a model hour -- just returns todays data.
"""
mocker.patch("app.weather_models.precip_rdps_model.read_into_memory", side_effect=[np.array([1, 1]), np.array([1, 1])])
res = await generate_24_hour_accumulating_precip_raster(datetime(2024, 1, 1, 0, tzinfo=timezone.utc))
mocker.patch("app.weather_models.precip_rdps_model.read_into_memory", side_effect=[(np.array([1, 1]), geotransform, projection), (np.array([1, 1]), geotransform, projection)])
(res, _, _) = await generate_24_hour_accumulating_precip_raster(datetime(2024, 1, 1, 0, tzinfo=timezone.utc))
assert np.allclose(res, np.array([1, 1]))


@pytest.mark.parametrize(
"current_time,today_raster,yesterday_raster",
[
(datetime(2024, 1, 1, 0, tzinfo=timezone.utc), None, np.array([1, 1])), # no today raster data
(datetime(2024, 1, 1, 0, tzinfo=timezone.utc), None, None), # no raster data
(datetime(2024, 1, 1, 0, tzinfo=timezone.utc), (None, None, None), (np.array([1, 1]), geotransform, projection)), # no today raster data
(datetime(2024, 1, 1, 0, tzinfo=timezone.utc), (None, None, None), (None, None, None)), # no raster data
],
)
@pytest.mark.anyio
Expand Down
67 changes: 38 additions & 29 deletions api/app/utils/s3.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,71 @@
""" Utils to help with s3
"""
"""Utils to help with s3"""

import logging
from typing import Generator, Tuple
from contextlib import asynccontextmanager
from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session
from botocore.exceptions import ClientError
from osgeo import gdal
from app import config

logger = logging.getLogger(__name__)


@asynccontextmanager
async def get_client() -> Generator[Tuple[AioBaseClient, str], None, None]:
""" Return AioBaseClient client and bucket
"""
server = config.get('OBJECT_STORE_SERVER')
user_id = config.get('OBJECT_STORE_USER_ID')
secret_key = config.get('OBJECT_STORE_SECRET')
bucket = config.get('OBJECT_STORE_BUCKET')
"""Return AioBaseClient client and bucket"""
server = config.get("OBJECT_STORE_SERVER")
user_id = config.get("OBJECT_STORE_USER_ID")
secret_key = config.get("OBJECT_STORE_SECRET")
bucket = config.get("OBJECT_STORE_BUCKET")

session = get_session()
async with session.create_client('s3',
endpoint_url=f'https://{server}',
aws_secret_access_key=secret_key,
aws_access_key_id=user_id) as client:
async with session.create_client("s3", endpoint_url=f"https://{server}", aws_secret_access_key=secret_key, aws_access_key_id=user_id) as client:
try:
yield client, bucket
finally:
del client


async def object_exists(client: AioBaseClient, bucket: str, target_path: str):
""" Check if and object exists in the object store
"""
"""Check if and object exists in the object store"""
# using list_objects, but could be using stat as well? don't know what's best.
result = await client.list_objects_v2(Bucket=bucket,
Prefix=target_path)
contents = result.get('Contents', None)
result = await client.list_objects_v2(Bucket=bucket, Prefix=target_path)
contents = result.get("Contents", None)
if contents:
for content in contents:
key = content.get('Key')
key = content.get("Key")
if key == target_path:
return True
return False


async def object_exists_v2(target_path: str):
""" Check if and object exists in the object store
"""
"""Check if and object exists in the object store"""
async with get_client() as (client, bucket):
return await object_exists(client, bucket, target_path)


async def read_into_memory(key: str):
async with get_client() as (client, bucket):
s3_source = await client.get_object(Bucket=bucket, Key=key)
mem_path = f'/vsimem/{key}'
s3_data = await s3_source['Body'].read()
gdal.FileFromMemBuffer(mem_path, s3_data)
data_source = gdal.Open(mem_path, gdal.GA_ReadOnly)
gdal.Unlink(mem_path)
dem_band = data_source.GetRasterBand(1)
dem_data = dem_band.ReadAsArray()
return dem_data
try:
s3_source = await client.get_object(Bucket=bucket, Key=key)
mem_path = f"/vsimem/{key}"
s3_data = await s3_source["Body"].read()
gdal.FileFromMemBuffer(mem_path, s3_data)
data_source = gdal.Open(mem_path, gdal.GA_ReadOnly)
gdal.Unlink(mem_path)
data_band = data_source.GetRasterBand(1)
data_geotransform = data_source.GetGeoTransform()
data_projection = data_source.GetProjection()
data_array = data_band.ReadAsArray()
data_source = None
del data_source
return (data_array, data_geotransform, data_projection)
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
logger.info("No object found for key: %s", key)
return (None, None, None)
else:
raise
82 changes: 74 additions & 8 deletions api/app/weather_models/precip_rdps_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import numpy
import tempfile
from osgeo import gdal, ogr
from app import config
from numba import vectorize
from app.utils.s3 import read_into_memory
from app.utils.s3 import get_client, read_into_memory
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key
from app.weather_models.rdps_filename_marshaller import SourcePrefix, adjust_forecast_hour, compose_computed_precip_rdps_key

logger = logging.getLogger(__name__)

RDPS_PRECIP_ACC_RASTER_PERMISSIONS = "public-read"


@dataclass
Expand All @@ -18,26 +27,83 @@ def is_after(self, other) -> bool:
return self.timestamp > other.timestamp


async def compute_and_store_precip_rasters(current_time: datetime):
"""
Given a UTC datetime, trigger 24 hours worth of accumulated precip
difference rasters and store them.
"""
async with get_client() as (client, bucket):
for hour in range(0, 24):
timestamp = current_time + timedelta(hours=hour)
(precip_diff_raster, geotransform, projection) = await generate_24_hour_accumulating_precip_raster(timestamp)
key = f"weather_models/{ModelEnum.RDPS.lower()}/{current_time.date().isoformat()}/" + compose_computed_precip_rdps_key(
current_time, current_time.hour, hour, SourcePrefix.COMPUTED
)

res = await client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1)
if "Contents" in res:
logger.info("File already exists for key: %s, skipping", key)
continue

bucket = config.get("OBJECT_STORE_BUCKET")

logger.info(
"Uploading RDPS 24 hour acc precip raster for date: %s, hour: %s, forecast hour: %s to %s",
current_time.date().isoformat(),
current_time.hour,
adjust_forecast_hour(current_time.hour, hour),
key,
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, current_time.date().isoformat() + "precip" + str(hour) + ".tif")
# Create temp file
driver = gdal.GetDriverByName("GTiff")
rows, cols = precip_diff_raster.shape
output_dataset = driver.Create(temp_filename, cols, rows, 1, gdal.GDT_Float32)
output_dataset.SetGeoTransform(geotransform)
output_dataset.SetProjection(projection)

if output_dataset is None:
raise IOError("Unable to create %s", key)

output_band = output_dataset.GetRasterBand(1)
output_band.WriteArray(precip_diff_raster)
output_band.FlushCache()
output_dataset = None
del output_dataset
output_band = None
del output_band

await client.put_object(
Bucket=bucket,
Key=key,
ACL=RDPS_PRECIP_ACC_RASTER_PERMISSIONS, # We need these to be accessible to everyone
Body=open(temp_filename, "rb"),
)

logger.info("Done uploading file to %s", key)


async def generate_24_hour_accumulating_precip_raster(current_time: datetime):
"""
Given a UTC datetime, grab the raster for that date
and the date for 24 hours before to compute the difference.
"""
(yesterday_key, today_key) = get_raster_keys_to_diff(current_time)
day_data = await read_into_memory(today_key)
(day_data, day_geotransform, day_projection) = await read_into_memory(today_key)
if yesterday_key is None:
if day_data is None:
raise ValueError("No precip raster data for %s", today_key)
return day_data
return (day_data, day_geotransform, day_projection)

yesterday_time = current_time - timedelta(days=1)
yesterday_data = await read_into_memory(yesterday_key)
(yesterday_data, _, _) = await read_into_memory(yesterday_key)
if yesterday_data is None:
raise ValueError("No precip raster data for %s", today_key, yesterday_key)

later_precip = TemporalPrecip(timestamp=current_time, precip_amount=day_data)
earlier_precip = TemporalPrecip(timestamp=yesterday_time, precip_amount=yesterday_data)
return compute_precip_difference(later_precip, earlier_precip)
return (compute_precip_difference(later_precip, earlier_precip), day_geotransform, day_projection)


def get_raster_keys_to_diff(timestamp: datetime):
Expand All @@ -50,10 +116,10 @@ def get_raster_keys_to_diff(timestamp: datetime):
# From earlier model run, get the keys for 24 hours before timestamp and the timestamp to perform the diff
earlier_key = f"{key_prefix}/"
later_key = f"{key_prefix}/"
later_key = later_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24)
later_key = later_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour + 24, SourcePrefix.CMC)
if target_model_run_date.hour != 0 and target_model_run_date.hour != 12:
# not a model run hour, return earlier and later keys to take difference
earlier_key = earlier_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour)
earlier_key = earlier_key + compose_computed_precip_rdps_key(target_model_run_date, target_model_run_date.hour, target_model_run_date.hour, SourcePrefix.CMC)
return (earlier_key, later_key)

# model run hour, just return the model value from 24 hours ago
Expand Down
Loading

0 comments on commit 43572d7

Please sign in to comment.