Skip to content

Commit

Permalink
Minimize session connections for weather model jobs (#3547)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgboss authored Apr 18, 2024
1 parent b9423ca commit e4aeac0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 78 deletions.
70 changes: 32 additions & 38 deletions api/app/jobs/env_canada.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class EnvCanada():
Canada.
"""

def __init__(self, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
Expand All @@ -258,6 +258,7 @@ def __init__(self, model_type: ModelEnum, station_source: StationSourceEnum = St
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.model_type: ModelEnum = model_type
self.session = session
# set projection based on model_type
if self.model_type == ModelEnum.GDPS:
self.projection = ProjectionEnum.LATLON_15X_15
Expand All @@ -273,39 +274,35 @@ def process_model_run_urls(self, urls):
"""
for url in urls:
try:
with app.db.database.get_write_session_scope() as session:
# check the database for a record of this file:
processed_file_record = get_processed_file_record(session, url)
if processed_file_record:
# This file has already been processed - so we skip it.
# NOTE: changing this to logger.debug causes too much noise in unit tests.
logger.debug('file already processed %s', url)
else:
# extract model info from URL:
model_info = parse_env_canada_filename(url)
# download the file:
with tempfile.TemporaryDirectory() as temporary_path:
downloaded = download(url, temporary_path, 'REDIS_CACHE_ENV_CANADA', model_info.model_enum.value,
'REDIS_ENV_CANADA_CACHE_EXPIRY')
if downloaded:
self.files_downloaded += 1
# If we've downloaded the file ok, we can now process it.
try:
self.grib_processor.process_grib_file(
downloaded, model_info, session)
# Flag the file as processed
flag_file_as_processed(url, session)
self.files_processed += 1
finally:
# delete the file when done.
os.remove(downloaded)
# check the database for a record of this file:
processed_file_record = get_processed_file_record(self.session, url)
if processed_file_record:
# This file has already been processed - so we skip it.
# NOTE: changing this to logger.debug causes too much noise in unit tests.
logger.debug("file already processed %s", url)
else:
# extract model info from URL:
model_info = parse_env_canada_filename(url)
# download the file:
with tempfile.TemporaryDirectory() as temporary_path:
downloaded = download(url, temporary_path, "REDIS_CACHE_ENV_CANADA", model_info.model_enum.value, "REDIS_ENV_CANADA_CACHE_EXPIRY")
if downloaded:
self.files_downloaded += 1
# If we've downloaded the file ok, we can now process it.
try:
self.grib_processor.process_grib_file(downloaded, model_info, self.session)
# Flag the file as processed
flag_file_as_processed(url, self.session)
self.files_processed += 1
finally:
# delete the file when done.
os.remove(downloaded)
except Exception as exception:
self.exception_count += 1
# We catch and log exceptions, but keep trying to download.
# We intentionally catch a broad exception, as we want to try and download as much
# as we can.
logger.error('unexpected exception processing %s',
url, exc_info=exception)
logger.error("unexpected exception processing %s", url, exc_info=exception)

def process_model_run(self, model_run_hour):
""" Process a particular model run """
Expand All @@ -319,13 +316,10 @@ def process_model_run(self, model_run_hour):
self.process_model_run_urls(urls)

# Having completed processing, check if we're all done.
with app.db.database.get_write_session_scope() as session:
if check_if_model_run_complete(session, urls):
logger.info(
'{} model run {:02d}:00 completed with SUCCESS'.format(self.model_type, model_run_hour))
if check_if_model_run_complete(self.session, urls):
logger.info("{} model run {:02d}:00 completed with SUCCESS".format(self.model_type, model_run_hour))

mark_prediction_model_run_processed(
session, self.model_type, self.projection, self.now, model_run_hour)
mark_prediction_model_run_processed(self.session, self.model_type, self.projection, self.now, model_run_hour)

def process(self):
""" Entry point for downloading and processing weather model grib files """
Expand All @@ -351,10 +345,10 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI
# grab the start time.
start_time = datetime.datetime.now()

env_canada = EnvCanada(model_type, station_source)
env_canada.process()

with app.db.database.get_write_session_scope() as session:
env_canada = EnvCanada(session, model_type, station_source)
env_canada.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor.process(model_type)
Expand Down
72 changes: 32 additions & 40 deletions api/app/jobs/noaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class NOAA():
""" Class that orchestrates downloading and processing of GFS weather model grib files from NOAA.
"""

def __init__(self, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
Expand All @@ -268,6 +268,7 @@ def __init__(self, model_type: ModelEnum, station_source: StationSourceEnum = St
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.model_type: ModelEnum = model_type
self.session = session
# projection depends on model type
if self.model_type == ModelEnum.GFS:
self.projection = ProjectionEnum.GFS_LONLAT
Expand All @@ -279,41 +280,35 @@ def process_model_run_urls(self, urls):
"""
for url in urls:
try:
with app.db.database.get_write_session_scope() as session:
# check the database for a record of this file:
processed_file_record = get_processed_file_record(session, url)
if processed_file_record:
# This file has already been processed - so we skip it.
# NOTE: changing this to logger.debug causes too much noise in unit tests.
logger.debug('file already processed %s', url)
else:
model_run_timestamp, prediction_timestamp = parse_url_for_timestamps(url, self.model_type)
model_info = ModelRunInfo(self.model_type, self.projection,
model_run_timestamp, prediction_timestamp)
# download the file:
with tempfile.TemporaryDirectory() as temporary_path:
downloaded = download(url, temporary_path,
'REDIS_CACHE_NOAA', self.model_type.value,
'REDIS_NOAA_CACHE_EXPIRY')
if downloaded:
self.files_downloaded += 1
# If we've downloaded the file ok, we can now process it.
try:
self.grib_processor.process_grib_file(
downloaded, model_info, session)
# Flag the file as processed
flag_file_as_processed(url, session)
self.files_processed += 1
finally:
# delete the file when done.
os.remove(downloaded)
# check the database for a record of this file:
processed_file_record = get_processed_file_record(self.session, url)
if processed_file_record:
# This file has already been processed - so we skip it.
# NOTE: changing this to logger.debug causes too much noise in unit tests.
logger.debug("file already processed %s", url)
else:
model_run_timestamp, prediction_timestamp = parse_url_for_timestamps(url, self.model_type)
model_info = ModelRunInfo(self.model_type, self.projection, model_run_timestamp, prediction_timestamp)
# download the file:
with tempfile.TemporaryDirectory() as temporary_path:
downloaded = download(url, temporary_path, "REDIS_CACHE_NOAA", self.model_type.value, "REDIS_NOAA_CACHE_EXPIRY")
if downloaded:
self.files_downloaded += 1
# If we've downloaded the file ok, we can now process it.
try:
self.grib_processor.process_grib_file(downloaded, model_info, self.session)
# Flag the file as processed
flag_file_as_processed(url, self.session)
self.files_processed += 1
finally:
# delete the file when done.
os.remove(downloaded)
except Exception as exception:
self.exception_count += 1
# We catch and log exceptions, but keep trying to download.
# We intentionally catch a broad exception, as we want to try and download as much
# as we can.
logger.error('unexpected exception processing %s',
url, exc_info=exception)
logger.error("unexpected exception processing %s", url, exc_info=exception)

def process_model_run(self, model_run_hour):
""" Process a particular model run """
Expand All @@ -330,13 +325,10 @@ def process_model_run(self, model_run_hour):
self.process_model_run_urls(urls)

# Having completed processing, check if we're all done.
with app.db.database.get_write_session_scope() as session:
if check_if_model_run_complete(session, urls):
logger.info(
'{} model run {}:00 completed with SUCCESS'.format(self.model_type, model_run_hour))
if check_if_model_run_complete(self.session, urls):
logger.info("{} model run {}:00 completed with SUCCESS".format(self.model_type, model_run_hour))

mark_prediction_model_run_processed(
session, self.model_type, self.projection, self.now, model_run_hour)
mark_prediction_model_run_processed(self.session, self.model_type, self.projection, self.now, model_run_hour)

def process(self):
""" Entry point for downloading and processing weather model grib files """
Expand All @@ -361,10 +353,10 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI
# grab the start time.
start_time = datetime.datetime.now()

noaa = NOAA(model_type, station_source)
noaa.process()

with app.db.database.get_write_session_scope() as session:
noaa = NOAA(session, model_type, station_source)
noaa.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor.process(model_type)
Expand Down

0 comments on commit e4aeac0

Please sign in to comment.