Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New data model migration #164

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
black --version
black --check --diff .

bandit:
bandit:
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ venv/
ENV/
env.bak/
venv.bak/

venv*
# Spyder project settings
.spyderproject
.spyproject
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ style:
build:
poetry export -f requirements.txt --without-hashes --output requirements.txt
docker build . -t pyronear/pyro-platform:latest

# Run the docker for production
run:
poetry export -f requirements.txt --without-hashes --output requirements.txt
Expand All @@ -25,6 +25,9 @@ run_dev:
poetry export -f requirements.txt --without-hashes --output requirements.txt
docker compose -f docker-compose-dev.yml up -d --build

run_local:
python app/index.py --host 0.0.0.0 --port 8050

# Run the docker
stop:
docker compose down
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ make stop
```

If you need to launch the pyro-api in your development environment you can use the pyro-devops project.
You can use it in two different ways :
You can use it in two different ways :
=> by building the pyro-platform image and launch the full development environment with the command :
```shell
make run
Expand Down
4 changes: 2 additions & 2 deletions app/assets/css/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ a.no-underline {
/* Common style for containers and panels */
.common-style {
border: 2px solid #044448;
border-radius: 10px;
border-radius: 10px;
background-color: rgba(4, 68, 72, 0.1);
}

.common-style-slider {
border: 2px solid #044448;
border-radius: 10px;
border-radius: 10px;
background-color: rgba(4, 68, 72, 0.1);
margin-top: 10px;
}
Expand Down
197 changes: 131 additions & 66 deletions app/callbacks/data_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

import json
from datetime import datetime, timedelta

import dash
import logging_config
Expand All @@ -15,48 +16,26 @@
from pyroclient import Client

import config as cfg
from services import api_client, call_api
from utils.data import (
convert_time,
past_ndays_api_events,
process_bbox,
read_stored_DataFrame,
)
from services import instantiate_token
from utils.data import process_bbox

logger = logging_config.configure_logging(cfg.DEBUG, cfg.SENTRY_DSN)


@app.callback(
[
Output("user_credentials", "data"),
Output("user_headers", "data"),
Output("client_token", "data"),
Output("form_feedback_area", "children"),
],
Input("send_form_button", "n_clicks"),
[
State("username_input", "value"),
State("password_input", "value"),
State("user_headers", "data"),
State("client_token", "data"),
],
)
def login_callback(n_clicks, username, password, user_headers):
"""
Callback to handle user login.

Parameters:
n_clicks (int): Number of times the login button has been clicked.
username (str or None): The value entered in the username input field.
password (str or None): The value entered in the password input field.
user_headers (dict or None): Existing user headers, if any, containing authentication details.

This function is triggered when the login button is clicked. It verifies the provided username and password,
attempts to authenticate the user via the API, and updates the user credentials and headers.
If authentication fails or credentials are missing, it provides appropriate feedback.

Returns:
dash.dependencies.Output: Updated user credentials and headers, and form feedback.
"""
if user_headers is not None:
def login_callback(n_clicks, username, password, client_token):
if client_token is not None:
return dash.no_update, dash.no_update, dash.no_update

if n_clicks:
Expand All @@ -74,79 +53,165 @@ def login_callback(n_clicks, username, password, user_headers):
else:
# This is the route of the API that we are going to use for the credential check
try:
client = Client(cfg.API_URL, username, password)
client = instantiate_token(username, password)

return (
{"username": username, "password": password},
client.headers,
client.token,
dash.no_update,
)
except Exception:
# This if statement is verified if credentials are invalid
form_feedback.append(html.P("Nom d'utilisateur et/ou mot de passe erroné."))

return dash.no_update, dash.no_update, form_feedback
return dash.no_update, form_feedback

raise PreventUpdate


@app.callback(
[
Output("store_api_alerts_data", "data"),
Output("store_wildfires_data", "data"),
Output("store_detections_data", "data"),
Output("media_url", "data"),
Output("trigger_no_wildfires", "data"),
Output("previous_time_event", "data"),
],
[Input("main_api_fetch_interval", "n_intervals"), Input("user_credentials", "data")],
[Input("main_api_fetch_interval", "n_intervals")],
[
State("store_api_alerts_data", "data"),
State("user_headers", "data"),
State("client_token", "data"),
State("media_url", "data"),
State("store_wildfires_data", "data"),
State("previous_time_event", "data"),
],
prevent_initial_call=True,
)
def api_watcher(n_intervals, user_credentials, local_alerts, user_headers):
def data_transform(n_intervals, client_token, media_url, store_wildfires_data, previous_time_event):
"""
Callback to periodically fetch alerts data from the API.
Fetches and processes live wildfire and detection data from the API at regular intervals.

This callback periodically checks for new wildfire and detection data from the API.
It processes the new data, updates local storage with the latest information,
and prepares it for displaying in the application.

Parameters:
n_intervals (int): Number of times the interval has been triggered.
user_credentials (dict or None): Current user credentials for API authentication.
local_alerts (dict or None): Locally stored alerts data, serialized as JSON.
user_headers (dict or None): Current user headers containing authentication details.
- n_intervals (int): Number of intervals passed since the start of the app,
used to trigger the periodic update.
- client_token (str): Client token for API calls

This function is triggered at specified intervals and when user credentials are updated.
It retrieves unacknowledged events from the API, processes the data, and stores it locally.
If the local data matches the API data, no updates are made.

Returns:
dash.dependencies.Output: Serialized JSON data of alerts and a flag indicating if data is loaded.
- json: Updated wildfires data in JSON format.
- json: Updated detections data in JSON format.
"""
if user_headers is None:
if client_token is None:
raise PreventUpdate
user_token = user_headers["Authorization"].split(" ")[1]
api_client.token = user_token

# Read local data
local_alerts, alerts_data_loaded = read_stored_DataFrame(local_alerts)
logger.info("Start Fetching the events")
# Fetch Detections
# Use the last event time or default to yesterday
if previous_time_event is None:
previous_time_event = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d_%H:%M:%S")
else:
previous_time_event = pd.to_datetime(previous_time_event).strftime("%Y-%m-%d_%H:%M:%S")

# Fetch events
api_alerts = pd.DataFrame(call_api(api_client.get_unacknowledged_events, user_credentials)())
api_alerts["created_at"] = convert_time(api_alerts)
api_alerts = past_ndays_api_events(api_alerts, n_days=0)

if len(api_alerts) == 0:
api_client = Client(client_token, cfg.API_URL)
response = api_client.fetch_unlabeled_detections(from_date=previous_time_event)
api_detections = pd.DataFrame(response.json())
previous_time_event = api_detections["created_at"].max()
if api_detections.empty:
return [
json.dumps(
{
"data": store_wildfires_data,
"data_loaded": False,
}
),
json.dumps(
{
"data": pd.DataFrame().to_json(orient="split"),
"data_loaded": True,
"data_loaded": False,
}
)
),
[],
True,
previous_time_event,
]

# Find ongoing detections for the wildfires started within 30 minutes;
# after that, any new detection is part of a new wildfire
api_detections["created_at"] = pd.to_datetime(api_detections["created_at"])

# Trier les détections par "created_at"
api_detections = api_detections.sort_values(by="created_at")

# Initialiser la liste pour les wildfires
cameras = pd.DataFrame(api_client.fetch_cameras().json())
api_detections["lat"] = None
api_detections["lon"] = None
api_detections["wildfire_id"] = None
api_detections["processed_loc"] = None
api_detections["processed_loc"] = api_detections["bboxes"].apply(process_bbox)

wildfires_dict = json.loads(store_wildfires_data)["data"]
# Load existing wildfires data
if wildfires_dict != {}:
id_counter = (
max(wildfire["id"] for camera_wildfires in wildfires_dict.values() for wildfire in camera_wildfires) + 1
)
else:
api_alerts["processed_loc"] = api_alerts["localization"].apply(process_bbox)
if alerts_data_loaded and not local_alerts.empty:
aligned_api_alerts, aligned_local_alerts = api_alerts["alert_id"].align(local_alerts["alert_id"])
if all(aligned_api_alerts == aligned_local_alerts):
return [dash.no_update]

return [json.dumps({"data": api_alerts.to_json(orient="split"), "data_loaded": True})]
wildfires_dict = {}
id_counter = 1

last_detection_time_per_camera: dict[int, str] = {}
media_dict = api_detections.set_index("id")["url"].to_dict()

# Parcourir les détections pour les regrouper en wildfires
for i, detection in api_detections.iterrows():
camera_id = api_detections.at[i, "camera_id"]
camera = cameras.loc[cameras["id"] == camera_id]
camera = camera.iloc[0] # Ensure camera is a Series
api_detections.at[i, "lat"] = camera["lat"]
api_detections.at[i, "lon"] = camera["lon"]

media_url[detection["id"]] = media_dict[detection["id"]]

if camera_id not in wildfires_dict:
wildfires_dict.setdefault(camera_id, [])
last_detection_time_per_camera.setdefault(camera_id, "")
# Initialize the first wildfire for this camera
wildfire = {
"id": id_counter,
"camera_name": camera["name"],
"created_at": detection["created_at"].strftime("%Y-%m-%d %H:%M:%S"),
"detection_ids": [detection["id"]],
}
wildfires_dict[camera_id] = [wildfire]
id_counter += 1
else:
time_diff = detection["created_at"] - last_detection_time_per_camera[camera_id]

if time_diff <= pd.Timedelta(minutes=30):
# Si la différence de temps est inférieure à 30 minutes, ajouter à l'actuel wildfire
wildfires_dict[camera_id][-1]["detection_ids"].append(detection["id"])
else:
# Initialize a new wildfire for this camera
wildfire = {
"id": id_counter,
"camera_name": camera["name"],
"created_at": detection["created_at"].strftime("%Y-%m-%d %H:%M:%S"),
"detection_ids": [detection["id"]],
}
wildfires_dict[camera_id].append(wildfire)
id_counter += 1
api_detections.at[i, "wildfire_id"] = wildfires_dict[camera_id][-1]["id"]
last_detection_time_per_camera[camera_id] = detection["created_at"]

wildfires_dict = {int(k): v for k, v in wildfires_dict.items()}
# Convertir la liste des wildfires en DataFrame
return [
json.dumps({"data": wildfires_dict, "data_loaded": True}),
json.dumps({"data": api_detections.to_json(orient="split"), "data_loaded": True}),
media_url,
dash.no_update,
previous_time_event,
]
Loading
Loading