Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+ GeoJSON stats for exports #278

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from celery import Celery

# Reader imports
from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer
from src.app import CustomExport, PolygonStats, GeoJSONStats, RawData, S3FileTransfer
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
Expand Down Expand Up @@ -75,7 +75,12 @@ def create_readme_content(default_readme, polygon_stats):


def zip_binding(
working_dir, exportname_parts, geom_dump, polygon_stats, default_readme
working_dir,
exportname_parts,
geom_dump,
polygon_stats,
geojson_stats,
default_readme,
):
logging.debug("Zip Binding Started!")
upload_file_path = os.path.join(
Expand All @@ -88,6 +93,9 @@ def zip_binding(
),
}

if geojson_stats:
additional_files["stats.json"] = geojson_stats

for name, content in additional_files.items():
temp_path = os.path.join(working_dir, name)
with open(temp_path, "w") as f:
Expand Down Expand Up @@ -209,11 +217,22 @@ def process_raw_data(self, params, user=None):
file_parts,
)

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)
inside_file_size = 0
polygon_stats = None
geojson_stats = None

if "include_stats" in params.dict():
if params.include_stats:
geoJSONStats = GeoJSONStats(params.filters)
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts, geoJSONStats.raw_data_line_stats)
geojson_stats = geoJSONStats.json()
else:
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)

inside_file_size = 0
if "include_stats" in params.dict():
if params.include_stats:
feature = {
Expand All @@ -222,12 +241,14 @@ def process_raw_data(self, params, user=None):
"properties": {},
}
polygon_stats = PolygonStats(feature).get_summary_stats()

if bind_zip:
upload_file_path, inside_file_size = zip_binding(
working_dir=working_dir,
exportname_parts=exportname_parts,
geom_dump=geom_dump,
polygon_stats=polygon_stats,
geojson_stats=geojson_stats,
default_readme=DEFAULT_README_TEXT,
)

Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ psutil==5.9.8

## logging
tqdm==4.66.2

# stats for geojson data
geojson-stats==0.1.0

65 changes: 61 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from psycopg2.extras import DictCursor
from slugify import slugify
from tqdm import tqdm
from geojson_stats.stats import Stats

# Reader imports
from src.config import (
Expand Down Expand Up @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
os.remove(query_path)

@staticmethod
def query2geojson(con, extraction_query, dump_temp_file_path):
def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn=None):
"""Function written from scratch without being dependent on any library, Provides better performance for geojson binding"""
# creating geojson file
pre_geojson = """{"type": "FeatureCollection","features": ["""
Expand All @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path):
for row in cursor:
if first:
first = False
f.write(row[0])
else:
f.write(",")
f.write(row[0])
if plugin_fn:
f.write(plugin_fn(row[0]))
else:
f.write((row[0]))
cursor.close() # closing connection to avoid memory issues
# close the writing geojson with last part
f.write(post_geojson)
Expand Down Expand Up @@ -711,7 +714,7 @@ def get_grid_id(geom, cur):
country_export,
)

def extract_current_data(self, exportname):
def extract_current_data(self, exportname, plugin_fn=None):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Args:
exportname: takes filename as argument to create geojson file passed from routers
Expand Down Expand Up @@ -777,6 +780,7 @@ def extract_current_data(self, exportname):
country_export=country_export,
),
dump_temp_file_path,
plugin_fn,
) # uses own conversion class
if output_type == RawDataOutputType.SHAPEFILE.value:
(
Expand Down Expand Up @@ -2255,3 +2259,56 @@ def get_summary_stats(self, start_date, end_date, group_by):
result = self.cur.fetchall()
self.d_b.close_conn()
return [dict(item) for item in result]


class GeoJSONStats(Stats):
"""Used for collecting stats while processing GeoJSON files line by line"""

def __init__(self, filters, *args, **kwargs):
super().__init__(*args, **kwargs)

self.config.clean = True
self.config.properties_prop = "properties.tags"

if filters and filters.tags:
config_area = ["building"]
config_length = ["highway", "waterway"]

for tag in config_area:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.area = True
for tag in config_length:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.length = True

def check_filter(self, tags, tag):
"""
Check if a tag is present in tag filters
"""

if tags.all_geometry:
if tags.all_geometry.join_or and tag in tags.all_geometry.join_or:
return True
if tags.all_geometry.join_and and tag in tags.all_geometry.join_and:
return True
if tags.polygon:
if tags.polygon.join_or and tag in tags.polygon.join_or:
return True
if tags.polygon.join_and and tag in tags.polygon.join_and:
return True
if tags.line:
if tags.line.join_or and tag in tags.line.join_or:
return True
if tags.line.join_and and tag in tags.line.join_and:
return True

def raw_data_line_stats(self, line: str):
"""
Process a GeoJSON line (for getting stats) and return that line
"""
self.process_file_line(line)
return line
Loading