Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opening new session to complete task activities #5667

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
- Fixed issue where the custom report "reset" button was not working as expected [#5649](https://github.com/ethyca/fides/pull/5649)
- Fixed column ordering issue in the Data Map report [#5649](https://github.com/ethyca/fides/pull/5649)
- Fixed issue where the Data Map report filter dialog was missing an Accordion item label [#5649](https://github.com/ethyca/fides/pull/5649)
- Improved database session management for long running access request tasks [#5667](https://github.com/ethyca/fides/pull/5667)

## [2.52.0](https://github.com/ethyca/fides/compare/2.51.2...2.52.0)

Expand Down
123 changes: 88 additions & 35 deletions src/fides/api/task/execute_request_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

from celery.app.task import Task
from loguru import logger
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Query, Session
from tenacity import (
RetryCallState,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from fides.api.common_exceptions import (
PrivacyRequestCanceled,
Expand Down Expand Up @@ -35,14 +43,13 @@
# DSR 3.0 task functions


def run_prerequisite_task_checks(
def get_privacy_request_and_task(
session: Session, privacy_request_id: str, privacy_request_task_id: str
) -> Tuple[PrivacyRequest, RequestTask, Query]:
) -> Tuple[PrivacyRequest, RequestTask]:
"""
Upfront checks that run as soon as the RequestTask is executed by the worker.

Returns resources for use in executing a task
Retrieves and validates a privacy request and its associated task
"""

privacy_request: Optional[PrivacyRequest] = PrivacyRequest.get(
db=session, object_id=privacy_request_id
)
Expand All @@ -65,6 +72,22 @@ def run_prerequisite_task_checks(
f"Request Task with id {privacy_request_task_id} not found for privacy request {privacy_request_id}"
)

return privacy_request, request_task


def run_prerequisite_task_checks(
session: Session, privacy_request_id: str, privacy_request_task_id: str
) -> Tuple[PrivacyRequest, RequestTask, Query]:
"""
Upfront checks that run as soon as the RequestTask is executed by the worker.

Returns resources for use in executing a task
"""

privacy_request, request_task = get_privacy_request_and_task(
session, privacy_request_id, privacy_request_task_id
)

assert request_task # For mypy

upstream_results: Query = request_task.upstream_tasks_objects(session)
Expand Down Expand Up @@ -146,6 +169,43 @@ def can_run_task_body(
return True


def log_retry_attempt(retry_state: RetryCallState) -> None:
"""Log queue_downstream_tasks retry attempts."""

logger.warning(
"queue_downstream_tasks attempt {} failed. Retrying in {} seconds...",
retry_state.attempt_number,
retry_state.next_action.sleep, # type: ignore[union-attr]
)


@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1),
retry=retry_if_exception_type(OperationalError),
before_sleep=log_retry_attempt,
)
def queue_downstream_tasks_with_retries(
database_task: DatabaseTask,
privacy_request_id: str,
privacy_request_task_id: str,
current_step: CurrentStep,
privacy_request_proceed: bool,
) -> None:
with database_task.get_new_session() as session:
privacy_request, request_task = get_privacy_request_and_task(
session, privacy_request_id, privacy_request_task_id
)
log_task_complete(request_task)
queue_downstream_tasks(
session,
request_task,
privacy_request,
current_step,
privacy_request_proceed,
)


def queue_downstream_tasks(
session: Session,
request_task: RequestTask,
Expand Down Expand Up @@ -233,16 +293,15 @@ def run_access_node(
]
# Run the main access function
graph_task.access_request(*upstream_access_data)
log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.upload_access,
privacy_request_proceed,
)
logger.info(f"Session ID - After get access data: {id(session)}")

queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.upload_access,
privacy_request_proceed,
)


@celery_app.task(base=DatabaseTask, bind=True)
Expand Down Expand Up @@ -285,16 +344,13 @@ def run_erasure_node(
# Run the main erasure function!
graph_task.erasure_request(retrieved_data)

log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.finalize_erasure,
privacy_request_proceed,
)
queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.finalize_erasure,
privacy_request_proceed,
)


@celery_app.task(base=DatabaseTask, bind=True)
Expand Down Expand Up @@ -339,16 +395,13 @@ def run_consent_node(

graph_task.consent_request(access_data[0] if access_data else {})

log_task_complete(request_task)

with self.get_new_session() as session:
queue_downstream_tasks(
session,
request_task,
privacy_request,
CurrentStep.finalize_consent,
privacy_request_proceed,
)
queue_downstream_tasks_with_retries(
self,
privacy_request_id,
privacy_request_task_id,
CurrentStep.finalize_consent,
privacy_request_proceed,
)


def logger_method(request_task: RequestTask) -> Callable:
Expand Down
Loading