diff --git a/custom_tools/partition_iterator.py b/custom_tools/partition_iterator.py index 1f03f9f2..2c57793e 100644 --- a/custom_tools/partition_iterator.py +++ b/custom_tools/partition_iterator.py @@ -1,10 +1,16 @@ import arcpy import os +import re +import shutil import random +import json +from typing import Dict, Tuple, Literal import env_setup.global_config from env_setup import environment_setup from custom_tools import custom_arcpy +from custom_tools.timing_decorator import timing_decorator + from input_data import input_n50 from file_manager.n100.file_manager_buildings import Building_N100 @@ -14,119 +20,359 @@ class PartitionIterator: """THIS IS WORK IN PROGRESS NOT READY FOR USE YET""" + # Class-level constants + PARTITION_FIELD = "partition_select" + ORIGINAL_ID_FIELD = "original_id_field" + def __init__( self, - alias_path_data, - root_file_partition_iterator, - scale, - output_feature_class, + alias_path_data: Dict[str, Tuple[Literal["input", "context"], str]], + alias_path_outputs: Dict[str, Tuple[str, str]], + root_file_partition_iterator: str, + scale: str, custom_functions=None, - feature_count="15000", - partition_method="FEATURES", - search_distance="500 Meters", - object_id_field="OBJECTID", + dictionary_documentation_path: str = None, + feature_count: str = "15000", + partition_method: Literal["FEATURES", "VERTICES"] = "FEATURES", + search_distance: str = "500 Meters", + object_id_field: str = "OBJECTID", ): """ Initialize the PartitionIterator with input datasets for partitioning and processing. - :param alias_path_inputs: A dictionary of input feature class paths with their aliases. + :param alias_path_data: A nested dictionary of input feature class paths with their aliases. :param root_file_partition_iterator: Base path for in progress outputs. :param scale: Scale for the partitions. - :param output_feature_class: The output feature class for final results. + :param alias_path_outputs: A nested dictionary of output feature class for final results. :param feature_count: Feature count for cartographic partitioning. :param partition_method: Method used for creating cartographic partitions. """ - self.data = { - alias: {"type": type_info, "path": path_info} - for alias, (type_info, path_info) in alias_path_data.items() - } + # Raw inputs and initial setup + self.raw_input_data = alias_path_data + self.raw_output_data = alias_path_outputs or {} self.root_file_partition_iterator = root_file_partition_iterator + if "." in dictionary_documentation_path: + self.dictionary_documentation_path = re.sub( + r"\.[^.]*$", "", dictionary_documentation_path + ) + else: + self.dictionary_documentation_path = dictionary_documentation_path + self.scale = scale - self.output_feature_class = output_feature_class + self.search_distance = search_distance self.feature_count = feature_count self.partition_method = partition_method + self.object_id_field = object_id_field + + # Initial processing results + self.nested_alias_type_data = {} + self.nested_final_outputs = {} + + # Variables related to features and iterations self.partition_feature = ( f"{root_file_partition_iterator}_partition_feature_{scale}" ) + self.max_object_id = None + self.iteration_file_paths_list = [] + self.first_call_directory_documentation = True + + # Variables related to custom operations self.custom_functions = custom_functions or [] - self.iteration_file_paths = [] - self.final_append_features = {} - self.search_distance = search_distance - self.object_id_field = object_id_field + + # self.handle_data_export( + # file_path=self.dictionary_documentation_path, + # alias_type_data=self.nested_alias_type_data, + # final_outputs=self.nested_final_outputs, + # file_name="initialization", + # iteration=False, + # object_id=None, + # ) + + def unpack_alias_path_data(self, alias_path_data): + # Process initial alias_path_data for inputs and outputs + for alias, info in alias_path_data.items(): + type_info, path_info = info + if alias not in self.nested_alias_type_data: + self.nested_alias_type_data[alias] = {} + self.nested_alias_type_data[alias][type_info] = path_info + + def unpack_alias_path_outputs(self, alias_path_outputs): + self.nested_final_outputs = {} + for alias, info in alias_path_outputs.items(): + type_info, path_info = info + if alias not in self.nested_final_outputs: + self.nested_final_outputs[alias] = {} + self.nested_final_outputs[alias][type_info] = path_info + + def configure_alias_and_type( + self, + alias, + type_name, + type_path, + ): + # Check if alias exists + if alias not in self.nested_alias_type_data: + print(f"Alias '{alias}' not found in nested_alias_type_data.") + return + + # Update path of an existing type or add a new type with the provided path + self.nested_alias_type_data[alias][type_name] = type_path + print(f"Set path for type '{type_name}' in alias '{alias}' to: {type_path}") + + def create_new_alias( + self, + alias, + initial_type_name=None, + initial_type_path=None, + ): + # Check if alias already exists + if alias in self.nested_alias_type_data: + raise ValueError(f"Alias {alias} already exists.") + + # Initialize nested_alias_type_data for alias + if initial_type_name: + # Create alias with initial type and path + self.nested_alias_type_data[alias] = {initial_type_name: initial_type_path} + else: + # Initialize alias as an empty dictionary + self.nested_alias_type_data[alias] = {} + + print( + f"Created new alias '{alias}' in nested_alias_type_data with type '{initial_type_name}' and path: {initial_type_path}" + ) def create_cartographic_partitions(self): + all_features = [ + path + for alias, types in self.nested_alias_type_data.items() + for type_key, path in types.items() + if type_key in ["input_copy", "context_copy"] and path is not None + ] + + print(f"all_features: {all_features}") + if all_features: + arcpy.cartography.CreateCartographicPartitions( + in_features=all_features, + out_features=self.partition_feature, + feature_count=self.feature_count, + partition_method=self.partition_method, + ) + print(f"Created partitions in {self.partition_feature}") + else: + print("No input or context features available for creating partitions.") + + def delete_feature_class(self, feature_class_path, alias=None, output_type=None): + """Deletes a feature class if it exists.""" + if arcpy.Exists(feature_class_path): + arcpy.management.Delete(feature_class_path) + if alias and output_type: + print( + f"Deleted existing output feature class for '{alias}' of type '{output_type}': {feature_class_path}" + ) + else: + print(f"Deleted feature class: {feature_class_path}") + + def delete_final_outputs(self): + """Deletes all final output files if they exist.""" + for alias in self.nested_final_outputs: + for _, output_file_path in self.nested_final_outputs[alias].items(): + if arcpy.Exists(output_file_path): + arcpy.management.Delete(output_file_path) + print(f"Deleted file: {output_file_path}") + + def delete_iteration_files(self, *file_paths): + """Deletes multiple feature classes or files. Detailed alias and output_type logging is not available here.""" + for file_path in file_paths: + self.delete_feature_class(file_path) + print(f"Deleted file: {file_path}") + + @staticmethod + def create_feature_class(full_feature_path, template_feature): + """Creates a new feature class from a template feature class, given a full path.""" + out_path, out_name = os.path.split(full_feature_path) + if arcpy.Exists(full_feature_path): + arcpy.management.Delete(full_feature_path) + print(f"Deleted existing feature class: {full_feature_path}") + + arcpy.management.CreateFeatureclass( + out_path=out_path, out_name=out_name, template=template_feature + ) + print(f"Created feature class: {full_feature_path}") + + def create_dummy_features(self, types_to_include=["input_copy", "context_copy"]): """ - Creates cartographic partitions based on all available feature classes, - including inputs and context features. + Creates dummy features for aliases with specified types. + + Args: + types_to_include (list): Types for which dummy features should be created. """ - all_features = [details["path"] for alias, details in self.data.items()] + for alias, alias_data in self.nested_alias_type_data.items(): + for type_info, path in list(alias_data.items()): + if type_info in types_to_include and path: + dummy_feature_path = f"{self.root_file_partition_iterator}_{alias}_dummy_{self.scale}" + PartitionIterator.create_feature_class( + full_feature_path=dummy_feature_path, + template_feature=path, + ) + print( + f"Created dummy feature class for {alias} of type {type_info}: {dummy_feature_path}" + ) + # Update alias state to include this new dummy type and its path + self.configure_alias_and_type( + alias=alias, + type_name="dummy", + type_path=dummy_feature_path, + ) - arcpy.cartography.CreateCartographicPartitions( - in_features=all_features, - out_features=self.partition_feature, - feature_count=self.feature_count, - partition_method=self.partition_method, - ) - print(f"Created partitions in {self.partition_feature}") + def initialize_dummy_used(self): + # Assuming `aliases` is a list of all your aliases + for alias in self.nested_alias_type_data: + self.nested_alias_type_data[alias]["dummy_used"] = False + + def reset_dummy_used(self): + # Assuming `aliases` is a list of all your aliases + for alias in self.nested_alias_type_data: + self.nested_alias_type_data[alias]["dummy_used"] = False + + def update_empty_alias_type_with_dummy_file(self, alias, type_info): + # Check if the dummy type exists in the alias nested_alias_type_data + if "dummy" in self.nested_alias_type_data[alias]: + # Check if the input type exists in the alias nested_alias_type_data + if ( + type_info in self.nested_alias_type_data[alias] + and self.nested_alias_type_data[alias][type_info] is not None + ): + # Get the dummy path from the alias nested_alias_type_data + dummy_path = self.nested_alias_type_data[alias]["dummy"] + # Set the value of the existing type_info to the dummy path + self.nested_alias_type_data[alias][type_info] = dummy_path + self.nested_alias_type_data[alias]["dummy_used"] = True + print( + f"The '{type_info}' for alias '{alias}' was updated with dummy path: {dummy_path}" + ) + else: + print( + f"'{type_info}' does not exist for alias '{alias}' in nested_alias_type_data." + ) + else: + print( + f"'dummy' type does not exist for alias '{alias}' in nested_alias_type_data." + ) - def delete_feature_class( + def create_directory_json_documentation( self, - feature_class_path, - ): - """Deletes a feature class if it exists. - + root_path: str, + target_dir: str, + iteration: bool, + ) -> str: + """ + Creates a directory at the given root_path for the target_dir. Args: - feature_class_path (str): The path to the feature class to be deleted. + root_path: The root directory where initial structure is created + target_dir: The target where the created directory should be placed + iteration: Boolean flag indicating if the iteration_documentation should be added + Returns: + A string containing the absolute path of the created directory. """ - if arcpy.Exists(feature_class_path): - arcpy.management.Delete(feature_class_path) - print(f"Deleted feature class: {feature_class_path}") - def create_feature_class( + # Determine base directory + directory_path = os.path.join(root_path, f"{target_dir}") + + # Ensure that the directory exists + os.makedirs(directory_path, exist_ok=True) + + if iteration: + iteration_documentation_dir = os.path.join( + directory_path, "iteration_documentation" + ) + os.makedirs(iteration_documentation_dir, exist_ok=True) + + return iteration_documentation_dir + + return directory_path + + def write_data_to_json( self, - out_path, - out_name, - template_feature, - ): - """Creates a new feature class from a template feature class. + data: dict, + file_path: str, + file_name: str, + object_id=None, + ) -> None: + """ + Writes dictionary into a json file. - Args: - out_path (str): The output path where the feature class will be created. - out_name (str): The name of the new feature class. - template_feature (str): The path to the template feature class. + Args: + data: The data to write. + file_path: The complete path (directory+file_name) where the file should be created + file_name: The name of the file to create + object_id: If provided, object_id will also be part of the file name. """ - full_out_path = os.path.join( - out_path, - out_name, - ) - if arcpy.Exists(full_out_path): - arcpy.management.Delete(full_out_path) - print(f"Deleted existing feature class: {full_out_path}") - arcpy.management.CreateFeatureclass( - out_path=out_path, - out_name=out_name, - template=template_feature, - ) - print(f"Created feature class: {full_out_path}") + if object_id: + complete_file_path = os.path.join( + file_path, f"{file_name}_{object_id}.json" + ) + else: + complete_file_path = os.path.join(file_path, f"{file_name}.json") - def delete_iteration_files(self, *file_paths): - """Deletes multiple feature classes or files. + with open(complete_file_path, "w") as f: + json.dump(data, f, indent=4) + + def export_dictionaries_to_json( + self, + file_path: str = None, + alias_type_data: dict = None, + final_outputs: dict = None, + file_name: str = None, + iteration: bool = False, + object_id=None, + ) -> None: + """ + Handles the export of alias type data and final outputs into separate json files. Args: - *file_paths: A variable number of file paths to delete. + file_path: The complete file path where to create the output directories. + alias_type_data: The alias type data to export. + final_outputs: The final outputs data to export. + file_name: The name of the file to create + iteration: Boolean flag indicating if the iteration_documentation should be added + object_id: Object ID to be included in the file name if it's an iteration (`iteration==True`). If `None`, will not be used. """ - for file_path in file_paths: - try: - if arcpy.Exists(file_path): - arcpy.Delete_management(file_path) - print(f"Deleted iteration file: {file_path}") - except Exception as e: - print(f"Error deleting {file_path}: {e}") + if file_path is None: + file_path = self.dictionary_documentation_path + if alias_type_data is None: + alias_type_data = self.nested_alias_type_data + if final_outputs is None: + final_outputs = self.nested_final_outputs + + if self.first_call_directory_documentation and os.path.exists(file_path): + shutil.rmtree(file_path) + self.first_call_directory_documentation = False + + alias_type_data_directory = self.create_directory_json_documentation( + file_path, "nested_alias_type_data", iteration + ) + final_outputs_directory = self.create_directory_json_documentation( + file_path, "nested_final_outputs", iteration + ) + + self.write_data_to_json( + alias_type_data, alias_type_data_directory, file_name, object_id + ) + self.write_data_to_json( + final_outputs, final_outputs_directory, file_name, object_id + ) + + def generate_unique_field_name(self, input_feature, field_name): + existing_field_names = [field.name for field in arcpy.ListFields(input_feature)] + unique_field_name = field_name + while unique_field_name in existing_field_names: + unique_field_name = f"{unique_field_name}_{random.randint(0, 9)}" + return unique_field_name - def pre_iteration(self): + def find_maximum_object_id(self): """ Determine the maximum OBJECTID for partitioning. """ @@ -137,415 +383,381 @@ def pre_iteration(self): self.object_id_field, sql_clause=(None, f"ORDER BY {self.object_id_field} DESC"), ) as cursor: - max_object_id = next(cursor)[0] + self.max_object_id = next(cursor)[0] - print(f"Maximum {self.object_id_field} found: {max_object_id}") + print(f"Maximum {self.object_id_field} found: {self.max_object_id}") - return max_object_id except Exception as e: print(f"Error in finding max {self.object_id_field}: {e}") def prepare_input_data(self): - # Iterate only over 'input' type data in self.data - for alias, details in self.data.items(): - if details["type"] == "input": + for alias, types in self.nested_alias_type_data.items(): + if "input" in types: + input_data_path = types["input"] input_data_copy = ( f"{self.root_file_partition_iterator}_{alias}_input_copy" ) + # self.delete_feature_class(input_data_copy) arcpy.management.Copy( - in_data=details["path"], + in_data=input_data_path, out_data=input_data_copy, ) - print(f"Copied input data for: {alias}") + print(f"Copied input nested_alias_type_data for: {alias}") + + # Add a new type for the alias the copied input nested_alias_type_data + self.configure_alias_and_type( + alias=alias, + type_name="input_copy", + type_path=input_data_copy, + ) + + # Making sure the field is unique if it exists a field with the same name + self.PARTITION_FIELD = self.generate_unique_field_name( + input_feature=input_data_copy, + field_name=self.PARTITION_FIELD, + ) - self.data[alias]["path"] = input_data_copy + self.ORIGINAL_ID_FIELD = self.generate_unique_field_name( + input_feature=input_data_copy, + field_name=self.ORIGINAL_ID_FIELD, + ) - partition_field = "partition_select" arcpy.AddField_management( in_table=input_data_copy, - field_name=partition_field, + field_name=self.PARTITION_FIELD, field_type="LONG", ) - print(f"Added field {partition_field}") + print(f"Added field {self.PARTITION_FIELD}") - # Add a unique ID field to the copied feature class, ensuring it's a new field - existing_field_names = [ - field.name for field in arcpy.ListFields(input_data_copy) - ] - orig_id_field = "orig_id_field" - while orig_id_field in existing_field_names: - orig_id_field = f"{orig_id_field}_{random.randint(0, 9)}" arcpy.AddField_management( in_table=input_data_copy, - field_name=orig_id_field, + field_name=self.ORIGINAL_ID_FIELD, field_type="LONG", ) - print(f"Added field {orig_id_field}") + print(f"Added field {self.ORIGINAL_ID_FIELD}") arcpy.CalculateField_management( in_table=input_data_copy, - field=orig_id_field, + field=self.ORIGINAL_ID_FIELD, expression=f"!{self.object_id_field}!", ) - print(f"Calculated field {orig_id_field}") + print(f"Calculated field {self.ORIGINAL_ID_FIELD}") + + if "context" in types: + context_data_path = types["context"] + context_data_copy = ( + f"{self.root_file_partition_iterator}_{alias}_context_copy" + ) + + arcpy.management.Copy( + in_data=context_data_path, + out_data=context_data_copy, + ) + print(f"Copied context nested_alias_type_data for: {alias}") + + self.configure_alias_and_type( + alias=alias, + type_name="context_copy", + type_path=context_data_copy, + ) def custom_function(inputs): outputs = [] return outputs - def delete_existing_outputs(self): - for alias, details in self.data.items(): - if details["type"] == "output": - output_path = details["path"] - self.delete_feature_class(output_path) - print(f"Deleted existing feature class: {output_path}") - - def create_dummy_features(self, alias, details): - dummy_feature_path = ( - f"{self.root_file_partition_iterator}_{alias}_dummy_{self.scale}" - ) - self.create_feature_class( - out_path=os.path.dirname(dummy_feature_path), - out_name=os.path.basename(dummy_feature_path), - template_feature=details["path"], - ) - def select_partition_feature(self, iteration_partition, object_id): """ Selects partition feature based on OBJECTID. """ - self.iteration_file_paths.append(iteration_partition) + self.iteration_file_paths_list.append(iteration_partition) custom_arcpy.select_attribute_and_make_permanent_feature( input_layer=self.partition_feature, expression=f"{self.object_id_field} = {object_id}", output_name=iteration_partition, ) - print(f"Created partition selection for OBJECTID {object_id}") + print(f"\nCreated partition selection for OBJECTID {object_id}") - def process_input_features(self, alias, details, iteration_partition): + def process_input_features( + self, + alias, + iteration_partition, + object_id, + ): """ Process input features for a given partition. """ - input_features_partition_selection = ( - f"in_memory/{alias}_partition_base_select_{self.scale}" - ) - self.iteration_file_paths.append(input_features_partition_selection) - - input_feature_count = custom_arcpy.select_location_and_make_feature_layer( - input_layer=details["path"], - overlap_type=custom_arcpy.OverlapType.HAVE_THEIR_CENTER_IN.value, - select_features=iteration_partition, - output_name=input_features_partition_selection, - ) - return input_feature_count > 0 - - def process_context_features(self, alias, details, iteration_partition): - """ - Process context features for a given partition if input features are present. - """ - context_selection_path = ( - f"{self.root_file_partition_iterator}_{alias}_context_iteration_selection" - ) - self.iteration_file_paths.append(context_selection_path) - - custom_arcpy.select_location_and_make_permanent_feature( - input_layer=details["path"], - overlap_type=custom_arcpy.OverlapType.WITHIN_A_DISTANCE, - select_features=iteration_partition, - output_name=context_selection_path, - selection_type=custom_arcpy.SelectionType.NEW_SELECTION.value, - search_distance=self.search_distance, - ) - - def partition_iteration( - self, - input_data_copy, - partition_feature, - max_object_id, - root_file_partition_iterator, - scale, - partition_field, - orig_id_field, - final_append_feature, - ): - self.delete_existing_outputs() - - for object_id in range(1, max_object_id + 1): - self.iteration_file_paths.clear() + if "input_copy" not in self.nested_alias_type_data[alias]: + # If there are no inputs to process, return None for the aliases and a flag indicating no input was present. + return None, False + + if "input_copy" in self.nested_alias_type_data[alias]: + input_path = self.nested_alias_type_data[alias]["input_copy"] + input_features_partition_selection = ( + f"in_memory/{alias}_partition_base_select_{self.scale}" + ) + self.iteration_file_paths_list.append(input_features_partition_selection) - iteration_partition = f"{self.partition_feature}_{object_id}" - self.select_partition_feature(iteration_partition, object_id) + custom_arcpy.select_location_and_make_feature_layer( + input_layer=input_path, + overlap_type=custom_arcpy.OverlapType.HAVE_THEIR_CENTER_IN.value, + select_features=iteration_partition, + output_name=input_features_partition_selection, + ) - inputs_present_in_partition = False + aliases_with_features = {} + count_points = int( + arcpy.management.GetCount(input_features_partition_selection).getOutput( + 0 + ) + ) + aliases_with_features[alias] = count_points - # Processing 'input' type features. - for alias, details in self.data.items(): - if details["type"] == "input": - inputs_present = self.process_input_features( - alias, details, iteration_partition - ) - inputs_present_in_partition |= inputs_present + if aliases_with_features[alias] > 0: + print(f"{alias} has {count_points} features in {iteration_partition}") - # Processing 'context' type features only if 'input' features are present. - if inputs_present_in_partition: - for alias, details in self.data.items(): - if details["type"] == "context": - self.process_context_features( - alias, details, iteration_partition - ) - - # Creating dummy features and selecting partition features for all types. - for alias, details in self.data.items(): - dummy_feature_path = ( - f"{self.root_file_partition_iterator}_{alias}_dummy_{self.scale}" - ) - self.create_feature_class( - out_path=os.path.dirname(dummy_feature_path), - out_name=os.path.basename(dummy_feature_path), - template_feature=details["path"], - ) + arcpy.CalculateField_management( + in_table=input_features_partition_selection, + field=self.PARTITION_FIELD, + expression="1", + ) - for object_id in range(1, max_object_id + 1): - self.iteration_file_paths.clear() - iteration_partition = f"{self.partition_feature}_{object_id}" - # Flag to check if any input features exist in this partition. - inputs_present_in_partition = False - - # Processing 'input' type features - for alias, details in self.data.items(): - if details["type"] == "input": - input_features_partition_selection = ( - f"in_memory/{alias}_partition_base_select_{scale}" - ) - self.iteration_file_paths.append(input_features_partition_selection) - input_feature_count = custom_arcpy.select_location_and_make_feature_layer( - input_layer=details["path"], - overlap_type=custom_arcpy.OverlapType.HAVE_THEIR_CENTER_IN.value, - select_features=iteration_partition, - output_name=input_features_partition_selection, - ) + iteration_append_feature = f"{self.root_file_partition_iterator}_{alias}_iteration_append_feature_{self.scale}" + self.iteration_file_paths_list.append(iteration_append_feature) - if input_feature_count > 0: - inputs_present_in_partition = True - # Processing 'context' type features only if 'input' features are present in this partition. - if inputs_present_in_partition: - for alias, details in self.data.items(): - if details["type"] == "context": - context_selection_path = f"{self.root_file_partition_iterator}_{alias}_context_iteration_selection_{object_id}" - self.iteration_file_paths.append(context_selection_path) - - custom_arcpy.select_location_and_make_permanent_feature( - input_layer=details["path"], - overlap_type=custom_arcpy.OverlapType.WITHIN_A_DISTANCE, - select_features=iteration_partition, - output_name=context_selection_path, - selection_type=custom_arcpy.SelectionType.NEW_SELECTION.value, - search_distance=self.search_distance, - ) - - aliases_feature_counts = {alias: 0 for alias in self.alias} - - for object_id in range(1, max_object_id + 1): - self.iteration_file_paths.clear() - for alias in self.alias: - # Retrieve the output path for the current alias - output_path = self.outputs.get(alias) - - if object_id == 1: - self.delete_feature_class(output_path) - - print(f"\nProcessing {self.object_id_field} {object_id}") - iteration_partition = f"{partition_feature}_{object_id}" - self.iteration_file_paths.append(iteration_partition) + PartitionIterator.create_feature_class( + full_feature_path=iteration_append_feature, + template_feature=input_features_partition_selection, + ) - custom_arcpy.select_attribute_and_make_permanent_feature( - input_layer=partition_feature, - expression=f"{self.object_id_field} = {object_id}", - output_name=iteration_partition, - ) + arcpy.management.Append( + inputs=input_features_partition_selection, + target=iteration_append_feature, + schema_type="NO_TEST", + ) - # Check for features for each alias and set features_present accordingly - for alias in self.alias: - input_data_copy = self.file_mapping[alias]["current_output"] - base_partition_selection = ( - f"in_memory/{alias}_partition_base_select_{scale}" + input_features_partition_context_selection = f"in_memory/{alias}_input_features_partition_context_selection_{self.scale}" + self.iteration_file_paths_list.append( + input_features_partition_context_selection ) - self.iteration_file_paths.append(base_partition_selection) custom_arcpy.select_location_and_make_feature_layer( - input_layer=input_data_copy, - overlap_type=custom_arcpy.OverlapType.HAVE_THEIR_CENTER_IN.value, + input_layer=input_path, + overlap_type=custom_arcpy.OverlapType.WITHIN_A_DISTANCE, select_features=iteration_partition, - output_name=base_partition_selection, + output_name=input_features_partition_context_selection, + selection_type=custom_arcpy.SelectionType.NEW_SELECTION.value, + search_distance=self.search_distance, ) - aliases_with_features = 0 + arcpy.management.SelectLayerByLocation( + in_layer=input_features_partition_context_selection, + overlap_type="HAVE_THEIR_CENTER_IN", + select_features=iteration_partition, + selection_type="REMOVE_FROM_SELECTION", + ) - count_points = int( - arcpy.management.GetCount(base_partition_selection).getOutput(0) + arcpy.CalculateField_management( + in_table=input_features_partition_context_selection, + field=self.PARTITION_FIELD, + expression="0", ) - aliases_feature_counts[alias] = count_points - # Check if there are features for this alias - if count_points > 0: - print( - f"{alias} has {count_points} features in {iteration_partition}" - ) - aliases_with_features += 1 + arcpy.management.Append( + inputs=input_features_partition_context_selection, + target=iteration_append_feature, + schema_type="NO_TEST", + ) - iteration_append_feature = f"{root_file_partition_iterator}_{alias}_iteration_append_feature_{scale}" - self.iteration_file_paths.append(iteration_append_feature) + self.configure_alias_and_type( + alias=alias, + type_name="input", + type_path=iteration_append_feature, + ) - self.create_feature_class( - out_path=os.path.dirname(iteration_append_feature), - out_name=os.path.basename(iteration_append_feature), - template_feature=input_data_copy, - ) + print( + f"iteration partition {input_features_partition_context_selection} appended to {iteration_append_feature}" + ) + # Return the processed input features and a flag indicating successful operation + return aliases_with_features, True + else: + # Loads in dummy feature for this alias for this iteration and sets dummy_used = True + self.update_empty_alias_type_with_dummy_file( + alias, + type_info="input", + ) + print( + f"iteration partition {object_id} has no features for {alias} in the partition feature" + ) + # If there are no inputs to process, return None for the aliases and a flag indicating no input was present. + return None, False + + def _process_inputs_in_partition(self, aliases, iteration_partition, object_id): + inputs_present_in_partition = False + for alias in aliases: + if "input_copy" in self.nested_alias_type_data[alias]: + # Using process_input_features to check whether inputs are present + _, input_present = self.process_input_features( + alias, iteration_partition, object_id + ) + # Sets inputs_present_in_partition as True if any alias in partition has input present. Otherwise it remains False. + inputs_present_in_partition = ( + inputs_present_in_partition or input_present + ) + return inputs_present_in_partition - arcpy.CalculateField_management( - in_table=base_partition_selection, - field=partition_field, - expression="1", - ) + def process_context_features(self, alias, iteration_partition): + """ + Process context features for a given partition if input features are present. + """ + if "context_copy" in self.nested_alias_type_data[alias]: + context_path = self.nested_alias_type_data[alias]["context_copy"] + context_selection_path = f"{self.root_file_partition_iterator}_{alias}_context_iteration_selection" + self.iteration_file_paths_list.append(context_selection_path) + + custom_arcpy.select_location_and_make_permanent_feature( + input_layer=context_path, + overlap_type=custom_arcpy.OverlapType.WITHIN_A_DISTANCE, + select_features=iteration_partition, + output_name=context_selection_path, + selection_type=custom_arcpy.SelectionType.NEW_SELECTION.value, + search_distance=self.search_distance, + ) - arcpy.management.Append( - inputs=base_partition_selection, - target=iteration_append_feature, - schema_type="NO_TEST", - ) + self.configure_alias_and_type( + alias=alias, + type_name="context", + type_path=context_selection_path, + ) - base_partition_selection_2 = ( - f"in_memory/{alias}_partition_base_select_2_{scale}" - ) - self.iteration_file_paths.append(base_partition_selection_2) - - custom_arcpy.select_location_and_make_feature_layer( - input_layer=input_data_copy, - overlap_type=custom_arcpy.OverlapType.WITHIN_A_DISTANCE, - select_features=iteration_partition, - output_name=base_partition_selection_2, - selection_type=custom_arcpy.SelectionType.NEW_SELECTION.value, - search_distance=self.search_distance, - ) + def _process_context_features_and_others( + self, aliases, iteration_partition, object_id + ): + for alias in aliases: + if "context_copy" not in self.nested_alias_type_data[alias]: + # Loads in dummy feature for this alias for this iteration and sets dummy_used = True + self.update_empty_alias_type_with_dummy_file( + alias, + type_info="context", + ) + print( + f"iteration partition {object_id} has no context features for {alias} in the partition feature" + ) + else: + self.process_context_features(alias, iteration_partition) + + def append_iteration_to_final(self, alias): + # Guard clause if alias doesn't exist in nested_final_outputs + if alias not in self.nested_final_outputs: + return + + # For each type under current alias, append the result of the current iteration + for type_info, final_output_path in self.nested_final_outputs[alias].items(): + # Skipping append if the alias is a dummy feature + if self.nested_alias_type_data[alias]["dummy_used"]: + continue - arcpy.management.SelectLayerByLocation( - in_layer=base_partition_selection_2, - overlap_type="HAVE_THEIR_CENTER_IN", - select_features=iteration_partition, - selection_type="REMOVE_FROM_SELECTION", - ) + input_feature_class = self.nested_alias_type_data[alias][type_info] - arcpy.CalculateField_management( - in_table=base_partition_selection_2, - field=partition_field, - expression="0", - ) + if ( + not arcpy.Exists(input_feature_class) + or int(arcpy.GetCount_management(input_feature_class).getOutput(0)) <= 0 + ): + print( + f"No features found in partition target selection: {input_feature_class}" + ) + continue - arcpy.management.Append( - inputs=base_partition_selection_2, - target=iteration_append_feature, - schema_type="NO_TEST", - ) + partition_target_selection = ( + f"in_memory/{alias}_{type_info}_partition_target_selection_{self.scale}" + ) + self.iteration_file_paths_list.append(partition_target_selection) + self.iteration_file_paths_list.append(input_feature_class) - print( - f"iteration partition {base_partition_selection_2} appended to {iteration_append_feature}" - ) - else: - print( - f"iteration partition {object_id} has no features for {alias} in the partition feature" - ) + # Apply feature selection + custom_arcpy.select_attribute_and_make_permanent_feature( + input_layer=input_feature_class, + expression=f"{self.PARTITION_FIELD} = 1", + output_name=partition_target_selection, + ) - # If no aliases had features, skip the rest of the processing for this object_id - if aliases_with_features == 0: - for alias in self.alias: - self.delete_iteration_files(*self.iteration_file_paths) - continue + if not arcpy.Exists(final_output_path): + arcpy.management.CopyFeatures( + in_features=partition_target_selection, + out_feature_class=final_output_path, + ) - for func in self.custom_functions: - try: - pass - # Determine inputs for the current function - # inputs = [ - # self.file_mapping[fc]["func_output"] or fc - # for fc in self.input_feature_classes - # ] - - # Call the function and get outputs - outputs = func(inputs) - - # Update file mapping with the outputs - for fc, output in zip(self.input_feature_classes, outputs): - self.file_mapping[fc]["current_output"] = output - except: - print(f"Error in custom function: {func}") - - # Process each alias after custom functions - for alias in self.alias: - if aliases_feature_counts[alias] > 0: - # Retrieve the output path for the current alias - output_path = self.outputs.get(alias) - iteration_append_feature = f"{root_file_partition_iterator}_{alias}_iteration_append_feature_{scale}" - - if not arcpy.Exists(output_path): - self.create_feature_class( - out_path=os.path.dirname(output_path), - out_name=os.path.basename(output_path), - template_feature=iteration_append_feature, - ) - - partition_target_selection = ( - f"in_memory/{alias}_partition_target_selection_{scale}" - ) - self.iteration_file_paths.append(partition_target_selection) + else: + arcpy.management.Append( + inputs=partition_target_selection, + target=final_output_path, + schema_type="NO_TEST", + ) - custom_arcpy.select_attribute_and_make_permanent_feature( - input_layer=iteration_append_feature, - expression=f"{partition_field} = 1", - output_name=partition_target_selection, - ) + def partition_iteration(self): + aliases = self.nested_alias_type_data.keys() + self.find_maximum_object_id() - print( - f"for {alias} in {iteration_append_feature} \nThe input is: {partition_target_selection}\nAppending to {output_path}" - ) + self.create_dummy_features(types_to_include=["input_copy", "context_copy"]) + self.initialize_dummy_used() - arcpy.management.Append( - inputs=partition_target_selection, - target=output_path, - schema_type="NO_TEST", - ) - else: - print( - f"No features found in {alias} for {self.object_id_field} {object_id} to append to {output_path}" - ) + self.delete_iteration_files(*self.iteration_file_paths_list) + self.iteration_file_paths_list.clear() + + for object_id in range(1, self.max_object_id + 1): + self.reset_dummy_used() + self.export_dictionaries_to_json( + file_name="iteration_start", + iteration=True, + object_id=object_id, + ) + self.iteration_file_paths_list.clear() + iteration_partition = f"{self.partition_feature}_{object_id}" + self.select_partition_feature(iteration_partition, object_id) - for alias in self.alias: - self.delete_iteration_files(*self.iteration_file_paths) - print(f"Finished iteration {object_id}") + inputs_present_in_partition = self._process_inputs_in_partition( + aliases, iteration_partition, object_id + ) + if inputs_present_in_partition: + self._process_context_features_and_others( + aliases, iteration_partition, object_id + ) + if inputs_present_in_partition: + for alias in aliases: + self.append_iteration_to_final(alias) + self.delete_iteration_files(*self.iteration_file_paths_list) + else: + self.delete_iteration_files(*self.iteration_file_paths_list) + + self.export_dictionaries_to_json( + file_name="iteration_end", + iteration=True, + object_id=object_id, + ) + @timing_decorator def run(self): - environment_setup.main() - self.create_cartographic_partitions() + self.unpack_alias_path_data(self.raw_input_data) + if self.raw_output_data is not None: + self.unpack_alias_path_outputs(self.raw_output_data) - max_object_id = self.pre_iteration() + self.export_dictionaries_to_json(file_name="post_alias_unpack") + self.delete_final_outputs() self.prepare_input_data() - self.partition_iteration( - [self.file_mapping[alias]["current_output"] for alias in self.alias], - self.partition_feature, - max_object_id, - self.root_file_partition_iterator, - self.scale, - "partition_select", - "id_field", - [self.final_append_features.get(alias) for alias in self.alias], - ) + self.create_cartographic_partitions() + + self.partition_iteration() + self.export_dictionaries_to_json(file_name="post_everything") if __name__ == "__main__": + environment_setup.main() # Define your input feature classes and their aliases building_points = "building_points" building_polygons = "building_polygons" + church_hospital = "church_hospital" + restriction_lines = "restriction_lines" inputs = { building_points: [ @@ -553,24 +765,29 @@ def run(self): Building_N100.data_preparation___matrikkel_bygningspunkt___n100_building.value, ], building_polygons: [ - "context", + "input", input_n50.Grunnriss, ], } outputs = { - building_points: Building_N100.iteration__partition_iterator_final_output_points__n100.value, - building_polygons: Building_N100.iteration__partition_iterator_final_output_polygons__n100.value, + building_points: [ + "input", + Building_N100.iteration__partition_iterator_final_output_points__n100.value, + ], + building_polygons: [ + "input", + Building_N100.iteration__partition_iterator_final_output_polygons__n100.value, + ], } # Instantiate PartitionIterator with necessary parameters partition_iterator = PartitionIterator( alias_path_data=inputs, - # alias_path_outputs=outputs, + alias_path_outputs=outputs, root_file_partition_iterator=Building_N100.iteration__partition_iterator__n100.value, scale=env_setup.global_config.scale_n100, - output_feature_class=Building_N100.iteration__partition_iterator_final_output__n100.value, - # Add other parameters like custom_functions if you have any + dictionary_documentation_path=Building_N100.iteration___partition_iterator_json_documentation___building_n100.value, ) # Run the partition iterator diff --git a/custom_tools/partition_iterator_state_based.py b/custom_tools/partition_iterator_state_based.py index 45740ef5..a1f58a0e 100644 --- a/custom_tools/partition_iterator_state_based.py +++ b/custom_tools/partition_iterator_state_based.py @@ -1,11 +1,16 @@ import arcpy import os +import re +import shutil import random -from enum import Enum +import json +from typing import Dict, Tuple, Literal import env_setup.global_config from env_setup import environment_setup from custom_tools import custom_arcpy +from custom_tools.timing_decorator import timing_decorator + from input_data import input_n50 from file_manager.n100.file_manager_buildings import Building_N100 @@ -17,19 +22,20 @@ class PartitionIterator: # Class-level constants PARTITION_FIELD = "partition_select" - ORIGINAL_ID_FIELD = "orig_id_field" + ORIGINAL_ID_FIELD = "original_id_field" def __init__( self, - alias_path_data, - root_file_partition_iterator, - scale, - alias_path_outputs, + alias_path_data: Dict[str, Tuple[Literal["input", "context"], str]], + alias_path_outputs: Dict[str, Tuple[str, str]], + root_file_partition_iterator: str, + scale: str, custom_functions=None, - feature_count="15000", - partition_method="FEATURES", - search_distance="500 Meters", - object_id_field="OBJECTID", + dictionary_documentation_path: str = None, + feature_count: str = "15000", + partition_method: Literal["FEATURES", "VERTICES"] = "FEATURES", + search_distance: str = "500 Meters", + object_id_field: str = "OBJECTID", ): """ Initialize the PartitionIterator with input datasets for partitioning and processing. @@ -41,68 +47,107 @@ def __init__( :param feature_count: Feature count for cartographic partitioning. :param partition_method: Method used for creating cartographic partitions. """ - self.data = {} - for alias, info in alias_path_data.items(): - type_info, path_info = info - if alias not in self.data: - self.data[alias] = {} - self.data[alias][type_info] = path_info + # Raw inputs and initial setup + self.raw_input_data = alias_path_data + self.raw_output_data = alias_path_outputs or {} self.root_file_partition_iterator = root_file_partition_iterator + if "." in dictionary_documentation_path: + self.dictionary_documentation_path = re.sub( + r"\.[^.]*$", "", dictionary_documentation_path + ) + else: + self.dictionary_documentation_path = dictionary_documentation_path + self.scale = scale - self.output_feature_class = alias_path_outputs + self.search_distance = search_distance self.feature_count = feature_count self.partition_method = partition_method + self.object_id_field = object_id_field + + # Initial processing results + self.nested_alias_type_data = {} + self.nested_final_outputs = {} + + # Variables related to features and iterations self.partition_feature = ( f"{root_file_partition_iterator}_partition_feature_{scale}" ) - self.custom_functions = custom_functions or [] - self.iteration_file_paths = [] - self.final_append_features = {} - self.search_distance = search_distance - self.object_id_field = object_id_field - self.input_data_copy = None self.max_object_id = None - self.final_append_feature = None + self.iteration_file_paths_list = [] + self.first_call_directory_documentation = True - def integrate_initial_data(self, alias_path_data, custom_function_specs): + # Variables related to custom operations + self.custom_functions = custom_functions or [] + + # self.handle_data_export( + # file_path=self.dictionary_documentation_path, + # alias_type_data=self.nested_alias_type_data, + # final_outputs=self.nested_final_outputs, + # file_name="initialization", + # iteration=False, + # object_id=None, + # ) + + def unpack_alias_path_data(self, alias_path_data): # Process initial alias_path_data for inputs and outputs - for alias, (type_info, path_info) in alias_path_data.items(): - self.update_alias_state( - alias=alias, - type_info=type_info, - path=path_info, - ) + for alias, info in alias_path_data.items(): + type_info, path_info = info + if alias not in self.nested_alias_type_data: + self.nested_alias_type_data[alias] = {} + self.nested_alias_type_data[alias][type_info] = path_info + + def unpack_alias_path_outputs(self, alias_path_outputs): + self.nested_final_outputs = {} + for alias, info in alias_path_outputs.items(): + type_info, path_info = info + if alias not in self.nested_final_outputs: + self.nested_final_outputs[alias] = {} + self.nested_final_outputs[alias][type_info] = path_info + + def configure_alias_and_type( + self, + alias, + type_name, + type_path, + ): + # Check if alias exists + if alias not in self.nested_alias_type_data: + print(f"Alias '{alias}' not found in nested_alias_type_data.") + return + + # Update path of an existing type or add a new type with the provided path + self.nested_alias_type_data[alias][type_name] = type_path + print(f"Set path for type '{type_name}' in alias '{alias}' to: {type_path}") - for func_name, specs in custom_function_specs.items(): - for alias, types in specs.items(): - for type_info in types: - self.update_alias_state( - alias=alias, - type_info=type_info, - path=None, - ) - - def update_alias_state(self, alias, type_info, path=None): - if alias not in self.data: - self.data[alias] = {} - self.data[alias][type_info] = path - - def create_new_alias(self, alias, initial_states): - if alias in self.data: + def create_new_alias( + self, + alias, + initial_type_name=None, + initial_type_path=None, + ): + # Check if alias already exists + if alias in self.nested_alias_type_data: raise ValueError(f"Alias {alias} already exists.") - self.data[alias] = initial_states - def get_path_for_alias_and_type(self, alias, type_info): - return self.data.get(alias, {}).get(type_info) + # Initialize nested_alias_type_data for alias + if initial_type_name: + # Create alias with initial type and path + self.nested_alias_type_data[alias] = {initial_type_name: initial_type_path} + else: + # Initialize alias as an empty dictionary + self.nested_alias_type_data[alias] = {} + + print( + f"Created new alias '{alias}' in nested_alias_type_data with type '{initial_type_name}' and path: {initial_type_path}" + ) def create_cartographic_partitions(self): - print("Debugging self.data before partition creation:", self.data) all_features = [ path - for alias, types in self.data.items() + for alias, types in self.nested_alias_type_data.items() for type_key, path in types.items() - if type_key in ["input", "context"] and path is not None + if type_key in ["input_copy", "context_copy"] and path is not None ] print(f"all_features: {all_features}") @@ -128,25 +173,19 @@ def delete_feature_class(self, feature_class_path, alias=None, output_type=None) else: print(f"Deleted feature class: {feature_class_path}") - def delete_existing_outputs(self): - for alias, output_info in self.output_feature_class.items(): - output_type, output_path = output_info - current_path = self.data.get(alias, {}).get(output_type) - if current_path == output_path and arcpy.Exists(current_path): - PartitionIterator.delete_feature_class( - current_path, - alias=alias, - output_type=output_type, - ) - else: - print( - f"Output feature class for '{alias}' of type '{output_type}' does not exist or path does not match: {current_path}" - ) + def delete_final_outputs(self): + """Deletes all final output files if they exist.""" + for alias in self.nested_final_outputs: + for _, output_file_path in self.nested_final_outputs[alias].items(): + if arcpy.Exists(output_file_path): + arcpy.management.Delete(output_file_path) + print(f"Deleted file: {output_file_path}") def delete_iteration_files(self, *file_paths): """Deletes multiple feature classes or files. Detailed alias and output_type logging is not available here.""" for file_path in file_paths: self.delete_feature_class(file_path) + print(f"Deleted file: {file_path}") @staticmethod def create_feature_class(full_feature_path, template_feature): @@ -161,14 +200,14 @@ def create_feature_class(full_feature_path, template_feature): ) print(f"Created feature class: {full_feature_path}") - def create_dummy_features(self, types_to_include=["input", "context"]): + def create_dummy_features(self, types_to_include=["input_copy", "context_copy"]): """ Creates dummy features for aliases with specified types. Args: types_to_include (list): Types for which dummy features should be created. """ - for alias, alias_data in self.data.items(): + for alias, alias_data in self.nested_alias_type_data.items(): for type_info, path in list(alias_data.items()): if type_info in types_to_include and path: dummy_feature_path = f"{self.root_file_partition_iterator}_{alias}_dummy_{self.scale}" @@ -180,13 +219,160 @@ def create_dummy_features(self, types_to_include=["input", "context"]): f"Created dummy feature class for {alias} of type {type_info}: {dummy_feature_path}" ) # Update alias state to include this new dummy type and its path - self.update_alias_state( + self.configure_alias_and_type( alias=alias, - type_info="dummy", - path=dummy_feature_path, + type_name="dummy", + type_path=dummy_feature_path, ) - def pre_iteration(self): + def initialize_dummy_used(self): + # Assuming `aliases` is a list of all your aliases + for alias in self.nested_alias_type_data: + self.nested_alias_type_data[alias]["dummy_used"] = False + + def reset_dummy_used(self): + # Assuming `aliases` is a list of all your aliases + for alias in self.nested_alias_type_data: + self.nested_alias_type_data[alias]["dummy_used"] = False + + def update_empty_alias_type_with_dummy_file(self, alias, type_info): + # Check if the dummy type exists in the alias nested_alias_type_data + if "dummy" in self.nested_alias_type_data[alias]: + # Check if the input type exists in the alias nested_alias_type_data + if ( + type_info in self.nested_alias_type_data[alias] + and self.nested_alias_type_data[alias][type_info] is not None + ): + # Get the dummy path from the alias nested_alias_type_data + dummy_path = self.nested_alias_type_data[alias]["dummy"] + # Set the value of the existing type_info to the dummy path + self.nested_alias_type_data[alias][type_info] = dummy_path + self.nested_alias_type_data[alias]["dummy_used"] = True + print( + f"The '{type_info}' for alias '{alias}' was updated with dummy path: {dummy_path}" + ) + else: + print( + f"'{type_info}' does not exist for alias '{alias}' in nested_alias_type_data." + ) + else: + print( + f"'dummy' type does not exist for alias '{alias}' in nested_alias_type_data." + ) + + def create_directory_json_documentation( + self, + root_path: str, + target_dir: str, + iteration: bool, + ) -> str: + """ + Creates a directory at the given root_path for the target_dir. + Args: + root_path: The root directory where initial structure is created + target_dir: The target where the created directory should be placed + iteration: Boolean flag indicating if the iteration_documentation should be added + Returns: + A string containing the absolute path of the created directory. + """ + + # Determine base directory + directory_path = os.path.join(root_path, f"{target_dir}") + + # Ensure that the directory exists + os.makedirs(directory_path, exist_ok=True) + + if iteration: + iteration_documentation_dir = os.path.join( + directory_path, "iteration_documentation" + ) + os.makedirs(iteration_documentation_dir, exist_ok=True) + + return iteration_documentation_dir + + return directory_path + + def write_data_to_json( + self, + data: dict, + file_path: str, + file_name: str, + object_id=None, + ) -> None: + """ + Writes dictionary into a json file. + + Args: + data: The data to write. + file_path: The complete path (directory+file_name) where the file should be created + file_name: The name of the file to create + object_id: If provided, object_id will also be part of the file name. + """ + + if object_id: + complete_file_path = os.path.join( + file_path, f"{file_name}_{object_id}.json" + ) + else: + complete_file_path = os.path.join(file_path, f"{file_name}.json") + + with open(complete_file_path, "w") as f: + json.dump(data, f, indent=4) + + def export_dictionaries_to_json( + self, + file_path: str = None, + alias_type_data: dict = None, + final_outputs: dict = None, + file_name: str = None, + iteration: bool = False, + object_id=None, + ) -> None: + """ + Handles the export of alias type data and final outputs into separate json files. + + Args: + file_path: The complete file path where to create the output directories. + alias_type_data: The alias type data to export. + final_outputs: The final outputs data to export. + file_name: The name of the file to create + iteration: Boolean flag indicating if the iteration_documentation should be added + object_id: Object ID to be included in the file name if it's an iteration (`iteration==True`). If `None`, will not be used. + """ + + if file_path is None: + file_path = self.dictionary_documentation_path + if alias_type_data is None: + alias_type_data = self.nested_alias_type_data + if final_outputs is None: + final_outputs = self.nested_final_outputs + + if self.first_call_directory_documentation and os.path.exists(file_path): + shutil.rmtree(file_path) + self.first_call_directory_documentation = False + + alias_type_data_directory = self.create_directory_json_documentation( + file_path, "nested_alias_type_data", iteration + ) + final_outputs_directory = self.create_directory_json_documentation( + file_path, "nested_final_outputs", iteration + ) + + self.write_data_to_json( + alias_type_data, alias_type_data_directory, file_name, object_id + ) + self.write_data_to_json( + final_outputs, final_outputs_directory, file_name, object_id + ) + + def generate_unique_field_name(self, input_feature, field_name): + existing_field_names = [field.name for field in arcpy.ListFields(input_feature)] + unique_field_name = field_name + while unique_field_name in existing_field_names: + unique_field_name = f"{unique_field_name}_{random.randint(0, 9)}" + return unique_field_name + + def find_maximum_object_id(self): """ Determine the maximum OBJECTID for partitioning. """ @@ -197,16 +383,15 @@ def pre_iteration(self): self.object_id_field, sql_clause=(None, f"ORDER BY {self.object_id_field} DESC"), ) as cursor: - max_object_id = next(cursor)[0] + self.max_object_id = next(cursor)[0] - print(f"Maximum {self.object_id_field} found: {max_object_id}") + print(f"Maximum {self.object_id_field} found: {self.max_object_id}") - return max_object_id except Exception as e: print(f"Error in finding max {self.object_id_field}: {e}") def prepare_input_data(self): - for alias, types in self.data.items(): + for alias, types in self.nested_alias_type_data.items(): if "input" in types: input_data_path = types["input"] input_data_copy = ( @@ -217,13 +402,24 @@ def prepare_input_data(self): in_data=input_data_path, out_data=input_data_copy, ) - print(f"Copied input data for: {alias}") + print(f"Copied input nested_alias_type_data for: {alias}") - # Update the path for 'input' type to the new copied path - self.update_alias_state( + # Add a new type for the alias the copied input nested_alias_type_data + self.configure_alias_and_type( alias=alias, - type_info="input", - path=input_data_copy, + type_name="input_copy", + type_path=input_data_copy, + ) + + # Making sure the field is unique if it exists a field with the same name + self.PARTITION_FIELD = self.generate_unique_field_name( + input_feature=input_data_copy, + field_name=self.PARTITION_FIELD, + ) + + self.ORIGINAL_ID_FIELD = self.generate_unique_field_name( + input_feature=input_data_copy, + field_name=self.ORIGINAL_ID_FIELD, ) arcpy.AddField_management( @@ -233,31 +429,37 @@ def prepare_input_data(self): ) print(f"Added field {self.PARTITION_FIELD}") - # Making sure the field is unique if it exists a field with the same name - existing_field_names = [ - field.name for field in arcpy.ListFields(input_data_copy) - ] - unique_orig_id_field = self.ORIGINAL_ID_FIELD - while unique_orig_id_field in existing_field_names: - unique_orig_id_field = ( - f"{self.ORIGINAL_ID_FIELD}_{random.randint(0, 9)}" - ) arcpy.AddField_management( in_table=input_data_copy, - field_name=unique_orig_id_field, + field_name=self.ORIGINAL_ID_FIELD, field_type="LONG", ) - print(f"Added field {unique_orig_id_field}") + print(f"Added field {self.ORIGINAL_ID_FIELD}") arcpy.CalculateField_management( in_table=input_data_copy, - field=unique_orig_id_field, + field=self.ORIGINAL_ID_FIELD, expression=f"!{self.object_id_field}!", ) - print(f"Calculated field {unique_orig_id_field}") + print(f"Calculated field {self.ORIGINAL_ID_FIELD}") + + if "context" in types: + context_data_path = types["context"] + context_data_copy = ( + f"{self.root_file_partition_iterator}_{alias}_context_copy" + ) - # Update the instance variable if a new unique field name was created - self.ORIGINAL_ID_FIELD = unique_orig_id_field + arcpy.management.Copy( + in_data=context_data_path, + out_data=context_data_copy, + ) + print(f"Copied context nested_alias_type_data for: {alias}") + + self.configure_alias_and_type( + alias=alias, + type_name="context_copy", + type_path=context_data_copy, + ) def custom_function(inputs): outputs = [] @@ -267,13 +469,13 @@ def select_partition_feature(self, iteration_partition, object_id): """ Selects partition feature based on OBJECTID. """ - self.iteration_file_paths.append(iteration_partition) + self.iteration_file_paths_list.append(iteration_partition) custom_arcpy.select_attribute_and_make_permanent_feature( input_layer=self.partition_feature, expression=f"{self.object_id_field} = {object_id}", output_name=iteration_partition, ) - print(f"Created partition selection for OBJECTID {object_id}") + print(f"\nCreated partition selection for OBJECTID {object_id}") def process_input_features( self, @@ -284,17 +486,18 @@ def process_input_features( """ Process input features for a given partition. """ - if "input" not in self.data[alias]: + if "input_copy" not in self.nested_alias_type_data[alias]: + # If there are no inputs to process, return None for the aliases and a flag indicating no input was present. return None, False - if "input" in self.data[alias]: - input_path = self.data[alias]["input"] + if "input_copy" in self.nested_alias_type_data[alias]: + input_path = self.nested_alias_type_data[alias]["input_copy"] input_features_partition_selection = ( f"in_memory/{alias}_partition_base_select_{self.scale}" ) - self.iteration_file_paths.append(input_features_partition_selection) + self.iteration_file_paths_list.append(input_features_partition_selection) - input_feature_count = custom_arcpy.select_location_and_make_feature_layer( + custom_arcpy.select_location_and_make_feature_layer( input_layer=input_path, overlap_type=custom_arcpy.OverlapType.HAVE_THEIR_CENTER_IN.value, select_features=iteration_partition, @@ -311,7 +514,6 @@ def process_input_features( if aliases_with_features[alias] > 0: print(f"{alias} has {count_points} features in {iteration_partition}") - # aliases_with_features += 1 arcpy.CalculateField_management( in_table=input_features_partition_selection, @@ -320,7 +522,7 @@ def process_input_features( ) iteration_append_feature = f"{self.root_file_partition_iterator}_{alias}_iteration_append_feature_{self.scale}" - self.iteration_file_paths.append(iteration_append_feature) + self.iteration_file_paths_list.append(iteration_append_feature) PartitionIterator.create_feature_class( full_feature_path=iteration_append_feature, @@ -334,7 +536,7 @@ def process_input_features( ) input_features_partition_context_selection = f"in_memory/{alias}_input_features_partition_context_selection_{self.scale}" - self.iteration_file_paths.append( + self.iteration_file_paths_list.append( input_features_partition_context_selection ) @@ -366,23 +568,38 @@ def process_input_features( schema_type="NO_TEST", ) + self.configure_alias_and_type( + alias=alias, + type_name="input", + type_path=iteration_append_feature, + ) + print( f"iteration partition {input_features_partition_context_selection} appended to {iteration_append_feature}" ) + # Return the processed input features and a flag indicating successful operation return aliases_with_features, True else: + # Loads in dummy feature for this alias for this iteration and sets dummy_used = True + self.update_empty_alias_type_with_dummy_file( + alias, + type_info="input", + ) print( f"iteration partition {object_id} has no features for {alias} in the partition feature" ) + # If there are no inputs to process, return None for the aliases and a flag indicating no input was present. return None, False def _process_inputs_in_partition(self, aliases, iteration_partition, object_id): inputs_present_in_partition = False for alias in aliases: - if "input" in self.data[alias]: + if "input_copy" in self.nested_alias_type_data[alias]: + # Using process_input_features to check whether inputs are present _, input_present = self.process_input_features( alias, iteration_partition, object_id ) + # Sets inputs_present_in_partition as True if any alias in partition has input present. Otherwise it remains False. inputs_present_in_partition = ( inputs_present_in_partition or input_present ) @@ -392,10 +609,10 @@ def process_context_features(self, alias, iteration_partition): """ Process context features for a given partition if input features are present. """ - if "context" in self.data[alias]: - context_path = self.data[alias]["context"] + if "context_copy" in self.nested_alias_type_data[alias]: + context_path = self.nested_alias_type_data[alias]["context_copy"] context_selection_path = f"{self.root_file_partition_iterator}_{alias}_context_iteration_selection" - self.iteration_file_paths.append(context_selection_path) + self.iteration_file_paths_list.append(context_selection_path) custom_arcpy.select_location_and_make_permanent_feature( input_layer=context_path, @@ -406,34 +623,94 @@ def process_context_features(self, alias, iteration_partition): search_distance=self.search_distance, ) + self.configure_alias_and_type( + alias=alias, + type_name="context", + type_path=context_selection_path, + ) + def _process_context_features_and_others( self, aliases, iteration_partition, object_id ): for alias in aliases: - if "context" not in self.data[alias]: + if "context_copy" not in self.nested_alias_type_data[alias]: + # Loads in dummy feature for this alias for this iteration and sets dummy_used = True + self.update_empty_alias_type_with_dummy_file( + alias, + type_info="context", + ) print( - f"iteration partition {object_id} has no features for {alias} in the partition feature" + f"iteration partition {object_id} has no context features for {alias} in the partition feature" ) else: self.process_context_features(alias, iteration_partition) + def append_iteration_to_final(self, alias): + # Guard clause if alias doesn't exist in nested_final_outputs + if alias not in self.nested_final_outputs: + return + + # For each type under current alias, append the result of the current iteration + for type_info, final_output_path in self.nested_final_outputs[alias].items(): + # Skipping append if the alias is a dummy feature + if self.nested_alias_type_data[alias]["dummy_used"]: + continue + + input_feature_class = self.nested_alias_type_data[alias][type_info] + + if ( + not arcpy.Exists(input_feature_class) + or int(arcpy.GetCount_management(input_feature_class).getOutput(0)) <= 0 + ): + print( + f"No features found in partition target selection: {input_feature_class}" + ) + continue + + partition_target_selection = ( + f"in_memory/{alias}_{type_info}_partition_target_selection_{self.scale}" + ) + self.iteration_file_paths_list.append(partition_target_selection) + self.iteration_file_paths_list.append(input_feature_class) + + # Apply feature selection + custom_arcpy.select_attribute_and_make_permanent_feature( + input_layer=input_feature_class, + expression=f"{self.PARTITION_FIELD} = 1", + output_name=partition_target_selection, + ) + + if not arcpy.Exists(final_output_path): + arcpy.management.CopyFeatures( + in_features=partition_target_selection, + out_feature_class=final_output_path, + ) + + else: + arcpy.management.Append( + inputs=partition_target_selection, + target=final_output_path, + schema_type="NO_TEST", + ) + def partition_iteration(self): - aliases = self.data.keys() - max_object_id = self.pre_iteration() - - self.delete_existing_outputs() - self.create_dummy_features( - types_to_include=[ - "input", - "context", - ] - ) - for alias in aliases: - self.delete_iteration_files(*self.iteration_file_paths) - self.iteration_file_paths.clear() + aliases = self.nested_alias_type_data.keys() + self.find_maximum_object_id() - for object_id in range(1, max_object_id + 1): - self.iteration_file_paths.clear() + self.create_dummy_features(types_to_include=["input_copy", "context_copy"]) + self.initialize_dummy_used() + + self.delete_iteration_files(*self.iteration_file_paths_list) + self.iteration_file_paths_list.clear() + + for object_id in range(1, self.max_object_id + 1): + self.reset_dummy_used() + self.export_dictionaries_to_json( + file_name="iteration_start", + iteration=True, + object_id=object_id, + ) + self.iteration_file_paths_list.clear() iteration_partition = f"{self.partition_feature}_{object_id}" self.select_partition_feature(iteration_partition, object_id) @@ -444,58 +721,34 @@ def partition_iteration(self): self._process_context_features_and_others( aliases, iteration_partition, object_id ) - else: + if inputs_present_in_partition: for alias in aliases: - self.delete_iteration_files(*self.iteration_file_paths) - - # # Process each alias after custom functions - # for alias in self.data.keys(): - # if inputs_present_in_partition: - # # Retrieve the output path for the current alias - # output_path = self.outputs.get(alias) - # iteration_append_feature = f"{self.root_file_partition_iterator}_{alias}_iteration_append_feature_{scale}" - # - # if not arcpy.Exists(output_path): - # self.create_feature_class( - # out_path=os.path.dirname(output_path), - # out_name=os.path.basename(output_path), - # template_feature=iteration_append_feature, - # ) - # - # partition_target_selection = ( - # f"in_memory/{alias}_partition_target_selection_{self.scale}" - # ) - # self.iteration_file_paths.append(partition_target_selection) - # - # custom_arcpy.select_attribute_and_make_permanent_feature( - # input_layer=iteration_append_feature, - # expression=f"{self.PARTITION_FIELD} = 1", - # output_name=partition_target_selection, - # ) - # - # print( - # f"for {alias} in {iteration_append_feature} \nThe input is: {partition_target_selection}\nAppending to {output_path}" - # ) - # - # arcpy.management.Append( - # inputs=partition_target_selection, - # target=output_path, - # schema_type="NO_TEST", - # ) - # else: - # print( - # f"No features found in {alias} for {self.object_id_field} {object_id} to append to {output_path}" - # ) - - # for alias in self.alias: - # self.delete_iteration_files(*self.iteration_file_paths) - # print(f"Finished iteration {object_id}") + self.append_iteration_to_final(alias) + self.delete_iteration_files(*self.iteration_file_paths_list) + else: + self.delete_iteration_files(*self.iteration_file_paths_list) + + self.export_dictionaries_to_json( + file_name="iteration_end", + iteration=True, + object_id=object_id, + ) + @timing_decorator def run(self): + self.unpack_alias_path_data(self.raw_input_data) + if self.raw_output_data is not None: + self.unpack_alias_path_outputs(self.raw_output_data) + + self.export_dictionaries_to_json(file_name="post_alias_unpack") + + self.delete_final_outputs() self.prepare_input_data() + self.create_cartographic_partitions() self.partition_iteration() + self.export_dictionaries_to_json(file_name="post_everything") if __name__ == "__main__": @@ -503,6 +756,8 @@ def run(self): # Define your input feature classes and their aliases building_points = "building_points" building_polygons = "building_polygons" + church_hospital = "church_hospital" + restriction_lines = "restriction_lines" inputs = { building_points: [ @@ -510,7 +765,7 @@ def run(self): Building_N100.data_preparation___matrikkel_bygningspunkt___n100_building.value, ], building_polygons: [ - "context", + "input", input_n50.Grunnriss, ], } @@ -520,10 +775,10 @@ def run(self): "input", Building_N100.iteration__partition_iterator_final_output_points__n100.value, ], - # building_polygons: [ - # "output", - # Building_N100.iteration__partition_iterator_final_output_polygons__n100.value, - # ], + building_polygons: [ + "input", + Building_N100.iteration__partition_iterator_final_output_polygons__n100.value, + ], } # Instantiate PartitionIterator with necessary parameters @@ -532,18 +787,51 @@ def run(self): alias_path_outputs=outputs, root_file_partition_iterator=Building_N100.iteration__partition_iterator__n100.value, scale=env_setup.global_config.scale_n100, + dictionary_documentation_path=Building_N100.iteration___partition_iterator_json_documentation___building_n100.value, ) # Run the partition iterator partition_iterator.run() + # inputs_2 = { + # church_hospital: [ + # "input", + # Building_N100.polygon_propogate_displacement___hospital_church_points___n100_building.value, + # ], + # restriction_lines: [ + # "input", + # Building_N100.polygon_propogate_displacement___begrensningskurve_500m_from_displaced_polygon___n100_building.value, + # ], + # } + # + # outputs_2 = { + # church_hospital: [ + # "input", + # f"{Building_N100.iteration__partition_iterator_final_output_points__n100.value}_church_hospital", + # ], + # restriction_lines: [ + # "input", + # f"{Building_N100.iteration__partition_iterator_final_output_polygons__n100.value}_restriction_lines", + # ], + # } + # + # partition_iterator_2 = PartitionIterator( + # alias_path_data=inputs_2, + # alias_path_outputs=outputs_2, + # root_file_partition_iterator=f"{Building_N100.iteration__partition_iterator__n100.value}_2", + # scale=env_setup.global_config.scale_n100, + # ) + # + # # Run the partition iterator + # partition_iterator_2.run() + """" Can I use pattern matching (match) to find the alias for each param? -self.data = { +self.nested_alias_type_data = { 'alias_1': { 'input': 'file_path_1', 'function_1': 'file_path_3', @@ -615,8 +903,6 @@ def run(self): # Thoughts on PartitionIterator: """ - -Need to decide if I will use class based variables or if I will implemented having a nested class -handling class based variables and file_path definitions inside the function. +Building_N100.iteration___json_documentation_after___building_n100.value """ diff --git a/custom_tools/timing_decorator.py b/custom_tools/timing_decorator.py index 31cf5e1f..1bac292f 100644 --- a/custom_tools/timing_decorator.py +++ b/custom_tools/timing_decorator.py @@ -1,75 +1,53 @@ import time +import os from functools import wraps -# Importing temporary files from file_manager.n100.file_manager_buildings import Building_N100 -import time -from functools import wraps -# Importing temporary files -from file_manager.n100.file_manager_buildings import Building_N100 +def timing_decorator(func): + """Logs the execution time of a function to both the console and a log file""" + + @wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + + result = func(*args, **kwargs) + + elapsed_time = compute_elapsed_time(start_time) + + log_to_console_and_file(func.__name__, elapsed_time) + + return result + + return wrapper + + +def compute_elapsed_time(start_time): + """Computes the elapsed time given a starting time""" + elapsed_time_seconds = time.time() - start_time + + elapsed_minutes, elapsed_seconds = divmod(elapsed_time_seconds, 60) + + return elapsed_minutes, elapsed_seconds + + +def log_to_console_and_file(function_name, elapsed_time): + """Logs a messages to both the console and a file""" + elapsed_minutes, elapsed_seconds = elapsed_time + output = f"{function_name} execution time: {int(elapsed_minutes)} minutes {elapsed_seconds:.0f} seconds" + + log_to_console(output) + log_to_file(output) + + +def log_to_console(message): + """Prints a given message to the console""" + print(message) + -# List to store print statements -print_output = [] - -# Decorator to measure execution time of functions -def timing_decorator(arg=None): - if isinstance(arg, str): # If arg is a string, use it as a custom name - custom_name = arg - - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - elapsed_time = end_time - start_time - minutes = int(elapsed_time // 60) - seconds = elapsed_time % 60 - output = f"{custom_name} execution time: {minutes} minutes {seconds:.2f} seconds" - print_output.append(output) # Append to the list - return result - - return wrapper - - return decorator - else: # If arg is not a string (or None), use the function name as the default name - func = arg - - @wraps(func) - def wrapper(*args, **kwargs): - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - elapsed_time = end_time - start_time - minutes = int(elapsed_time // 60) - seconds = elapsed_time % 60 - output = f"{func.__name__} execution time: {minutes} minutes {seconds:.2f} seconds" - print_output.append(output) # Append to the list - return result - - return wrapper - - -# Total elapsed time accumulator -total_elapsed_time = 0 - -# Calculate total elapsed time -for line in print_output: - minutes = int(line.split(":")[1].split()[0]) - seconds = float(line.split(":")[1].split()[2]) - total_elapsed_time += minutes * 60 + seconds - -# Write all print statements to a file -output_file = Building_N100.overview__runtime_all_building_functions__n100.value - -# Write total elapsed time to the file -with open(output_file, "w") as f: - f.write( - f"Total run time: {int(total_elapsed_time // 3600)} hours {int((total_elapsed_time % 3600) // 60)} minutes {total_elapsed_time % 60:.2f} seconds\n\n" - ) - - # Write all print statements to the file with additional newline characters - for line in print_output: - f.write(line + "\n") +def log_to_file(message): + """Writes a given message to a log file""" + log_file_path = Building_N100.overview__runtime_all_building_functions__n100.value + with open(log_file_path, "a") as f: + f.write(message + "\n") diff --git a/file_manager/n100/file_manager_buildings.py b/file_manager/n100/file_manager_buildings.py index 95c9a258..13ba2468 100644 --- a/file_manager/n100/file_manager_buildings.py +++ b/file_manager/n100/file_manager_buildings.py @@ -893,17 +893,11 @@ class Building_N100(Enum): description="building_polygon_base_partition_selection", ) ) - iteration___json_documentation_before___building_n100 = ( - file_manager.generate_file_name_general_files( - script_source_name=iteration, - description="json_documentation_before", - file_type="json", - ) - ) - iteration___json_documentation_after___building_n100 = ( + + iteration___partition_iterator_json_documentation___building_n100 = ( file_manager.generate_file_name_general_files( script_source_name=iteration, - description="json_documentation_after", + description="partition_iterator_json_documentation", file_type="json", ) ) diff --git a/generalization/n100/building/not_in_use_models/testing_buildings.py b/generalization/n100/building/not_in_use_models/testing_buildings.py new file mode 100644 index 00000000..e69de29b