From 13b835923e52bff6a701805f20b69bf03615c0da Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Sun, 10 Mar 2024 10:43:38 -0700 Subject: [PATCH] Restrict access to purpleair until we can remove data --- openaq_api/openaq_api/routers/cities.py | 32 +- openaq_api/openaq_api/routers/countries.py | 6 +- openaq_api/openaq_api/routers/sources.py | 1 + .../openaq_api/v3/routers/manufacturers.py | 13 +- .../openaq_api/v3/routers/measurements.py | 1 + openaq_api/openaq_api/v3/routers/owners.py | 1 + openaq_api/openaq_api/v3/routers/sensors.py | 312 +++++++++--------- openaq_api/tests/test_purpleair.py | 49 +++ 8 files changed, 236 insertions(+), 179 deletions(-) create mode 100644 openaq_api/tests/test_purpleair.py diff --git a/openaq_api/openaq_api/routers/cities.py b/openaq_api/openaq_api/routers/cities.py index e0471e7..820f58e 100644 --- a/openaq_api/openaq_api/routers/cities.py +++ b/openaq_api/openaq_api/routers/cities.py @@ -105,31 +105,32 @@ async def cities_get( elif cities.order_by == "locations": order_by = "locations" q = f""" - SELECT + SELECT count(*) over () as citiescount, c.iso AS country , sn.city AS city , SUM(sr.value_count) AS "count" , COUNT(DISTINCT sn.sensor_nodes_id) AS locations - , MIN(sr.datetime_first)::TEXT AS first_updated - , MAX(sr.datetime_last)::TEXT AS last_updated - , array_agg(DISTINCT m.measurand) AS parameters + , MIN(sr.datetime_first)::TEXT AS first_updated + , MAX(sr.datetime_last)::TEXT AS last_updated + , array_agg(DISTINCT m.measurand) AS parameters , COUNT(1) OVER() as found - FROM + FROM sensors_rollup sr - JOIN + JOIN sensors s USING (sensors_id) JOIN sensor_systems ss USING (sensor_systems_id) JOIN sensor_nodes sn USING (sensor_nodes_id) - JOIN + JOIN countries c USING (countries_id) - JOIN - measurands m USING (measurands_id) + JOIN + measurands m USING (measurands_id) WHERE {cities.where()} - and city is not null + AND city is not null + AND s.is_public GROUP BY c.iso, sn.city ORDER BY {order_by} {cities.sort} OFFSET :offset @@ -161,7 +162,7 @@ async def cities_getv1( elif cities.order_by == "locations": order_by = "locations" q = f""" - SELECT + SELECT count(*) over () as citiescount , c.iso AS country , sn.city AS "name" @@ -169,19 +170,20 @@ async def cities_getv1( , SUM(sr.value_count) AS "count" , COUNT(DISTINCT sn.sensor_nodes_id) AS locations , COUNT(1) OVER() as found - FROM + FROM sensors_rollup sr - JOIN + JOIN sensors s USING (sensors_id) JOIN sensor_systems ss USING (sensor_systems_id) JOIN sensor_nodes sn USING (sensor_nodes_id) - JOIN + JOIN countries c USING (countries_id) WHERE {cities.where()} - and city is not null + AND city is not null + AND s.is_public GROUP BY c.iso, sn.city ORDER BY {order_by} {cities.sort} OFFSET :offset diff --git a/openaq_api/openaq_api/routers/countries.py b/openaq_api/openaq_api/routers/countries.py index 341be2f..fe9c1a4 100644 --- a/openaq_api/openaq_api/routers/countries.py +++ b/openaq_api/openaq_api/routers/countries.py @@ -148,6 +148,7 @@ async def countries_by_path( WHERE c.iso IS NOT NULL AND c.countries_id = :country_id + AND sn.is_public GROUP BY code, c.name, c.countries_id OFFSET :offset LIMIT :limit @@ -215,6 +216,7 @@ async def countries_get( WHERE {countries.where()} AND c.iso IS NOT NULL + AND sn.is_public GROUP BY code, c.name ORDER BY {order_by} {countries.sort} OFFSET :offset @@ -270,7 +272,9 @@ async def countries_getv1( WHERE {countries.where()} AND - c.iso IS NOT NULL + c.iso IS NOT NULL + AND + sn.is_public GROUP BY code, c.name ORDER BY {order_by} {countries.sort} OFFSET :offset diff --git a/openaq_api/openaq_api/routers/sources.py b/openaq_api/openaq_api/routers/sources.py index d069328..f1e543c 100644 --- a/openaq_api/openaq_api/routers/sources.py +++ b/openaq_api/openaq_api/routers/sources.py @@ -82,6 +82,7 @@ async def sources_get( --LEFT JOIN groups_view USING (groups_id, measurands_id) --WHERE rollup='total' AND groups_view.type='node' WHERE {sources.where()} + AND sensors.is_public GROUP BY 1,2,3,4,5 ORDER BY "{sources.order_by}" {sources.sort} diff --git a/openaq_api/openaq_api/v3/routers/manufacturers.py b/openaq_api/openaq_api/v3/routers/manufacturers.py index cc2affc..0f32c45 100644 --- a/openaq_api/openaq_api/v3/routers/manufacturers.py +++ b/openaq_api/openaq_api/v3/routers/manufacturers.py @@ -96,22 +96,21 @@ async def manufacturers_get( async def fetch_manufacturers(query, db): query_builder = QueryBuilder(query) - sql = f""" - SELECT + sql = f""" + SELECT e.entities_id AS id , e.full_name AS name , ARRAY_AGG(DISTINCT (jsonb_build_object('id', i.instruments_id, 'name', i.label))) AS instruments , COUNT(1) OVER() AS found - FROM + FROM sensor_nodes sn - JOIN + JOIN sensor_systems ss ON sn.sensor_nodes_id = ss.sensor_nodes_id - JOIN + JOIN instruments i ON i.instruments_id = ss.instruments_id - JOIN + JOIN entities e ON e.entities_id = i.manufacturer_entities_id {query_builder.where()} - GROUP BY id, name {query_builder.pagination()}; diff --git a/openaq_api/openaq_api/v3/routers/measurements.py b/openaq_api/openaq_api/v3/routers/measurements.py index 7f35e23..57886d2 100644 --- a/openaq_api/openaq_api/v3/routers/measurements.py +++ b/openaq_api/openaq_api/v3/routers/measurements.py @@ -152,6 +152,7 @@ async def fetch_measurements(q, db): JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id) JOIN timezones ts ON (sn.timezones_id = ts.gid) {query.where()} + AND sn.is_public AND s.is_public GROUP BY 1, 2, 3, 4) SELECT t.sensor_nodes_id , json_build_object( diff --git a/openaq_api/openaq_api/v3/routers/owners.py b/openaq_api/openaq_api/v3/routers/owners.py index 8454a69..1a6bf71 100644 --- a/openaq_api/openaq_api/v3/routers/owners.py +++ b/openaq_api/openaq_api/v3/routers/owners.py @@ -99,6 +99,7 @@ async def fetch_owners(query, db): FROM entities e JOIN sensor_nodes sn ON e.entities_id = sn.owner_entities_id {query_builder.where()} + AND sn.is_public GROUP BY e.entities_id, name ORDER BY e.entities_id {query_builder.pagination()}; diff --git a/openaq_api/openaq_api/v3/routers/sensors.py b/openaq_api/openaq_api/v3/routers/sensors.py index ee93f52..be51c6e 100644 --- a/openaq_api/openaq_api/v3/routers/sensors.py +++ b/openaq_api/openaq_api/v3/routers/sensors.py @@ -16,9 +16,9 @@ ) from openaq_api.v3.models.responses import ( - SensorsResponse, - MeasurementsResponse, - ) + SensorsResponse, + MeasurementsResponse, + ) logger = logging.getLogger("sensors") @@ -49,7 +49,7 @@ def where(self): class SensorMeasurementsQueries( Paging, - SensorQuery, + SensorQuery, DateFromQuery, DateToQuery, PeriodNameQuery, @@ -115,46 +115,46 @@ async def fetch_sensors(q, db): logger.debug(query.params()) sql = f""" - SELECT s.sensors_id as id - , m.measurand||' '||m.units as name - , json_build_object( - 'id', m.measurands_id + SELECT s.sensors_id as id + , m.measurand||' '||m.units as name + , json_build_object( + 'id', m.measurands_id , 'name', m.measurand , 'units', m.units , 'display_name', m.display ) as parameter - , s.sensors_id - , json_build_object( - 'min', r.value_min - , 'max', r.value_max - , 'avg', r.value_avg - , 'sd', r.value_sd - ) as summary - , calculate_coverage( - r.value_count - , s.data_averaging_period_seconds - , s.data_logging_period_seconds - , r.datetime_first - , r.datetime_last - ) as coverage - , get_datetime_object(r.datetime_first, t.tzid) as datetime_first - , get_datetime_object(r.datetime_last, t.tzid) as datetime_last - , json_build_object( - 'datetime', get_datetime_object(r.datetime_last, t.tzid) - , 'value', r.value_latest - , 'coordinates', json_build_object( - 'latitude', st_y(COALESCE(r.geom_latest, n.geom)) - ,'longitude', st_x(COALESCE(r.geom_latest, n.geom)) - )) as latest - FROM sensors s - JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) - JOIN sensor_nodes n ON (sy.sensor_nodes_id = n.sensor_nodes_id) + , s.sensors_id + , json_build_object( + 'min', r.value_min + , 'max', r.value_max + , 'avg', r.value_avg + , 'sd', r.value_sd + ) as summary + , calculate_coverage( + r.value_count + , s.data_averaging_period_seconds + , s.data_logging_period_seconds + , r.datetime_first + , r.datetime_last + ) as coverage + , get_datetime_object(r.datetime_first, t.tzid) as datetime_first + , get_datetime_object(r.datetime_last, t.tzid) as datetime_last + , json_build_object( + 'datetime', get_datetime_object(r.datetime_last, t.tzid) + , 'value', r.value_latest + , 'coordinates', json_build_object( + 'latitude', st_y(COALESCE(r.geom_latest, n.geom)) + ,'longitude', st_x(COALESCE(r.geom_latest, n.geom)) + )) as latest + FROM sensors s + JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) + JOIN sensor_nodes n ON (sy.sensor_nodes_id = n.sensor_nodes_id) JOIN timezones t ON (n.timezones_id = t.gid) - JOIN measurands m ON (s.measurands_id = m.measurands_id) - LEFT JOIN sensors_rollup r ON (s.sensors_id = r.sensors_id) - {query.where()} + JOIN measurands m ON (s.measurands_id = m.measurands_id) + LEFT JOIN sensors_rollup r ON (s.sensors_id = r.sensors_id) + {query.where()} AND n.is_public AND s.is_public {query.pagination()} - """ + """ return await db.fetchPage(sql, query.params()) @@ -211,58 +211,59 @@ async def fetch_measurements(q, db): """ elif q.period_name in ["raw"]: sql = f""" - WITH sensor AS ( - SELECT s.sensors_id - , sn.sensor_nodes_id + WITH sensor AS ( + SELECT s.sensors_id + , sn.sensor_nodes_id , s.data_averaging_period_seconds , s.data_logging_period_seconds , format('%ssec', s.data_averaging_period_seconds)::interval as averaging_interval , format('%ssec', s.data_logging_period_seconds)::interval as logging_interval - , tz.tzid as timezone - , m.measurands_id - , m.measurand - , m.unit - , as_timestamptz(:date_from, tz.tzid) as datetime_from - , as_timestamptz(:date_to, tz.tzid) as datetime_to + , tz.tzid as timezone + , m.measurands_id + , m.measurand + , m.unit + , as_timestamptz(:date_from, tz.tzid) as datetime_from + , as_timestamptz(:date_to, tz.tzid) as datetime_to FROM sensors s - , sensor_systems sy - , sensor_nodes sn - , timezones tz - , measurands m - WHERE s.sensor_systems_id = sy.sensor_systems_id - AND sy.sensor_nodes_id = sn.sensor_nodes_id - AND sn.timezones_id = tz.gid - AND s.sensors_id = :sensors_id - AND s.measurands_id = m.measurands_id) - SELECT m.sensors_id - , value - , get_datetime_object(m.datetime, s.timezone) - , json_build_object( - 'id', s.measurands_id - , 'units', s.units - , 'name', s.measurand - ) as parameter + , sensor_systems sy + , sensor_nodes sn + , timezones tz + , measurands m + WHERE s.sensor_systems_id = sy.sensor_systems_id + AND sy.sensor_nodes_id = sn.sensor_nodes_id + AND sn.timezones_id = tz.gid + AND s.sensors_id = :sensors_id + AND s.measurands_id = m.measurands_id + AND sn.is_public AND s.is_public) + SELECT m.sensors_id + , value + , get_datetime_object(m.datetime, s.timezone) + , json_build_object( + 'id', s.measurands_id + , 'units', s.units + , 'name', s.measurand + ) as parameter , json_build_object( - 'label', 'raw' - , 'interval', s.logging_interval - , 'datetime_from', get_datetime_object(m.datetime - s.logging_interval, s.timezone) - , 'datetime_to', get_datetime_object(m.datetime, s.timezone) - ) as period + 'label', 'raw' + , 'interval', s.logging_interval + , 'datetime_from', get_datetime_object(m.datetime - s.logging_interval, s.timezone) + , 'datetime_to', get_datetime_object(m.datetime, s.timezone) + ) as period , json_build_object( - 'expected_count', 1 - , 'observed_count', 1 - , 'expected_interval', s.logging_interval - , 'observed_interval', s.averaging_interval - , 'datetime_from', get_datetime_object(m.datetime - s.averaging_interval, s.timezone) - , 'datetime_to', get_datetime_object(m.datetime, s.timezone) - , 'percent_complete', 100 - , 'percent_coverage', (s.data_averaging_period_seconds/s.data_logging_period_seconds)*100 - ) as coverage + 'expected_count', 1 + , 'observed_count', 1 + , 'expected_interval', s.logging_interval + , 'observed_interval', s.averaging_interval + , 'datetime_from', get_datetime_object(m.datetime - s.averaging_interval, s.timezone) + , 'datetime_to', get_datetime_object(m.datetime, s.timezone) + , 'percent_complete', 100 + , 'percent_coverage', (s.data_averaging_period_seconds/s.data_logging_period_seconds)*100 + ) as coverage FROM measurements m JOIN sensor s USING (sensors_id) WHERE datetime > datetime_from - AND datetime <= datetime_to - AND s.sensors_id = :sensors_id + AND datetime <= datetime_to + AND s.sensors_id = :sensors_id ORDER BY datetime {query.pagination()} """ @@ -299,39 +300,37 @@ async def fetch_measurements(q, db): FROM hourly_data m JOIN sensors s ON (m.sensors_id = s.sensors_id) JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) - JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id) - -- JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id) - -- JOIN timezones ts ON (sn.timezones_id = ts.gid) + JOIN locations_view_cached sn ON (sy.sensor_nodes_id = sn.id) {query.where()} GROUP BY 1, 2, 3, 4) SELECT t.sensor_nodes_id - ---------- + ---------- , json_build_object( 'label', '1{q.period_name}' , 'datetime_from', get_datetime_object(datetime, t.timezone) , 'datetime_to', get_datetime_object(last_period, t.timezone) , 'interval', '{dur}' ) as period - ---------- + ---------- , sig_digits(value_avg, 2) as value - ----------- + ----------- , json_build_object( 'id', t.measurands_id , 'units', m.units , 'name', m.measurand ) as parameter - --------- + --------- , json_build_object( 'sd', t.value_sd - , 'min', t.value_min - , 'q02', t.value_p02 - , 'q25', t.value_p25 - , 'median', t.value_p50 - , 'q75', t.value_p75 - , 'q98', t.value_p98 - , 'max', t.value_max + , 'min', t.value_min + , 'q02', t.value_p02 + , 'q25', t.value_p25 + , 'median', t.value_p50 + , 'q75', t.value_p75 + , 'q98', t.value_p98 + , 'max', t.value_max ) as summary - -------- + -------- , calculate_coverage( value_count::int , 3600 @@ -365,52 +364,53 @@ async def fetch_measurements(q, db): sql = f""" - ----------------------------------- - -- start by getting some basic sensor information - -- and transforming the timestamps - ----------------------------------- - WITH sensor AS ( - SELECT s.sensors_id - , sn.sensor_nodes_id + ----------------------------------- + -- start by getting some basic sensor information + -- and transforming the timestamps + ----------------------------------- + WITH sensor AS ( + SELECT s.sensors_id + , sn.sensor_nodes_id + , s.data_averaging_period_seconds + , s.data_logging_period_seconds + , tz.tzid as timezone + , m.measurands_id + , m.measurand + , m.units + , as_timestamptz(:date_from, tz.tzid) as datetime_from + , as_timestamptz(:date_to, tz.tzid) as datetime_to + FROM sensors s + , sensor_systems sy + , sensor_nodes sn + , timezones tz + , measurands m + WHERE s.sensor_systems_id = sy.sensor_systems_id + AND sy.sensor_nodes_id = sn.sensor_nodes_id + AND sn.timezones_id = tz.gid + AND s.sensors_id = :sensors_id + AND s.measurands_id = m.measurands_id + AND sn.is_public AND s.is_public + -------------------------------- + -- Then we calculate what we expect to find in the data + -------------------------------- + ), expected AS ( + SELECT to_char(timezone(s.timezone, dd - '1sec'::interval), {period_format}) as factor + , s.timezone + , COUNT(1) as n + , MIN(date_trunc(:period_name, dd + {period_first_offset}::interval)) as period_first + , MAX(date_trunc(:period_name, dd + {period_last_offset}::interval)) as period_last + FROM sensor s + , generate_series(s.datetime_from + '1hour'::interval, s.datetime_to, ('1hour')::interval) dd + GROUP BY 1,2 + ------------------------------------ + -- Then we query what we have in the db + -- we join the sensor CTE here so that we have access to the timezone + ------------------------------------ + ), observed AS ( + SELECT + s.sensors_id , s.data_averaging_period_seconds , s.data_logging_period_seconds - , tz.tzid as timezone - , m.measurands_id - , m.measurand - , m.units - , as_timestamptz(:date_from, tz.tzid) as datetime_from - , as_timestamptz(:date_to, tz.tzid) as datetime_to - FROM sensors s - , sensor_systems sy - , sensor_nodes sn - , timezones tz - , measurands m - WHERE s.sensor_systems_id = sy.sensor_systems_id - AND sy.sensor_nodes_id = sn.sensor_nodes_id - AND sn.timezones_id = tz.gid - AND s.sensors_id = :sensors_id - AND s.measurands_id = m.measurands_id - -------------------------------- - -- Then we calculate what we expect to find in the data - -------------------------------- - ), expected AS ( - SELECT to_char(timezone(s.timezone, dd - '1sec'::interval), {period_format}) as factor - , s.timezone - , COUNT(1) as n - , MIN(date_trunc(:period_name, dd + {period_first_offset}::interval)) as period_first - , MAX(date_trunc(:period_name, dd + {period_last_offset}::interval)) as period_last - FROM sensor s - , generate_series(s.datetime_from + '1hour'::interval, s.datetime_to, ('1hour')::interval) dd - GROUP BY 1,2 - ------------------------------------ - -- Then we query what we have in the db - -- we join the sensor CTE here so that we have access to the timezone - ------------------------------------ - ), observed AS ( - SELECT - s.sensors_id - , s.data_averaging_period_seconds - , s.data_logging_period_seconds , s.timezone , s.measurands_id , s.measurand @@ -438,7 +438,7 @@ async def fetch_measurements(q, db): ----------------------------------------- -- And finally we tie it all together ----------------------------------------- - SELECT o.sensors_id + SELECT o.sensors_id , sig_digits(value_avg, 2) as value , json_build_object( 'id', o.measurands_id @@ -455,23 +455,23 @@ async def fetch_measurements(q, db): , 'q98', o.value_p98 , 'max', o.value_max ) as summary - , json_build_object( - 'label', e.factor - , 'datetime_from', get_datetime_object(e.period_first, o.timezone) - , 'datetime_to', get_datetime_object(e.period_last, o.timezone) - , 'interval', :period_name - ) as period - , calculate_coverage( - o.n::int - , o.data_averaging_period_seconds + , json_build_object( + 'label', e.factor + , 'datetime_from', get_datetime_object(e.period_first, o.timezone) + , 'datetime_to', get_datetime_object(e.period_last, o.timezone) + , 'interval', :period_name + ) as period + , calculate_coverage( + o.n::int + , o.data_averaging_period_seconds , o.data_logging_period_seconds - , e.n * 3600.0)|| - jsonb_build_object( - 'datetime_from', get_datetime_object(o.coverage_first, o.timezone) - , 'datetime_to', get_datetime_object(o.coverage_last, o.timezone) - ) as coverage - FROM expected e - JOIN observed o ON (e.factor = o.factor) + , e.n * 3600.0)|| + jsonb_build_object( + 'datetime_from', get_datetime_object(o.coverage_first, o.timezone) + , 'datetime_to', get_datetime_object(o.coverage_last, o.timezone) + ) as coverage + FROM expected e + JOIN observed o ON (e.factor = o.factor) {query.pagination()} """ diff --git a/openaq_api/tests/test_purpleair.py b/openaq_api/tests/test_purpleair.py new file mode 100644 index 0000000..e39cdc2 --- /dev/null +++ b/openaq_api/tests/test_purpleair.py @@ -0,0 +1,49 @@ +from fastapi.testclient import TestClient +import json +import time +import os +import pytest +from openaq_api.main import app +from openaq_api.db import db_pool + + +@pytest.fixture +def client(): + with TestClient(app) as c: + yield c + +# purple air sensor and node +sensor = 393731 +node = 62376 + +urls = [ + ## v2 + {"path": "/v3/instruments/3","status": 200}, + {"path": "/v2/averages?locations_id=:node","status": 404}, + {"path": "/v2/locations/:node","status": 404}, + {"path": "/v2/latest/:node","status": 404}, + {"path": "/v2/measurements?location_id=:node","status": 404}, + ## v3 + {"path": "/v3/latest?location_id=:node","status": 404}, + {"path": "/v3/locations/:node","status": 404}, # after + {"path": "/v3/locations/:node/measurements","status": 404}, # after + {"path": "/v3/sensors/:sensor/measurements","status": 404}, # after + {"path": "/v3/sensors/:sensor","status": 404}, # after + # all of the following have an added where clause + # and we just want to make sure the sql works + {"path": "/v2/cities?limit=1","status": 200}, + {"path": "/v2/countries?limit=1","status": 200}, + {"path": "/v2/sources?limit=1","status": 200}, + {"path": "/v3/manufacturers?limit=1","status": 200}, + {"path": "/v3/locations?limit=1","status": 200}, +] + + +@pytest.mark.parametrize("url", urls) +class TestUrls: + def test_urls(self, client, url): + path = url.get('path') + path = path.replace(':sensor', str(sensor)) + path = path.replace(':node', str(node)) + response = client.get(path) + assert response.status_code == url.get('status')