diff --git a/API/api_worker.py b/API/api_worker.py index 096f2f0f..0a90ce21 100644 --- a/API/api_worker.py +++ b/API/api_worker.py @@ -15,7 +15,7 @@ from celery import Celery # Reader imports -from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer +from src.app import CustomExport, PolygonStats, GeoJSONStats, RawData, S3FileTransfer from src.config import ALLOW_BIND_ZIP_FILTER from src.config import CELERY_BROKER_URL as celery_broker_uri from src.config import CELERY_RESULT_BACKEND as celery_backend @@ -75,7 +75,12 @@ def create_readme_content(default_readme, polygon_stats): def zip_binding( - working_dir, exportname_parts, geom_dump, polygon_stats, default_readme + working_dir, + exportname_parts, + geom_dump, + polygon_stats, + geojson_stats, + default_readme, ): logging.debug("Zip Binding Started!") upload_file_path = os.path.join( @@ -88,6 +93,9 @@ def zip_binding( ), } + if geojson_stats: + additional_files["stats.json"] = geojson_stats + for name, content in additional_files.items(): temp_path = os.path.join(working_dir, name) with open(temp_path, "w") as f: @@ -209,11 +217,22 @@ def process_raw_data(self, params, user=None): file_parts, ) - geom_area, geom_dump, working_dir = RawData( - params, str(self.request.id) - ).extract_current_data(file_parts) - inside_file_size = 0 polygon_stats = None + geojson_stats = None + + if "include_stats" in params.dict(): + if params.include_stats: + geoJSONStats = GeoJSONStats(params.filters) + geom_area, geom_dump, working_dir = RawData( + params, str(self.request.id) + ).extract_current_data(file_parts, geoJSONStats.raw_data_line_stats) + geojson_stats = geoJSONStats.json() + else: + geom_area, geom_dump, working_dir = RawData( + params, str(self.request.id) + ).extract_current_data(file_parts) + + inside_file_size = 0 if "include_stats" in params.dict(): if params.include_stats: feature = { @@ -222,12 +241,14 @@ def process_raw_data(self, params, user=None): "properties": {}, } polygon_stats = PolygonStats(feature).get_summary_stats() + if bind_zip: upload_file_path, inside_file_size = zip_binding( working_dir=working_dir, exportname_parts=exportname_parts, geom_dump=geom_dump, polygon_stats=polygon_stats, + geojson_stats=geojson_stats, default_readme=DEFAULT_README_TEXT, ) diff --git a/requirements.txt b/requirements.txt index c295c164..6e87b47b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -54,3 +54,7 @@ psutil==5.9.8 ## logging tqdm==4.66.2 + +# stats for geojson data +geojson-stats==0.1.0 + diff --git a/src/app.py b/src/app.py index fb5e82d5..60d8fcdf 100644 --- a/src/app.py +++ b/src/app.py @@ -47,6 +47,7 @@ from psycopg2.extras import DictCursor from slugify import slugify from tqdm import tqdm +from geojson_stats.stats import Stats # Reader imports from src.config import ( @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params): os.remove(query_path) @staticmethod - def query2geojson(con, extraction_query, dump_temp_file_path): + def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn=None): """Function written from scratch without being dependent on any library, Provides better performance for geojson binding""" # creating geojson file pre_geojson = """{"type": "FeatureCollection","features": [""" @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path): for row in cursor: if first: first = False - f.write(row[0]) else: f.write(",") - f.write(row[0]) + if plugin_fn: + f.write(plugin_fn(row[0])) + else: + f.write((row[0])) cursor.close() # closing connection to avoid memory issues # close the writing geojson with last part f.write(post_geojson) @@ -711,7 +714,7 @@ def get_grid_id(geom, cur): country_export, ) - def extract_current_data(self, exportname): + def extract_current_data(self, exportname, plugin_fn=None): """Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump Args: exportname: takes filename as argument to create geojson file passed from routers @@ -777,6 +780,7 @@ def extract_current_data(self, exportname): country_export=country_export, ), dump_temp_file_path, + plugin_fn, ) # uses own conversion class if output_type == RawDataOutputType.SHAPEFILE.value: ( @@ -2255,3 +2259,56 @@ def get_summary_stats(self, start_date, end_date, group_by): result = self.cur.fetchall() self.d_b.close_conn() return [dict(item) for item in result] + + +class GeoJSONStats(Stats): + """Used for collecting stats while processing GeoJSON files line by line""" + + def __init__(self, filters, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.config.clean = True + self.config.properties_prop = "properties.tags" + + if filters and filters.tags: + config_area = ["building"] + config_length = ["highway", "waterway"] + + for tag in config_area: + if self.check_filter(filters.tags, tag): + self.config.keys.append(tag) + self.config.value_keys.append(tag) + self.config.area = True + for tag in config_length: + if self.check_filter(filters.tags, tag): + self.config.keys.append(tag) + self.config.value_keys.append(tag) + self.config.length = True + + def check_filter(self, tags, tag): + """ + Check if a tag is present in tag filters + """ + + if tags.all_geometry: + if tags.all_geometry.join_or and tag in tags.all_geometry.join_or: + return True + if tags.all_geometry.join_and and tag in tags.all_geometry.join_and: + return True + if tags.polygon: + if tags.polygon.join_or and tag in tags.polygon.join_or: + return True + if tags.polygon.join_and and tag in tags.polygon.join_and: + return True + if tags.line: + if tags.line.join_or and tag in tags.line.join_or: + return True + if tags.line.join_and and tag in tags.line.join_and: + return True + + def raw_data_line_stats(self, line: str): + """ + Process a GeoJSON line (for getting stats) and return that line + """ + self.process_file_line(line) + return line