From 98966dc12f75e1dc94983365e034a99c5cb67262 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Mon, 21 Oct 2024 11:23:37 -0700 Subject: [PATCH] Remove StationSourceEnum --- api/app/jobs/common_model_fetchers.py | 6 +- api/app/jobs/critical_hours_job.py | 399 ++++++++++++++++++ api/app/jobs/env_canada.py | 11 +- api/app/jobs/noaa.py | 11 +- api/app/routers/stations.py | 15 +- api/app/stations.py | 61 +-- .../weather_models/test_env_canada_gdps.py | 14 +- .../weather_models/test_env_canada_hrdps.py | 8 +- .../weather_models/test_env_canada_rdps.py | 9 +- .../tests/weather_models/test_process_grib.py | 6 +- api/app/weather_models/process_grib.py | 10 +- 11 files changed, 456 insertions(+), 94 deletions(-) create mode 100644 api/app/jobs/critical_hours_job.py diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 6ecc66a835..8141949e81 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -23,7 +23,7 @@ from app import config, configure_logging import app.utils.time as time_utils from app.utils.redis import create_redis -from app.stations import get_stations_synchronously, StationSourceEnum +from app.stations import get_stations_synchronously from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp, WeatherStationModelPrediction, ModelRunPrediction) import app.db.database @@ -187,10 +187,10 @@ class ModelValueProcessor: """ Iterate through model runs that have completed, and calculate the interpolated weather predictions. """ - def __init__(self, session, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session): """ Prepare variables we're going to use throughout """ self.session = session - self.stations = get_stations_synchronously(station_source) + self.stations = get_stations_synchronously() self.station_count = len(self.stations) def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum): diff --git a/api/app/jobs/critical_hours_job.py b/api/app/jobs/critical_hours_job.py new file mode 100644 index 0000000000..b86ccccc3f --- /dev/null +++ b/api/app/jobs/critical_hours_job.py @@ -0,0 +1,399 @@ +from collections import defaultdict +from datetime import date, datetime, timedelta +from typing import Tuple +from osgeo import gdal, ogr, osr +import asyncio +import logging +import math +import numpy as np +import os +import requests +from sqlalchemy.ext.asyncio import AsyncSession +import statistics +import sys +import tempfile +import xml.etree.ElementTree as ET +from app import configure_logging +from app.db.crud.grass_curing import get_last_percent_grass_curing_for_date, save_percent_grass_curing +from app.db.database import get_async_read_session_scope, get_async_write_session_scope +from app.fire_behaviour import cffdrs +from app.fire_behaviour.fuel_types import FUEL_TYPE_DEFAULTS, FuelTypeEnum +from app.fire_behaviour.prediction import build_hourly_rh_dict, calculate_cfb, get_critical_hours +from app.fire_behaviour.wind_speed import calculate_wind_speed_result +from app.geospatial import WGS84 +from app.db.models.grass_curing import PercentGrassCuring +from app.hourlies import get_hourly_readings_in_time_interval +from app.rocketchat_notifications import send_rocketchat_notification +from app.schemas.fba_calc import WindResult +from app.stations import get_stations_asynchronously + + +from aiohttp.client import ClientSession +from app.db.crud.auto_spatial_advisory import ( + get_all_sfms_fuel_type_records, + get_fire_zone_unit_shape_type_id, + get_most_recent_run_parameters, + get_containing_zone, + get_fuel_type_stats_in_advisory_area, + save_critical_hours, +) +from app.db.models.auto_spatial_advisory import ( + AdvisoryFuelStats, + CriticalHours, + HfiClassificationThresholdEnum, + Shape, + ClassifiedHfi, + HfiClassificationThreshold, + SFMSFuelType, + RunTypeEnum, + FuelType, + HighHfiArea, + RunParameters, + AdvisoryElevationStats, + ShapeType, +) +from app.utils.time import get_hour_20_from_date, get_julian_date, get_utc_now +from app.wildfire_one import wfwx_api +from app.wildfire_one.schema_parsers import WFWXWeatherStation + +logger = logging.getLogger(__name__) + + +class PointTransformer: + def __init__(self, source_srs, target_srs): + source = osr.SpatialReference() + source.ImportFromEPSG(source_srs) + target = osr.SpatialReference() + target.ImportFromEPSG(target_srs) + self.transform = osr.CoordinateTransformation(source, target) + + def transform_coordinate(self, x: float, y: float) -> Tuple[float, float]: + point = ogr.CreateGeometryFromWkt(f"POINT ({x} {y})") + point.Transform(self.transform) + return (point.GetX(), point.GetY()) + + +class CriticalHoursJob: + def __init__(self): + self.db_session = None + self.client_session = None + + def _set_db_session(self, db_session: AsyncSession): + self.db_session = db_session + + def _set_client_session(self, client_session: ClientSession): + self.client_session = client_session + + def _calculate_wind_speed_result(self, yesterday: dict, raw_daily: dict) -> WindResult: + # extract variable from wf1 that we need to calculate the fire behaviour advisory. + bui = cffdrs.bui_calc(raw_daily.get("duffMoistureCode", None), raw_daily.get("droughtCode", None)) + temperature = raw_daily.get("temperature", None) + relative_humidity = raw_daily.get("relativeHumidity", None) + precipitation = raw_daily.get("precipitation", None) + + wind_speed = raw_daily.get("windSpeed", None) + status = raw_daily.get("recordType").get("id") + + ffmc = cffdrs.fine_fuel_moisture_code(yesterday.get("fineFuelMoistureCode", None), temperature, relative_humidity, precipitation, wind_speed) + isi = cffdrs.initial_spread_index(ffmc, wind_speed) + fwi = cffdrs.fire_weather_index(isi, bui) + return WindResult(ffmc=ffmc, isi=isi, bui=bui, wind_speed=wind_speed, fwi=fwi, status=status) + + async def _calculate_critical_hours(): + # Fetch all stations from WF1 API + stations = get_stations_asynchronously() + zone_unit_shape_type_id = await get_fire_zone_unit_shape_type_id() + zone_units = 1 + stations_by_zone_unit = {} + + def _sort_station_by_zone(self, stations: list[WFWXWeatherStation]): + stations_by_zone = defaultdict(list) + for station in stations: + stations_by_zone[station.zone_code].append(station) + return stations_by_zone + + async def _get_most_recent_sfms_run_parameters(self, run_type: RunTypeEnum, for_date: datetime): + result = await get_most_recent_run_parameters(self.db_session, RunTypeEnum.forecast, date(2024, 7, 16)) + if len(result) > 0: + return result[0] + return None + + # def _get_high_hfi_fuel_types(self, session: ClientSession, station_code: int, for_date: datetime): + + async def _calculate_critical_hours_for_station_by_fuel_type( + self, + wfwx_station: WFWXWeatherStation, + dailies_by_station_id: dict, + yesterday_dailies_by_station_id: dict, + hourly_observations_by_station_id: dict, + fuel_type: FuelTypeEnum, + for_date: datetime, + ): + raw_daily = dailies_by_station_id[wfwx_station.wfwx_id] + raw_observations = hourly_observations_by_station_id[wfwx_station.code] + yesterday = yesterday_dailies_by_station_id[wfwx_station.wfwx_id] + last_observed_morning_rh_values = build_hourly_rh_dict(raw_observations.values) + + wind_result = self._calculate_wind_speed_result(yesterday, raw_daily) + bui = wind_result.bui + ffmc = wind_result.ffmc + isi = wind_result.isi + fuel_type_info = FUEL_TYPE_DEFAULTS[fuel_type] + percentage_conifer = fuel_type_info.get("PC", None) + percentage_dead_balsam_fir = fuel_type_info.get("PDF", None) + crown_base_height = fuel_type_info.get("CBH", None) + cfl = fuel_type_info.get("CFL", None) + grass_cure = yesterday.get("grasslandCuring", None) + wind_speed = wind_result.wind_speed + yesterday_ffmc = yesterday.get("fineFuelMoistureCode", None) + julian_date = get_julian_date(for_date) + fmc = cffdrs.foliar_moisture_content(int(wfwx_station.lat), int(wfwx_station.long), wfwx_station.elevation, julian_date) + sfc = cffdrs.surface_fuel_consumption(fuel_type, bui, ffmc, percentage_conifer) + ros = cffdrs.rate_of_spread( + fuel_type, + isi=isi, + bui=bui, + fmc=fmc, + sfc=sfc, + pc=percentage_conifer, + cc=grass_cure, + pdf=percentage_dead_balsam_fir, + cbh=crown_base_height, + ) + cfb = calculate_cfb(fuel_type, fmc, sfc, ros, crown_base_height) + + critical_hours = get_critical_hours( + 4000, + fuel_type, + percentage_conifer, + percentage_dead_balsam_fir, + bui, + grass_cure, + crown_base_height, + ffmc, + fmc, + cfb, + cfl, + wind_speed, + yesterday_ffmc, + last_observed_morning_rh_values, + ) + + return critical_hours + + # async def _calculate_critical_hours_for_stations(self, header, stations): + # for_date = get_hour_20_from_date(get_utc_now()) + # time_of_interest = get_hour_20_from_date(for_date) + # unique_station_codes = list(set(station.code for station in stations)) + # # get the dailies for all the stations + # dailies = await wfwx_api.get_dailies_generator(self.session, header, stations, time_of_interest, time_of_interest) + # # turn it into a dictionary so we can easily get at data using a station id + # dailies_by_station_id = {raw_daily.get("stationId"): raw_daily async for raw_daily in dailies} + # # must retrieve the previous day's observed/forecasted FFMC value from WFWX + # prev_day = time_of_interest - timedelta(days=1) + # # get the "daily" data for the station for the previous day + # yesterday_response = await wfwx_api.get_dailies_generator(self.session, header, stations, prev_day, prev_day) + # # turn it into a dictionary so we can easily get at data + # yesterday_dailies_by_station_id = {raw_daily.get("stationId"): raw_daily async for raw_daily in yesterday_response} + # # get hourly observation history from our API (used for calculating morning diurnal FFMC) + # hourly_observations = await get_hourly_readings_in_time_interval(unique_station_codes, time_of_interest - timedelta(days=4), time_of_interest) + # # also turn hourly obs data into a dict indexed by station id + # hourly_obs_by_station_code = {raw_hourly.station.code: raw_hourly for raw_hourly in hourly_observations} + + # # we need a lookup from station code to station id + # # TODO: this is a bit silly, the call to get_wfwx_stations_from_station_codes repeats a lot of this! + # wfwx_station_lookup = {wfwx_station.code: wfwx_station for wfwx_station in stations} + # stations_critical_hours = {} + # for station in stations: + # critical_hours = await self._calculate_critical_hours_for_station(station, dailies_by_station_id, yesterday_dailies_by_station_id, hourly_obs_by_station_code, for_date) + # return + + def _get_fuel_types_by_area(self, advisory_fuel_stats): + fuel_types_by_area = {} + for row in advisory_fuel_stats: + advisory_fuel_stat = row[0] + sfms_fuel_type = row[1] + key = sfms_fuel_type.fuel_type_code + if key == "Non-fuel": + continue + if key in fuel_types_by_area: + fuel_types_by_area[key] += advisory_fuel_stat.area + else: + fuel_types_by_area[key] = advisory_fuel_stat.area + return fuel_types_by_area + + def _calculate_representative_hours(self, critical_hours): + print(critical_hours) + + def _check_station_valid(self, wfwx_station: WFWXWeatherStation, dailies, hourlies) -> bool: + """ + Checks if there is sufficient information to calculate critical hours for the specified station. + + :param wfwx_station: The station of interest. + :param yesterdays: Yesterday's station data based on observations and FWI calculations. + :param hourlies: Hourly observations from yesterday. + :return: True if the station can be used for critical hours calculations, otherwise false. + """ + if wfwx_station.wfwx_id not in dailies or wfwx_station.code not in hourlies: + return False + daily = dailies[wfwx_station.wfwx_id] + if daily["duffMoistureCode"] is None or daily["droughtCode"] is None or daily["fineFuelMoistureCode"] is None: + return False + return True + + def _determine_start_time(self, times: list[float]) -> float: + if len(times) < 3: + return min(times) + return math.floor(np.percentile(times, 25)) + + def _determine_end_time(self, times: list[float]) -> float: + if len(times) < 3: + return max(times) + return math.ceil(np.percentile(times, 75)) + + def _calculate_representative_hours(self, critical_hours): + start_times = [] + end_times = [] + for hours in critical_hours: + start_times.append(hours.start) + end_times.append(hours.end) + start_time = self._determine_start_time(start_times) + end_time = self._determine_end_time(end_times) + return (start_time, end_time) + + async def _get_sfms_fuel_types_dict(self): + sfms_fuel_types = await get_all_sfms_fuel_type_records(self.db_session) + fuel_types_dict = defaultdict() + for fuel_type in sfms_fuel_types: + fuel_types_dict[fuel_type[0].fuel_type_code] = fuel_type[0].id + return fuel_types_dict + + async def _save_critical_hours(self, zone_id: int, critical_hours_by_fuel_type: dict, run_parameters_id: int): + sfms_fuel_types_dict = await self._get_sfms_fuel_types_dict() + critical_hours_to_save: list[CriticalHours] = [] + for fuel_type, critical_hours in critical_hours_by_fuel_type.items(): + start_time, end_time = self._calculate_representative_hours(critical_hours) + critical_hours_record = CriticalHours( + advisory_shape_id=zone_id, + threshold=HfiClassificationThresholdEnum.ADVISORY.value, + run_parameters=run_parameters_id, + fuel_type=sfms_fuel_types_dict[fuel_type], + start_hour=start_time, + end_hour=end_time, + ) + critical_hours_to_save.append(critical_hours_record) + await save_critical_hours(self.db_session, critical_hours_to_save) + print(fuel_type) + + async def _calculate_critical_hours_by_zone(self, header, stations_by_zone): + for_date = get_hour_20_from_date(get_utc_now()) + time_of_interest = get_hour_20_from_date(for_date) + run_parameters = await self._get_most_recent_sfms_run_parameters(RunTypeEnum.forecast, date(2024, 8, 7)) + run_parameters_id = run_parameters.id + critical_hours_by_zone_and_fuel_type = defaultdict(str, defaultdict(list)) + for zone_key in stations_by_zone.keys(): + critical_hours_by_station = defaultdict(list) + advisory_fuel_stats = await get_fuel_type_stats_in_advisory_area(self.db_session, zone_key, run_parameters_id) + fuel_types_by_area = self._get_fuel_types_by_area(advisory_fuel_stats) + wfwx_stations = stations_by_zone[zone_key] + unique_station_codes = list(set(station.code for station in wfwx_stations)) + # get the dailies for all the stations + async with ClientSession() as client_session: + dailies = await wfwx_api.get_dailies_generator(client_session, header, wfwx_stations, time_of_interest, time_of_interest) + # turn it into a dictionary so we can easily get at data using a station id + dailies_by_station_id = {raw_daily.get("stationId"): raw_daily async for raw_daily in dailies} + # must retrieve the previous day's observed/forecasted FFMC value from WFWX + prev_day = time_of_interest - timedelta(days=1) + # get the "daily" data for the station for the previous day + yesterday_response = await wfwx_api.get_dailies_generator(client_session, header, wfwx_stations, prev_day, prev_day) + # turn it into a dictionary so we can easily get at data + yesterday_dailies_by_station_id = {raw_daily.get("stationId"): raw_daily async for raw_daily in yesterday_response} + # get hourly observation history from our API (used for calculating morning diurnal FFMC) + hourly_observations = await get_hourly_readings_in_time_interval(unique_station_codes, time_of_interest - timedelta(days=4), time_of_interest) + # also turn hourly obs data into a dict indexed by station id + hourly_obs_by_station_code = {raw_hourly.station.code: raw_hourly for raw_hourly in hourly_observations} + + # we need a lookup from station code to station id + # TODO: this is a bit silly, the call to get_wfwx_stations_from_station_codes repeats a lot of this! + wfwx_station_lookup = {wfwx_station.code: wfwx_station for wfwx_station in wfwx_stations} + + for wfwx_station in wfwx_stations: + if self._check_station_valid(wfwx_station, dailies_by_station_id, hourly_obs_by_station_code): + for fuel_type_key in fuel_types_by_area.keys(): + fuel_type_enum = FuelTypeEnum(fuel_type_key.replace("-", "")) + try: + # Placing critical hours calculation in a try/except block as failure to calculate critical hours for a single station/fuel type pair + # shouldn't prevent us from continuing with other stations and fuel types. + critical_hours = await self._calculate_critical_hours_for_station_by_fuel_type( + wfwx_station, dailies_by_station_id, yesterday_dailies_by_station_id, hourly_obs_by_station_code, fuel_type_enum, for_date + ) + if critical_hours is not None and critical_hours.start is not None and critical_hours.end is not None: + critical_hours_by_station[fuel_type_key].append(critical_hours) + except Exception as exc: + logger.warning(f"An error occurred when calculating critical hours for station code: {wfwx_station.code} and fuel type: {fuel_type_key}: {exc} ") + if len(critical_hours_by_station) > 0: + critical_hours_by_zone_and_fuel_type[zone_key] = critical_hours_by_station + + for zone_id, critical_hours_by_fuel_type in critical_hours_by_zone_and_fuel_type.items(): + await self._save_critical_hours(zone_id, critical_hours_by_fuel_type, run_parameters_id) + print(zone_id) + + return critical_hours_by_zone_and_fuel_type + + async def _sort_stations_by_zone_unit(self, stations): + stations_by_zone = defaultdict(list) + transformer = PointTransformer(4326, 3005) + for station in stations: + (x, y) = transformer.transform_coordinate(station.lat, station.long) + zone_id = await get_containing_zone(self.db_session, f"POINT({x} {y})", 3005) + if zone_id is not None: + stations_by_zone[zone_id[0]].append(station) + return stations_by_zone + + async def _run_critical_hours(self): + """Entry point for running the job.""" + # Somehow check the last time critical hours were calculated + async with ClientSession() as client_session: + header = await wfwx_api.get_auth_header(client_session) + all_stations = await get_stations_asynchronously() + station_codes = list(station.code for station in all_stations) + stations = await wfwx_api.get_wfwx_stations_from_station_codes(client_session, header, station_codes) + stations_by_zone = await self._sort_stations_by_zone_unit(stations) + + critical_hours_by_zone = await self._calculate_critical_hours_by_zone(header, stations_by_zone) + + print("test") + # await self._calculate_critical_hours() + + async def run_job(self): + # Create a db session and client session for use throughout the job. + async with get_async_write_session_scope() as db_session: + self._set_db_session(db_session) + async with ClientSession() as client_session: + self._set_client_session(client_session) + await self._run_critical_hours() + + +def main(): + """Kicks off asynchronous calculation of critical hours.""" + try: + logger.debug("Begin calculating critical hours.") + + job = CriticalHoursJob() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(job.run_job()) + + # Exit with 0 - success. + sys.exit(os.EX_OK) + except Exception as exception: + # Exit non 0 - failure. + logger.error("An error occurred while processing critical hours.", exc_info=exception) + rc_message = ":scream: Encountered an error while processing critical hours." + send_rocketchat_notification(rc_message, exception) + sys.exit(os.EX_SOFTWARE) + + +if __name__ == "__main__": + configure_logging() + main() \ No newline at end of file diff --git a/api/app/jobs/env_canada.py b/api/app/jobs/env_canada.py index 301c8da781..3167580b52 100644 --- a/api/app/jobs/env_canada.py +++ b/api/app/jobs/env_canada.py @@ -22,7 +22,6 @@ import app.utils.time as time_utils from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo import app.db.database -from app.stations import StationSourceEnum from app.rocketchat_notifications import send_rocketchat_notification from app.jobs.env_canada_utils import adjust_model_day, get_model_run_urls @@ -160,14 +159,14 @@ class EnvCanada(): Canada. """ - def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session: Session, model_type: ModelEnum): """ Prep variables """ self.files_downloaded = 0 self.files_processed = 0 self.exception_count = 0 # We always work in UTC: self.now = time_utils.get_utc_now() - self.grib_processor = GribFileProcessor(station_source) + self.grib_processor = GribFileProcessor() self.model_type: ModelEnum = model_type self.session = session # set projection based on model_type @@ -246,7 +245,7 @@ def process(self): self.model_type, hour, exc_info=exception) -def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +def process_models(): """ downloading and processing models """ # set the model type requested based on arg passed via command line @@ -257,11 +256,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI start_time = datetime.datetime.now() with app.db.database.get_write_session_scope() as session: - env_canada = EnvCanada(session, model_type, station_source) + env_canada = EnvCanada(session, model_type) env_canada.process() # interpolate and machine learn everything that needs interpolating. - model_value_processor = ModelValueProcessor(session, station_source) + model_value_processor = ModelValueProcessor(session) model_value_processor.process(model_type) # calculate the execution time. diff --git a/api/app/jobs/noaa.py b/api/app/jobs/noaa.py index bef780ba95..c9ad06741b 100644 --- a/api/app/jobs/noaa.py +++ b/api/app/jobs/noaa.py @@ -23,7 +23,6 @@ from app.weather_models import ModelEnum, ProjectionEnum from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo import app.db.database -from app.stations import StationSourceEnum from app.rocketchat_notifications import send_rocketchat_notification # If running as its own process, configure logging appropriately. @@ -261,14 +260,14 @@ class NOAA(): """ Class that orchestrates downloading and processing of GFS weather model grib files from NOAA. """ - def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session: Session, model_type: ModelEnum): """ Prep variables """ self.files_downloaded = 0 self.files_processed = 0 self.exception_count = 0 # We always work in UTC: self.now = time_utils.get_utc_now() - self.grib_processor = GribFileProcessor(station_source) + self.grib_processor = GribFileProcessor() self.model_type: ModelEnum = model_type self.session = session # projection depends on model type @@ -346,7 +345,7 @@ def process(self): self.model_type, hour, exc_info=exception) -def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +def process_models(): """ downloading and processing models """ # set the model type requested based on arg passed via command line model_type = ModelEnum(sys.argv[1]) @@ -356,11 +355,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI start_time = datetime.datetime.now() with app.db.database.get_write_session_scope() as session: - noaa = NOAA(session, model_type, station_source) + noaa = NOAA(session, model_type) noaa.process() # interpolate and machine learn everything that needs interpolating. - model_value_processor = ModelValueProcessor(session, station_source) + model_value_processor = ModelValueProcessor(session) model_value_processor.process(model_type) # calculate the execution time. diff --git a/api/app/routers/stations.py b/api/app/routers/stations.py index ecdba3a49b..da75199a70 100644 --- a/api/app/routers/stations.py +++ b/api/app/routers/stations.py @@ -6,7 +6,7 @@ from app.utils.time import get_utc_now, get_hour_20 from app.schemas.stations import (WeatherStationGroupsMemberRequest, WeatherStationsResponse, DetailedWeatherStationsResponse, WeatherStationGroupsResponse, WeatherStationGroupMembersResponse) -from app.stations import StationSourceEnum, get_stations_as_geojson, fetch_detailed_stations_as_geojson +from app.stations import get_stations_as_geojson, fetch_detailed_stations_as_geojson from app.wildfire_one import wfwx_api @@ -20,11 +20,7 @@ @router.get('/details/', response_model=DetailedWeatherStationsResponse) -async def get_detailed_stations(response: Response, - toi: datetime = None, - source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE, - __=Depends(audit), - _=Depends(authentication_required)): +async def get_detailed_stations(response: Response, toi: datetime = None, __=Depends(audit), _=Depends(authentication_required)): """ Returns a list of fire weather stations with detailed information. -) Unspecified: Use configuration to establish source. -) LocalStorage: Read from json file (ignore configuration). @@ -40,7 +36,7 @@ async def get_detailed_stations(response: Response, toi = get_utc_now() else: toi = get_hour_20(toi) - weather_stations = await fetch_detailed_stations_as_geojson(toi, source) + weather_stations = await fetch_detailed_stations_as_geojson(toi) return DetailedWeatherStationsResponse(features=weather_stations) except Exception as exception: @@ -49,8 +45,7 @@ async def get_detailed_stations(response: Response, @router.get('/', response_model=WeatherStationsResponse) -async def get_stations(response: Response, - source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +async def get_stations(response: Response): """ Return a list of fire weather stations. Stations source can be: -) Unspecified: Use configuration to establish source. @@ -60,7 +55,7 @@ async def get_stations(response: Response, try: logger.info('/stations/') - weather_stations = await get_stations_as_geojson(source) + weather_stations = await get_stations_as_geojson() response.headers["Cache-Control"] = no_cache return WeatherStationsResponse(features=weather_stations) diff --git a/api/app/stations.py b/api/app/stations.py index 25d029b8ab..8474755d94 100644 --- a/api/app/stations.py +++ b/api/app/stations.py @@ -29,19 +29,6 @@ dirname, 'data/weather_stations.json') -class StationSourceEnum(enum.Enum): - """ Station list sources. - We currently have two sources for station listing, local json file, or wildfire one api. - If the source is unspecified, configuration will govern which is used. - """ - # Configuration wins: - UNSPECIFIED = 'unspecified' - # Use wildfire one as source, filtering on active stations: - WILDFIRE_ONE = 'wildfire_one' - # Use static file as source for testing purposes: - TEST = 'test' - - def _get_stations_local() -> List[WeatherStation]: """ Get list of stations from local json files. """ @@ -106,36 +93,24 @@ async def get_stations_by_codes(station_codes: List[int]) -> List[WeatherStation return await wfwx_api.get_stations_by_codes(station_codes) -async def get_stations_from_source( - station_source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE) -> List[WeatherStation]: - """ Get list of stations from some source (ideally WFWX Fireweather API) - """ - if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE: - return await get_stations_asynchronously() - # Get from local: - return _get_stations_local() - - -async def fetch_detailed_stations_as_geojson( - time_of_interest: datetime, - station_source: StationSourceEnum) \ - -> List[GeoJsonDetailedWeatherStation]: - """ Fetch a detailed list of stations. i.e. more than just the fire station name and code, - throw some observations and forecast in the mix. """ - if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE: - # Get from wildfire one: - logger.info('requesting detailed stations...') - result = await get_detailed_stations(time_of_interest) - logger.info('detailed stations loaded.') - return result - return await _get_detailed_stations(time_of_interest) - - -async def get_stations_as_geojson( - station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED) -> List[GeoJsonWeatherStation]: +async def get_stations_from_source() -> List[WeatherStation]: + """Get list of stations from some source (ideally WFWX Fireweather API)""" + return await get_stations_asynchronously() + + +async def fetch_detailed_stations_as_geojson(time_of_interest: datetime) -> List[GeoJsonDetailedWeatherStation]: + """Fetch a detailed list of stations. i.e. more than just the fire station name and code, + throw some observations and forecast in the mix.""" + logger.info("requesting detailed stations...") + result = await get_detailed_stations(time_of_interest) + logger.info("detailed stations loaded.") + return result + + +async def get_stations_as_geojson() -> List[GeoJsonWeatherStation]: """ Format stations to conform to GeoJson spec """ geojson_stations = [] - stations = await get_stations_from_source(station_source) + stations = await get_stations_from_source() for station in stations: geojson_stations.append( GeoJsonWeatherStation(properties=WeatherStationProperties( @@ -154,9 +129,9 @@ async def get_stations_asynchronously(): return await get_station_data(session, header) -def get_stations_synchronously(station_source: StationSourceEnum) -> List[WeatherStation]: +def get_stations_synchronously() -> List[WeatherStation]: """ Get list of stations - in a synchronous/blocking call. """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - return loop.run_until_complete(get_stations_from_source(station_source)) + return loop.run_until_complete(get_stations_from_source()) diff --git a/api/app/tests/weather_models/test_env_canada_gdps.py b/api/app/tests/weather_models/test_env_canada_gdps.py index dbdee46fb4..2e635437c3 100644 --- a/api/app/tests/weather_models/test_env_canada_gdps.py +++ b/api/app/tests/weather_models/test_env_canada_gdps.py @@ -7,6 +7,7 @@ from datetime import datetime import pytest import requests +from aiohttp import ClientSession from sqlalchemy.orm import Session from app.jobs import env_canada from app.jobs.env_canada_utils import GRIB_LAYERS, get_global_model_run_download_urls @@ -14,9 +15,9 @@ import app.utils.time as time_utils from app.weather_models import machine_learning import app.db.crud.weather_models -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations) @@ -118,16 +119,15 @@ def mock_get_stations_synchronously(monkeypatch): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_gdps(mock_download, - mock_database, - mock_get_actuals_left_outer_join_with_predictions, - mock_get_stations_synchronously, - mock_get_processed_file_count): +def test_process_gdps( + mock_download, mock_database, mock_get_actuals_left_outer_join_with_predictions, mock_get_stations_synchronously, mock_get_processed_file_count, monkeypatch: pytest.MonkeyPatch +): """ run main method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "GDPS"] - assert env_canada.process_models(StationSourceEnum.TEST) == 1 + assert env_canada.process_models() == 1 def test_for_zero_day_bug(monkeypatch): diff --git a/api/app/tests/weather_models/test_env_canada_hrdps.py b/api/app/tests/weather_models/test_env_canada_hrdps.py index 9b1300bb01..0313ad538a 100644 --- a/api/app/tests/weather_models/test_env_canada_hrdps.py +++ b/api/app/tests/weather_models/test_env_canada_hrdps.py @@ -7,6 +7,7 @@ import pytest import requests import datetime +from aiohttp import ClientSession from sqlalchemy.orm import Session from pytest_mock import MockerFixture from app.jobs.env_canada_utils import HRDPS_GRIB_LAYERS, get_high_res_model_run_download_urls @@ -17,9 +18,9 @@ import app.jobs.common_model_fetchers import app.weather_models.process_grib from app.weather_models import ProjectionEnum -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.test_env_canada_gdps import MockResponse @@ -97,12 +98,13 @@ def test_get_hrdps_download_urls(): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_hrdps(mock_download, mock_database): +def test_process_hrdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch): """ run process method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "HRDPS"] - assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1 + assert app.jobs.env_canada.process_models() == 1 def test_main_fail(mocker: MockerFixture, monkeypatch): diff --git a/api/app/tests/weather_models/test_env_canada_rdps.py b/api/app/tests/weather_models/test_env_canada_rdps.py index c44fff18a8..b986a793d3 100644 --- a/api/app/tests/weather_models/test_env_canada_rdps.py +++ b/api/app/tests/weather_models/test_env_canada_rdps.py @@ -5,6 +5,7 @@ import logging import pytest import requests +from aiohttp import ClientSession from typing import Optional from sqlalchemy.orm import Session from app.jobs.env_canada_utils import GRIB_LAYERS, get_regional_model_run_download_urls @@ -13,9 +14,9 @@ import app.jobs.env_canada import app.jobs.common_model_fetchers import app.db.crud.weather_models -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.test_env_canada_gdps import (MockResponse) logger = logging.getLogger(__name__) @@ -103,10 +104,10 @@ def test_get_rdps_download_urls(): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_rdps(mock_download, - mock_database): +def test_process_rdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch): """ run main method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "RDPS"] - assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1 + assert app.jobs.env_canada.process_models() == 1 diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index c7aa878e53..830a06d0de 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -5,7 +5,6 @@ import math import pytest from app.geospatial import NAD83_CRS -from app.stations import StationSourceEnum from app.tests.common import default_mock_client_get from app.weather_models import process_grib @@ -37,10 +36,7 @@ def test_read_single_raster_value(monkeypatch: pytest.MonkeyPatch): geo_to_raster_transformer = process_grib.get_transformer(NAD83_CRS, crs) padf_transform = process_grib.get_dataset_geometry(filename) - processor = process_grib.GribFileProcessor(StationSourceEnum.UNSPECIFIED, - padf_transform, - raster_to_geo_transformer, - geo_to_raster_transformer) + processor = process_grib.GribFileProcessor(padf_transform, raster_to_geo_transformer, geo_to_raster_transformer) raster_band = dataset.GetRasterBand(1) station, value = next(processor.yield_value_for_stations(raster_band)) diff --git a/api/app/weather_models/process_grib.py b/api/app/weather_models/process_grib.py index f9f1077aee..30423c67f7 100644 --- a/api/app/weather_models/process_grib.py +++ b/api/app/weather_models/process_grib.py @@ -14,7 +14,7 @@ import rasterio from rasterio.io import DatasetReader from app.geospatial import NAD83_CRS -from app.stations import get_stations_synchronously, StationSourceEnum +from app.stations import get_stations_synchronously from app.db.models.weather_models import ( ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp) from app.db.crud.weather_models import ( @@ -137,13 +137,9 @@ class GribFileProcessor(): """ Instances of this object can be used to process and ingest a grib file. """ - def __init__(self, - station_source: StationSourceEnum, - padf_transform=None, - raster_to_geo_transformer=None, - geo_to_raster_transformer=None): + def __init__(self, padf_transform=None, raster_to_geo_transformer=None, geo_to_raster_transformer=None): # Get list of stations we're interested in, and store it so that we only call it once. - self.stations = get_stations_synchronously(station_source) + self.stations = get_stations_synchronously() self.padf_transform = padf_transform self.raster_to_geo_transformer = raster_to_geo_transformer self.geo_to_raster_transformer = geo_to_raster_transformer