Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment to use iterators more and directly use either st_asgeojson/st_asewkt depending on output type. #77

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions tifeatures/dbmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,13 @@ def id_column_info(self) -> Column: # type: ignore

def columns(self, properties: Optional[List[str]] = None) -> List[str]:
"""Return table columns optionally filtered to only include columns from properties."""
cols = [c.name for c in self.properties]
if properties is not None:
if self.id_column and self.id_column not in properties:
properties.append(self.id_column)

geom_col = self.get_geometry_column()
if geom_col:
properties.append(geom_col.name)

cols = [col for col in cols if col in properties]

if len(cols) < 1:
raise TypeError("No columns selected")

return cols
nongeo = [
c.name for c in self.properties if c.type not in ["geometry", "geography"]
]
if properties is None:
return nongeo
else:
return [c for c in nongeo if c in properties]

def get_column(self, property_name: str) -> Optional[Column]:
"""Return column info."""
Expand Down
24 changes: 20 additions & 4 deletions tifeatures/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
from pygeofilter.parsers.cql2_json import parse as cql2_json_parser
from pygeofilter.parsers.cql2_text import parse as cql2_text_parser

from tifeatures.errors import InvalidBBox
from tifeatures.layer import CollectionLayer
from tifeatures.dbmodel import GeometryColumn
from tifeatures.errors import InvalidBBox, InvalidGeometryColumnName
from tifeatures.layer import Table as TableLayer
from tifeatures.resources import enums

from fastapi import HTTPException, Path, Query
from fastapi import Depends, HTTPException, Path, Query

from starlette.requests import Request


def CollectionParams(
request: Request,
collectionId: str = Path(..., description="Collection identifier"),
) -> CollectionLayer:
) -> TableLayer:
"""Return Layer Object."""
# Check function_catalog
function_catalog = getattr(request.app.state, "tifeatures_function_catalog", {})
Expand Down Expand Up @@ -251,3 +251,19 @@ def sortby_query(
):
"""Sortby dependency."""
return sortby


def geom_col(
collection: TableLayer = Depends(CollectionParams),
geom_column: Optional[str] = Query(
None,
description="Select geometry column.",
alias="geom-column",
),
) -> GeometryColumn:
"""Geometry Column Dependency."""
geom_col = collection.get_geometry_column(geom_column)

if geom_column and geom_column.lower() != "none" and not geom_col:
raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom_column}.")
return geom_col
197 changes: 102 additions & 95 deletions tifeatures/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import csv
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional
from typing import Any, Callable, Dict, List, Optional

import jinja2
import orjson
from pygeofilter.ast import AstType

from tifeatures import model
from tifeatures.dbmodel import GeometryColumn
from tifeatures.dependencies import (
CollectionParams,
ItemOutputType,
Expand All @@ -18,6 +20,7 @@
bbox_query,
datetime_query,
filter_query,
geom_col,
ids_query,
properties_query,
sortby_query,
Expand Down Expand Up @@ -45,7 +48,7 @@
)


def create_csv_rows(data: Iterable[Dict]) -> Generator[str, None, None]:
async def create_csv_rows(data):
"""Creates an iterator that returns lines of csv from an iterable of dicts."""

class DummyWriter:
Expand All @@ -56,7 +59,7 @@ def write(self, line: str):
return line

# Get the first row and construct the column names
row = next(data) # type: ignore
row = await data.__anext__() # type: ignore
fieldnames = row.keys()
writer = csv.DictWriter(DummyWriter(), fieldnames=fieldnames)

Expand All @@ -67,10 +70,42 @@ def write(self, line: str):
yield writer.writerow(row)

# Write all remaining rows
for row in data:
async for row in data:
yield writer.writerow(row)


def create_geojson_feature(item: Dict, geometry_column: Optional[str]) -> Dict:
"""Creates an iterator that returns geojson features from an iterable of dicts."""
geom = item.pop("tifeatures_geom")
if geom:
geomout = geom
else:
geomout = None
id = item.pop("tifeatures_id")
return {
"type": "Feature",
"id": id,
"geometry": geomout,
"properties": item,
}


def create_wkt_feature(item: Dict, geometry_column: Optional[str]) -> Dict:
"""Creates an iterator that returns geojson features from an iterable of dicts."""
geom = item.pop("tifeatures_geom")
if geom:
geomout = geom
else:
geomout = None
id = item.pop("tifeatures_id")
return {
"type": "Feature",
"id": id,
"geometry": geomout,
"properties": item,
}


@dataclass
class Endpoints:
"""Endpoints Factory."""
Expand Down Expand Up @@ -543,7 +578,7 @@ def queryables(

@self.router.get(
"/collections/{collectionId}/items",
response_model=model.Items,
# response_model=model.Items,
response_model_exclude_none=True,
response_class=GeoJSONResponse,
responses={
Expand All @@ -568,11 +603,7 @@ async def items(
properties: Optional[List[str]] = Depends(properties_query),
cql_filter: Optional[AstType] = Depends(filter_query),
sortby: Optional[str] = Depends(sortby_query),
geom_column: Optional[str] = Query(
None,
description="Select geometry column.",
alias="geom-column",
),
geom_column: Optional[GeometryColumn] = Depends(geom_col),
datetime_column: Optional[str] = Query(
None,
description="Select datetime column.",
Expand Down Expand Up @@ -625,53 +656,48 @@ async def items(
if key.lower() not in exclude and key.lower() in table_property
]

items, matched_items = await collection.features(
request.app.state.pool,
ids_filter=ids_filter,
bbox_filter=bbox_filter,
datetime_filter=datetime_filter,
properties_filter=properties_filter,
cql_filter=cql_filter,
if geom_column:
geom_col_name = geom_column.name
else:
geom_col_name = None

_from = collection._from()
_where = collection._where(
ids=ids_filter,
datetime=datetime_filter,
bbox=bbox_filter,
properties=properties_filter,
cql=cql_filter,
geom=geom_col_name,
dt=datetime_filter,
)

features = collection._features(
pool=request.app.state.pool,
_from=_from,
_where=_where,
sortby=sortby,
properties=properties,
limit=limit,
offset=offset,
geom=geom_column,
dt=datetime_column,
geometry_column=geom_column,
bbox_only=bbox_only,
simplify=simplify,
)

if output_type in (
if output_type in [
MediaType.csv,
MediaType.json,
MediaType.ndjson,
):
if items and items[0].geometry is not None:
rows = (
{
"collectionId": collection.id,
"itemId": f.id,
**f.properties,
"geometry": f.geometry.wkt,
}
for f in items
)

else:
rows = (
{
"collectionId": collection.id,
"itemId": f.id,
**f.properties,
}
for f in items
)

]:
items = (
create_wkt_feature(feature, geom_col_name)
async for feature in features
)
# CSV Response
if output_type == MediaType.csv:
return StreamingResponse(
create_csv_rows(rows),
create_csv_rows(items),
media_type=MediaType.csv,
headers={
"Content-Disposition": "attachment;filename=items.csv"
Expand All @@ -680,18 +706,35 @@ async def items(

# JSON Response
if output_type == MediaType.json:
return JSONResponse([row for row in rows])
return JSONResponse([item async for item in items])

# NDJSON Response
if output_type == MediaType.ndjson:
return StreamingResponse(
(json.dumps(row) + "\n" for row in rows),
(orjson.dumps(item) + b"\n" async for item in items),
media_type=MediaType.ndjson,
headers={
"Content-Disposition": "attachment;filename=items.ndjson"
},
)

matched_items = await collection._features_count(
pool=request.app.state.pool, _from=_from, _where=_where
)
items = (
create_geojson_feature(feature, geom_col_name)
async for feature in features
)

if output_type == MediaType.geojsonseq:
return StreamingResponse(
(orjson.dumps(item) async for item in items),
media_type=MediaType.geojsonseq,
headers={
"Content-Disposition": "attachment;filename=items.geojson"
},
)

qs = "?" + str(request.query_params) if request.query_params else ""
links = [
model.Link(
Expand All @@ -711,7 +754,9 @@ async def items(
),
]

items = [item async for item in items]
items_returned = len(items)
print(f"items_returned {items_returned}")

if (matched_items - items_returned) > offset:
next_offset = offset + items_returned
Expand Down Expand Up @@ -753,64 +798,26 @@ async def items(
),
)

data = model.Items(
id=collection.id,
title=collection.title or collection.id,
description=collection.description or collection.title or collection.id,
numberMatched=matched_items,
numberReturned=items_returned,
links=links,
features=[
model.Item(
**{
**feature.dict(),
"links": [
model.Link(
title="Collection",
href=self.url_for(
request,
"collection",
collectionId=collection.id,
),
rel="collection",
type=MediaType.json,
),
model.Link(
title="Item",
href=self.url_for(
request,
"item",
collectionId=collection.id,
itemId=feature.id,
),
rel="item",
type=MediaType.json,
),
],
}
)
for feature in items
],
)
data = {
"id": collection.id,
"title": collection.title or collection.id,
"description": collection.description
or collection.title
or collection.id,
"numberMatched": matched_items,
"numberReturned": items_returned,
"links": [link.dict() for link in links],
"features": items,
}

# HTML Response
if output_type == MediaType.html:
return self._create_html_response(
request,
data.json(exclude_none=True),
orjson.dumps(data),
template_name="items",
)

# GeoJSONSeq Response
elif output_type == MediaType.geojsonseq:
return StreamingResponse(
data.json_seq(exclude_none=True),
media_type=MediaType.geojsonseq,
headers={
"Content-Disposition": "attachment;filename=items.geojson"
},
)

# Default to GeoJSON Response
return data

Expand Down
Loading