Skip to content

Commit

Permalink
moved uv inversion to base output handler and added option to do uv i…
Browse files Browse the repository at this point in the history
…nversion for netcdf writes. This adds an invert_uv argument to ForwardPassStrategy
  • Loading branch information
bnb32 committed Nov 13, 2024
1 parent e43e4bb commit e0a6384
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 149 deletions.
7 changes: 7 additions & 0 deletions sup3r/pipeline/forward_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ def _run_serial(cls, strategy, node_index):
model_class=strategy.model_class,
allowed_const=strategy.allowed_const,
output_workers=strategy.output_workers,
invert_uv=strategy.invert_uv,
meta=fwp.meta,
)
logger.info(
Expand Down Expand Up @@ -575,6 +576,7 @@ def run_chunk(
model_kwargs,
model_class,
allowed_const,
invert_uv=None,
meta=None,
output_workers=None,
):
Expand All @@ -599,6 +601,10 @@ def run_chunk(
True to allow any constant output or a list of allowed possible
constant outputs. See :class:`ForwardPassStrategy` for more
information on this argument.
invert_uv : bool
Wether to convert uv to windspeed and winddirection for writing
output. This defaults to True for H5 output and False for NETCDF
output.
meta : dict | None
Meta data to write to forward pass output file.
output_workers : int | None
Expand Down Expand Up @@ -640,6 +646,7 @@ def run_chunk(
times=chunk.hr_times,
out_file=chunk.out_file,
meta_data=meta,
invert_uv=invert_uv,
max_workers=output_workers,
gids=chunk.gids,
)
Expand Down
5 changes: 5 additions & 0 deletions sup3r/pipeline/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class ForwardPassStrategy:
chunks and overwrite any pre-existing outputs (False).
output_workers : int | None
Max number of workers to use for writing forward pass output.
invert_uv : bool | None
Wether to convert u and v wind components to windspeed and direction
for writing to output. This defaults to True for H5 output and False
for NETCDF output.
pass_workers : int | None
Max number of workers to use for performing forward passes on a single
node. If 1 then all forward passes on chunks distributed to a single
Expand Down Expand Up @@ -195,6 +199,7 @@ class ForwardPassStrategy:
allowed_const: Optional[Union[list, bool]] = None
incremental: bool = True
output_workers: int = 1
invert_uv: Optional[bool] = None
pass_workers: int = 1
max_nodes: int = 1
head_node: bool = False
Expand Down
154 changes: 152 additions & 2 deletions sup3r/postprocessing/writers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
import json
import logging
import os
import re
from abc import abstractmethod
from warnings import warn

import dask
import numpy as np
import pandas as pd
from rex.outputs import Outputs as BaseRexOutputs
from scipy.interpolate import griddata

from sup3r.preprocessing.derivers.utilities import parse_feature
from sup3r.preprocessing.derivers.utilities import (
invert_uv,
parse_feature,
)
from sup3r.utilities import VERSION_RECORD
from sup3r.utilities.utilities import (
pd_date_range,
Expand All @@ -26,7 +31,7 @@

ATTR_DIR = os.path.dirname(os.path.realpath(__file__))
ATTR_FP = os.path.join(ATTR_DIR, 'output_attrs.json')
with open(ATTR_FP, 'r') as f:
with open(ATTR_FP) as f:
OUTPUT_ATTRS = json.load(f)


Expand Down Expand Up @@ -227,6 +232,145 @@ class OutputHandler(OutputMixin):
back to their original form and outputting to the correct file format.
"""

@classmethod
def get_renamed_features(cls, features):
"""Rename features based on transformation from u/v to
windspeed/winddirection
Parameters
----------
features : list
List of output features
Returns
-------
list
List of renamed features u/v -> windspeed/winddirection for each
height
"""
heights = [
parse_feature(f).height
for f in features
if re.match('u_(.*?)m'.lower(), f.lower())
]
renamed_features = features.copy()

for height in heights:
u_idx = features.index(f'u_{height}m')
v_idx = features.index(f'v_{height}m')

renamed_features[u_idx] = f'windspeed_{height}m'
renamed_features[v_idx] = f'winddirection_{height}m'

return renamed_features

@classmethod
def invert_uv_features(cls, data, features, lat_lon, max_workers=None):
"""Invert U/V to windspeed and winddirection. Performed in place.
Parameters
----------
data : ndarray
High res data from forward pass
(spatial_1, spatial_2, temporal, features)
features : list
List of output features. If this doesn't contain any names matching
u_*m, this method will do nothing.
lat_lon : ndarray
High res lat/lon array
(spatial_1, spatial_2, 2)
max_workers : int | None
Max workers to use for inverse transform. If None the maximum
possible will be used
"""

heights = [
parse_feature(f).height
for f in features
if re.match('u_(.*?)m'.lower(), f.lower())
]

if heights:
logger.info(
'Converting u/v to ws/wd for H5 output with max_workers=%s',
max_workers,
)
logger.debug(
'Found heights %s for output features %s', heights, features
)

tasks = []
for height in heights:
u_idx = features.index(f'u_{height}m')
v_idx = features.index(f'v_{height}m')
task = dask.delayed(cls.invert_uv_single_pair)(
data, lat_lon, u_idx, v_idx
)
tasks.append(task)
logger.info('Added %s futures to convert u/v to ws/wd', len(tasks))
if max_workers == 1:
dask.compute(*tasks, scheduler='single-threaded')
else:
dask.compute(*tasks, scheduler='threads', num_workers=max_workers)
logger.info('Finished converting u/v to ws/wd')

@staticmethod
def invert_uv_single_pair(data, lat_lon, u_idx, v_idx):
"""Perform inverse transform in place on a single u/v pair.
Parameters
----------
data : ndarray
High res data from forward pass
(spatial_1, spatial_2, temporal, features)
lat_lon : ndarray
High res lat/lon array
(spatial_1, spatial_2, 2)
u_idx : int
Index in data for U component to transform
v_idx : int
Index in data for V component to transform
"""
ws, wd = invert_uv(data[..., u_idx], data[..., v_idx], lat_lon)
data[..., u_idx] = ws
data[..., v_idx] = wd

@classmethod
def _transform_output(
cls, data, features, lat_lon, invert_uv=True, max_workers=None
):
"""Transform output data before writing to H5 file
Parameters
----------
data : ndarray
(spatial_1, spatial_2, temporal, features)
High resolution forward pass output
features : list
List of feature names corresponding to the last dimension of data
lat_lon : ndarray
Array of high res lat/lon for output data.
(spatial_1, spatial_2, 2)
Last dimension has ordering (lat, lon)
invert_uv : bool
Whether to convert u and v wind components to windspeed and
direction
max_workers : int | None
Max workers to use for inverse transform. If None the max_workers
will be estimated based on memory limits.
"""
if invert_uv and any(
re.match('u_(.*?)m'.lower(), f.lower())
or re.match('v_(.*?)m'.lower(), f.lower())
for f in features
):
cls.invert_uv_features(
data, features, lat_lon, max_workers=max_workers
)
features = cls.get_renamed_features(features)
data = cls.enforce_limits(features=features, data=data)
return data, features

@staticmethod
def enforce_limits(features, data):
"""Enforce physical limits for feature data
Expand Down Expand Up @@ -481,6 +625,7 @@ def _write_output(
times,
out_file,
meta_data,
invert_uv=True,
max_workers=None,
gids=None,
):
Expand All @@ -495,6 +640,7 @@ def write_output(
low_res_times,
out_file,
meta_data=None,
invert_uv=None,
max_workers=None,
gids=None,
):
Expand All @@ -516,6 +662,9 @@ def write_output(
Output file path
meta_data : dict | None
Dictionary of meta data from model
invert_uv : bool | None
Whether to convert u and v wind components to windspeed and
direction
max_workers : int | None
Max workers to use for inverse uv transform. If None the
max_workers will be estimated based on memory limits.
Expand All @@ -532,6 +681,7 @@ def write_output(
times,
out_file,
meta_data=meta_data,
invert_uv=invert_uv,
max_workers=max_workers,
gids=gids,
)
Loading

0 comments on commit e0a6384

Please sign in to comment.