Skip to content

Commit

Permalink
Implement NoDataStrategy to deal with optional setup_functions (#229)
Browse files Browse the repository at this point in the history
* add NoDataStrategy

* pre-commit fixes

* added checks when no data is found (returns `None`)

* updated changelog; added test

* address review comment
  • Loading branch information
JoostBuitink authored Feb 13, 2024
1 parent a45206c commit f4a6486
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 69 deletions.
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Fixed
- **setup_config_output_timeseries**: bugfix for reducer.
- update hydromt configuration files from ini to yml format. PR #230
- remove or update calls to check if source in self.data_catalog `Issue #501 <https://github.com/Deltares/hydromt/issues/501>`_
- Included NoDataStrategy from hydromt-core: setup functions for lakes, reservoirs, glaciers, and gauges are skipped when no data is found withing the model region (same behavior as before) PR #229

Deprecated
----------
Expand Down
171 changes: 103 additions & 68 deletions hydromt_wflow/wflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implement Wflow model class."""

# Implement model class following model API

import codecs
Expand All @@ -22,6 +23,7 @@
from dask.diagnostics import ProgressBar
from hydromt import flw
from hydromt.models.model_grid import GridModel
from hydromt.nodata import NoDataStrategy
from pyflwdir import core_conversion, core_d8, core_ldd
from shapely.geometry import box

Expand Down Expand Up @@ -1186,14 +1188,27 @@ def setup_gauges(
code = self.crs
kwargs.update(crs=code)
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn, geom=self.basins, assert_gtype="Point", **kwargs
gauges_fn,
geom=self.basins,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog[gauges_fn].data_type == "GeoDataFrame":
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn, geom=self.basins, assert_gtype="Point", **kwargs
gauges_fn,
geom=self.basins,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog[gauges_fn].data_type == "GeoDataset":
da = self.data_catalog.get_geodataset(gauges_fn, geom=self.basins, **kwargs)
da = self.data_catalog.get_geodataset(
gauges_fn,
geom=self.basins,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
gdf_gauges = da.vector.to_gdf()
# Check for point geometry
if not np.all(np.isin(gdf_gauges.geometry.type, "Point")):
Expand All @@ -1208,10 +1223,9 @@ def setup_gauges(
if basename is None:
basename = os.path.basename(gauges_fn).split(".")[0].replace("_", "-")

# Create gauge map, subcatch map and update toml
if gdf_gauges.index.size == 0:
self.logger.warning(f"No {gauges_fn} gauge locations found within domain")

# Check if there is data found
if gdf_gauges is None:
self.logger.info("Skipping method, as no data has been found")
return

# Create the gauges map
Expand Down Expand Up @@ -1424,6 +1438,7 @@ def setup_lakes(
lakes_fn, "lake", min_area, **kwargs
)
if ds_lakes is None:
self.logger.info("Skipping method, as no data has been found")
return
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lakes.data_vars}
ds_lakes = ds_lakes.rename(rmdict)
Expand Down Expand Up @@ -1612,71 +1627,75 @@ def setup_reservoirs(
# the parameters tbls =
# if everything is present, skip calculate reservoirattrs() and
# directly make the maps
if ds_res is not None:
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars}
self.set_grid(ds_res.rename(rmdict))

# add attributes
# if present use directly
resattributes = [
"waterbody_id",
"ResSimpleArea",
"ResMaxVolume",
"ResTargetMinFrac",
"ResTargetFullFrac",
"ResDemand",
"ResMaxRelease",
]
if np.all(np.isin(resattributes, gdf_org.columns)):
intbl_reservoirs = gdf_org[resattributes]
reservoir_accuracy = None
reservoir_timeseries = None
# else compute
else:
(
intbl_reservoirs,
reservoir_accuracy,
reservoir_timeseries,
) = workflows.reservoirattrs(
gdf=gdf_org, timeseries_fn=timeseries_fn, logger=self.logger
)
intbl_reservoirs = intbl_reservoirs.rename(columns=tbls)

# create a geodf with id of reservoir and gemoetry at outflow location
gdf_org_points = gpd.GeoDataFrame(
gdf_org["waterbody_id"],
geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout),
# Skip method if no data is returned
if ds_res is None:
self.logger.info("Skipping method, as no data has been found")
return

# Continue method if data has been found
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars}
self.set_grid(ds_res.rename(rmdict))

# add attributes
# if present use directly
resattributes = [
"waterbody_id",
"ResSimpleArea",
"ResMaxVolume",
"ResTargetMinFrac",
"ResTargetFullFrac",
"ResDemand",
"ResMaxRelease",
]
if np.all(np.isin(resattributes, gdf_org.columns)):
intbl_reservoirs = gdf_org[resattributes]
reservoir_accuracy = None
reservoir_timeseries = None
# else compute
else:
(
intbl_reservoirs,
reservoir_accuracy,
reservoir_timeseries,
) = workflows.reservoirattrs(
gdf=gdf_org, timeseries_fn=timeseries_fn, logger=self.logger
)
intbl_reservoirs = intbl_reservoirs.rename(
columns={"expr1": "waterbody_id"}
intbl_reservoirs = intbl_reservoirs.rename(columns=tbls)

# create a geodf with id of reservoir and gemoetry at outflow location
gdf_org_points = gpd.GeoDataFrame(
gdf_org["waterbody_id"],
geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout),
)
intbl_reservoirs = intbl_reservoirs.rename(columns={"expr1": "waterbody_id"})
gdf_org_points = gdf_org_points.merge(
intbl_reservoirs, on="waterbody_id"
) # merge
# add parameter attributes to polygone gdf:
gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id")

# write reservoirs with param values to geoms
self.set_geoms(gdf_org, name="reservoirs")

for name in gdf_org_points.columns[2:]:
gdf_org_points[name] = gdf_org_points[name].astype("float32")
da_res = ds_res.raster.rasterize(
gdf_org_points, col_name=name, dtype="float32", nodata=-999
)
gdf_org_points = gdf_org_points.merge(
intbl_reservoirs, on="waterbody_id"
) # merge
# add parameter attributes to polygone gdf:
gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id")

# write reservoirs with param values to geoms
self.set_geoms(gdf_org, name="reservoirs")

for name in gdf_org_points.columns[2:]:
gdf_org_points[name] = gdf_org_points[name].astype("float32")
da_res = ds_res.raster.rasterize(
gdf_org_points, col_name=name, dtype="float32", nodata=-999
)
self.set_grid(da_res)
self.set_grid(da_res)

# Save accuracy information on reservoir parameters
if reservoir_accuracy is not None:
reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv"))
# Save accuracy information on reservoir parameters
if reservoir_accuracy is not None:
reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv"))

if reservoir_timeseries is not None:
reservoir_timeseries.to_csv(
join(self.root, f"reservoir_timeseries_{timeseries_fn}.csv")
)
if reservoir_timeseries is not None:
reservoir_timeseries.to_csv(
join(self.root, f"reservoir_timeseries_{timeseries_fn}.csv")
)

for option in res_toml:
self.set_config(option, res_toml[option])
for option in res_toml:
self.set_config(option, res_toml[option])

def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0, **kwargs):
"""Help with common workflow of setup_lakes and setup_reservoir.
Expand All @@ -1688,8 +1707,16 @@ def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0, **kwargs):
if "predicate" not in kwargs:
kwargs.update(predicate="contains")
gdf_org = self.data_catalog.get_geodataframe(
waterbodies_fn, geom=self.basins, **kwargs
waterbodies_fn,
geom=self.basins,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
if gdf_org is None:
# Return two times None (similair to main function output), if there is no
# data found
return None, None

# skip small size waterbodies
if "Area_avg" in gdf_org.columns and gdf_org.geometry.size > 0:
min_area_m2 = min_area * 1e6
Expand Down Expand Up @@ -1859,8 +1886,16 @@ def setup_glaciers(self, glaciers_fn="rgi", min_area=1):
# retrieve data for basin
self.logger.info("Preparing glacier maps.")
gdf_org = self.data_catalog.get_geodataframe(
glaciers_fn, geom=self.basins, predicate="intersects"
glaciers_fn,
geom=self.basins,
predicate="intersects",
handle_nodata=NoDataStrategy.IGNORE,
)
# Check if there are glaciers found
if gdf_org is None:
self.logger.info("Skipping method, as no data has been found")
return

# skip small size glacier
if "AREA" in gdf_org.columns and gdf_org.geometry.size > 0:
gdf_org = gdf_org[gdf_org["AREA"] >= min_area]
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""add global fixtures."""

import logging
import platform
from os.path import abspath, dirname, join
Expand Down Expand Up @@ -72,7 +73,7 @@ def example_wflow_results():
@pytest.fixture()
def clipped_wflow_model():
root = join(EXAMPLEDIR, "wflow_piave_clip")
mod = WflowModel(root=root, mode="r")
mod = WflowModel(root=root, mode="r", data_libs="artifact_data")
return mod


Expand Down
15 changes: 15 additions & 0 deletions tests/test_model_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,18 @@ def test_setup_floodplains_2d(elevtn_map, example_wflow_model, floodplain1d_test
.raster.mask_nodata()
.equals(floodplain1d_testdata[f"{mapname}_D4"])
)


def test_skip_nodata_reservoir(clipped_wflow_model):
# Using the clipped_wflow_model as the reservoirs are not in this model
clipped_wflow_model.setup_reservoirs(
reservoirs_fn="hydro_reservoirs",
min_area=0.0,
)
assert clipped_wflow_model.config["model"]["reservoirs"] == False
# Get names for two reservoir layers
for mapname in ["resareas", "reslocs"]:
# Check if layers are indeed not present in the model
assert (
clipped_wflow_model._MAPS[mapname] not in clipped_wflow_model.grid.data_vars
)

0 comments on commit f4a6486

Please sign in to comment.