diff --git a/cg/cli/deliver/base.py b/cg/cli/deliver/base.py index f94f4d65b4..6649903b83 100644 --- a/cg/cli/deliver/base.py +++ b/cg/cli/deliver/base.py @@ -6,7 +6,9 @@ import click from cg.apps.tb import TrailblazerAPI +from cg.cli.deliver.utils import deliver_raw_data_for_analyses from cg.cli.utils import CLICK_CONTEXT_SETTINGS +from cg.constants import DataDelivery, Workflow from cg.constants.cli_options import DRY_RUN from cg.constants.delivery import FileDeliveryOption from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService @@ -17,7 +19,7 @@ from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import ( DeliveryServiceFactory, ) -from cg.store.models import Case +from cg.store.models import Case, Analysis LOG = logging.getLogger(__name__) @@ -115,3 +117,29 @@ def deliver_ticket( delivery_service.deliver_files_for_ticket( ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run ) + + +@deliver.command(name="auto-raw-data") +@click.pass_obj +@DRY_RUN +def deliver_auto_raw_data(context: CGConfig, dry_run: bool): + """ + Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a Rsync job. + 1. get all cases with analysis type fastq that need to be delivered + 2. check if their upload has started + 3. if not, start the upload + 4. update the uploaded at field + """ + """Starts upload of all not previously uploaded cases with analysis type fastq to + clinical-delivery.""" + service_builder: DeliveryServiceFactory = context.delivery_service_factory + analyses: list[Analysis] = context.analysis_service.get_analyses_to_upload_for_workflow( + workflow=Workflow.FASTQ + ) + deliver_raw_data_for_analyses( + analyses=analyses, + status_db=context.status_db, + delivery_path=Path(context.delivery_path), + service_builder=service_builder, + dry_run=dry_run, + ) diff --git a/cg/cli/deliver/utils.py b/cg/cli/deliver/utils.py new file mode 100644 index 0000000000..335de5deef --- /dev/null +++ b/cg/cli/deliver/utils.py @@ -0,0 +1,46 @@ +import logging +from datetime import datetime +from pathlib import Path + +from cg.constants import Workflow +from cg.services.file_delivery.deliver_files_service.deliver_files_service import ( + DeliverFilesService, +) +from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import ( + DeliveryServiceFactory, +) +from cg.store.models import Case, Analysis +from cg.store.store import Store + + +LOG = logging.getLogger(__name__) + + +def deliver_raw_data_for_analyses( + analyses: list[Analysis], + status_db: Store, + delivery_path: Path, + service_builder: DeliveryServiceFactory, + dry_run: bool, +): + """Deliver raw data for a list of analyses""" + for analysis in analyses: + try: + case: Case = analysis.case + delivery_service: DeliverFilesService = service_builder.build_delivery_service( + delivery_type=case.data_delivery, + workflow=Workflow.FASTQ, + ) + + delivery_service.deliver_files_for_case( + case=case, delivery_base_path=delivery_path, dry_run=dry_run + ) + status_db.update_analysis_upload_started_at( + analysis_id=analysis.id, upload_started_at=datetime.now() + ) + except Exception as error: + status_db.update_analysis_upload_started_at( + analysis_id=analysis.id, upload_started_at=None + ) + LOG.error(f"Could not deliver files for analysis {analysis.id}: {error}") + continue diff --git a/cg/models/cg_config.py b/cg/models/cg_config.py index 27243d7a09..ccab72592d 100644 --- a/cg/models/cg_config.py +++ b/cg/models/cg_config.py @@ -669,7 +669,7 @@ def slurm_upload_service(self) -> SlurmUploadService: @property def analysis_service(self) -> AnalysisService: - return AnalysisService(analysis_client=self.trailblazer_api) + return AnalysisService(analysis_client=self.trailblazer_api, status_db=self.status_db) @property def scout_api(self) -> ScoutAPI: diff --git a/cg/services/analysis_service/analysis_service.py b/cg/services/analysis_service/analysis_service.py index ca2259b48a..8b25619553 100644 --- a/cg/services/analysis_service/analysis_service.py +++ b/cg/services/analysis_service/analysis_service.py @@ -1,11 +1,45 @@ +from datetime import datetime + from cg.apps.tb.api import TrailblazerAPI from cg.apps.tb.models import TrailblazerAnalysis +from cg.constants import Workflow +from cg.store.models import Case, Analysis +from cg.store.store import Store class AnalysisService: - def __init__(self, analysis_client: TrailblazerAPI): + def __init__(self, analysis_client: TrailblazerAPI, status_db: Store): + self.status_db = status_db self.analysis_client = analysis_client def add_upload_job(self, case_id: str, slurm_id: int): analysis: TrailblazerAnalysis = self.analysis_client.get_latest_completed_analysis(case_id) self.analysis_client.add_upload_job_to_analysis(slurm_id=slurm_id, analysis_id=analysis.id) + + def get_analyses_to_upload_for_workflow(self, workflow: Workflow) -> list[Analysis]: + """ + Get all analyses that should be uploaded for a specific workflow. + 1. Get all analyses that should be uploaded for a specific workflow + 2. Check if the analysis has been uploaded and update the uploaded_at field + """ + analysis_to_upload: list[Analysis] = [] + analyses: list[Analysis] = self.status_db.get_analyses_to_upload(workflow=workflow) + for analysis in analyses: + if self._has_uploaded_started(analysis) and self._is_analysis_completed(analysis): + self.status_db.update_analysis_uploaded_at( + analysis_id=analysis.id, uploaded_at=datetime.now() + ) + if not self._is_analysis_uploaded(analysis) and self._is_analysis_completed(analysis): + analysis_to_upload.append(analysis) + return analysis_to_upload + + @staticmethod + def _is_analysis_uploaded(analysis: Analysis) -> bool: + return bool(analysis.uploaded_at) + + def _is_analysis_completed(self, analysis: Analysis) -> bool: + return self.analysis_client.is_latest_analysis_completed(case_id=analysis.case.internal_id) + + @staticmethod + def _has_uploaded_started(analysis: Analysis) -> bool: + return bool(analysis.upload_started_at) diff --git a/cg/services/file_delivery/deliver_files_service/deliver_files_service.py b/cg/services/file_delivery/deliver_files_service/deliver_files_service.py index 035e74be3f..e63706c55e 100644 --- a/cg/services/file_delivery/deliver_files_service/deliver_files_service.py +++ b/cg/services/file_delivery/deliver_files_service/deliver_files_service.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime from pathlib import Path from cg.apps.tb import TrailblazerAPI @@ -19,7 +20,7 @@ ) from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService from cg.store.exc import EntryNotFoundError -from cg.store.models import Case +from cg.store.models import Case, Analysis from cg.store.store import Store LOG = logging.getLogger(__name__) diff --git a/cg/store/crud/read.py b/cg/store/crud/read.py index c4063c6cdf..b0d3b318ea 100644 --- a/cg/store/crud/read.py +++ b/cg/store/crud/read.py @@ -169,6 +169,14 @@ def get_analysis_by_case_entry_id_and_started_at( started_at_date=started_at_date, ).first() + def get_analysis_by_entry_id(self, entry_id: int) -> Analysis: + """Return an analysis.""" + return apply_analysis_filter( + filter_functions=[AnalysisFilter.BY_ENTRY_ID], + analyses=self._get_query(table=Analysis), + entry_id=entry_id, + ).first() + def get_cases_by_customer_and_case_name_search( self, customer: Customer, case_name_search: str ) -> list[Case]: diff --git a/cg/store/crud/update.py b/cg/store/crud/update.py index e55ee48bad..d6c62e0fdc 100644 --- a/cg/store/crud/update.py +++ b/cg/store/crud/update.py @@ -83,3 +83,17 @@ def mark_sample_as_cancelled(self, sample_id: int) -> None: sample: Sample = self.get_sample_by_entry_id(sample_id) sample.is_cancelled = True self.session.commit() + + def update_analysis_uploaded_at(self, analysis_id: int, uploaded_at: datetime | None) -> None: + """Update the uploaded at field of an analysis.""" + analysis = self.get_analysis_by_entry_id(analysis_id) + analysis.uploaded_at = uploaded_at + self.session.commit() + + def update_analysis_upload_started_at( + self, analysis_id: int, upload_started_at: datetime | None + ) -> None: + """Update the upload started at field of an analysis.""" + analysis = self.get_analysis_by_entry_id(analysis_id) + analysis.upload_started_at = upload_started_at + self.session.commit() diff --git a/cg/store/filters/status_analysis_filters.py b/cg/store/filters/status_analysis_filters.py index ea89c0d706..e0ea534062 100644 --- a/cg/store/filters/status_analysis_filters.py +++ b/cg/store/filters/status_analysis_filters.py @@ -90,13 +90,19 @@ def filter_analysis_case_action_is_none(analyses: Query, **kwargs) -> Query: return analyses.join(Case).filter(Case.action.is_(None)) +def filter_analysis_by_entry_id(analyses: Query, entry_id: int, **kwargs) -> Query: + """Return a query of analyses filtered by entry id.""" + return analyses.filter(Analysis.id == entry_id) + + def apply_analysis_filter( filter_functions: list[Callable], analyses: Query, - workflow: Workflow = None, case_entry_id: int = None, completed_at_date: datetime = None, + entry_id: int = None, started_at_date: datetime = None, + workflow: Workflow = None, ) -> Query: """Apply filtering functions to the analyses queries and return filtered results.""" @@ -106,6 +112,7 @@ def apply_analysis_filter( workflow=workflow, case_entry_id=case_entry_id, completed_at_date=completed_at_date, + entry_id=entry_id, started_at_date=started_at_date, ) return analyses @@ -129,3 +136,4 @@ class AnalysisFilter(Enum): CASE_ACTION_IS_NONE: Callable = filter_analysis_case_action_is_none ORDER_BY_UPLOADED_AT: Callable = order_analyses_by_uploaded_at_asc ORDER_BY_COMPLETED_AT: Callable = order_analyses_by_completed_at_asc + BY_ENTRY_ID: Callable = filter_analysis_by_entry_id diff --git a/tests/services/analysis_service/test_analysis_service.py b/tests/services/analysis_service/test_analysis_service.py new file mode 100644 index 0000000000..3cf6e0a4d5 --- /dev/null +++ b/tests/services/analysis_service/test_analysis_service.py @@ -0,0 +1,41 @@ +from datetime import datetime +from unittest.mock import Mock + +from cg.services.analysis_service.analysis_service import AnalysisService + +from cg.constants import Workflow +from cg.store.models import Analysis +from cg.store.store import Store +from tests.store_helpers import StoreHelpers + + +def test_get_analyses_to_upload_for_workflow( + helpers: StoreHelpers, base_store: Store, timestamp_now: datetime +): + # GIVEN an analysis service and a store with analyses to upload and not to upload + analysis_upload: Analysis = helpers.add_analysis( + store=base_store, + uploaded_at=None, + upload_started=None, + workflow=Workflow.FASTQ, + completed_at=timestamp_now, + ) + new_case = helpers.add_case(store=base_store, name="no_upload_case") + analysis_no_upload: Analysis = helpers.add_analysis( + store=base_store, + case=new_case, + uploaded_at=timestamp_now, + upload_started=None, + workflow=Workflow.FASTQ, + completed_at=timestamp_now, + ) + analysis_service = AnalysisService(analysis_client=Mock(), status_db=base_store) + + # WHEN getting analyses to upload + analyses: list[Analysis] = analysis_service.get_analyses_to_upload_for_workflow( + workflow=Workflow.FASTQ + ) + + # THEN only the analyses to upload should be returned + assert analyses == [analysis_upload] + assert analysis_no_upload not in analyses diff --git a/tests/store/crud/read/test_read_analysis.py b/tests/store/crud/read/test_read_analysis.py index 1232e770b8..baf64933bc 100644 --- a/tests/store/crud/read/test_read_analysis.py +++ b/tests/store/crud/read/test_read_analysis.py @@ -472,3 +472,19 @@ def test_get_analyses_before( # THEN assert that the analyses before the given date are returned for analysis in analyses: assert analysis.started_at < timestamp_now + + +def test_get_analysis_by_entry_id( + store_with_analyses_for_cases_not_uploaded_fluffy: Store, +): + """Test to get an analysis by entry id.""" + + # GIVEN a database with a number of analyses + + # WHEN getting an analysis by entry id + analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy.get_analysis_by_entry_id( + entry_id=1 + ) + + # THEN assert that the analysis is returned + assert analysis.id == 1 diff --git a/tests/store/crud/update/test_update.py b/tests/store/crud/update/test_update.py index 7567ce54bb..c209f411c0 100644 --- a/tests/store/crud/update/test_update.py +++ b/tests/store/crud/update/test_update.py @@ -2,11 +2,7 @@ from cg.constants import SequencingRunDataAvailability from cg.constants.sequencing import Sequencers -from cg.store.models import ( - IlluminaSampleSequencingMetrics, - IlluminaSequencingRun, - Sample, -) +from cg.store.models import IlluminaSequencingRun, Sample, IlluminaSampleSequencingMetrics, Analysis from cg.store.store import Store @@ -104,3 +100,39 @@ def test_update_sample_last_sequenced_at( # THEN the last sequenced at date for the sample is updated assert sample.last_sequenced_at == timestamp_now + + +def test_update_analysis_uploaded_at( + store_with_analyses_for_cases_not_uploaded_fluffy: Store, timestamp_yesterday: datetime +): + # GIVEN a store with an analysis + analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy._get_query( + Analysis + ).first() + assert analysis.uploaded_at != timestamp_yesterday + + # WHEN updating the uploaded_at field + store_with_analyses_for_cases_not_uploaded_fluffy.update_analysis_uploaded_at( + analysis_id=analysis.id, uploaded_at=timestamp_yesterday + ) + + # THEN the uploaded_at field is updated + assert analysis.uploaded_at == timestamp_yesterday + + +def test_update_analysis_upload_started_at( + store_with_analyses_for_cases_not_uploaded_fluffy: Store, timestamp_yesterday: datetime +): + # GIVEN a store with an analysis + analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy._get_query( + Analysis + ).first() + assert analysis.upload_started_at != timestamp_yesterday + + # WHEN updating the upload_started_at field + store_with_analyses_for_cases_not_uploaded_fluffy.update_analysis_upload_started_at( + analysis_id=analysis.id, upload_started_at=timestamp_yesterday + ) + + # THEN the upload_started_at field is updated + assert analysis.upload_started_at == timestamp_yesterday diff --git a/tests/store/filters/test_status_analyses_filters.py b/tests/store/filters/test_status_analyses_filters.py index 0fdcc453bf..086a6ddff8 100644 --- a/tests/store/filters/test_status_analyses_filters.py +++ b/tests/store/filters/test_status_analyses_filters.py @@ -18,6 +18,7 @@ filter_valid_analyses_in_production, order_analyses_by_completed_at_asc, order_analyses_by_uploaded_at_asc, + filter_analysis_by_entry_id, ) from cg.store.models import Analysis, Case from cg.store.store import Store @@ -324,3 +325,21 @@ def test_filter_analyses_by_started_at( # THEN only the analysis that have been started after the given date should be retrieved assert analysis_started_now not in analyses assert analysis_started_old in analyses + + +def test_filter_by_analysis_entry_id(base_store: Store, helpers: StoreHelpers): + """Test filtering of analyses by entry id.""" + + # GIVEN a set of mock analyses + analysis: Analysis = helpers.add_analysis(store=base_store) + + # WHEN filtering the analyses by entry id + analyses: Query = filter_analysis_by_entry_id( + analyses=base_store._get_query(table=Analysis), entry_id=analysis.id + ) + + # ASSERT that analyses is a query + assert isinstance(analyses, Query) + + # THEN only the analysis with the given entry id should be retrieved + assert analysis == analyses.one()