Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fetch job and update stage_ic to work with fetched ICs #3141

Open
wants to merge 45 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fdb996d
Stage_ic updates: GH2988
DavidGrumm-NOAA Nov 27, 2024
a1f03ec
Removed white space
DavidGrumm-NOAA Dec 10, 2024
17f7e6c
remove more whitespace
DavidGrumm-NOAA Dec 10, 2024
a9bd3e2
Incorporate fetch into GW
DavidGrumm-NOAA Dec 12, 2024
ba1a017
Merge branch 'develop' into stage_ic_2988
DavidGrumm-NOAA Dec 13, 2024
e624872
Fix whitespace
DavidGrumm-NOAA Dec 13, 2024
fa35c61
Reconcile divergent branches. Merge branch 'stage_ic_2988' of github.…
DavidGrumm-NOAA Dec 13, 2024
f21f96b
Move the fetch options to the run_options dict
DavidGrumm-NOAA Dec 16, 2024
20f2862
Additonal code to incorporate fetch, including modifying some of the …
DavidGrumm-NOAA Dec 18, 2024
5ca1531
Updated fetch code to use fetch directory FETCHDIR instead of ATARDIR…
DavidGrumm-NOAA Dec 18, 2024
09ce49c
Deleted ci/cases/pr/ATM_cold.yaml
DavidGrumm-NOAA Dec 19, 2024
c71b340
modifying stage_ic and fetch code to ensure that both function compat…
DavidGrumm-NOAA Dec 20, 2024
a6dade0
Moved setting of cycle_YMDH
DavidGrumm-NOAA Dec 23, 2024
a6a84d4
Updates for the fetch and yaml code, and keys.
DavidGrumm-NOAA Jan 4, 2025
8aead59
Corrected typo
DavidGrumm-NOAA Jan 7, 2025
ecc1b48
Add specification of fetch directory, corrrected name of yaml file.
DavidGrumm-NOAA Jan 10, 2025
dd645eb
Add DO_FETCH_HPSS setting to all non-hera files in workflow/hosts
DavidGrumm-NOAA Jan 10, 2025
4ca9d49
Change pass to raise an error
DavidGrumm-NOAA Jan 10, 2025
df88c29
whitespace repair
DavidGrumm-NOAA Jan 10, 2025
dd2b726
Merge branch 'develop' into stage_ic_2988
DavidGrumm-NOAA Jan 10, 2025
89c42ef
Add S2SW_cold fetch template
DavidHuber-NOAA Jan 13, 2025
d0e6f9b
Update ATM_cold.yaml.j2
DavidHuber-NOAA Jan 13, 2025
701093c
Add YMDH to fetch_yamls
DavidHuber-NOAA Jan 13, 2025
9433473
Add previous_cycle to fetch dict
DavidHuber-NOAA Jan 13, 2025
04f8da9
Check for missing files after untarring, add logging
DavidHuber-NOAA Jan 13, 2025
d1f2a00
Renamed fetch configs
DavidHuber-NOAA Jan 13, 2025
34ae4a8
Replace gefs config.fetch with link to gfs
DavidHuber-NOAA Jan 13, 2025
5c25f26
Update fetch directory
DavidHuber-NOAA Jan 13, 2025
35f2454
Restrict fetch cases to C48_S2SW and C48_ATM
DavidHuber-NOAA Jan 13, 2025
57d01ea
Merge pull request #1 from DavidHuber-NOAA/new_fetch_yaml
DavidGrumm-NOAA Jan 13, 2025
b929200
Cleanup of env files
DavidGrumm-NOAA Jan 17, 2025
8e3be40
Merge branch 'develop' into stage_ic_2988
DavidGrumm-NOAA Jan 17, 2025
23b2057
Renamed variable, removed unused code
DavidGrumm-NOAA Jan 17, 2025
bab00e5
need to pull changes from remote. Merge branch 'stage_ic_2988' of git…
DavidGrumm-NOAA Jan 21, 2025
ac86509
Merged in develop
DavidGrumm-NOAA Jan 23, 2025
60241bd
Merged in develop
DavidGrumm-NOAA Jan 23, 2025
00e3d62
Undo merge mangling
DavidGrumm-NOAA Jan 23, 2025
e7396bc
Adding higher resolutions back
DavidGrumm-NOAA Jan 23, 2025
eba7841
Removing higher resolutions (they will be added in a different PR)
DavidGrumm-NOAA Jan 23, 2025
8644449
Address additional reviewer comments
DavidGrumm-NOAA Jan 23, 2025
b7c919e
Remove some fetch options for now
DavidGrumm-NOAA Jan 23, 2025
aae3e6e
Address reviewer comments(.venv) [David.Grumm@hfe10 G_WF_2988]$ git a…
DavidGrumm-NOAA Jan 24, 2025
2eec8da
Remove extraneous new lines
DavidGrumm-NOAA Jan 24, 2025
939ce24
Merge branch 'develop' into stage_ic_2988
DavidHuber-NOAA Jan 24, 2025
7afe68c
Delete updated submodules to placate git pull. Merge branch 'stage_ic…
DavidGrumm-NOAA Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ci/cases/yamls/gfs_defaults_ci.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
defaults:
!INC {{ HOMEgfs }}/parm/config/gfs/yaml/defaults.yaml
base:
ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }}
DO_TEST_MODE: "YES"

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
defaults:
!INC {{ HOMEgfs }}/parm/config/gfs/yaml/defaults.yaml
base:
ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }}
DO_TEST_MODE: "NO"
FETCHDIR: "/NCEPDEV/emc-global/1year/David.Grumm/test_data"
DO_FETCH_HPSS: "NO"
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
DO_METP: "NO"

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 23 additions & 0 deletions jobs/JGLOBAL_FETCH
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "fetch" -c "base fetch"

# Execute fetching
"${SCRgfs}/exglobal_fetch.py"
err=$?

###############################################################
# Check for errors and exit if any of the above failed
if [[ "${err}" -ne 0 ]]; then
echo "FATAL ERROR: Unable to fetch ICs to ${ROTDIR}; ABORT!"
exit "${err}"
fi

##########################################
# Remove the Temporary working directory
##########################################
cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1)
[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}"

exit 0
18 changes: 18 additions & 0 deletions jobs/rocoto/fetch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"

# Source FV3GFS workflow modules
. "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
[[ "${status}" -ne 0 ]] && exit "${status}"

export job="fetch"
export jobid="${job}.$$"

# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_FETCH"
status=$?


exit "${status}"
1 change: 1 addition & 0 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export ROTDIR="@COMROOT@/${PSLOT}"

export ARCDIR="${NOSCRUB}/archive/${PSLOT}"
export ATARDIR="@ATARDIR@"
export FETCHDIR="@FETCHDIR@" # HPSS or local directory where IC tarball(s) can be found.

# Commonly defined parameters in JJOBS
export envir=${envir:-"prod"}
Expand Down
1 change: 1 addition & 0 deletions parm/config/gefs/config.fetch
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ if [[ "${PDY}${cyc}" -ge "2019092100" && "${PDY}${cyc}" -le "2019110700" ]]; the
fi
export ARCDIR="${NOSCRUB}/archive/${PSLOT}"
export ATARDIR="@ATARDIR@"
export FETCHDIR="@FETCHDIR@"

# Commonly defined parameters in JJOBS
export envir=${envir:-"prod"}
Expand Down Expand Up @@ -474,6 +475,11 @@ export DO_VRFY_OCEANDA="@DO_VRFY_OCEANDA@" # Run SOCA Ocean and Seaice DA verif
export FHMAX_FITS=132
[[ "${FHMAX_FITS}" -gt "${FHMAX_GFS}" ]] && export FHMAX_FITS=${FHMAX_GFS}

if [[ ${DO_FETCH_HPSS} = "YES" ]] && [[ ${DO_FETCH_LOCAL} = "YES" ]]; then
echo "Both HPSS and local fetch selected. Please choose one or the other."
exit 3
fi

# Archiving options
export HPSSARCH="@HPSSARCH@" # save data to HPSS archive
export LOCALARCH="@LOCALARCH@" # save data to local archive
Expand Down
19 changes: 19 additions & 0 deletions parm/config/gfs/config.fetch
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#! /usr/bin/env bash

########## config.fetch ##########

echo "BEGIN: config.fetch"

# Get task specific resources
source "${EXPDIR}/config.resources" fetch

# Determine start type
if [[ "${EXP_WARM_START}" == ".false." ]]; then
ic_type="cold"
else
ic_type="warm"
fi

export FETCH_YAML_TMPL="${PARMgfs}/fetch/${NET}_${APP}_${ic_type}_${MODE}.yaml.j2"

echo "END: config.fetch"
4 changes: 2 additions & 2 deletions parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ if (( $# != 1 )); then

echo "Must specify an input task argument to set resource variables!"
echo "argument can be any one of the following:"
echo "stage_ic aerosol_init"
echo "stage_ic aerosol_init fetch"
echo "prep prepatmiodaobs"
echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal"
echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal"
Expand Down Expand Up @@ -1059,7 +1059,7 @@ case ${step} in
export is_exclusive=True
;;

"arch" | "earc" | "getic")
"arch" | "earc" | "getic" | "fetch")
walltime="06:00:00"
ntasks=1
tasks_per_node=1
Expand Down
7 changes: 6 additions & 1 deletion parm/config/gfs/config.stage_ic
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ echo "BEGIN: config.stage_ic"
# Get task specific resources
source "${EXPDIR}/config.resources" stage_ic

export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
if [[ "${DO_FETCH_HPSS^^}" =~ "Y" || "${DO_FETCH_LOCAL^^}" =~ "Y" ]]; then
export ICSDIR="${DATAROOT}" # fetch untars data into DATAROOT
else
export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
fi

export BASE_IC="@BASE_IC@" # Platform home for staged ICs

export STAGE_IC_YAML_TMPL="${PARMgfs}/stage/master_gfs.yaml.j2"
Expand Down
16 changes: 16 additions & 0 deletions parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set cycle_YMD = current_cycle | to_YMD %}
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %}
target:
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/atm_cold.tar"
on_hpss: True
contents:
# ATM
- {{atm_dir}}/gfs_ctrl.nc
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc
{% endfor %} # ntile
{% endfor %} # ftype
destination: "{{ DATAROOT }}"
37 changes: 37 additions & 0 deletions parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set cycle_YMD = current_cycle | to_YMD %}
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set prev_cycle_YMD = previous_cycle | to_YMD %}
{% set prev_cycle_HH = previous_cycle | strftime("%H") %}
# For cold starts, the ATM component is in the current cycle RUN.YYYYMMDD/HH
# For ocean/ice, some files are in the current cyle, some in the previous
# For waves, all files are in the previous cycle
# Previous cycles are always gdas (gdas.YYYYMMDD/HH)
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %}
{% set ocean_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ocean/restart" %}
{% set ice_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ice/restart" %}
{% set prev_ocean_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ocean/restart" %}
{% set prev_ice_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ice/restart" %}
{% set prev_wave_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/wave/restart" %}
{% set restart_prefix = cycle_YMD ~ "." ~ cycle_HH ~ "0000" %}
untar:
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/s2sw_cold.tar"
on_hpss: True
contents:
# ATM
- {{atm_dir}}/gfs_ctrl.nc
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc
{% endfor %} # ntile
{% endfor %} # ftype
# Ocean
- {{ocean_dir}}/{{restart_prefix}}.MOM.res.nc
- {{prev_ocean_dir}}/{{restart_prefix}}.MOM.res.nc
# Ice
- {{ice_dir}}/{{restart_prefix}}.cice_model.res.nc
- {{prev_ice_dir}}/{{restart_prefix}}.cice_model.res.nc
# Wave
- {{prev_wave_dir}}/{{restart_prefix}}.restart.ww3
- {{prev_wave_dir}}/{{restart_prefix}}.restart.{{waveGRD}}
destination: "{{ DATAROOT }}"
39 changes: 39 additions & 0 deletions scripts/exglobal_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python3

import os

from pygfs.task.fetch import Fetch
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


@logit(logger)
def main():

config = cast_strdict_as_dtypedict(os.environ)

# Instantiate the Fetch object
fetch = Fetch(config)

# Pull out all the configuration keys needed to run the fetch step
keys = ['current_cycle', 'previous_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR',
'FETCH_YAML_TMPL', 'FETCHDIR', 'ntiles', 'DATAROOT', 'waveGRD']

fetch_dict = AttrDict()
for key in keys:
fetch_dict[key] = fetch.task_config.get(key)
if fetch_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

# Determine which archives to retrieve from HPSS
# Read the input YAML file to get the list of tarballs on tape
fetchdir_set = fetch.configure(fetch_dict)

# Pull the data from tape or locally and store the specified destination
fetch.execute_pull_data(fetchdir_set)


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions ush/python/pygfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .task.oceanice_products import OceanIceProducts
from .task.gfs_forecast import GFSForecast
from .utils import marine_da_utils
from .task.fetch import Fetch

__docformat__ = "restructuredtext"
__version__ = "0.1.0"
Expand Down
105 changes: 105 additions & 0 deletions ush/python/pygfs/task/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Any, Dict

from wxflow import (Hsi, Task, htar,
logit, parse_j2yaml, chdir)
# import tarfile


logger = getLogger(__name__.split('.')[-1])


class Fetch(Task):
"""Task to pull ROTDIR data from HPSS (or locally)
"""

@logit(logger, name="Fetch")
def __init__(self, config: Dict[str, Any]) -> None:
"""Constructor for the Fetch task
The constructor is responsible for collecting necessary yamls based on
the runtime options and RUN.
Parameters
----------
config : Dict[str, Any]
Incoming configuration for the task from the environment
Returns
-------
None
"""
super().__init__(config)

@logit(logger)
def configure(self, fetch_dict: Dict[str, Any]):
"""Determine which tarballs will need to be extracted
Parameters
----------
fetch_dict : Dict[str, Any]
Task specific keys, e.g. COM directories, etc
Return
------
parsed_fetch: Dict[str, Any]
Dictionary derived from the yaml file with necessary HPSS info.
"""
self.hsi = Hsi()

fetch_yaml = fetch_dict.FETCH_YAML_TMPL
fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch")

parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml),
fetch_dict)
return parsed_fetch

@logit(logger)
def execute_pull_data(self, fetchdir_set: Dict[str, Any]) -> None:
"""Pull data from HPSS based on a yaml dictionary and store at the
specified destination.
Parameters
----------
fetchdir_set: Dict[str, Any],
Dict defining set of tarballs to pull and where to put them.
Return
None
"""

f_names = fetchdir_set.target.contents
if len(f_names) <= 0: # Abort if no files
raise FileNotFoundError("FATAL ERROR: The tar ball has no files")

on_hpss = fetchdir_set.target.on_hpss
dest = fetchdir_set.target.destination
tarball = fetchdir_set.targettarball

# Select action whether no_hpss is True or not, and pull these
# data from tape or locally and place where it needs to go
# DG - these need testing
with chdir(dest):
logger.info(f"Changed working directory to {dest}")
if on_hpss is True: # htar all files in fnames
htar_obj = htar.Htar()
htar_obj.xvf(tarball, f_names)
else: # tar all files in fnames
raise NotImplementedError("The fetch job does not yet support pulling from local archives")

# with tarfile.open(dest, "w") as tar:
# for filename in f_names:
# tar.add(filename)
# Verify all data files were extracted
missing_files = []
for f in f_names:
if not os.path.exists(f):
missing_files.append(f)
if len(missing_files) > 0:
message = "Failed to extract all required files. Missing files:\n"
for f in missing_files:
message += f"{f}\n"

raise FileNotFoundError(message)
3 changes: 3 additions & 0 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]:
run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False)
run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None)

run_options[run]['do_fetch_hpss'] = run_base.get('DO_FETCH_HPSS', False)
run_options[run]['do_fetch_local'] = run_base.get('DO_FETCH_LOCAL', False)

if not AppConfig.is_monotonic(run_options[run]['fcst_segments']):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

Expand Down
17 changes: 14 additions & 3 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ def _get_app_configs(self, run):
Returns the config_files that are involved in the forecast-only app
"""

configs = []
options = self.run_options[run]
configs = ['stage_ic', 'fcst', 'arch', 'cleanup']
if options['do_fetch_hpss'] or options['do_fetch_local']:
configs += ['fetch']

configs += ['stage_ic', 'fcst', 'arch', 'cleanup']

if options['do_atm']:

Expand Down Expand Up @@ -98,15 +102,22 @@ def get_task_names(self):
This is the place where that order is set.
"""

tasks = ['stage_ic']
options = self.run_options[self.run]

tasks = []

if options['do_fetch_hpss'] or options['do_fetch_local']:
tasks += ['fetch']

tasks += ['stage_ic']

if options['do_aero_fcst'] and not options['exp_warm_start']:
tasks += ['aerosol_init']

if options['do_wave']:
tasks += ['waveinit']
# tasks += ['waveprep'] # TODO - verify if waveprep is executed in forecast-only mode when APP=ATMW|S2SW
# tasks += ['waveprep'] # TODO - verify if waveprep is executed in ...
# ... forecast-only mode when APP=ATMW|S2SW

tasks += ['fcst']

Expand Down
Loading