Skip to content

Commit

Permalink
Opening new session to complete task activities
Browse files Browse the repository at this point in the history
  • Loading branch information
galvana committed Jan 14, 2025
1 parent de209fe commit c5eb962
Showing 1 changed file with 88 additions and 35 deletions.
123 changes: 88 additions & 35 deletions src/fides/api/task/execute_request_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

from celery.app.task import Task
from loguru import logger
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Query, Session
from tenacity import (
RetryCallState,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from fides.api.common_exceptions import (
PrivacyRequestCanceled,
Expand Down Expand Up @@ -35,14 +43,13 @@
# DSR 3.0 task functions


def run_prerequisite_task_checks(
def get_privacy_request_and_task(
session: Session, privacy_request_id: str, privacy_request_task_id: str
) -> Tuple[PrivacyRequest, RequestTask, Query]:
) -> Tuple[PrivacyRequest, RequestTask]:
"""
Upfront checks that run as soon as the RequestTask is executed by the worker.
Returns resources for use in executing a task
Retrieves and validates a privacy request and its associated task
"""

privacy_request: Optional[PrivacyRequest] = PrivacyRequest.get(
db=session, object_id=privacy_request_id
)
Expand All @@ -65,6 +72,22 @@ def run_prerequisite_task_checks(
f"Request Task with id {privacy_request_task_id} not found for privacy request {privacy_request_id}"
)

return privacy_request, request_task


def run_prerequisite_task_checks(
session: Session, privacy_request_id: str, privacy_request_task_id: str
) -> Tuple[PrivacyRequest, RequestTask, Query]:
"""
Upfront checks that run as soon as the RequestTask is executed by the worker.
Returns resources for use in executing a task
"""

privacy_request, request_task = get_privacy_request_and_task(
session, privacy_request_id, privacy_request_task_id
)

assert request_task # For mypy

upstream_results: Query = request_task.upstream_tasks_objects(session)
Expand Down Expand Up @@ -146,6 +169,43 @@ def can_run_task_body(
return True


def log_retry_attempt(retry_state: RetryCallState) -> None:
"""Log queue_downstream_tasks retry attempts."""

logger.warning(

Check warning on line 175 in src/fides/api/task/execute_request_tasks.py

View check run for this annotation

Codecov / codecov/patch

src/fides/api/task/execute_request_tasks.py#L175

Added line #L175 was not covered by tests
"queue_downstream_tasks attempt {} failed. Retrying in {} seconds...",
retry_state.attempt_number,
retry_state.next_action.sleep, # type: ignore[union-attr]
)


@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1),
retry=retry_if_exception_type(OperationalError),
before_sleep=log_retry_attempt,
)
def queue_downstream_tasks_with_retries(
database_task: DatabaseTask,
privacy_request_id: str,
privacy_request_task_id: str,
current_step: CurrentStep,
privacy_request_proceed: bool,
) -> None:
with database_task.get_new_session() as session:
privacy_request, request_task = get_privacy_request_and_task(
session, privacy_request_id, privacy_request_task_id
)
log_task_complete(request_task)
queue_downstream_tasks(
session,
request_task,
privacy_request,
current_step,
privacy_request_proceed,
)


def queue_downstream_tasks(
session: Session,
request_task: RequestTask,
Expand Down Expand Up @@ -233,16 +293,15 @@ def run_access_node(
]
# Run the main access function
graph_task.access_request(*upstream_access_data)
log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.upload_access,
privacy_request_proceed,
)
logger.info(f"Session ID - After get access data: {id(session)}")

queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.upload_access,
privacy_request_proceed,
)


@celery_app.task(base=DatabaseTask, bind=True)
Expand Down Expand Up @@ -285,16 +344,13 @@ def run_erasure_node(
# Run the main erasure function!
graph_task.erasure_request(retrieved_data)

log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.finalize_erasure,
privacy_request_proceed,
)
queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.finalize_erasure,
privacy_request_proceed,
)


@celery_app.task(base=DatabaseTask, bind=True)
Expand Down Expand Up @@ -339,16 +395,13 @@ def run_consent_node(

graph_task.consent_request(access_data[0] if access_data else {})

log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.finalize_consent,
privacy_request_proceed,
)
queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.finalize_consent,
privacy_request_proceed,
)


def logger_method(request_task: RequestTask) -> Callable:
Expand Down

0 comments on commit c5eb962

Please sign in to comment.