From abdbfa7b44576e556e60521a60b0270e7621233f Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Thu, 15 Feb 2024 06:05:27 -0700 Subject: [PATCH] Release 20240214 (#338) * Feature/merge trends and measurements (#336) * Fix/statement timeout (#316) * Fix/v2 measurements source (#314) * fix v2 measurements to query measurements table instead of hourly * realias to match query params * measurements geom fix * Added sensors_id to speed up sorting * update statement_timeout to milliseconds --------- Co-authored-by: Russ Biggs * set statement_timeout default config value --------- Co-authored-by: Christian Parker * Release 20231222 (#318) * Fix/v2 measurements source (#314) * fix v2 measurements to query measurements table instead of hourly * realias to match query params * measurements geom fix * Added sensors_id to speed up sorting * update statement_timeout to milliseconds --------- Co-authored-by: Russ Biggs * ratelimit headers (#315) * ratelimit headers * dynamic ratelimt-policy values * Added wait_for method (#317) --------- Co-authored-by: Christian Parker * Added the trends methods (dow, hod, moy) to the measurements --------- Co-authored-by: Russ Biggs * Feature/add license to location (#337) * Fix/statement timeout (#316) * Fix/v2 measurements source (#314) * fix v2 measurements to query measurements table instead of hourly * realias to match query params * measurements geom fix * Added sensors_id to speed up sorting * update statement_timeout to milliseconds --------- Co-authored-by: Russ Biggs * set statement_timeout default config value --------- Co-authored-by: Christian Parker * Release 20231222 (#318) * Fix/v2 measurements source (#314) * fix v2 measurements to query measurements table instead of hourly * realias to match query params * measurements geom fix * Added sensors_id to speed up sorting * update statement_timeout to milliseconds --------- Co-authored-by: Russ Biggs * ratelimit headers (#315) * ratelimit headers * dynamic ratelimt-policy values * Added wait_for method (#317) --------- Co-authored-by: Christian Parker * Added the licsense list object to the locations --------- Co-authored-by: Russ Biggs --------- Co-authored-by: Christian Parker --- openaq_api/openaq_api/v3/models/queries.py | 6 +- openaq_api/openaq_api/v3/models/responses.py | 11 +- openaq_api/openaq_api/v3/routers/locations.py | 1 + openaq_api/openaq_api/v3/routers/sensors.py | 219 +++++++++++++++++- 4 files changed, 232 insertions(+), 5 deletions(-) diff --git a/openaq_api/openaq_api/v3/models/queries.py b/openaq_api/openaq_api/v3/models/queries.py index beeac0a..55104c0 100644 --- a/openaq_api/openaq_api/v3/models/queries.py +++ b/openaq_api/openaq_api/v3/models/queries.py @@ -270,7 +270,7 @@ class Paging(QueryBaseModel): def pagination(self) -> str: return "LIMIT :limit OFFSET :offset" - + class ParametersQuery(QueryBaseModel): """Pydantic query model for the parameters query parameter @@ -449,7 +449,6 @@ class DateFromQuery(QueryBaseModel): date_from: date or datetime in ISO-8601 format to filter results to a date range. """ - date_from: datetime | date | None = Query( None, description="From when?", @@ -467,6 +466,7 @@ def where(self) -> str: Returns: string of WHERE clause if `date_from` is set """ + if self.date_from is None: return None elif isinstance(self.date_from, datetime): @@ -536,7 +536,7 @@ class PeriodNameQuery(QueryBaseModel): """ period_name: PeriodNames | None = Query( - "hour", description="Period to aggregate. Month, day, hour" + "hour", description="Period to aggregate. Month, day, hour, hour of day (hod), day of week (dow) and month of year (moy)" ) diff --git a/openaq_api/openaq_api/v3/models/responses.py b/openaq_api/openaq_api/v3/models/responses.py index 7f85509..ba70f97 100644 --- a/openaq_api/openaq_api/v3/models/responses.py +++ b/openaq_api/openaq_api/v3/models/responses.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, date from typing import Any, List from humps import camelize @@ -108,6 +108,14 @@ class ManufacturerBase(JsonBase): name: str +class LicenseBase(JsonBase): + id: int + url: str + date_from: date + date_to: date | None = None + description: str | None = None + + class Latest(JsonBase): datetime: DatetimeObject value: float @@ -198,6 +206,7 @@ class Location(JsonBase): instruments: list[InstrumentBase] sensors: list[SensorBase] coordinates: Coordinates + licenses: list[LicenseBase] | None = None bounds: list[float] = Field(..., min_length=4, max_length=4) distance: float | None = None datetime_first: DatetimeObject diff --git a/openaq_api/openaq_api/v3/routers/locations.py b/openaq_api/openaq_api/v3/routers/locations.py index c5bc74b..155d864 100644 --- a/openaq_api/openaq_api/v3/routers/locations.py +++ b/openaq_api/openaq_api/v3/routers/locations.py @@ -128,6 +128,7 @@ async def fetch_locations(query, db): , bbox(geom) as bounds , datetime_first , datetime_last + , licenses {query_builder.fields() or ''} {query_builder.total()} FROM locations_view_cached diff --git a/openaq_api/openaq_api/v3/routers/sensors.py b/openaq_api/openaq_api/v3/routers/sensors.py index 4406f05..178adaf 100644 --- a/openaq_api/openaq_api/v3/routers/sensors.py +++ b/openaq_api/openaq_api/v3/routers/sensors.py @@ -17,7 +17,6 @@ SensorsResponse, MeasurementsResponse, ) -from openaq_api.v3.routers.measurements import fetch_measurements logger = logging.getLogger("sensors") @@ -140,3 +139,221 @@ async def fetch_sensors(q, db): {query.pagination()} """ return await db.fetchPage(sql, query.params()) + + + +async def fetch_measurements(q, db): + query = QueryBuilder(q) + dur = "01:00:00" + expected_hours = 1 + + if q.period_name in [None, "hour"]: + # Query for hourly data + sql = f""" + SELECT sn.id + , json_build_object( + 'label', '1hour' + , 'datetime_from', get_datetime_object(h.datetime - '1hour'::interval, sn.timezone) + , 'datetime_to', get_datetime_object(h.datetime, sn.timezone) + , 'interval', '01:00:00' + ) as period + , json_build_object( + 'id', h.measurands_id + , 'units', m.units + , 'name', m.measurand + ) as parameter + , json_build_object( + 'sd', h.value_sd + , 'min', h.value_min + , 'q02', h.value_p02 + , 'q25', h.value_p25 + , 'median', h.value_p50 + , 'q75', h.value_p75 + , 'q98', h.value_p98 + , 'max', h.value_max + ) as summary + , sig_digits(h.value_avg, 2) as value + , calculate_coverage( + h.value_count + , s.data_averaging_period_seconds + , s.data_logging_period_seconds + , {expected_hours} * 3600 + )||jsonb_build_object( + 'datetime_from', get_datetime_object(h.first_datetime, sn.timezone) + , 'datetime_to', get_datetime_object(h.last_datetime, sn.timezone) + ) as coverage + {query.fields()} + FROM hourly_data h + JOIN sensors s USING (sensors_id) + JOIN sensor_systems sy USING (sensor_systems_id) + JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id) + JOIN measurands m ON (m.measurands_id = h.measurands_id) + {query.where()} + ORDER BY datetime + {query.pagination()} + """ + elif q.period_name in ["day", "month"]: + # Query for the aggregate data + if q.period_name == "day": + dur = "24:00:00" + elif q.period_name == "month": + dur = "1 month" + + sql = f""" + WITH meas AS ( + SELECT + sy.sensor_nodes_id + , s.measurands_id + , sn.timezone + , truncate_timestamp(datetime, :period_name, sn.timezone) as datetime + , AVG(s.data_averaging_period_seconds) as avg_seconds + , AVG(s.data_logging_period_seconds) as log_seconds + , MAX(truncate_timestamp(datetime, :period_name, sn.timezone, '1{q.period_name}'::interval)) as last_period + , MIN(timezone(sn.timezone, datetime - '1sec'::interval)) as first_datetime + , MAX(timezone(sn.timezone, datetime - '1sec'::interval)) as last_datetime + , COUNT(1) as value_count + , AVG(value_avg) as value_avg + , STDDEV(value_avg) as value_sd + , MIN(value_avg) as value_min + , MAX(value_avg) as value_max + , PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY value_avg) as value_p02 + , PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY value_avg) as value_p25 + , PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY value_avg) as value_p50 + , PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY value_avg) as value_p75 + , PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY value_avg) as value_p98 + , current_timestamp as calculated_on + FROM hourly_data m + JOIN sensors s ON (m.sensors_id = s.sensors_id) + JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) + JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id) + -- JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id) + -- JOIN timezones ts ON (sn.timezones_id = ts.gid) + {query.where()} + GROUP BY 1, 2, 3, 4) + SELECT t.sensor_nodes_id + ---------- + , json_build_object( + 'label', '1{q.period_name}' + , 'datetime_from', get_datetime_object(datetime, t.timezone) + , 'datetime_to', get_datetime_object(last_period, t.timezone) + , 'interval', '{dur}' + ) as period + ---------- + , sig_digits(value_avg, 2) as value + ----------- + , json_build_object( + 'id', t.measurands_id + , 'units', m.units + , 'name', m.measurand + ) as parameter + --------- + , json_build_object( + 'sd', t.value_sd + , 'min', t.value_min + , 'q02', t.value_p02 + , 'q25', t.value_p25 + , 'median', t.value_p50 + , 'q75', t.value_p75 + , 'q98', t.value_p98 + , 'max', t.value_max + ) as summary + -------- + , calculate_coverage( + value_count::int + , 3600 + , 3600 + , EXTRACT(EPOCH FROM last_period - datetime) + )||jsonb_build_object( + 'datetime_from', get_datetime_object(first_datetime, t.timezone) + , 'datetime_to', get_datetime_object(last_datetime, t.timezone) + ) as coverage + {query.total()} + FROM meas t + JOIN measurands m ON (t.measurands_id = m.measurands_id) + {query.pagination()} + """ + elif q.period_name in ["hod","dow","moy"]: + + fmt = "" + if q.period_name == "hod": + fmt = "HH24" + dur = "01:00:00" + prd = "hour" + elif q.period_name == "dow": + fmt = "ID" + dur = "24:00:00" + prd = "day" + elif q.period_name == "mod": + fmt = "MM" + dur = "1 month" + prd = "month" + + + q.period_name = prd + sql = f""" +WITH trends AS ( +SELECT + sn.id + , s.measurands_id + , sn.timezone + , to_char(timezone(sn.timezone, datetime - '1sec'::interval), '{fmt}') as factor + , AVG(s.data_averaging_period_seconds) as avg_seconds + , AVG(s.data_logging_period_seconds) as log_seconds +, MAX(truncate_timestamp(datetime, :period_name, sn.timezone, '1{prd}'::interval)) as last_period +, MIN(timezone(sn.timezone, datetime - '1sec'::interval)) as first_datetime +, MAX(timezone(sn.timezone, datetime - '1sec'::interval)) as last_datetime + , COUNT(1) as value_count + , AVG(value_avg) as value_avg + , STDDEV(value_avg) as value_sd + , MIN(value_avg) as value_min + , MAX(value_avg) as value_max + , PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY value_avg) as value_p02 + , PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY value_avg) as value_p25 + , PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY value_avg) as value_p50 + , PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY value_avg) as value_p75 + , PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY value_avg) as value_p98 + , current_timestamp as calculated_on + FROM hourly_data m + JOIN sensors s ON (m.sensors_id = s.sensors_id) + JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) + JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id) + {query.where()} + GROUP BY 1, 2, 3, 4) + SELECT t.id + , json_build_object( + 'label', factor + , 'datetime_from', get_datetime_object(first_datetime, t.timezone) + , 'datetime_to', get_datetime_object(last_datetime, t.timezone) + , 'interval', '{dur}' + ) as period + , sig_digits(value_avg, 2) as value + , json_build_object( + 'id', t.measurands_id + , 'units', m.units + , 'name', m.measurand + ) as parameter + , json_build_object( + 'sd', t.value_sd + , 'min', t.value_min + , 'q02', t.value_p02 + , 'q25', t.value_p25 + , 'median', t.value_p50 + , 'q75', t.value_p75 + , 'q98', t.value_p98 + , 'max', t.value_max + ) as summary + , calculate_coverage( + t.value_count::int + , t.avg_seconds + , t.log_seconds + , expected_hours(first_datetime, last_datetime, '{prd}', factor) * 3600.0 +)||jsonb_build_object( + 'datetime_from', get_datetime_object(first_datetime, t.timezone) + , 'datetime_to', get_datetime_object(last_datetime, t.timezone) + ) as coverage + FROM trends t + JOIN measurands m ON (t.measurands_id = m.measurands_id) + {query.pagination()} + """ + + return await db.fetchPage(sql, query.params())