diff --git a/alembic/versions/1fd83d22bd1e_create_alert_table.py b/alembic/versions/1fd83d22bd1e_create_alert_table.py index 7d41c747..f90740c6 100644 --- a/alembic/versions/1fd83d22bd1e_create_alert_table.py +++ b/alembic/versions/1fd83d22bd1e_create_alert_table.py @@ -8,12 +8,12 @@ import sqlalchemy as sa from sqlalchemy import Inspector - +from sqlalchemy.dialects.postgresql import ARRAY from alembic import op # revision identifiers, used by Alembic. revision = "1fd83d22bd1e" -down_revision = "e52b9542531c" +down_revision = "68c9f220a07f" branch_labels = None depends_on = None @@ -28,9 +28,10 @@ def upgrade() -> None: primary_key=True, index=True, ), - sa.Column("timestamp", sa.DateTime), + sa.Column("timestamp", sa.DateTime, index=True, nullable=False), sa.Column("vessel_id", sa.Integer, index=True, nullable=False), - sa.Column("mpa_id", sa.Integer, index=True, nullable=False), + sa.Column("cross_mpa", sa.Integer, nullable=False), + sa.Column("mpa_ids", ARRAY(sa.BigInteger), nullable=False), keep_existing=False, ) @@ -42,13 +43,6 @@ def upgrade() -> None: ["id"], ) - -# op.create_foreign_key( -# "fk_alert_mpa", -# "alert", -# "mpa", - - def downgrade() -> None: conn = op.get_bind() inspector = Inspector.from_engine(conn) diff --git a/alembic/versions/68c9f220a07f_add_columns_in_spire_table.py b/alembic/versions/68c9f220a07f_add_columns_in_spire_table.py index 8eed6ab7..3af1b46d 100644 --- a/alembic/versions/68c9f220a07f_add_columns_in_spire_table.py +++ b/alembic/versions/68c9f220a07f_add_columns_in_spire_table.py @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = "68c9f220a07f" -down_revision = "1fd83d22bd1e" +down_revision = "e52b9542531c" branch_labels = None depends_on = None diff --git a/alembic/versions/e52b9542531c_create_marine_traffic_positions_and_.py b/alembic/versions/e52b9542531c_create_marine_traffic_positions_and_.py index 14497a44..11e7085f 100644 --- a/alembic/versions/e52b9542531c_create_marine_traffic_positions_and_.py +++ b/alembic/versions/e52b9542531c_create_marine_traffic_positions_and_.py @@ -31,11 +31,11 @@ def upgrade() -> None: index=True, default=uuid.uuid4, ), - sa.Column("timestamp", sa.DateTime), + sa.Column("timestamp", sa.DateTime, index=True), sa.Column("ship_name", sa.String), sa.Column("IMO", sa.String), sa.Column("vessel_id", sa.Integer, index=True, nullable=False), - sa.Column("mmsi", sa.Integer, index=True), + sa.Column("mmsi", sa.Integer), sa.Column("last_position_time", sa.DateTime), sa.Column("fishing", sa.Boolean), sa.Column("at_port", sa.Boolean), @@ -56,11 +56,11 @@ def upgrade() -> None: index=True, default=uuid.uuid4, ), - sa.Column("timestamp", sa.DateTime), + sa.Column("timestamp", sa.DateTime, index=True), sa.Column("ship_name", sa.String), sa.Column("IMO", sa.String), sa.Column("vessel_id", sa.Integer, index=True, nullable=False), - sa.Column("mmsi", sa.Integer, index=True), + sa.Column("mmsi", sa.Integer), sa.Column("last_position_time", sa.DateTime), sa.Column("fishing", sa.Boolean), sa.Column("at_port", sa.Boolean), diff --git a/app.py b/app.py index 7d68e0c6..cf99c483 100644 --- a/app.py +++ b/app.py @@ -27,7 +27,7 @@ def main() -> None: ) args = parser.parse_args() use_cases = UseCases() - marine_traffic_usecase = use_cases.scrap_marine_data_usecase() + #marine_traffic_usecase = use_cases.scrap_marine_data_usecase() spire_traffic_usecase = use_cases.get_spire_data_usecase() alert_usecase = use_cases.generate_alert_usecase() timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") @@ -41,7 +41,7 @@ def main() -> None: spire_traffic_usecase.save_vessels( spire_traffic_usecase.get_all_vessels(timestamp), ) - marine_traffic_usecase.scrap_vessels(timestamp) + #marine_traffic_usecase.scrap_vessels(timestamp) alert_usecase.generate_alerts(timestamp) while True: scheduler.start() @@ -50,7 +50,7 @@ def main() -> None: spire_traffic_usecase.save_vessels( spire_traffic_usecase.get_all_vessels(timestamp), ) - marine_traffic_usecase.scrap_vessels(timestamp) + #marine_traffic_usecase.scrap_vessels(timestamp) alert_usecase.generate_alerts(timestamp) diff --git a/bloom/infra/repositories/repository_alert.py b/bloom/infra/repositories/repository_alert.py index 4e328807..24aaac47 100644 --- a/bloom/infra/repositories/repository_alert.py +++ b/bloom/infra/repositories/repository_alert.py @@ -19,18 +19,40 @@ def save_alerts(self, timestamp: datetime) -> None: with self.session_factory() as session: sql = text( f""" - INSERT INTO alert(timestamp,vessel_id,mpa_id) - (SELECT spire_vessel_positions.timestamp, - spire_vessel_positions.vessel_id,mpa.index - FROM spire_vessel_positions - JOIN mpa - ON ST_Contains(mpa.geometry, spire_vessel_positions.position) - WHERE spire_vessel_positions.timestamp = '{timestamp}'); + INSERT INTO alert(timestamp,vessel_id,cross_mpa,mpa_ids) + ( + SELECT timestamp, vessel_id, (CAST(ST_Contains(mpa_fr_with_mn.geometry,current_position) AS INT) - CAST(ST_Contains(mpa_fr_with_mn.geometry,previous_position) AS INT)) as cross_mpa, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids FROM + (SELECT spire_vessel_positions.vessel_id AS vessel_id, + spire_vessel_positions.position AS current_position, + spire_vessel_positions.timestamp AS timestamp, + LAG(spire_vessel_positions.position) OVER (PARTITION BY spire_vessel_positions.vessel_id ORDER BY spire_vessel_positions.timestamp) AS previous_position + FROM spire_vessel_positions WHERE spire_vessel_positions.timestamp >= TIMESTAMP '{timestamp}' - INTERVAL '15 minutes' AND spire_vessel_positions.timestamp < TIMESTAMP '{timestamp}' + INTERVAL '15 minutes' ) AS foo + CROSS JOIN mpa_fr_with_mn WHERE previous_position IS NOT NULL and ST_Contains(mpa_fr_with_mn.geometry,current_position) != ST_Contains(mpa_fr_with_mn.geometry,previous_position) GROUP BY vessel_id, timestamp,cross_mpa + ); """, # nosec: B608 ) session.execute(sql) session.commit() return + + # an other query with the same result : + # WITH cte_query1 AS ( + # SELECT spire_vessel_positions.vessel_id AS vessel_id, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids + # FROM spire_vessel_positions + # JOIN mpa_fr_with_mn ON ST_Contains(mpa_fr_with_mn.geometry, spire_vessel_positions.position) + # WHERE spire_vessel_positions.timestamp = TO_TIMESTAMP('2023-11-17 12:00', 'YYYY-MM-DD HH24:MI') + # GROUP BY vessel_id + # ), + # cte_query2 AS ( + # SELECT DISTINCT spire_vessel_positions.vessel_id AS vessel_id, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids + # FROM spire_vessel_positions + # JOIN mpa_fr_with_mn ON ST_Contains(mpa_fr_with_mn.geometry, spire_vessel_positions.position) + # WHERE spire_vessel_positions.timestamp = TO_TIMESTAMP('2023-11-17 12:15', 'YYYY-MM-DD HH24:MI') + # GROUP BY vessel_id + # ) + # SELECT vessel_id, mpa_ids, -1 AS value FROM cte_query1 EXCEPT SELECT vessel_id, mpa_ids, -1 AS value FROM cte_query2 + # UNION ALL + # SELECT vessel_id, mpa_ids, 1 AS value FROM cte_query2 EXCEPT SELECT vessel_id, mpa_ids, 1 AS value FROM cte_query1 def load_alert(self, timestamp: datetime) -> list[Alert]: with self.session_factory() as session: @@ -40,8 +62,8 @@ def load_alert(self, timestamp: datetime) -> list[Alert]: sql = text( f""" SELECT timestamp, ship_name, mmsi, lp_time, position, - mpa."NAME", mpa."IUCN_CAT" - FROM (SELECT a.mpa_id as mpa_id, a.timestamp as timestamp, + mpa_fr_with_mn.name, mpa_fr_with_mn."IUCN_CAT" + FROM (SELECT a.mpa_ids as mpa_ids, a.timestamp as timestamp, spire.ship_name as ship_name, spire.mmsi as mmsi, spire.last_position_time as lp_time, ST_AsText(spire.position) as position @@ -49,8 +71,8 @@ def load_alert(self, timestamp: datetime) -> list[Alert]: JOIN (SELECT * FROM spire_vessel_positions WHERE spire_vessel_positions.timestamp = '{timestamp}') as spire ON a.vessel_id = spire.vessel_id - WHERE a.timestamp = '{timestamp}') as habile - JOIN mpa ON mpa_id = mpa.index + WHERE a.timestamp = '{timestamp}' and cross_mpa = 1) as habile + JOIN mpa_fr_with_mn ON mpa_ids[1] = mpa_fr_with_mn.index """, # nosec: B608 ) e = session.execute(sql)