diff --git a/docs/changelog.rst b/docs/changelog.rst index 34e327ea0..8885abfcc 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -31,12 +31,22 @@ Added UGRID topologies from "intervals": the (M + 1, N + 1) vertex coordinates for N faces. - :meth:`xugrid.UgridDataArrayAccessor.from_structured` now takes ``x`` and ``y`` arguments to specify which coordinates to use as the UGRID x and y coordinates. +- :attr:`xugrid.UgridDataset.sizes` as an alternative to :attr:`xugrid.UgridDataset.dimensions` +- :attr:`xugrid.Ugrid2d.max_face_node_dimension` which returns the dimension + name designating nodes per face. +- :attr:`xugrid.AbstractUgrid.max_connectivity_sizes` which returns all + maximum connectivity dimensions and their corresponding size. +- :attr:`xugrid.AbstractUgrid.max_connectivity_dimensions` which returns all + maximum connectivity dimensions. + Changed ~~~~~~~ - :meth:`xugrid.Ugrid2d.from_structured` now takes ``x`` and ``y`` arguments instead of ``x_bounds`` and ``y_bounds`` arguments. +- :func:`xugrid.merge_partitions` now also merges datasets with grids that are + only contained in some of the partition datasets. [0.8.1] 2024-01-19 ------------------ diff --git a/tests/test_partitioning.py b/tests/test_partitioning.py index 04596cf4a..19b3a3b4c 100644 --- a/tests/test_partitioning.py +++ b/tests/test_partitioning.py @@ -139,12 +139,15 @@ def test_merge_partitions__errors(self): grid1 = partitions[1].ugrid.grid partitions[1]["extra"] = (grid1.face_dimension, np.ones(grid1.n_face)) - with pytest.raises(ValueError, match="These variables are present"): + with pytest.raises( + ValueError, + match="Missing variables: {'extra'} in partition", + ): pt.merge_partitions(partitions) partitions = self.uds.ugrid.partition(n_part=2) partitions[1]["face_z"] = partitions[1]["face_z"].expand_dims("layer", axis=0) - with pytest.raises(ValueError, match="Dimensions for face_z do not match"): + with pytest.raises(ValueError, match="Dimensions for 'face_z' do not match"): pt.merge_partitions(partitions) uds = self.uds.copy() @@ -188,13 +191,29 @@ def test_merge_partitions(self): merged = pt.merge_partitions(self.datasets) assert isinstance(merged, xu.UgridDataset) assert len(merged.ugrid.grids) == 2 - # In case of non-UGRID data, it should default to the first partition: - assert merged["c"] == 0 + # In case of non-UGRID data, it should default to the last partition: + assert merged["c"] == 1 - def test_merge_partitions__errors(self): + assert len(merged["first_nFaces"]) == 6 + assert len(merged["second_nFaces"]) == 20 + + def test_merge_partitions__unique_grid_per_partition(self): pa = self.datasets[0][["a"]] pb = self.datasets[1][["b"]] - with pytest.raises(ValueError, match="Expected 2 UGRID topologies"): + merged = pt.merge_partitions([pa, pb]) + + assert isinstance(merged, xu.UgridDataset) + assert len(merged.ugrid.grids) == 2 + + assert len(merged["first_nFaces"]) == 3 + assert len(merged["second_nFaces"]) == 10 + + def test_merge_partitions__errors(self): + pa = self.datasets[0][["a"]] * xr.DataArray([1.0, 1.0], dims=("error_dim",)) + pb = self.datasets[1][["a"]] + with pytest.raises( + ValueError, match="Dimensions for 'a' do not match across partitions: " + ): pt.merge_partitions([pa, pb]) grid_a = self.datasets[1].ugrid.grids[0].copy() @@ -248,7 +267,7 @@ def setup(self): ds_expected = xu.UgridDataset(grids=[grid]) ds_expected["a"] = ((grid.edge_dimension), np.concatenate(values_parts)) - ds_expected["c"] = 0 + ds_expected["c"] = 1 # Assign coordinates also added during merge_partitions coords = {grid.edge_dimension: np.arange(grid.n_edge)} ds_expected = ds_expected.assign_coords(**coords) @@ -260,8 +279,9 @@ def test_merge_partitions(self): merged = pt.merge_partitions(self.datasets_partitioned) assert isinstance(merged, xu.UgridDataset) assert len(merged.ugrid.grids) == 1 - # In case of non-UGRID data, it should default to the first partition: - assert merged["c"] == 0 + # In case of non-UGRID data, it should default to the last partition of + # the grid that's checked last. + assert merged["c"] == 1 assert self.dataset_expected.ugrid.grid.equals(merged.ugrid.grid) assert self.dataset_expected["a"].equals(merged["a"]) @@ -289,16 +309,22 @@ def setup(self): ds["a"] = ((part_a.face_dimension), values_a) ds["b"] = ((part_b.edge_dimension), values_b) ds["c"] = i - datasets_parts.append(ds) + + coords = { + part_a.face_dimension: values_a, + part_b.edge_dimension: values_b, + } + + datasets_parts.append(ds.assign_coords(**coords)) ds_expected = xu.UgridDataset(grids=[grid_a, grid_b]) ds_expected["a"] = ((grid_a.face_dimension), np.concatenate(values_parts_a)) ds_expected["b"] = ((grid_b.edge_dimension), np.concatenate(values_parts_b)) - ds_expected["c"] = 0 + ds_expected["c"] = 1 # Assign coordinates also added during merge_partitions coords = { - grid_a.face_dimension: np.arange(grid_a.n_face), - grid_b.edge_dimension: np.arange(grid_b.n_edge), + grid_a.face_dimension: np.concatenate(values_parts_a), + grid_b.edge_dimension: np.concatenate(values_parts_b), } ds_expected = ds_expected.assign_coords(**coords) @@ -309,7 +335,26 @@ def test_merge_partitions(self): merged = pt.merge_partitions(self.datasets_parts) assert isinstance(merged, xu.UgridDataset) assert len(merged.ugrid.grids) == 2 - # In case of non-UGRID data, it should default to the first partition: - assert merged["c"] == 0 + # In case of non-UGRID data, it should default to the last partition of + # the grid that's checked last. + assert merged["c"] == 1 + + assert self.dataset_expected.equals(merged) + + def test_merge_partitions__inconsistent_grid_types(self): + self.datasets_parts[0] = self.datasets_parts[0].drop_vars( + ["b", "mesh1d_nEdges"] + ) + b = self.dataset_expected["b"].isel(mesh1d_nEdges=[0, 1, 2]) + self.dataset_expected = self.dataset_expected.drop_vars(["b", "mesh1d_nEdges"]) + self.dataset_expected["b"] = b + self.dataset_expected["c"] = 1 + + merged = pt.merge_partitions(self.datasets_parts) + assert isinstance(merged, xu.UgridDataset) + assert len(merged.ugrid.grids) == 2 + # In case of non-UGRID data, it should default to the last partition of + # the grid that's checked last. + assert merged["c"] == 1 assert self.dataset_expected.equals(merged) diff --git a/xugrid/ugrid/partitioning.py b/xugrid/ugrid/partitioning.py index a14790a66..5d5a6e2fb 100644 --- a/xugrid/ugrid/partitioning.py +++ b/xugrid/ugrid/partitioning.py @@ -1,6 +1,6 @@ """Create and merge partitioned UGRID topologies.""" from collections import defaultdict -from itertools import accumulate +from itertools import accumulate, chain from typing import List import numpy as np @@ -9,6 +9,7 @@ from xugrid.constants import IntArray, IntDType from xugrid.core.wrap import UgridDataArray, UgridDataset from xugrid.ugrid.connectivity import renumber +from xugrid.ugrid.ugridbase import UgridType def labels_to_indices(labels: IntArray) -> List[IntArray]: @@ -145,13 +146,7 @@ def merge_edges(grids, node_inverse): return _merge_connectivity(all_edges, slices) -def validate_partition_topology(grouped, n_partition: int): - n = n_partition - if not all(len(v) == n for v in grouped.values()): - raise ValueError( - f"Expected {n} UGRID topologies for {n} partitions, received: " f"{grouped}" - ) - +def validate_partition_topology(grouped: defaultdict[str, UgridType]) -> None: for name, grids in grouped.items(): types = {type(grid) for grid in grids} if len(types) > 1: @@ -170,37 +165,60 @@ def validate_partition_topology(grouped, n_partition: int): return None -def group_grids_by_name(partitions): +def group_grids_by_name(partitions: list[UgridDataset]) -> defaultdict[str, UgridType]: grouped = defaultdict(list) for partition in partitions: for grid in partition.grids: grouped[grid.name].append(grid) - validate_partition_topology(grouped, len(partitions)) + validate_partition_topology(grouped) return grouped -def validate_partition_objects(data_objects): - # Check presence of variables. - allvars = list({tuple(sorted(ds.data_vars)) for ds in data_objects}) - if len(allvars) > 1: - raise ValueError( - "These variables are present in some partitions, but not in " - f"others: {set(allvars[0]).symmetric_difference(allvars[1])}" - ) - # Check dimensions - for var in allvars.pop(): - vardims = list({ds[var].dims for ds in data_objects}) - if len(vardims) > 1: - raise ValueError( - f"Dimensions for {var} do not match across partitions: " - f"{vardims[0]} versus {vardims[1]}" - ) +def group_data_objects_by_gridname( + partitions: list[UgridDataset] +) -> defaultdict[str, xr.Dataset]: + # Convert to dataset for convenience + data_objects = [partition.obj for partition in partitions] + data_objects = [ + obj.to_dataset() if isinstance(obj, xr.DataArray) else obj + for obj in data_objects + ] + + grouped = defaultdict(list) + for partition, obj in zip(partitions, data_objects): + for grid in partition.grids: + grouped[grid.name].append(obj) + + return grouped + + +def validate_partition_objects( + objects_by_gridname: defaultdict[str, xr.Dataset] +) -> None: + for data_objects in objects_by_gridname.values(): + allvars = list({tuple(sorted(ds.data_vars)) for ds in data_objects}) + unique_vars = set(chain(*allvars)) + # Check dimensions + dims_per_var = [ + {ds[var].dims for ds in data_objects if var in ds.data_vars} + for var in unique_vars + ] + for var, vardims in zip(unique_vars, dims_per_var): + if len(vardims) > 1: + vardims_ls = list(vardims) + raise ValueError( + f"Dimensions for '{var}' do not match across partitions: " + f"{vardims_ls[0]} versus {vardims_ls[1]}" + ) + return None -def separate_variables(data_objects, ugrid_dims): +def separate_variables( + objects_by_gridname: defaultdict[str, xr.Dataset], ugrid_dims: set[str] +): """Separate into UGRID variables grouped by dimension, and other variables.""" - validate_partition_objects(data_objects) + validate_partition_objects(objects_by_gridname) def assert_single_dim(intersection): if len(intersection) > 1: @@ -216,31 +234,77 @@ def all_equal(iterator): return all(element == first for element in iterator) # Group variables by UGRID dimension. - first = data_objects[0] - variables = first.variables - vardims = {var: tuple(first[var].dims) for var in variables} - grouped = defaultdict(list) # UGRID associated vars - other = [] # other vars - for var, da in variables.items(): - shapes = (obj[var].shape for obj in data_objects) - - # Check if variable depends on UGRID dimension. - intersection = ugrid_dims.intersection(da.dims) - if intersection: - assert_single_dim(intersection) - # Now check whether the non-UGRID dimensions match. - dim = intersection.pop() # Get the single element in the set. - axis = vardims[var].index(dim) - shapes = [remove_item(shape, axis) for shape in shapes] - if all_equal(shapes): - grouped[dim].append(var) - - elif all_equal(shapes): - other.append(var) + grouped = defaultdict(set) # UGRID associated vars + other = defaultdict(set) # other vars + + for gridname, data_objects in objects_by_gridname.items(): + variables = { + varname: data + for obj in data_objects + for varname, data in obj.variables.items() + } + vardims = {varname: data.dims for varname, data in variables.items()} + for var, dims in vardims.items(): + shapes = [obj[var].shape for obj in data_objects if var in obj] + + # Check if variable depends on UGRID dimension. + intersection = ugrid_dims.intersection(dims) + if intersection: + assert_single_dim(intersection) + # Now check whether the non-UGRID dimensions match. + dim = intersection.pop() # Get the single element in the set. + axis = dims.index(dim) + shapes = [remove_item(shape, axis) for shape in shapes] + if all_equal(shapes): + grouped[dim].add(var) + + elif all_equal(shapes): + other[gridname].add(var) return grouped, other +def merge_data_along_dim( + data_objects: list[xr.Dataset], + vars: list[str], + merge_dim: str, + indexes: list[np.array], + merged_grid: UgridType, +) -> xr.Dataset: + """ + Select variables from the data objects. + Pad connectivity dims if needed. + Concatenate along dim. + """ + max_sizes = merged_grid.max_connectivity_sizes + ugrid_connectivity_dims = set(max_sizes) + + to_merge = [] + for obj, index in zip(data_objects, indexes): + # Check for presence of vars + missing_vars = set(vars).difference(set(obj.variables.keys())) + if missing_vars: + raise ValueError(f"Missing variables: {missing_vars} in partition {obj}") + + selection = obj[vars].isel({merge_dim: index}, missing_dims="ignore") + + # Pad the ugrid connectivity dims (e.g. n_max_face_node_connectivity) if + # needed. + present_dims = ugrid_connectivity_dims.intersection(selection.dims) + pad_width = {} + for dim in present_dims: + nmax = max_sizes[dim] + size = selection.sizes[dim] + if size != nmax: + pad_width[dim] = (0, nmax - size) + if pad_width: + selection = selection.pad(pad_width=pad_width) + + to_merge.append(selection) + + return xr.concat(to_merge, dim=merge_dim) + + def merge_partitions(partitions): """ Merge topology and data, partitioned along UGRID dimensions, into a single @@ -270,38 +334,43 @@ def merge_partitions(partitions): if obj_type not in (UgridDataArray, UgridDataset): raise TypeError(msg.format(obj_type.__name__)) - # Convert to dataset for convenience - data_objects = [partition.obj for partition in partitions] - data_objects = [ - obj.to_dataset() if isinstance(obj, xr.DataArray) else obj - for obj in data_objects - ] # Collect grids grids = [grid for p in partitions for grid in p.grids] ugrid_dims = {dim for grid in grids for dim in grid.dimensions} grids_by_name = group_grids_by_name(partitions) - vars_by_dim, other_vars = separate_variables(data_objects, ugrid_dims) + + data_objects_by_name = group_data_objects_by_gridname(partitions) + vars_by_dim, other_vars_by_name = separate_variables( + data_objects_by_name, ugrid_dims + ) # First, take identical non-UGRID variables from the first partition: - merged = data_objects[0][other_vars] + merged = xr.Dataset() # Merge the UGRID topologies into one, and find the indexes to index into # the data to avoid duplicates. merged_grids = [] - for grids in grids_by_name.values(): + for gridname, grids in grids_by_name.items(): + data_objects = data_objects_by_name[gridname] + other_vars = other_vars_by_name[gridname] + # First, merge the grid topology. grid = grids[0] merged_grid, indexes = grid.merge_partitions(grids) merged_grids.append(merged_grid) - # Now remove duplicates, then concatenate along the UGRID dimension. + # Add other vars, unassociated with UGRID dimensions, to dataset. + for obj in data_objects: + other_vars_obj = set(other_vars).intersection(set(obj.data_vars)) + merged.update(obj[other_vars_obj]) + for dim, dim_indexes in indexes.items(): vars = vars_by_dim[dim] - selection = [ - obj[vars].isel({dim: index}, missing_dims="ignore") - for obj, index in zip(data_objects, dim_indexes) - ] - merged_selection = xr.concat(selection, dim=dim) + if len(vars) == 0: + continue + merged_selection = merge_data_along_dim( + data_objects, vars, dim, dim_indexes, merged_grid + ) merged.update(merged_selection) return UgridDataset(merged, merged_grids) diff --git a/xugrid/ugrid/ugrid1d.py b/xugrid/ugrid/ugrid1d.py index a5f705d16..93cb82d6b 100644 --- a/xugrid/ugrid/ugrid1d.py +++ b/xugrid/ugrid/ugrid1d.py @@ -630,7 +630,9 @@ def contract_vertices(self, indices: IntArray) -> "Ugrid1d": ) @staticmethod - def merge_partitions(grids: Sequence["Ugrid1d"]) -> "Ugrid1d": + def merge_partitions( + grids: Sequence["Ugrid1d"] + ) -> tuple["Ugrid1d", dict[str, np.array]]: """ Merge grid partitions into a single whole. diff --git a/xugrid/ugrid/ugrid2d.py b/xugrid/ugrid/ugrid2d.py index fd744ece5..261cb3ba5 100644 --- a/xugrid/ugrid/ugrid2d.py +++ b/xugrid/ugrid/ugrid2d.py @@ -417,6 +417,20 @@ def dimensions(self): self.face_dimension: self.n_face, } + @property + def max_face_node_dimension(self) -> str: + return self._attrs["max_face_nodes_dimension"] + + @property + def max_connectivity_sizes(self) -> dict[str, int]: + return { + self.max_face_node_dimension: self.n_max_node_per_face, + } + + @property + def max_connectivity_dimensions(self) -> tuple[str]: + return (self.max_face_node_dimension,) + @property def topology_dimension(self): """Highest dimensionality of the geometric elements: 2""" @@ -1491,7 +1505,9 @@ def partition(self, n_part: int): return [self.topology_subset(index) for index in indices] @staticmethod - def merge_partitions(grids: Sequence["Ugrid2d"]) -> "Ugrid2d": + def merge_partitions( + grids: Sequence["Ugrid2d"] + ) -> tuple["Ugrid2d", dict[str, np.array]]: """ Merge grid partitions into a single whole. diff --git a/xugrid/ugrid/ugridbase.py b/xugrid/ugrid/ugridbase.py index 0fd84782b..423051c19 100644 --- a/xugrid/ugrid/ugridbase.py +++ b/xugrid/ugrid/ugridbase.py @@ -299,6 +299,18 @@ def edge_dimension(self): """Name of edge dimension""" return self._attrs["edge_dimension"] + @property + def max_connectivity_dimensions(self) -> tuple[str]: + return () + + @property + def max_connectivity_sizes(self) -> dict[str, int]: + return {} + + @property + def sizes(self) -> dict[str, int]: + return self.dimensions + @property def node_coordinates(self) -> FloatArray: """Coordinates (x, y) of the nodes (vertices)"""