From 28e5865727a95c202097279d289e59e20895d271 Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Thu, 31 Oct 2024 16:52:30 -0300 Subject: [PATCH 1/6] + GeoJSON stats --- API/api_worker.py | 29 ++++++++++++++++++------ src/app.py | 57 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 75 insertions(+), 11 deletions(-) diff --git a/API/api_worker.py b/API/api_worker.py index 096f2f0f..1f4e559f 100644 --- a/API/api_worker.py +++ b/API/api_worker.py @@ -15,7 +15,7 @@ from celery import Celery # Reader imports -from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer +from src.app import CustomExport, PolygonStats, GeoJSONStats, RawData, S3FileTransfer from src.config import ALLOW_BIND_ZIP_FILTER from src.config import CELERY_BROKER_URL as celery_broker_uri from src.config import CELERY_RESULT_BACKEND as celery_backend @@ -75,7 +75,7 @@ def create_readme_content(default_readme, polygon_stats): def zip_binding( - working_dir, exportname_parts, geom_dump, polygon_stats, default_readme + working_dir, exportname_parts, geom_dump, polygon_stats, geojson_stats, default_readme ): logging.debug("Zip Binding Started!") upload_file_path = os.path.join( @@ -88,6 +88,9 @@ def zip_binding( ), } + if geojson_stats: + additional_files["stats.json"] = geojson_stats + for name, content in additional_files.items(): temp_path = os.path.join(working_dir, name) with open(temp_path, "w") as f: @@ -165,7 +168,6 @@ def on_failure(self, exc, task_id, args, kwargs, einfo): if os.path.exists(clean_dir): shutil.rmtree(clean_dir) - @celery.task( bind=True, name="process_raw_data", @@ -209,11 +211,22 @@ def process_raw_data(self, params, user=None): file_parts, ) - geom_area, geom_dump, working_dir = RawData( - params, str(self.request.id) - ).extract_current_data(file_parts) - inside_file_size = 0 polygon_stats = None + geojson_stats = None + + if "include_stats" in params.dict(): + if params.include_stats: + geoJSONStats = GeoJSONStats(params.filters) + geom_area, geom_dump, working_dir = RawData( + params, str(self.request.id) + ).extract_current_data(file_parts, geoJSONStats.raw_data_line_stats) + geojson_stats = geoJSONStats.json() + else: + geom_area, geom_dump, working_dir = RawData( + params, str(self.request.id) + ).extract_current_data(file_parts) + + inside_file_size = 0 if "include_stats" in params.dict(): if params.include_stats: feature = { @@ -222,12 +235,14 @@ def process_raw_data(self, params, user=None): "properties": {}, } polygon_stats = PolygonStats(feature).get_summary_stats() + if bind_zip: upload_file_path, inside_file_size = zip_binding( working_dir=working_dir, exportname_parts=exportname_parts, geom_dump=geom_dump, polygon_stats=polygon_stats, + geojson_stats=geojson_stats, default_readme=DEFAULT_README_TEXT, ) diff --git a/src/app.py b/src/app.py index fb5e82d5..c477fb2e 100644 --- a/src/app.py +++ b/src/app.py @@ -47,6 +47,7 @@ from psycopg2.extras import DictCursor from slugify import slugify from tqdm import tqdm +from geojson_stats.stats import Stats # Reader imports from src.config import ( @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params): os.remove(query_path) @staticmethod - def query2geojson(con, extraction_query, dump_temp_file_path): + def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn = None): """Function written from scratch without being dependent on any library, Provides better performance for geojson binding""" # creating geojson file pre_geojson = """{"type": "FeatureCollection","features": [""" @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path): for row in cursor: if first: first = False - f.write(row[0]) else: f.write(",") - f.write(row[0]) + if plugin_fn: + f.write(plugin_fn(row[0])) + else: + f.write((row[0])) cursor.close() # closing connection to avoid memory issues # close the writing geojson with last part f.write(post_geojson) @@ -711,7 +714,7 @@ def get_grid_id(geom, cur): country_export, ) - def extract_current_data(self, exportname): + def extract_current_data(self, exportname, plugin_fn = None): """Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump Args: exportname: takes filename as argument to create geojson file passed from routers @@ -777,6 +780,7 @@ def extract_current_data(self, exportname): country_export=country_export, ), dump_temp_file_path, + plugin_fn ) # uses own conversion class if output_type == RawDataOutputType.SHAPEFILE.value: ( @@ -2255,3 +2259,48 @@ def get_summary_stats(self, start_date, end_date, group_by): result = self.cur.fetchall() self.d_b.close_conn() return [dict(item) for item in result] + +class GeoJSONStats(Stats): + """Used for collecting stats while processing GeoJSON files line by line""" + + def __init__(self, filters, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.config.clean = True + self.config.properties_prop = "properties.tags" + + if filters and filters.tags: + config_area = ["building"] + config_length = ["highway", "waterway"] + + for tag in config_area: + if self.check_filter(filters.tags, tag): + self.config.keys.append(tag) + self.config.value_keys.append(tag) + self.config.area = True + for tag in config_length: + if self.check_filter(filters.tags, tag): + self.config.keys.append(tag) + self.config.value_keys.append(tag) + self.config.length = True + + def check_filter(self, tags, tag): + if tags.all_geometry: + if tags.all_geometry.join_or and tag in tags.all_geometry.join_or: + return True + if tags.all_geometry.join_and and tag in tags.all_geometry.join_and: + return True + if tags.polygon: + if tags.polygon.join_or and tag in tags.polygon.join_or: + return True + if tags.polygon.join_and and tag in tags.polygon.join_and: + return True + if tags.line: + if tags.line.join_or and tag in tags.line.join_or: + return True + if tags.line.join_and and tag in tags.line.join_and: + return True + + def raw_data_line_stats(self, line: str): + self.process_file_line(line) + return line From b18936291dd456d68acf0c42a921c335638d75cb Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Mon, 4 Nov 2024 09:16:26 -0300 Subject: [PATCH 2/6] + Code docs, requirement --- requirements.txt | 4 ++++ src/app.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/requirements.txt b/requirements.txt index c295c164..73fcb7e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -54,3 +54,7 @@ psutil==5.9.8 ## logging tqdm==4.66.2 + +# stats for geojson data +geojson-stats @ git+https://github.com/emi420/geojson-stats@v0.0.1-alpha + diff --git a/src/app.py b/src/app.py index c477fb2e..102754a4 100644 --- a/src/app.py +++ b/src/app.py @@ -2285,6 +2285,10 @@ def __init__(self, filters, *args, **kwargs): self.config.length = True def check_filter(self, tags, tag): + """ + Check if a tag is present in tag filters + """ + if tags.all_geometry: if tags.all_geometry.join_or and tag in tags.all_geometry.join_or: return True From 5ed27a600f5e8b27296ea3ec4b8acd3e970d6cc0 Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Mon, 4 Nov 2024 09:52:12 -0300 Subject: [PATCH 3/6] Change requirement (geojson-stats --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 73fcb7e6..6e87b47b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,5 +56,5 @@ psutil==5.9.8 tqdm==4.66.2 # stats for geojson data -geojson-stats @ git+https://github.com/emi420/geojson-stats@v0.0.1-alpha +geojson-stats==0.1.0 From 5eaee149629aad2eae3714b82a6c6f86022834d4 Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Mon, 4 Nov 2024 09:59:48 -0300 Subject: [PATCH 4/6] + Black code linting --- API/api_worker.py | 8 ++++- src/app.py | 11 +++++-- src/validation/models.py | 64 ++++++++++++++++++++-------------------- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/API/api_worker.py b/API/api_worker.py index 1f4e559f..0a90ce21 100644 --- a/API/api_worker.py +++ b/API/api_worker.py @@ -75,7 +75,12 @@ def create_readme_content(default_readme, polygon_stats): def zip_binding( - working_dir, exportname_parts, geom_dump, polygon_stats, geojson_stats, default_readme + working_dir, + exportname_parts, + geom_dump, + polygon_stats, + geojson_stats, + default_readme, ): logging.debug("Zip Binding Started!") upload_file_path = os.path.join( @@ -168,6 +173,7 @@ def on_failure(self, exc, task_id, args, kwargs, einfo): if os.path.exists(clean_dir): shutil.rmtree(clean_dir) + @celery.task( bind=True, name="process_raw_data", diff --git a/src/app.py b/src/app.py index 102754a4..b675001a 100644 --- a/src/app.py +++ b/src/app.py @@ -641,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params): os.remove(query_path) @staticmethod - def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn = None): + def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn=None): """Function written from scratch without being dependent on any library, Provides better performance for geojson binding""" # creating geojson file pre_geojson = """{"type": "FeatureCollection","features": [""" @@ -714,7 +714,7 @@ def get_grid_id(geom, cur): country_export, ) - def extract_current_data(self, exportname, plugin_fn = None): + def extract_current_data(self, exportname, plugin_fn=None): """Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump Args: exportname: takes filename as argument to create geojson file passed from routers @@ -780,7 +780,7 @@ def extract_current_data(self, exportname, plugin_fn = None): country_export=country_export, ), dump_temp_file_path, - plugin_fn + plugin_fn, ) # uses own conversion class if output_type == RawDataOutputType.SHAPEFILE.value: ( @@ -2260,6 +2260,7 @@ def get_summary_stats(self, start_date, end_date, group_by): self.d_b.close_conn() return [dict(item) for item in result] + class GeoJSONStats(Stats): """Used for collecting stats while processing GeoJSON files line by line""" @@ -2305,6 +2306,10 @@ def check_filter(self, tags, tag): if tags.line.join_and and tag in tags.line.join_and: return True + """ + Process a GeoJSON line (for getting stats) and return that line + """ + def raw_data_line_stats(self, line: str): self.process_file_line(line) return line diff --git a/src/validation/models.py b/src/validation/models.py index 1b1b0e92..4b757447 100644 --- a/src/validation/models.py +++ b/src/validation/models.py @@ -303,22 +303,22 @@ class StatsRequestParams(BaseModel, GeometryValidatorMixin): max_length=3, example="NPL", ) - geometry: Optional[ - Union[Polygon, MultiPolygon, Feature, FeatureCollection] - ] = Field( - default=None, - example={ - "type": "Polygon", - "coordinates": [ - [ - [83.96919250488281, 28.194446860487773], - [83.99751663208006, 28.194446860487773], - [83.99751663208006, 28.214869548073377], - [83.96919250488281, 28.214869548073377], - [83.96919250488281, 28.194446860487773], - ] - ], - }, + geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = ( + Field( + default=None, + example={ + "type": "Polygon", + "coordinates": [ + [ + [83.96919250488281, 28.194446860487773], + [83.99751663208006, 28.194446860487773], + [83.99751663208006, 28.214869548073377], + [83.96919250488281, 28.214869548073377], + [83.96919250488281, 28.194446860487773], + ] + ], + }, + ) ) @validator("geometry", pre=True, always=True) @@ -624,22 +624,22 @@ class DynamicCategoriesModel(CategoriesBase, GeometryValidatorMixin): max_length=3, example="USA", ) - geometry: Optional[ - Union[Polygon, MultiPolygon, Feature, FeatureCollection] - ] = Field( - default=None, - example={ - "type": "Polygon", - "coordinates": [ - [ - [83.96919250488281, 28.194446860487773], - [83.99751663208006, 28.194446860487773], - [83.99751663208006, 28.214869548073377], - [83.96919250488281, 28.214869548073377], - [83.96919250488281, 28.194446860487773], - ] - ], - }, + geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = ( + Field( + default=None, + example={ + "type": "Polygon", + "coordinates": [ + [ + [83.96919250488281, 28.194446860487773], + [83.99751663208006, 28.194446860487773], + [83.99751663208006, 28.214869548073377], + [83.96919250488281, 28.214869548073377], + [83.96919250488281, 28.194446860487773], + ] + ], + }, + ) ) @validator("geometry", pre=True, always=True) From d2aca194a207a4f287f6aa9cb13326cb9cea526b Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Mon, 4 Nov 2024 10:08:43 -0300 Subject: [PATCH 5/6] Format doc --- src/app.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/app.py b/src/app.py index b675001a..60d8fcdf 100644 --- a/src/app.py +++ b/src/app.py @@ -2306,10 +2306,9 @@ def check_filter(self, tags, tag): if tags.line.join_and and tag in tags.line.join_and: return True - """ - Process a GeoJSON line (for getting stats) and return that line - """ - def raw_data_line_stats(self, line: str): + """ + Process a GeoJSON line (for getting stats) and return that line + """ self.process_file_line(line) return line From 0e2e37863f26945cb1500818de13d7791af6ec64 Mon Sep 17 00:00:00 2001 From: Emilio Mariscal Date: Mon, 4 Nov 2024 10:22:51 -0300 Subject: [PATCH 6/6] Code reformatting using Black v23 --- src/validation/models.py | 64 ++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/src/validation/models.py b/src/validation/models.py index 4b757447..1b1b0e92 100644 --- a/src/validation/models.py +++ b/src/validation/models.py @@ -303,22 +303,22 @@ class StatsRequestParams(BaseModel, GeometryValidatorMixin): max_length=3, example="NPL", ) - geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = ( - Field( - default=None, - example={ - "type": "Polygon", - "coordinates": [ - [ - [83.96919250488281, 28.194446860487773], - [83.99751663208006, 28.194446860487773], - [83.99751663208006, 28.214869548073377], - [83.96919250488281, 28.214869548073377], - [83.96919250488281, 28.194446860487773], - ] - ], - }, - ) + geometry: Optional[ + Union[Polygon, MultiPolygon, Feature, FeatureCollection] + ] = Field( + default=None, + example={ + "type": "Polygon", + "coordinates": [ + [ + [83.96919250488281, 28.194446860487773], + [83.99751663208006, 28.194446860487773], + [83.99751663208006, 28.214869548073377], + [83.96919250488281, 28.214869548073377], + [83.96919250488281, 28.194446860487773], + ] + ], + }, ) @validator("geometry", pre=True, always=True) @@ -624,22 +624,22 @@ class DynamicCategoriesModel(CategoriesBase, GeometryValidatorMixin): max_length=3, example="USA", ) - geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = ( - Field( - default=None, - example={ - "type": "Polygon", - "coordinates": [ - [ - [83.96919250488281, 28.194446860487773], - [83.99751663208006, 28.194446860487773], - [83.99751663208006, 28.214869548073377], - [83.96919250488281, 28.214869548073377], - [83.96919250488281, 28.194446860487773], - ] - ], - }, - ) + geometry: Optional[ + Union[Polygon, MultiPolygon, Feature, FeatureCollection] + ] = Field( + default=None, + example={ + "type": "Polygon", + "coordinates": [ + [ + [83.96919250488281, 28.194446860487773], + [83.99751663208006, 28.194446860487773], + [83.99751663208006, 28.214869548073377], + [83.96919250488281, 28.214869548073377], + [83.96919250488281, 28.194446860487773], + ] + ], + }, ) @validator("geometry", pre=True, always=True)