Skip to content

Commit

Permalink
fix: reslove merge conflict with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Pradip-p committed Oct 29, 2024
2 parents 6270603 + ba94feb commit 496c0b0
Show file tree
Hide file tree
Showing 35 changed files with 559 additions and 280 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ repos:

# Versioning: Commit messages & changelog
- repo: https://github.com/commitizen-tools/commitizen
rev: v3.29.1
rev: v3.30.0
hooks:
- id: commitizen
stages: [commit-msg]

# Lint / autoformat: Python code
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: "v0.7.0"
rev: "v0.7.1"
hooks:
# Run the linter
- id: ruff
Expand Down
2 changes: 2 additions & 0 deletions src/backend/app/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class EventType(str, Enum):
- ``assign`` -- For a requester user to assign a task to another user. Set the state *locked for mapping* passing in the required user id.
- ``comment`` -- Keep the state the same, but simply add a comment.
- ``unlock`` -- Unlock a task state by unlocking it if it's locked.
- ``image_upload`` -- Set the state to *image uploaded* when the task image is uploaded.
Note that ``task_id`` must be specified in the endpoint too.
"""
Expand All @@ -183,3 +184,4 @@ class EventType(str, Enum):
ASSIGN = "assign"
COMMENT = "comment"
UNLOCK = "unlock"
IMAGE_UPLOAD = "image_upload"
25 changes: 14 additions & 11 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.tasks import task_logic
from app.models.enums import State
from app.utils import timestamp
from app.db import database
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from loguru import logger as log
Expand Down Expand Up @@ -191,7 +192,6 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non


async def download_and_upload_assets_from_odm_to_s3(
db: Connection,
node_odm_url: str,
task_id: str,
dtm_project_id: uuid.UUID,
Expand Down Expand Up @@ -259,16 +259,19 @@ async def download_and_upload_assets_from_odm_to_s3(
)

# Update background task status to COMPLETED
await task_logic.update_task_state(
db,
dtm_project_id,
dtm_task_id,
user_id,
comment,
current_state,
State.IMAGE_PROCESSED,
timestamp(),
)
pool = await database.get_db_connection_pool()

async with pool.connection() as conn:
await task_logic.update_task_state(
db=conn,
project_id=dtm_project_id,
task_id=dtm_task_id,
user_id=user_id,
comment=comment,
initial_state=current_state,
final_state=State.IMAGE_PROCESSED,
updated_at=timestamp(),
)

except Exception as e:
log.error(f"Error downloading or uploading assets for task {task_id}: {e}")
Expand Down
124 changes: 89 additions & 35 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from app.users import user_schemas
from rio_tiler.io import Reader
from rio_tiler.errors import TileOutsideBounds
from minio.deleteobjects import DeleteObject


router = APIRouter(
Expand Down Expand Up @@ -289,8 +290,9 @@ async def preview_split_by_square(

@router.post("/generate-presigned-url/", tags=["Image Upload"])
async def generate_presigned_url(
data: project_schemas.PresignedUrlRequest,
user: Annotated[AuthUser, Depends(login_required)],
data: project_schemas.PresignedUrlRequest,
replace_existing: bool = False,
):
"""
Generate a pre-signed URL for uploading an image to S3 Bucket.
Expand All @@ -299,21 +301,54 @@ async def generate_presigned_url(
an S3 bucket. The URL expires after a specified duration.
Args:
image_name: The name of the image you want to upload
expiry : Expiry time in hours
image_name: The name of the image(s) you want to upload.
expiry : Expiry time in hours.
replace_existing: A boolean flag to indicate if the image should be replaced.
Returns:
str: The pre-signed URL to upload the image
list: A list of dictionaries with the image name and the pre-signed URL to upload.
"""
try:
# Generate a pre-signed URL for an object
# Initialize the S3 client
client = s3_client()
urls = []

# Process each image in the request
for image in data.image_name:
image_path = f"projects/{data.project_id}/{data.task_id}/images/{image}"

# If replace_existing is True, delete the image first
if replace_existing:
image_dir = f"projects/{data.project_id}/{data.task_id}/images/"
try:
# Prepare the list of objects to delete (recursively if necessary)
delete_object_list = map(
lambda x: DeleteObject(x.object_name),
client.list_objects(
settings.S3_BUCKET_NAME, image_dir, recursive=True
),
)

# Remove the objects (images)
errors = client.remove_objects(
settings.S3_BUCKET_NAME, delete_object_list
)

# Handle deletion errors, if any
for error in errors:
log.error("Error occurred when deleting object", error)
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Failed to delete existing image: {error}",
)

except Exception as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Failed to delete existing image. {e}",
)

# Generate a new pre-signed URL for the image upload
url = client.get_presigned_url(
"PUT",
settings.S3_BUCKET_NAME,
Expand All @@ -323,6 +358,7 @@ async def generate_presigned_url(
urls.append({"image_name": image, "url": url})

return urls

except Exception as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
Expand Down Expand Up @@ -397,17 +433,6 @@ async def process_imagery(
db: Annotated[Connection, Depends(database.get_db)],
):
user_id = user_data.id
# TODO: Update task state to reflect completion of image uploads.
await task_logic.update_task_state(
db,
project.id,
task_id,
user_id,
"Task images upload completed.",
State.LOCKED_FOR_MAPPING,
State.IMAGE_UPLOADED,
timestamp(),
)
background_tasks.add_task(
project_logic.process_drone_images, project.id, task_id, user_id, db
)
Expand Down Expand Up @@ -445,7 +470,10 @@ async def get_assets_info(

return results
else:
return project_logic.get_project_info_from_s3(project.id, task_id)
current_state = await task_logic.get_task_state(db, project.id, task_id)
project_info = project_logic.get_project_info_from_s3(project.id, task_id)
project_info.state = current_state.get("state")
return project_info


@router.post(
Expand All @@ -472,6 +500,7 @@ async def odm_webhook(

task_id = payload.get("uuid")
status = payload.get("status")

if not task_id or not status:
raise HTTPException(status_code=400, detail="Invalid webhook payload")

Expand All @@ -482,22 +511,48 @@ async def odm_webhook(
if status["code"] == 40:
log.info(f"Task ID: {task_id}, Status: going for download......")

# Call function to download assets from ODM and upload to S3
background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
db,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
dtm_task_id,
dtm_user_id,
State.IMAGE_UPLOADED,
"Task completed.",
)
current_state = await task_logic.get_task_state(db, dtm_project_id, dtm_task_id)
current_state_value = State[current_state.get("state")]
match current_state_value:
case State.IMAGE_UPLOADED:
log.info(
f"Task ID: {task_id}, Status: already IMAGE_UPLOADED - no update needed."
)
# Call function to download assets from ODM and upload to S3
background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
dtm_task_id,
dtm_user_id,
State.IMAGE_UPLOADED,
"Task completed.",
)

case State.IMAGE_PROCESSING_FAILED:
log.warning(
f"Task ID: {task_id}, Status: previously failed, updating to IMAGE_UPLOADED"
)
# Call function to download assets from ODM and upload to S3
background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
dtm_task_id,
dtm_user_id,
State.IMAGE_UPLOADED,
"Task completed.",
)

case _:
log.info(
f"Task ID: {task_id}, Status: updating to IMAGE_UPLOADED from {current_state}"
)

elif status["code"] == 30:
current_state = await task_logic.get_current_state(
db, dtm_project_id, dtm_task_id
)
current_state = await task_logic.get_task_state(db, dtm_project_id, dtm_task_id)
# If the current state is not already IMAGE_PROCESSING_FAILED, update it
if current_state != State.IMAGE_PROCESSING_FAILED:
await task_logic.update_task_state(
Expand All @@ -513,7 +568,6 @@ async def odm_webhook(

background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
db,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
Expand Down
19 changes: 7 additions & 12 deletions src/backend/app/projects/project_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from psycopg import Connection
from psycopg.rows import class_row
from slugify import slugify
from app.models.enums import FinalOutput, ProjectVisibility
from app.models.enums import FinalOutput, ProjectVisibility, UserRole
from app.models.enums import (
IntEnum,
ProjectStatus,
Expand Down Expand Up @@ -58,6 +58,7 @@ class AssetsInfo(BaseModel):
task_id: str
image_count: int
assets_url: Optional[str]
state: Optional[UserRole] = None


def validate_geojson(
Expand Down Expand Up @@ -197,6 +198,7 @@ class DbProject(BaseModel):
altitude_from_ground: Optional[float] = None
is_terrain_follow: bool = False
image_url: Optional[str] = None
created_at: datetime

async def one(db: Connection, project_id: uuid.UUID):
"""Get a single project & all associated tasks by ID."""
Expand Down Expand Up @@ -258,15 +260,7 @@ async def one(db: Connection, project_id: uuid.UUID):
SELECT DISTINCT ON (te.task_id)
te.task_id,
te.user_id,
CASE
WHEN te.state = 'REQUEST_FOR_MAPPING' THEN 'request logs'
WHEN te.state = 'LOCKED_FOR_MAPPING' OR te.state = 'IMAGE_UPLOADED' THEN 'ongoing'
WHEN te.state = 'IMAGE_PROCESSED' THEN 'completed'
WHEN te.state = 'UNFLYABLE_TASK' THEN 'unflyable task'
WHEN te.state = 'IMAGE_PROCESSING_FAILED' THEN 'task failed'
ELSE ''
END AS calculated_state
te.state
FROM
task_events te
ORDER BY
Expand All @@ -283,7 +277,7 @@ async def one(db: Connection, project_id: uuid.UUID):
ST_YMin(ST_Envelope(t.outline)) AS ymin,
ST_XMax(ST_Envelope(t.outline)) AS xmax,
ST_YMax(ST_Envelope(t.outline)) AS ymax,
COALESCE(tsc.calculated_state) AS state,
tsc.state AS state,
tsc.user_id,
u.name,
ST_Area(ST_Transform(t.outline, 3857)) / 1000000 AS task_area
Expand Down Expand Up @@ -343,7 +337,7 @@ async def all(
await cur.execute(
"""
SELECT
p.id, p.slug, p.name, p.description, p.per_task_instructions,
p.id, p.slug, p.name, p.description, p.per_task_instructions, p.created_at,
ST_AsGeoJSON(p.outline)::jsonb AS outline,
p.requires_approval_from_manager_for_locking,
Expand Down Expand Up @@ -541,6 +535,7 @@ class ProjectInfo(BaseModel):
ongoing_task_count: Optional[int] = 0
completed_task_count: Optional[int] = 0
status: Optional[str] = "not-started"
created_at: datetime

@model_validator(mode="after")
def set_image_url(cls, values):
Expand Down
20 changes: 0 additions & 20 deletions src/backend/app/tasks/task_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,6 @@
from datetime import datetime


async def get_current_state(db, project_id, task_id):
try:
async with db.cursor() as cur:
await cur.execute(
"""
SELECT DISTINCT ON (state) state
FROM task_events
WHERE task_id = %(task_id)s AND project_id = %(project_id)s
ORDER BY state, created_at DESC
""",
{"task_id": task_id, "project_id": project_id},
)
return await cur.fetchone()

except Exception as err:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(err)
)


async def update_take_off_point_in_db(
db: Connection, task_id: uuid.UUID, take_off_point: str
):
Expand Down
Loading

0 comments on commit 496c0b0

Please sign in to comment.