Skip to content

Commit

Permalink
Prevent re-analysis to be triggered for cases without new data (#2660)…
Browse files Browse the repository at this point in the history
…(patch)

Adjust how the samples are marked as having new data to ensure that they are only picked up for re-analysis when there are new reads.
  • Loading branch information
seallard authored Nov 7, 2023
1 parent b9097ed commit 31b6017
Show file tree
Hide file tree
Showing 38 changed files with 219 additions and 181 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Rename sample reads_updated_at
Revision ID: 70e641d723b3
Revises: fce8a2ca0fd1
Create Date: 2023-11-07 10:05:05.324260
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
revision = "70e641d723b3"
down_revision = "fce8a2ca0fd1"
branch_labels = None
depends_on = None


def upgrade():
op.alter_column(
"sample",
"reads_updated_at",
new_column_name="last_sequenced_at",
existing_type=sa.DateTime(),
)


def downgrade():
op.alter_column(
"sample",
"last_sequenced_at",
new_column_name="reads_updated_at",
existing_type=sa.DateTime(),
)
4 changes: 2 additions & 2 deletions cg/cli/delete/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _log_sample_process_information(sample: Sample):
LOG.info(f"Can NOT delete processed sample: {sample.internal_id}")
LOG.info(f"Sample was received: {sample.received_at}")
LOG.info(f"Sample was prepared: {sample.prepared_at}")
LOG.info(f"Sample's reads were updated: {sample.reads_updated_at}")
LOG.info(f"Sample's reads were updated: {sample.last_sequenced_at}")
LOG.info(f"Sample was delivered: {sample.delivered_at}")
LOG.info(f"Sample has invoice: {sample.invoice_id}")

Expand All @@ -128,7 +128,7 @@ def _has_sample_been_lab_processed(sample: Sample) -> datetime.datetime:
return (
sample.received_at
or sample.prepared_at
or sample.reads_updated_at
or sample.last_sequenced_at
or sample.delivered_at
or sample.invoice_id
)
Expand Down
6 changes: 2 additions & 4 deletions cg/meta/demultiplex/demux_post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cg.meta.demultiplex.status_db_storage_functions import (
store_flow_cell_data_in_status_db,
store_sequencing_metrics_in_status_db,
update_sample_read_counts_in_status_db,
store_sample_data_in_status_db,
)
from cg.meta.demultiplex.utils import create_delivery_file_in_flow_cell_directory
from cg.meta.demultiplex.validation import is_flow_cell_ready_for_postprocessing
Expand Down Expand Up @@ -110,9 +110,7 @@ def store_flow_cell_data(self, parsed_flow_cell: FlowCellDirectoryData) -> None:
store=self.status_db,
)
store_sequencing_metrics_in_status_db(flow_cell=parsed_flow_cell, store=self.status_db)
update_sample_read_counts_in_status_db(
flow_cell_data=parsed_flow_cell, store=self.status_db
)
store_sample_data_in_status_db(flow_cell=parsed_flow_cell, store=self.status_db)
store_flow_cell_data_in_housekeeper(
flow_cell=parsed_flow_cell,
hk_api=self.hk_api,
Expand Down
52 changes: 30 additions & 22 deletions cg/meta/demultiplex/status_db_storage_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,37 @@ def metric_exists_in_status_db(metric: SampleLaneSequencingMetrics, store: Store
return bool(existing_metrics_entry)


def update_sample_read_counts_in_status_db(
flow_cell_data: FlowCellDirectoryData, store: Store
) -> None:
"""Update samples in status db with the sum of all read counts for the sample in the sequencing metrics table."""
q30_threshold: int = get_q30_threshold(flow_cell_data.sequencer_type)
sample_internal_ids: list[str] = flow_cell_data.sample_sheet.get_sample_ids()
def store_sample_data_in_status_db(flow_cell: FlowCellDirectoryData, store: Store) -> None:
"""Update samples on the flow cell with read counts and sequencing date."""
q30_threshold: int = get_q30_threshold(flow_cell.sequencer_type)
sample_internal_ids: list[str] = flow_cell.sample_sheet.get_sample_ids()
sequenced_at: datetime = flow_cell.sequenced_at

for sample_id in sample_internal_ids:
update_sample_read_count(sample_id=sample_id, q30_threshold=q30_threshold, store=store)
sample: Optional[Sample] = store.get_sample_by_internal_id(sample_id)

if not sample:
LOG.warning(f"Cannot find {sample_id}. Skipping.")
continue

update_sample_read_count(sample=sample, q30_threshold=q30_threshold, store=store)
update_sample_sequencing_date(sample=sample, sequenced_at=sequenced_at)

store.session.commit()


def update_sample_read_count(sample_id: str, q30_threshold: int, store: Store) -> None:
"""Update the read count for a sample in status db with all reads exceeding the q30 threshold from the sequencing metrics table."""
sample: Optional[Sample] = store.get_sample_by_internal_id(sample_id)
if sample:
sample_read_count: int = store.get_number_of_reads_for_sample_passing_q30_threshold(
sample_internal_id=sample_id,
q30_threshold=q30_threshold,
)
LOG.debug(
f"Updating sample {sample_id} with read count {sample_read_count} and setting sequenced at."
)
sample.reads = sample_read_count
sample.reads_updated_at = datetime.datetime.now()
else:
LOG.warning(f"Cannot find {sample_id} in status_db when adding read counts. Skipping.")
def update_sample_read_count(sample: Sample, q30_threshold: int, store: Store) -> None:
"""Update the read count with reads passing the q30 threshold."""
sample_read_count: int = store.get_number_of_reads_for_sample_passing_q30_threshold(
sample_internal_id=sample.internal_id,
q30_threshold=q30_threshold,
)
LOG.debug(f"Updating sample {sample.internal_id} with read count {sample_read_count}")
sample.reads = sample_read_count


def update_sample_sequencing_date(sample: Sample, sequenced_at: datetime) -> None:
"""Update the last sequenced at date for a sample in status db."""
if not sample.last_sequenced_at or sample.last_sequenced_at < sequenced_at:
LOG.debug(f"Updating sample {sample.internal_id} with new sequencing date .")
sample.last_sequenced_at = sequenced_at
2 changes: 1 addition & 1 deletion cg/meta/report/report_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def get_sample_timestamp_data(sample: Sample) -> TimestampModel:
ordered_at=sample.ordered_at,
received_at=sample.received_at,
prepared_at=sample.prepared_at,
reads_updated_at=sample.reads_updated_at,
reads_updated_at=sample.last_sequenced_at,
)

def get_sample_metadata(
Expand Down
6 changes: 3 additions & 3 deletions cg/meta/workflow/fluffy.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ def get_concentrations_from_lims(self, sample_id: str) -> str:

def get_sample_sequenced_date(self, sample_id: str) -> Optional[dt.date]:
sample_obj: Sample = self.status_db.get_sample_by_internal_id(sample_id)
reads_updated_at: dt.datetime = sample_obj.reads_updated_at
if reads_updated_at:
return reads_updated_at.date()
last_sequenced_at: dt.datetime = sample_obj.last_sequenced_at
if last_sequenced_at:
return last_sequenced_at.date()

def get_sample_control_status(self, sample_id: str) -> bool:
sample_obj: Sample = self.status_db.get_sample_by_internal_id(sample_id)
Expand Down
2 changes: 1 addition & 1 deletion cg/meta/workflow/microsalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def get_parameters(self, sample_obj: Sample) -> dict[str, str]:
"Customer_ID": sample_obj.customer.internal_id,
"application_tag": sample_obj.application_version.application.tag,
"date_arrival": str(sample_obj.received_at or datetime.min),
"date_sequencing": str(sample_obj.reads_updated_at or datetime.min),
"date_sequencing": str(sample_obj.last_sequenced_at or datetime.min),
"date_libprep": str(sample_obj.prepared_at or datetime.min),
"method_libprep": method_library_prep or "Not in LIMS",
"method_sequencing": method_sequencing or "Not in LIMS",
Expand Down
2 changes: 1 addition & 1 deletion cg/meta/workflow/mutant.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_sample_parameters(self, sample_obj: Sample) -> MutantSampleConfig:
),
date_arrival=str(sample_obj.received_at),
date_libprep=str(sample_obj.prepared_at),
date_sequencing=str(sample_obj.reads_updated_at),
date_sequencing=str(sample_obj.last_sequenced_at),
selection_criteria=self.lims_api.get_sample_attribute(
lims_id=sample_obj.internal_id, key="selection_criteria"
),
Expand Down
7 changes: 7 additions & 0 deletions cg/models/flow_cell/flow_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from pathlib import Path
from typing import Optional, Type, Union
from cg.models.flow_cell.utils import parse_date

from pydantic import ValidationError
from typing_extensions import Literal
Expand Down Expand Up @@ -74,6 +75,12 @@ def full_name(self) -> str:
"""Return flow cell full name."""
return self.path.name

@property
def sequenced_at(self) -> list[str]:
"""Return the sequencing date for the flow cell."""
date_part: str = self.full_name.split("_")[0]
return parse_date(date_part)

@property
def sample_sheet_path(self) -> Path:
"""
Expand Down
10 changes: 10 additions & 0 deletions cg/models/flow_cell/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from datetime import datetime


def parse_date(date_string: str) -> datetime:
if len(date_string) == 6:
return datetime.strptime(date_string, "%y%m%d")
elif len(date_string) == 8:
return datetime.strptime(date_string, "%Y%m%d")
else:
raise ValueError("Date format not recognized")
2 changes: 1 addition & 1 deletion cg/server/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class SampleView(BaseView):
"comment",
"downsampled_to",
"is_tumour",
"reads_updated_at",
"last_sequenced_at",
"sex",
]
column_filters = ["customer.internal_id", "priority", "sex", "application_version.application"]
Expand Down
8 changes: 4 additions & 4 deletions cg/store/api/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ def _calculate_case_data(self, case_obj: Family) -> SimpleNamespace:
)
case_data.samples_sequenced = len(
[
link.sample.reads_updated_at
link.sample.last_sequenced_at
for link in case_obj.links
if link.sample.reads_updated_at
if link.sample.last_sequenced_at
]
)
case_data.samples_delivered = len(
Expand Down Expand Up @@ -450,9 +450,9 @@ def _calculate_case_data(self, case_obj: Family) -> SimpleNamespace:

if case_data.samples_to_sequence > 0 and case_data.samples_sequenced_bool:
case_data.samples_sequenced_at = max(
link.sample.reads_updated_at
link.sample.last_sequenced_at
for link in case_obj.links
if link.sample.reads_updated_at is not None
if link.sample.last_sequenced_at is not None
)

if case_data.samples_to_deliver > 0 and case_data.samples_delivered_bool:
Expand Down
4 changes: 2 additions & 2 deletions cg/store/filters/status_case_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def filter_cases_for_analysis(cases: Query, **kwargs) -> Query:
),
and_(
Family.action.is_(None),
Analysis.created_at < Sample.reads_updated_at,
Analysis.created_at < Sample.last_sequenced_at,
),
)
)


def filter_cases_has_sequence(cases: Query, **kwargs) -> Query:
"""Filter cases that is not sequenced according to record in StatusDB."""
return cases.filter(or_(Application.is_external, Sample.reads_updated_at.isnot(None)))
return cases.filter(or_(Application.is_external, Sample.last_sequenced_at.isnot(None)))


def filter_cases_not_analysed(cases: Query, **kwargs) -> Query:
Expand Down
4 changes: 2 additions & 2 deletions cg/store/filters/status_sample_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ def filter_samples_is_not_down_sampled(samples: Query, **kwargs) -> Query:

def filter_samples_is_sequenced(samples: Query, **kwargs) -> Query:
"""Return samples that are sequenced."""
return samples.filter(Sample.reads_updated_at.isnot(None))
return samples.filter(Sample.last_sequenced_at.isnot(None))


def filter_samples_is_not_sequenced(samples: Query, **kwargs) -> Query:
"""Return samples that are not sequenced."""
return samples.filter(Sample.reads_updated_at.is_(None))
return samples.filter(Sample.last_sequenced_at.is_(None))


def filter_samples_do_invoice(samples: Query, **kwargs) -> Query:
Expand Down
10 changes: 5 additions & 5 deletions cg/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ def latest_sequenced(self) -> Optional[dt.datetime]:
for link in self.links:
if link.sample.application_version.application.is_external:
sequenced_dates.append(link.sample.ordered_at)
elif link.sample.reads_updated_at:
sequenced_dates.append(link.sample.reads_updated_at)
elif link.sample.last_sequenced_at:
sequenced_dates.append(link.sample.last_sequenced_at)
return max(sequenced_dates, default=None)

@property
Expand Down Expand Up @@ -688,7 +688,7 @@ class Sample(Model, PriorityMixin):

priority = Column(types.Enum(Priority), default=Priority.standard, nullable=False)
reads = Column(types.BigInteger, default=0)
reads_updated_at = Column(types.DateTime)
last_sequenced_at = Column(types.DateTime)
received_at = Column(types.DateTime)
reference_genome = Column(types.String(255))
sequence_start = Column(types.DateTime)
Expand Down Expand Up @@ -760,8 +760,8 @@ def state(self) -> str:
"""Get the current sample state."""
if self.delivered_at:
return f"Delivered {self.delivered_at.date()}"
if self.reads_updated_at:
return f"Sequenced {self.reads_updated_at.date()}"
if self.last_sequenced_at:
return f"Sequenced {self.last_sequenced_at.date()}"
if self.sequence_start:
return f"Sequencing {self.sequence_start.date()}"
if self.received_at:
Expand Down
Loading

0 comments on commit 31b6017

Please sign in to comment.