-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch '#64FlexibleAggregationArea' into fiat_integrator
- Loading branch information
Showing
3 changed files
with
211 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import geopandas as gpd | ||
from typing import List, Union | ||
from pathlib import Path | ||
|
||
def process_value(value): | ||
if isinstance(value, list) and len(value) == 1: | ||
return value[0] | ||
elif isinstance(value, list) and len(value) > 1: | ||
return ", ".join(value) | ||
else: | ||
return value | ||
|
||
def join_exposure_aggregation_multiple_areas( | ||
exposure_gdf: gpd.GeoDataFrame, | ||
aggregation_area_fn: Union[List[str], List[Path]], | ||
attribute_names: List[str], | ||
label_names: List[str], | ||
) -> gpd.GeoDataFrame: | ||
"""_summary_ | ||
Parameters | ||
---------- | ||
exposure_gdf : gpd.GeoDataFrame | ||
_description_ | ||
aggregation_area_fn : Union[List[str], List[Path]] | ||
_description_ | ||
attribute_names : List[str] | ||
_description_ | ||
label_names : List[str] | ||
_description_ | ||
Returns | ||
------- | ||
gpd.GeoDataFrame | ||
_description_ | ||
""" | ||
for file_path, attribute_name, label_name in zip(aggregation_area_fn, attribute_names, label_names): | ||
aggregation_gdf = gpd.read_file(file_path) | ||
|
||
## check the projection of both gdf and if not match, reproject | ||
if exposure_gdf.crs != aggregation_gdf.crs: | ||
aggregation_gdf = aggregation_gdf.to_crs(exposure_gdf.crs) | ||
|
||
# FG: The three lines of code below could result in a wrong column being | ||
# selected if the user has submitted aggregation area files with the same | ||
# attribute names. I would suggest to add a check to make sure that the order | ||
# of the label names is the same as the order of the attribute names. If not, | ||
# the user should get an error message. | ||
# selected_column = None | ||
# for column_name in aggregation_gdf.columns: | ||
# if any(label_name in column_name for label_name in attribute_names): | ||
# selected_column = column_name | ||
|
||
assert attribute_name in aggregation_gdf.columns, f"Attribute {attribute_name} not found in {file_path}" | ||
|
||
# If you overwrite the exposure_gdf with the joined data, you can append all | ||
# aggregation areas to the same exposure_gdf | ||
exposure_gdf = gpd.sjoin( | ||
exposure_gdf, | ||
aggregation_gdf[["geometry", attribute_name]], | ||
op="intersects", | ||
how="left", | ||
) | ||
|
||
# aggregate the data if duplicates exist | ||
aggregated = ( | ||
exposure_gdf.groupby("Object ID")[attribute_name].agg(list).reset_index() | ||
) | ||
exposure_gdf.drop_duplicates(subset="Object ID", keep="first", inplace=True) | ||
exposure_gdf.drop(columns=attribute_name, inplace=True) | ||
exposure_gdf = exposure_gdf.merge(aggregated, on="Object ID") | ||
|
||
# Create a string from the list of values in the duplicated aggregation area | ||
# column | ||
exposure_gdf[attribute_name] = exposure_gdf[attribute_name].apply(process_value) | ||
|
||
# Rename the 'aggregation_attribute' column to 'new_column_name'. Put in | ||
# Documentation that the order the user put the label name must be the order of the gdf | ||
exposure_gdf.rename(columns={attribute_name: f"Aggregation Label: {label_name}"}, inplace=True) | ||
|
||
##remove the index_right column | ||
if "index_right" in exposure_gdf.columns: | ||
del exposure_gdf["index_right"] | ||
|
||
return exposure_gdf | ||
|
||
|
||
def join_exposure_aggregation_areas( | ||
exposure_gdf: gpd.GeoDataFrame, | ||
aggregation_area_fn: Union[List[str], List[Path], str, Path], | ||
attribute_names: Union[List[str], str], | ||
label_names: Union[List[str], str], | ||
) -> gpd.GeoDataFrame: | ||
"""Join aggregation area labels to the exposure data. | ||
Parameters | ||
---------- | ||
exposure_gdf : gpd.GeoDataFrame | ||
Exposure data to join the aggregation areas to as "Aggregation | ||
Label: `label_names`". | ||
aggregation_area_fn : Union[List[str], List[Path], str, Path] | ||
Path(s) to the aggregation area(s). | ||
attribute_names : Union[List[str], str] | ||
Name of the attribute(s) to join. | ||
label_names : Union[List[str], str] | ||
Name of the label(s) to join. | ||
""" | ||
if isinstance(aggregation_area_fn, str) or isinstance(aggregation_area_fn, Path): | ||
aggregation_area_fn = [aggregation_area_fn] | ||
if isinstance(attribute_names, str): | ||
attribute_names = [attribute_names] | ||
if isinstance(label_names, str): | ||
label_names = [label_names] | ||
|
||
exposure_gdf = join_exposure_aggregation_multiple_areas(exposure_gdf, aggregation_area_fn, attribute_names, label_names) | ||
|
||
# Remove the geometry column from the exposure_gdf to return a dataframe | ||
del exposure_gdf["geometry"] | ||
return exposure_gdf |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from typing import Sequence | ||
from _pytest.mark.structures import ParameterSet | ||
from hydromt_fiat.fiat import FiatModel | ||
from hydromt.log import setuplog | ||
from pathlib import Path | ||
import pytest | ||
import geopandas as gpd | ||
import pandas as pd | ||
from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_areas | ||
from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_multiple_areas | ||
|
||
from hydromt_fiat.workflows.exposure_vector import ExposureVector | ||
from hydromt_fiat.workflows.vulnerability import Vulnerability | ||
|
||
import shutil | ||
|
||
# set pyogrio as default engine | ||
gpd.options.io_engine = "pyogrio" | ||
|
||
# Load Data | ||
EXAMPLEDIR = Path(r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model") | ||
|
||
_cases = { | ||
"aggregation_test_1": { | ||
"new_root": Path(r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\modelbuilder_sprint"), | ||
"configuration": { | ||
"setup_aggregation_areas": { | ||
"aggregation_area_fn": r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model\zones.gpkg", | ||
"attribute_names": "ZONE_BASE", | ||
"label_names": "Zoning_map", | ||
} | ||
}, | ||
}, | ||
"aggregation_test_2": { | ||
"new_root": Path(r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\modelbuilder_sprint3"), | ||
"configuration": { | ||
"setup_aggregation_areas": { | ||
"aggregation_area_fn": [ | ||
r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model\zones.gpkg", | ||
r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model\zones2.gpkg", | ||
r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model\zones3.gpkg", | ||
r"C:\Users\rautenba\OneDrive - Stichting Deltares\Documents\Projects\FIAT\20230927_Hydromt_Fiat_Sprint\FIAT_model\zones4.gpkg" | ||
], | ||
"attribute_names": ["ZONE_BASE", "LAND_USE","ZoneName","ACCOM"], | ||
"label_names": ["Zoning_map", "Land_use_map","Horse","Accomodation_Zone"], | ||
} | ||
}, | ||
}, | ||
} | ||
|
||
|
||
|
||
# Set up Fiat Model | ||
@pytest.mark.parametrize("case", list(_cases.keys())) | ||
def test_aggregation_areas(case: ParameterSet | Sequence[object] | object): | ||
# Read model in examples folder. | ||
root = EXAMPLEDIR | ||
if _cases[case]["new_root"].exists(): | ||
shutil.rmtree(_cases[case]["new_root"]) | ||
logger = setuplog("hydromt_fiat", log_level=10) | ||
|
||
fm = FiatModel(root=root, mode="r", logger=logger) | ||
fm.read() | ||
|
||
fm.build(write=False, opt=_cases[case]["configuration"]) | ||
fm.set_root(_cases[case]["new_root"]) | ||
fm.write() | ||
|
||
# Check if the exposure object exists | ||
assert isinstance(fm.exposure, ExposureVector) | ||
|
||
# Check if the exposure database exists | ||
assert not fm.exposure.exposure_db.empty | ||
|
||
# Check if the vulnerability object exists | ||
assert isinstance(fm.vulnerability, Vulnerability) | ||
|
||
# Check if the vulnerability functions exist | ||
assert len(fm.vulnerability.functions) > 0 |