Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(CLI automatic delivery fastq workflow) #3692

Merged
merged 15 commits into from
Sep 9, 2024
30 changes: 29 additions & 1 deletion cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import click

from cg.apps.tb import TrailblazerAPI
from cg.cli.deliver.utils import deliver_raw_data_for_analyses
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants import DataDelivery, Workflow
from cg.constants.cli_options import DRY_RUN
from cg.constants.delivery import FileDeliveryOption
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
Expand All @@ -17,7 +19,7 @@
from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case
from cg.store.models import Case, Analysis

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -115,3 +117,29 @@ def deliver_ticket(
delivery_service.deliver_files_for_ticket(
ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run
)


@deliver.command(name="auto-raw-data")
@click.pass_obj
@DRY_RUN
def deliver_auto_raw_data(context: CGConfig, dry_run: bool):
"""
Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a Rsync job.
1. get all cases with analysis type fastq that need to be delivered
2. check if their upload has started
3. if not, start the upload
4. update the uploaded at field
"""
"""Starts upload of all not previously uploaded cases with analysis type fastq to
clinical-delivery."""
service_builder: DeliveryServiceFactory = context.delivery_service_factory
analyses: list[Analysis] = context.analysis_service.get_analyses_to_upload_for_workflow(
workflow=Workflow.FASTQ
)
deliver_raw_data_for_analyses(
analyses=analyses,
status_db=context.status_db,
delivery_path=Path(context.delivery_path),
service_builder=service_builder,
dry_run=dry_run,
)
46 changes: 46 additions & 0 deletions cg/cli/deliver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
from datetime import datetime
from pathlib import Path

from cg.constants import Workflow
from cg.services.file_delivery.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case, Analysis
from cg.store.store import Store


LOG = logging.getLogger(__name__)


def deliver_raw_data_for_analyses(
analyses: list[Analysis],
status_db: Store,
delivery_path: Path,
service_builder: DeliveryServiceFactory,
dry_run: bool,
):
"""Deliver raw data for a list of analyses"""
for analysis in analyses:
try:
case: Case = analysis.case
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=case.data_delivery,
workflow=Workflow.FASTQ,
)

delivery_service.deliver_files_for_case(
case=case, delivery_base_path=delivery_path, dry_run=dry_run
)
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=datetime.now()
)
except Exception as error:
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=None
)
LOG.error(f"Could not deliver files for analysis {analysis.id}: {error}")
continue
2 changes: 1 addition & 1 deletion cg/models/cg_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def slurm_upload_service(self) -> SlurmUploadService:

@property
def analysis_service(self) -> AnalysisService:
return AnalysisService(analysis_client=self.trailblazer_api)
return AnalysisService(analysis_client=self.trailblazer_api, status_db=self.status_db)

@property
def scout_api(self) -> ScoutAPI:
Expand Down
36 changes: 35 additions & 1 deletion cg/services/analysis_service/analysis_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
from datetime import datetime

from cg.apps.tb.api import TrailblazerAPI
from cg.apps.tb.models import TrailblazerAnalysis
from cg.constants import Workflow
from cg.store.models import Case, Analysis
from cg.store.store import Store


class AnalysisService:
def __init__(self, analysis_client: TrailblazerAPI):
def __init__(self, analysis_client: TrailblazerAPI, status_db: Store):
self.status_db = status_db
self.analysis_client = analysis_client

def add_upload_job(self, case_id: str, slurm_id: int):
analysis: TrailblazerAnalysis = self.analysis_client.get_latest_completed_analysis(case_id)
self.analysis_client.add_upload_job_to_analysis(slurm_id=slurm_id, analysis_id=analysis.id)

def get_analyses_to_upload_for_workflow(self, workflow: Workflow) -> list[Analysis]:
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
"""
Get all analyses that should be uploaded for a specific workflow.
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
1. Get all analyses that should be uploaded for a specific workflow
2. Check if the analysis has been uploaded and update the uploaded_at field
"""
analysis_to_upload: list[Analysis] = []
analyses: list[Analysis] = self.status_db.get_analyses_to_upload(workflow=workflow)
for analysis in analyses:
if self._has_uploaded_started(analysis) and self._is_analysis_completed(analysis):
self.status_db.update_analysis_uploaded_at(
analysis_id=analysis.id, uploaded_at=datetime.now()
)
if not self._is_analysis_uploaded(analysis) and self._is_analysis_completed(analysis):
analysis_to_upload.append(analysis)
return analysis_to_upload

@staticmethod
def _is_analysis_uploaded(analysis: Analysis) -> bool:
return bool(analysis.uploaded_at)

def _is_analysis_completed(self, analysis: Analysis) -> bool:
return self.analysis_client.is_latest_analysis_completed(case_id=analysis.case.internal_id)

@staticmethod
def _has_uploaded_started(analysis: Analysis) -> bool:
return bool(analysis.upload_started_at)
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
from pathlib import Path

from cg.apps.tb import TrailblazerAPI
Expand All @@ -19,7 +20,7 @@
)
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.store.exc import EntryNotFoundError
from cg.store.models import Case
from cg.store.models import Case, Analysis
from cg.store.store import Store

LOG = logging.getLogger(__name__)
Expand Down
8 changes: 8 additions & 0 deletions cg/store/crud/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ def get_analysis_by_case_entry_id_and_started_at(
started_at_date=started_at_date,
).first()

def get_analysis_by_entry_id(self, entry_id: int) -> Analysis:
"""Return an analysis."""
return apply_analysis_filter(
filter_functions=[AnalysisFilter.BY_ENTRY_ID],
analyses=self._get_query(table=Analysis),
entry_id=entry_id,
).first()

def get_cases_by_customer_and_case_name_search(
self, customer: Customer, case_name_search: str
) -> list[Case]:
Expand Down
14 changes: 14 additions & 0 deletions cg/store/crud/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ def mark_sample_as_cancelled(self, sample_id: int) -> None:
sample: Sample = self.get_sample_by_entry_id(sample_id)
sample.is_cancelled = True
self.session.commit()

def update_analysis_uploaded_at(self, analysis_id: int, uploaded_at: datetime | None) -> None:
"""Update the uploaded at field of an analysis."""
analysis = self.get_analysis_by_entry_id(analysis_id)
analysis.uploaded_at = uploaded_at
self.session.commit()

def update_analysis_upload_started_at(
self, analysis_id: int, upload_started_at: datetime | None
) -> None:
"""Update the upload started at field of an analysis."""
analysis = self.get_analysis_by_entry_id(analysis_id)
analysis.upload_started_at = upload_started_at
self.session.commit()
10 changes: 9 additions & 1 deletion cg/store/filters/status_analysis_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ def filter_analysis_case_action_is_none(analyses: Query, **kwargs) -> Query:
return analyses.join(Case).filter(Case.action.is_(None))


def filter_analysis_by_entry_id(analyses: Query, entry_id: int, **kwargs) -> Query:
"""Return a query of analyses filtered by entry id."""
return analyses.filter(Analysis.id == entry_id)


def apply_analysis_filter(
filter_functions: list[Callable],
analyses: Query,
workflow: Workflow = None,
case_entry_id: int = None,
completed_at_date: datetime = None,
entry_id: int = None,
started_at_date: datetime = None,
workflow: Workflow = None,
) -> Query:
"""Apply filtering functions to the analyses queries and return filtered results."""

Expand All @@ -106,6 +112,7 @@ def apply_analysis_filter(
workflow=workflow,
case_entry_id=case_entry_id,
completed_at_date=completed_at_date,
entry_id=entry_id,
started_at_date=started_at_date,
)
return analyses
Expand All @@ -129,3 +136,4 @@ class AnalysisFilter(Enum):
CASE_ACTION_IS_NONE: Callable = filter_analysis_case_action_is_none
ORDER_BY_UPLOADED_AT: Callable = order_analyses_by_uploaded_at_asc
ORDER_BY_COMPLETED_AT: Callable = order_analyses_by_completed_at_asc
BY_ENTRY_ID: Callable = filter_analysis_by_entry_id
41 changes: 41 additions & 0 deletions tests/services/analysis_service/test_analysis_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime
from unittest.mock import Mock

from cg.services.analysis_service.analysis_service import AnalysisService

from cg.constants import Workflow
from cg.store.models import Analysis
from cg.store.store import Store
from tests.store_helpers import StoreHelpers


def test_get_analyses_to_upload_for_workflow(
helpers: StoreHelpers, base_store: Store, timestamp_now: datetime
):
# GIVEN an analysis service and a store with analyses to upload and not to upload
analysis_upload: Analysis = helpers.add_analysis(
store=base_store,
uploaded_at=None,
upload_started=None,
workflow=Workflow.FASTQ,
completed_at=timestamp_now,
)
new_case = helpers.add_case(store=base_store, name="no_upload_case")
analysis_no_upload: Analysis = helpers.add_analysis(
store=base_store,
case=new_case,
uploaded_at=timestamp_now,
upload_started=None,
workflow=Workflow.FASTQ,
completed_at=timestamp_now,
)
analysis_service = AnalysisService(analysis_client=Mock(), status_db=base_store)

# WHEN getting analyses to upload
analyses: list[Analysis] = analysis_service.get_analyses_to_upload_for_workflow(
workflow=Workflow.FASTQ
)

# THEN only the analyses to upload should be returned
assert analyses == [analysis_upload]
assert analysis_no_upload not in analyses
16 changes: 16 additions & 0 deletions tests/store/crud/read/test_read_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,19 @@ def test_get_analyses_before(
# THEN assert that the analyses before the given date are returned
for analysis in analyses:
assert analysis.started_at < timestamp_now


def test_get_analysis_by_entry_id(
store_with_analyses_for_cases_not_uploaded_fluffy: Store,
):
"""Test to get an analysis by entry id."""

# GIVEN a database with a number of analyses

# WHEN getting an analysis by entry id
analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy.get_analysis_by_entry_id(
entry_id=1
)

# THEN assert that the analysis is returned
assert analysis.id == 1
42 changes: 37 additions & 5 deletions tests/store/crud/update/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

from cg.constants import SequencingRunDataAvailability
from cg.constants.sequencing import Sequencers
from cg.store.models import (
IlluminaSampleSequencingMetrics,
IlluminaSequencingRun,
Sample,
)
from cg.store.models import IlluminaSequencingRun, Sample, IlluminaSampleSequencingMetrics, Analysis
from cg.store.store import Store


Expand Down Expand Up @@ -104,3 +100,39 @@ def test_update_sample_last_sequenced_at(

# THEN the last sequenced at date for the sample is updated
assert sample.last_sequenced_at == timestamp_now


def test_update_analysis_uploaded_at(
store_with_analyses_for_cases_not_uploaded_fluffy: Store, timestamp_yesterday: datetime
):
# GIVEN a store with an analysis
analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy._get_query(
Analysis
).first()
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
assert analysis.uploaded_at != timestamp_yesterday

# WHEN updating the uploaded_at field
store_with_analyses_for_cases_not_uploaded_fluffy.update_analysis_uploaded_at(
analysis_id=analysis.id, uploaded_at=timestamp_yesterday
)

# THEN the uploaded_at field is updated
assert analysis.uploaded_at == timestamp_yesterday


def test_update_analysis_upload_started_at(
store_with_analyses_for_cases_not_uploaded_fluffy: Store, timestamp_yesterday: datetime
):
# GIVEN a store with an analysis
analysis: Analysis = store_with_analyses_for_cases_not_uploaded_fluffy._get_query(
Analysis
).first()
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
assert analysis.upload_started_at != timestamp_yesterday

# WHEN updating the upload_started_at field
store_with_analyses_for_cases_not_uploaded_fluffy.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=timestamp_yesterday
)

# THEN the upload_started_at field is updated
assert analysis.upload_started_at == timestamp_yesterday
19 changes: 19 additions & 0 deletions tests/store/filters/test_status_analyses_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
filter_valid_analyses_in_production,
order_analyses_by_completed_at_asc,
order_analyses_by_uploaded_at_asc,
filter_analysis_by_entry_id,
)
from cg.store.models import Analysis, Case
from cg.store.store import Store
Expand Down Expand Up @@ -324,3 +325,21 @@ def test_filter_analyses_by_started_at(
# THEN only the analysis that have been started after the given date should be retrieved
assert analysis_started_now not in analyses
assert analysis_started_old in analyses


def test_filter_by_analysis_entry_id(base_store: Store, helpers: StoreHelpers):
"""Test filtering of analyses by entry id."""

# GIVEN a set of mock analyses
analysis: Analysis = helpers.add_analysis(store=base_store)

# WHEN filtering the analyses by entry id
analyses: Query = filter_analysis_by_entry_id(
analyses=base_store._get_query(table=Analysis), entry_id=analysis.id
)

# ASSERT that analyses is a query
assert isinstance(analyses, Query)

# THEN only the analysis with the given entry id should be retrieved
assert analysis == analyses.one()
Loading