Skip to content

Commit

Permalink
Remove StationSourceEnum
Browse files Browse the repository at this point in the history
  • Loading branch information
dgboss committed Oct 22, 2024
1 parent 4d69df4 commit 98966dc
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 94 deletions.
6 changes: 3 additions & 3 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from app import config, configure_logging
import app.utils.time as time_utils
from app.utils.redis import create_redis
from app.stations import get_stations_synchronously, StationSourceEnum
from app.stations import get_stations_synchronously
from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp,
WeatherStationModelPrediction, ModelRunPrediction)
import app.db.database
Expand Down Expand Up @@ -187,10 +187,10 @@ class ModelValueProcessor:
""" Iterate through model runs that have completed, and calculate the interpolated weather predictions.
"""

def __init__(self, session, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session):
""" Prepare variables we're going to use throughout """
self.session = session
self.stations = get_stations_synchronously(station_source)
self.stations = get_stations_synchronously()
self.station_count = len(self.stations)

def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum):
Expand Down
399 changes: 399 additions & 0 deletions api/app/jobs/critical_hours_job.py

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions api/app/jobs/env_canada.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import app.utils.time as time_utils
from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo
import app.db.database
from app.stations import StationSourceEnum
from app.rocketchat_notifications import send_rocketchat_notification
from app.jobs.env_canada_utils import adjust_model_day, get_model_run_urls

Expand Down Expand Up @@ -160,14 +159,14 @@ class EnvCanada():
Canada.
"""

def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
self.exception_count = 0
# We always work in UTC:
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.grib_processor = GribFileProcessor()
self.model_type: ModelEnum = model_type
self.session = session
# set projection based on model_type
Expand Down Expand Up @@ -246,7 +245,7 @@ def process(self):
self.model_type, hour, exc_info=exception)


def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def process_models():
""" downloading and processing models """

# set the model type requested based on arg passed via command line
Expand All @@ -257,11 +256,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI
start_time = datetime.datetime.now()

with app.db.database.get_write_session_scope() as session:
env_canada = EnvCanada(session, model_type, station_source)
env_canada = EnvCanada(session, model_type)
env_canada.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor = ModelValueProcessor(session)
model_value_processor.process(model_type)

# calculate the execution time.
Expand Down
11 changes: 5 additions & 6 deletions api/app/jobs/noaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from app.weather_models import ModelEnum, ProjectionEnum
from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo
import app.db.database
from app.stations import StationSourceEnum
from app.rocketchat_notifications import send_rocketchat_notification

# If running as its own process, configure logging appropriately.
Expand Down Expand Up @@ -261,14 +260,14 @@ class NOAA():
""" Class that orchestrates downloading and processing of GFS weather model grib files from NOAA.
"""

def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
self.exception_count = 0
# We always work in UTC:
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.grib_processor = GribFileProcessor()
self.model_type: ModelEnum = model_type
self.session = session
# projection depends on model type
Expand Down Expand Up @@ -346,7 +345,7 @@ def process(self):
self.model_type, hour, exc_info=exception)


def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def process_models():
""" downloading and processing models """
# set the model type requested based on arg passed via command line
model_type = ModelEnum(sys.argv[1])
Expand All @@ -356,11 +355,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI
start_time = datetime.datetime.now()

with app.db.database.get_write_session_scope() as session:
noaa = NOAA(session, model_type, station_source)
noaa = NOAA(session, model_type)
noaa.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor = ModelValueProcessor(session)
model_value_processor.process(model_type)

# calculate the execution time.
Expand Down
15 changes: 5 additions & 10 deletions api/app/routers/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.utils.time import get_utc_now, get_hour_20
from app.schemas.stations import (WeatherStationGroupsMemberRequest, WeatherStationsResponse, DetailedWeatherStationsResponse, WeatherStationGroupsResponse,
WeatherStationGroupMembersResponse)
from app.stations import StationSourceEnum, get_stations_as_geojson, fetch_detailed_stations_as_geojson
from app.stations import get_stations_as_geojson, fetch_detailed_stations_as_geojson
from app.wildfire_one import wfwx_api


Expand All @@ -20,11 +20,7 @@


@router.get('/details/', response_model=DetailedWeatherStationsResponse)
async def get_detailed_stations(response: Response,
toi: datetime = None,
source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE,
__=Depends(audit),
_=Depends(authentication_required)):
async def get_detailed_stations(response: Response, toi: datetime = None, __=Depends(audit), _=Depends(authentication_required)):
""" Returns a list of fire weather stations with detailed information.
-) Unspecified: Use configuration to establish source.
-) LocalStorage: Read from json file (ignore configuration).
Expand All @@ -40,7 +36,7 @@ async def get_detailed_stations(response: Response,
toi = get_utc_now()
else:
toi = get_hour_20(toi)
weather_stations = await fetch_detailed_stations_as_geojson(toi, source)
weather_stations = await fetch_detailed_stations_as_geojson(toi)
return DetailedWeatherStationsResponse(features=weather_stations)

except Exception as exception:
Expand All @@ -49,8 +45,7 @@ async def get_detailed_stations(response: Response,


@router.get('/', response_model=WeatherStationsResponse)
async def get_stations(response: Response,
source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
async def get_stations(response: Response):
""" Return a list of fire weather stations.
Stations source can be:
-) Unspecified: Use configuration to establish source.
Expand All @@ -60,7 +55,7 @@ async def get_stations(response: Response,
try:
logger.info('/stations/')

weather_stations = await get_stations_as_geojson(source)
weather_stations = await get_stations_as_geojson()
response.headers["Cache-Control"] = no_cache

return WeatherStationsResponse(features=weather_stations)
Expand Down
61 changes: 18 additions & 43 deletions api/app/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@
dirname, 'data/weather_stations.json')


class StationSourceEnum(enum.Enum):
""" Station list sources.
We currently have two sources for station listing, local json file, or wildfire one api.
If the source is unspecified, configuration will govern which is used.
"""
# Configuration wins:
UNSPECIFIED = 'unspecified'
# Use wildfire one as source, filtering on active stations:
WILDFIRE_ONE = 'wildfire_one'
# Use static file as source for testing purposes:
TEST = 'test'


def _get_stations_local() -> List[WeatherStation]:
""" Get list of stations from local json files.
"""
Expand Down Expand Up @@ -106,36 +93,24 @@ async def get_stations_by_codes(station_codes: List[int]) -> List[WeatherStation
return await wfwx_api.get_stations_by_codes(station_codes)


async def get_stations_from_source(
station_source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE) -> List[WeatherStation]:
""" Get list of stations from some source (ideally WFWX Fireweather API)
"""
if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE:
return await get_stations_asynchronously()
# Get from local:
return _get_stations_local()


async def fetch_detailed_stations_as_geojson(
time_of_interest: datetime,
station_source: StationSourceEnum) \
-> List[GeoJsonDetailedWeatherStation]:
""" Fetch a detailed list of stations. i.e. more than just the fire station name and code,
throw some observations and forecast in the mix. """
if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE:
# Get from wildfire one:
logger.info('requesting detailed stations...')
result = await get_detailed_stations(time_of_interest)
logger.info('detailed stations loaded.')
return result
return await _get_detailed_stations(time_of_interest)


async def get_stations_as_geojson(
station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED) -> List[GeoJsonWeatherStation]:
async def get_stations_from_source() -> List[WeatherStation]:
"""Get list of stations from some source (ideally WFWX Fireweather API)"""
return await get_stations_asynchronously()


async def fetch_detailed_stations_as_geojson(time_of_interest: datetime) -> List[GeoJsonDetailedWeatherStation]:
"""Fetch a detailed list of stations. i.e. more than just the fire station name and code,
throw some observations and forecast in the mix."""
logger.info("requesting detailed stations...")
result = await get_detailed_stations(time_of_interest)
logger.info("detailed stations loaded.")
return result


async def get_stations_as_geojson() -> List[GeoJsonWeatherStation]:
""" Format stations to conform to GeoJson spec """
geojson_stations = []
stations = await get_stations_from_source(station_source)
stations = await get_stations_from_source()
for station in stations:
geojson_stations.append(
GeoJsonWeatherStation(properties=WeatherStationProperties(
Expand All @@ -154,9 +129,9 @@ async def get_stations_asynchronously():
return await get_station_data(session, header)


def get_stations_synchronously(station_source: StationSourceEnum) -> List[WeatherStation]:
def get_stations_synchronously() -> List[WeatherStation]:
""" Get list of stations - in a synchronous/blocking call.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(get_stations_from_source(station_source))
return loop.run_until_complete(get_stations_from_source())
14 changes: 7 additions & 7 deletions api/app/tests/weather_models/test_env_canada_gdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from datetime import datetime
import pytest
import requests
from aiohttp import ClientSession
from sqlalchemy.orm import Session
from app.jobs import env_canada
from app.jobs.env_canada_utils import GRIB_LAYERS, get_global_model_run_download_urls
from app.jobs import common_model_fetchers
import app.utils.time as time_utils
from app.weather_models import machine_learning
import app.db.crud.weather_models
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions
from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations)

Expand Down Expand Up @@ -118,16 +119,15 @@ def mock_get_stations_synchronously(monkeypatch):


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_gdps(mock_download,
mock_database,
mock_get_actuals_left_outer_join_with_predictions,
mock_get_stations_synchronously,
mock_get_processed_file_count):
def test_process_gdps(
mock_download, mock_database, mock_get_actuals_left_outer_join_with_predictions, mock_get_stations_synchronously, mock_get_processed_file_count, monkeypatch: pytest.MonkeyPatch
):
""" run main method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "GDPS"]
assert env_canada.process_models(StationSourceEnum.TEST) == 1
assert env_canada.process_models() == 1


def test_for_zero_day_bug(monkeypatch):
Expand Down
8 changes: 5 additions & 3 deletions api/app/tests/weather_models/test_env_canada_hrdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import requests
import datetime
from aiohttp import ClientSession
from sqlalchemy.orm import Session
from pytest_mock import MockerFixture
from app.jobs.env_canada_utils import HRDPS_GRIB_LAYERS, get_high_res_model_run_download_urls
Expand All @@ -17,9 +18,9 @@
import app.jobs.common_model_fetchers
import app.weather_models.process_grib
from app.weather_models import ProjectionEnum
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.test_env_canada_gdps import MockResponse


Expand Down Expand Up @@ -97,12 +98,13 @@ def test_get_hrdps_download_urls():


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_hrdps(mock_download, mock_database):
def test_process_hrdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch):
""" run process method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "HRDPS"]
assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1
assert app.jobs.env_canada.process_models() == 1


def test_main_fail(mocker: MockerFixture, monkeypatch):
Expand Down
9 changes: 5 additions & 4 deletions api/app/tests/weather_models/test_env_canada_rdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import pytest
import requests
from aiohttp import ClientSession
from typing import Optional
from sqlalchemy.orm import Session
from app.jobs.env_canada_utils import GRIB_LAYERS, get_regional_model_run_download_urls
Expand All @@ -13,9 +14,9 @@
import app.jobs.env_canada
import app.jobs.common_model_fetchers
import app.db.crud.weather_models
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.test_env_canada_gdps import (MockResponse)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,10 +104,10 @@ def test_get_rdps_download_urls():


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_rdps(mock_download,
mock_database):
def test_process_rdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch):
""" run main method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "RDPS"]
assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1
assert app.jobs.env_canada.process_models() == 1
6 changes: 1 addition & 5 deletions api/app/tests/weather_models/test_process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import math
import pytest
from app.geospatial import NAD83_CRS
from app.stations import StationSourceEnum
from app.tests.common import default_mock_client_get
from app.weather_models import process_grib

Expand Down Expand Up @@ -37,10 +36,7 @@ def test_read_single_raster_value(monkeypatch: pytest.MonkeyPatch):
geo_to_raster_transformer = process_grib.get_transformer(NAD83_CRS, crs)
padf_transform = process_grib.get_dataset_geometry(filename)

processor = process_grib.GribFileProcessor(StationSourceEnum.UNSPECIFIED,
padf_transform,
raster_to_geo_transformer,
geo_to_raster_transformer)
processor = process_grib.GribFileProcessor(padf_transform, raster_to_geo_transformer, geo_to_raster_transformer)

raster_band = dataset.GetRasterBand(1)
station, value = next(processor.yield_value_for_stations(raster_band))
Expand Down
Loading

0 comments on commit 98966dc

Please sign in to comment.