Skip to content

Commit

Permalink
Release 20240214 (#338)
Browse files Browse the repository at this point in the history
* Feature/merge trends and measurements (#336)

* Fix/statement timeout (#316)

* Fix/v2 measurements source (#314)

* fix v2 measurements to query measurements table instead of hourly

* realias to match query params

* measurements geom fix

* Added sensors_id to speed up sorting

* update statement_timeout to milliseconds

---------

Co-authored-by: Russ Biggs <[email protected]>

* set statement_timeout default config value

---------

Co-authored-by: Christian Parker <[email protected]>

* Release 20231222 (#318)

* Fix/v2 measurements source (#314)

* fix v2 measurements to query measurements table instead of hourly

* realias to match query params

* measurements geom fix

* Added sensors_id to speed up sorting

* update statement_timeout to milliseconds

---------

Co-authored-by: Russ Biggs <[email protected]>

* ratelimit headers (#315)

* ratelimit headers

* dynamic ratelimt-policy values

* Added wait_for method (#317)

---------

Co-authored-by: Christian Parker <[email protected]>

* Added the trends methods (dow, hod, moy) to the measurements

---------

Co-authored-by: Russ Biggs <[email protected]>

* Feature/add license to location (#337)

* Fix/statement timeout (#316)

* Fix/v2 measurements source (#314)

* fix v2 measurements to query measurements table instead of hourly

* realias to match query params

* measurements geom fix

* Added sensors_id to speed up sorting

* update statement_timeout to milliseconds

---------

Co-authored-by: Russ Biggs <[email protected]>

* set statement_timeout default config value

---------

Co-authored-by: Christian Parker <[email protected]>

* Release 20231222 (#318)

* Fix/v2 measurements source (#314)

* fix v2 measurements to query measurements table instead of hourly

* realias to match query params

* measurements geom fix

* Added sensors_id to speed up sorting

* update statement_timeout to milliseconds

---------

Co-authored-by: Russ Biggs <[email protected]>

* ratelimit headers (#315)

* ratelimit headers

* dynamic ratelimt-policy values

* Added wait_for method (#317)

---------

Co-authored-by: Christian Parker <[email protected]>

* Added the licsense list object to the locations

---------

Co-authored-by: Russ Biggs <[email protected]>

---------

Co-authored-by: Christian Parker <[email protected]>
  • Loading branch information
russbiggs and caparker authored Feb 15, 2024
1 parent 15a7667 commit abdbfa7
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 5 deletions.
6 changes: 3 additions & 3 deletions openaq_api/openaq_api/v3/models/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class Paging(QueryBaseModel):

def pagination(self) -> str:
return "LIMIT :limit OFFSET :offset"


class ParametersQuery(QueryBaseModel):
"""Pydantic query model for the parameters query parameter
Expand Down Expand Up @@ -449,7 +449,6 @@ class DateFromQuery(QueryBaseModel):
date_from: date or datetime in ISO-8601 format to filter results to a
date range.
"""

date_from: datetime | date | None = Query(
None,
description="From when?",
Expand All @@ -467,6 +466,7 @@ def where(self) -> str:
Returns:
string of WHERE clause if `date_from` is set
"""

if self.date_from is None:
return None
elif isinstance(self.date_from, datetime):
Expand Down Expand Up @@ -536,7 +536,7 @@ class PeriodNameQuery(QueryBaseModel):
"""

period_name: PeriodNames | None = Query(
"hour", description="Period to aggregate. Month, day, hour"
"hour", description="Period to aggregate. Month, day, hour, hour of day (hod), day of week (dow) and month of year (moy)"
)


Expand Down
11 changes: 10 additions & 1 deletion openaq_api/openaq_api/v3/models/responses.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, date
from typing import Any, List

from humps import camelize
Expand Down Expand Up @@ -108,6 +108,14 @@ class ManufacturerBase(JsonBase):
name: str


class LicenseBase(JsonBase):
id: int
url: str
date_from: date
date_to: date | None = None
description: str | None = None


class Latest(JsonBase):
datetime: DatetimeObject
value: float
Expand Down Expand Up @@ -198,6 +206,7 @@ class Location(JsonBase):
instruments: list[InstrumentBase]
sensors: list[SensorBase]
coordinates: Coordinates
licenses: list[LicenseBase] | None = None
bounds: list[float] = Field(..., min_length=4, max_length=4)
distance: float | None = None
datetime_first: DatetimeObject
Expand Down
1 change: 1 addition & 0 deletions openaq_api/openaq_api/v3/routers/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ async def fetch_locations(query, db):
, bbox(geom) as bounds
, datetime_first
, datetime_last
, licenses
{query_builder.fields() or ''}
{query_builder.total()}
FROM locations_view_cached
Expand Down
219 changes: 218 additions & 1 deletion openaq_api/openaq_api/v3/routers/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
SensorsResponse,
MeasurementsResponse,
)
from openaq_api.v3.routers.measurements import fetch_measurements

logger = logging.getLogger("sensors")

Expand Down Expand Up @@ -140,3 +139,221 @@ async def fetch_sensors(q, db):
{query.pagination()}
"""
return await db.fetchPage(sql, query.params())



async def fetch_measurements(q, db):
query = QueryBuilder(q)
dur = "01:00:00"
expected_hours = 1

if q.period_name in [None, "hour"]:
# Query for hourly data
sql = f"""
SELECT sn.id
, json_build_object(
'label', '1hour'
, 'datetime_from', get_datetime_object(h.datetime - '1hour'::interval, sn.timezone)
, 'datetime_to', get_datetime_object(h.datetime, sn.timezone)
, 'interval', '01:00:00'
) as period
, json_build_object(
'id', h.measurands_id
, 'units', m.units
, 'name', m.measurand
) as parameter
, json_build_object(
'sd', h.value_sd
, 'min', h.value_min
, 'q02', h.value_p02
, 'q25', h.value_p25
, 'median', h.value_p50
, 'q75', h.value_p75
, 'q98', h.value_p98
, 'max', h.value_max
) as summary
, sig_digits(h.value_avg, 2) as value
, calculate_coverage(
h.value_count
, s.data_averaging_period_seconds
, s.data_logging_period_seconds
, {expected_hours} * 3600
)||jsonb_build_object(
'datetime_from', get_datetime_object(h.first_datetime, sn.timezone)
, 'datetime_to', get_datetime_object(h.last_datetime, sn.timezone)
) as coverage
{query.fields()}
FROM hourly_data h
JOIN sensors s USING (sensors_id)
JOIN sensor_systems sy USING (sensor_systems_id)
JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id)
JOIN measurands m ON (m.measurands_id = h.measurands_id)
{query.where()}
ORDER BY datetime
{query.pagination()}
"""
elif q.period_name in ["day", "month"]:
# Query for the aggregate data
if q.period_name == "day":
dur = "24:00:00"
elif q.period_name == "month":
dur = "1 month"

sql = f"""
WITH meas AS (
SELECT
sy.sensor_nodes_id
, s.measurands_id
, sn.timezone
, truncate_timestamp(datetime, :period_name, sn.timezone) as datetime
, AVG(s.data_averaging_period_seconds) as avg_seconds
, AVG(s.data_logging_period_seconds) as log_seconds
, MAX(truncate_timestamp(datetime, :period_name, sn.timezone, '1{q.period_name}'::interval)) as last_period
, MIN(timezone(sn.timezone, datetime - '1sec'::interval)) as first_datetime
, MAX(timezone(sn.timezone, datetime - '1sec'::interval)) as last_datetime
, COUNT(1) as value_count
, AVG(value_avg) as value_avg
, STDDEV(value_avg) as value_sd
, MIN(value_avg) as value_min
, MAX(value_avg) as value_max
, PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY value_avg) as value_p02
, PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY value_avg) as value_p25
, PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY value_avg) as value_p50
, PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY value_avg) as value_p75
, PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY value_avg) as value_p98
, current_timestamp as calculated_on
FROM hourly_data m
JOIN sensors s ON (m.sensors_id = s.sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id)
-- JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
-- JOIN timezones ts ON (sn.timezones_id = ts.gid)
{query.where()}
GROUP BY 1, 2, 3, 4)
SELECT t.sensor_nodes_id
----------
, json_build_object(
'label', '1{q.period_name}'
, 'datetime_from', get_datetime_object(datetime, t.timezone)
, 'datetime_to', get_datetime_object(last_period, t.timezone)
, 'interval', '{dur}'
) as period
----------
, sig_digits(value_avg, 2) as value
-----------
, json_build_object(
'id', t.measurands_id
, 'units', m.units
, 'name', m.measurand
) as parameter
---------
, json_build_object(
'sd', t.value_sd
, 'min', t.value_min
, 'q02', t.value_p02
, 'q25', t.value_p25
, 'median', t.value_p50
, 'q75', t.value_p75
, 'q98', t.value_p98
, 'max', t.value_max
) as summary
--------
, calculate_coverage(
value_count::int
, 3600
, 3600
, EXTRACT(EPOCH FROM last_period - datetime)
)||jsonb_build_object(
'datetime_from', get_datetime_object(first_datetime, t.timezone)
, 'datetime_to', get_datetime_object(last_datetime, t.timezone)
) as coverage
{query.total()}
FROM meas t
JOIN measurands m ON (t.measurands_id = m.measurands_id)
{query.pagination()}
"""
elif q.period_name in ["hod","dow","moy"]:

fmt = ""
if q.period_name == "hod":
fmt = "HH24"
dur = "01:00:00"
prd = "hour"
elif q.period_name == "dow":
fmt = "ID"
dur = "24:00:00"
prd = "day"
elif q.period_name == "mod":
fmt = "MM"
dur = "1 month"
prd = "month"


q.period_name = prd
sql = f"""
WITH trends AS (
SELECT
sn.id
, s.measurands_id
, sn.timezone
, to_char(timezone(sn.timezone, datetime - '1sec'::interval), '{fmt}') as factor
, AVG(s.data_averaging_period_seconds) as avg_seconds
, AVG(s.data_logging_period_seconds) as log_seconds
, MAX(truncate_timestamp(datetime, :period_name, sn.timezone, '1{prd}'::interval)) as last_period
, MIN(timezone(sn.timezone, datetime - '1sec'::interval)) as first_datetime
, MAX(timezone(sn.timezone, datetime - '1sec'::interval)) as last_datetime
, COUNT(1) as value_count
, AVG(value_avg) as value_avg
, STDDEV(value_avg) as value_sd
, MIN(value_avg) as value_min
, MAX(value_avg) as value_max
, PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY value_avg) as value_p02
, PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY value_avg) as value_p25
, PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY value_avg) as value_p50
, PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY value_avg) as value_p75
, PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY value_avg) as value_p98
, current_timestamp as calculated_on
FROM hourly_data m
JOIN sensors s ON (m.sensors_id = s.sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id)
{query.where()}
GROUP BY 1, 2, 3, 4)
SELECT t.id
, json_build_object(
'label', factor
, 'datetime_from', get_datetime_object(first_datetime, t.timezone)
, 'datetime_to', get_datetime_object(last_datetime, t.timezone)
, 'interval', '{dur}'
) as period
, sig_digits(value_avg, 2) as value
, json_build_object(
'id', t.measurands_id
, 'units', m.units
, 'name', m.measurand
) as parameter
, json_build_object(
'sd', t.value_sd
, 'min', t.value_min
, 'q02', t.value_p02
, 'q25', t.value_p25
, 'median', t.value_p50
, 'q75', t.value_p75
, 'q98', t.value_p98
, 'max', t.value_max
) as summary
, calculate_coverage(
t.value_count::int
, t.avg_seconds
, t.log_seconds
, expected_hours(first_datetime, last_datetime, '{prd}', factor) * 3600.0
)||jsonb_build_object(
'datetime_from', get_datetime_object(first_datetime, t.timezone)
, 'datetime_to', get_datetime_object(last_datetime, t.timezone)
) as coverage
FROM trends t
JOIN measurands m ON (t.measurands_id = m.measurands_id)
{query.pagination()}
"""

return await db.fetchPage(sql, query.params())

0 comments on commit abdbfa7

Please sign in to comment.