Skip to content

Commit

Permalink
refacto(flow cell dir data pt1) (#3310) (patch)
Browse files Browse the repository at this point in the history
# Description
First part of the larger refactoring.
Will do more specific refactoring of other modules separately.
  • Loading branch information
ChrOertlin authored Jun 3, 2024
1 parent 985acfb commit dee97ab
Show file tree
Hide file tree
Showing 41 changed files with 317 additions and 297 deletions.
32 changes: 16 additions & 16 deletions cg/apps/demultiplex/demultiplex_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from cg.exc import HousekeeperFileMissingError
from cg.io.controller import WriteFile
from cg.models.demultiplex.sbatch import SbatchCommand, SbatchError
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.models.slurm.sbatch import SbatchDragen

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,7 +55,7 @@ def set_dry_run(self, dry_run: bool) -> None:

@staticmethod
def get_sbatch_error(
flow_cell: FlowCellDirectoryData,
flow_cell: IlluminaRunDirectoryData,
email: str,
demux_dir: Path,
) -> str:
Expand Down Expand Up @@ -95,26 +95,26 @@ def get_sbatch_command(
return DEMULTIPLEX_COMMAND.format(**command_parameters.model_dump())

@staticmethod
def demultiplex_sbatch_path(flow_cell: FlowCellDirectoryData) -> Path:
def demultiplex_sbatch_path(flow_cell: IlluminaRunDirectoryData) -> Path:
"""Get the path to where sbatch script file should be kept."""
return Path(flow_cell.path, "demux-novaseq.sh")

@staticmethod
def get_run_name(flow_cell: FlowCellDirectoryData) -> str:
def get_run_name(flow_cell: IlluminaRunDirectoryData) -> str:
"""Create the run name for the sbatch job."""
return f"{flow_cell.id}_demultiplex"

@staticmethod
def get_stderr_logfile(flow_cell: FlowCellDirectoryData) -> Path:
def get_stderr_logfile(flow_cell: IlluminaRunDirectoryData) -> Path:
"""Create the path to the stderr logfile."""
return Path(flow_cell.path, f"{DemultiplexingAPI.get_run_name(flow_cell)}.stderr")

@staticmethod
def get_stdout_logfile(flow_cell: FlowCellDirectoryData) -> Path:
def get_stdout_logfile(flow_cell: IlluminaRunDirectoryData) -> Path:
"""Create the path to the stdout logfile."""
return Path(flow_cell.path, f"{DemultiplexingAPI.get_run_name(flow_cell)}.stdout")

def flow_cell_out_dir_path(self, flow_cell: FlowCellDirectoryData) -> Path:
def flow_cell_out_dir_path(self, flow_cell: IlluminaRunDirectoryData) -> Path:
"""Create the path to where the demultiplexed result should be produced."""
return Path(self.demultiplexed_runs_dir, flow_cell.path.name)

Expand All @@ -126,13 +126,13 @@ def is_sample_sheet_in_housekeeper(self, flow_cell_id: str) -> bool:
except HousekeeperFileMissingError:
return False

def get_flow_cell_unaligned_dir(self, flow_cell: FlowCellDirectoryData) -> Path:
def get_flow_cell_unaligned_dir(self, flow_cell: IlluminaRunDirectoryData) -> Path:
"""Returns the path to where the demultiplexed result are located."""
return Path(
self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME
)

def demultiplexing_completed_path(self, flow_cell: FlowCellDirectoryData) -> Path:
def demultiplexing_completed_path(self, flow_cell: IlluminaRunDirectoryData) -> Path:
"""Return the path to demultiplexing complete file."""
LOG.info(
Path(self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.DEMUX_COMPLETE)
Expand All @@ -141,7 +141,7 @@ def demultiplexing_completed_path(self, flow_cell: FlowCellDirectoryData) -> Pat
self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.DEMUX_COMPLETE
)

def is_demultiplexing_possible(self, flow_cell: FlowCellDirectoryData) -> bool:
def is_demultiplexing_possible(self, flow_cell: IlluminaRunDirectoryData) -> bool:
"""Check if it is possible to start demultiplexing.
This means that
Expand All @@ -151,7 +151,7 @@ def is_demultiplexing_possible(self, flow_cell: FlowCellDirectoryData) -> bool:
"""
LOG.info(f"Check if demultiplexing is possible for {flow_cell.id}")
demultiplexing_possible = True
if not flow_cell.is_flow_cell_ready():
if not flow_cell.is_sequencing_run_ready():
demultiplexing_possible = False

if not flow_cell.sample_sheet_exists():
Expand Down Expand Up @@ -190,7 +190,7 @@ def write_trailblazer_config(content: dict, file_path: Path) -> None:
)

def add_to_trailblazer(
self, tb_api: TrailblazerAPI, slurm_job_id: int, flow_cell: FlowCellDirectoryData
self, tb_api: TrailblazerAPI, slurm_job_id: int, flow_cell: IlluminaRunDirectoryData
):
"""Add demultiplexing entry to trailblazer."""
if self.dry_run:
Expand All @@ -209,7 +209,7 @@ def add_to_trailblazer(
workflow=Workflow.DEMULTIPLEX,
)

def start_demultiplexing(self, flow_cell: FlowCellDirectoryData):
def start_demultiplexing(self, flow_cell: IlluminaRunDirectoryData):
"""Start demultiplexing for a flow cell."""
self.create_demultiplexing_started_file(flow_cell.demultiplexing_started_path)
log_path: Path = self.get_stderr_logfile(flow_cell=flow_cell)
Expand Down Expand Up @@ -243,16 +243,16 @@ def start_demultiplexing(self, flow_cell: FlowCellDirectoryData):
LOG.info(f"Demultiplexing running as job {sbatch_number}")
return sbatch_number

def prepare_output_directory(self, flow_cell: FlowCellDirectoryData) -> None:
def prepare_output_directory(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""Makes sure the output directory is ready for demultiplexing."""
self.remove_demultiplexing_output_directory(flow_cell)
self.create_demultiplexing_output_dir(flow_cell)

def remove_demultiplexing_output_directory(self, flow_cell: FlowCellDirectoryData) -> None:
def remove_demultiplexing_output_directory(self, flow_cell: IlluminaRunDirectoryData) -> None:
if not self.dry_run and self.flow_cell_out_dir_path(flow_cell=flow_cell).exists():
shutil.rmtree(self.flow_cell_out_dir_path(flow_cell=flow_cell), ignore_errors=False)

def create_demultiplexing_output_dir(self, flow_cell: FlowCellDirectoryData) -> None:
def create_demultiplexing_output_dir(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""Creates the demultiplexing output directory for the flow cell."""
output_directory: Path = self.flow_cell_out_dir_path(flow_cell)
LOG.debug(f"Creating demultiplexing output directory: {output_directory}")
Expand Down
20 changes: 10 additions & 10 deletions cg/apps/demultiplex/sample_sheet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
add_and_include_sample_sheet_path_to_housekeeper,
delete_sample_sheet_from_housekeeper,
)
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.utils.files import get_directories_in_path, link_or_overwrite_file

LOG = logging.getLogger(__name__)
Expand All @@ -45,7 +45,7 @@ def set_force(self, force: bool) -> None:
LOG.debug(f"Set force to {force}")
self.force = force

def _get_flow_cell(self, flow_cell_name: str) -> FlowCellDirectoryData:
def _get_flow_cell(self, flow_cell_name: str) -> IlluminaRunDirectoryData:
"""
Return a flow cell given a path.
Raises:
Expand All @@ -57,7 +57,7 @@ def _get_flow_cell(self, flow_cell_name: str) -> FlowCellDirectoryData:
LOG.warning(message)
raise SampleSheetError(message)
try:
flow_cell = FlowCellDirectoryData(flow_cell_path)
flow_cell = IlluminaRunDirectoryData(flow_cell_path)
except FlowCellError as error:
raise SampleSheetError from error
return flow_cell
Expand All @@ -77,7 +77,7 @@ def validate_sample_sheet(self, sample_sheet_path: Path) -> None:
self.validator.validate_sample_sheet_from_content(sample_sheet_content)

@staticmethod
def _are_necessary_files_in_flow_cell(flow_cell: FlowCellDirectoryData) -> bool:
def _are_necessary_files_in_flow_cell(flow_cell: IlluminaRunDirectoryData) -> bool:
"""Determine if the flow cell has a Run Parameters file and a sample sheet."""
try:
flow_cell.run_parameters_path.exists()
Expand Down Expand Up @@ -108,7 +108,7 @@ def _replace_sample_header(sample_sheet_content: list[list[str]]) -> list[list[s

def translate_sample_sheet(self, flow_cell_name: str) -> None:
"""Translate a Bcl2Fastq sample sheet to a BCLConvert sample sheet."""
flow_cell: FlowCellDirectoryData = self._get_flow_cell(flow_cell_name)
flow_cell: IlluminaRunDirectoryData = self._get_flow_cell(flow_cell_name)
if not self._are_necessary_files_in_flow_cell(flow_cell):
raise SampleSheetError("Could not translate sample sheet")
original_content: list[list[str]] = ReadFile.get_content_from_file(
Expand Down Expand Up @@ -137,7 +137,7 @@ def translate_sample_sheet(self, flow_cell_name: str) -> None:
file_path=flow_cell.sample_sheet_path,
)

def _use_sample_sheet_from_housekeeper(self, flow_cell: FlowCellDirectoryData) -> None:
def _use_sample_sheet_from_housekeeper(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""
Copy the sample sheet from Housekeeper to the flow cell directory if it exists and is valid.
"""
Expand All @@ -152,7 +152,7 @@ def _use_sample_sheet_from_housekeeper(self, flow_cell: FlowCellDirectoryData) -
if not self.dry_run:
link_or_overwrite_file(src=sample_sheet_path, dst=flow_cell.sample_sheet_path)

def _use_flow_cell_sample_sheet(self, flow_cell: FlowCellDirectoryData) -> None:
def _use_flow_cell_sample_sheet(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""Use the sample sheet from the flow cell directory if it is valid."""
self.validate_sample_sheet(flow_cell.sample_sheet_path)
LOG.info("Sample sheet from flow cell directory is valid. Adding it to Housekeeper")
Expand All @@ -167,7 +167,7 @@ def _use_flow_cell_sample_sheet(self, flow_cell: FlowCellDirectoryData) -> None:
hk_api=self.hk_api,
)

def _get_sample_sheet_content(self, flow_cell: FlowCellDirectoryData) -> list[list[str]]:
def _get_sample_sheet_content(self, flow_cell: IlluminaRunDirectoryData) -> list[list[str]]:
"""Return the sample sheet content for a flow cell."""
lims_samples: list[FlowCellSample] = list(
get_flow_cell_samples(
Expand All @@ -185,7 +185,7 @@ def _get_sample_sheet_content(self, flow_cell: FlowCellDirectoryData) -> list[li
)
return creator.construct_sample_sheet()

def _create_sample_sheet_file(self, flow_cell: FlowCellDirectoryData) -> None:
def _create_sample_sheet_file(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""Create a valid sample sheet in the flow cell directory and add it to Housekeeper."""
sample_sheet_content: list[list[str]] = self._get_sample_sheet_content(flow_cell)
if not self.force:
Expand Down Expand Up @@ -216,7 +216,7 @@ def get_or_create_sample_sheet(self, flow_cell_name: str) -> None:
Ensure that a valid sample sheet is present in the flow cell directory by fetching it from
housekeeper or creating it if there is not a valid sample sheet.
"""
flow_cell: FlowCellDirectoryData = self._get_flow_cell(flow_cell_name)
flow_cell: IlluminaRunDirectoryData = self._get_flow_cell(flow_cell_name)
LOG.info("Fetching and validating sample sheet from Housekeeper")
try:
self._use_sample_sheet_from_housekeeper(flow_cell)
Expand Down
6 changes: 3 additions & 3 deletions cg/apps/demultiplex/sample_sheet/sample_sheet_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cg.apps.demultiplex.sample_sheet.sample_models import FlowCellSample
from cg.constants.demultiplexing import IndexSettings, SampleSheetBCLConvertSections
from cg.models.demultiplex.run_parameters import RunParameters
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData

LOG = logging.getLogger(__name__)

Expand All @@ -16,10 +16,10 @@ class SampleSheetCreator:

def __init__(
self,
flow_cell: FlowCellDirectoryData,
flow_cell: IlluminaRunDirectoryData,
lims_samples: list[FlowCellSample],
):
self.flow_cell: FlowCellDirectoryData = flow_cell
self.flow_cell: IlluminaRunDirectoryData = flow_cell
self.flow_cell_id: str = flow_cell.id
self.lims_samples: list[FlowCellSample] = lims_samples
self.run_parameters: RunParameters = flow_cell.run_parameters
Expand Down
16 changes: 8 additions & 8 deletions cg/cli/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
)
from cg.meta.tar.tar import TarAPI
from cg.models.cg_config import CGConfig
from cg.models.flow_cell.flow_cell import (
FlowCellDirectoryData,
get_flow_cells_from_path,
from cg.models.run_devices.illumina_run_directory_data import (
IlluminaRunDirectoryData,
get_sequencing_runs_from_path,
)
from cg.store.models import Flowcell, Sample
from cg.store.store import Store
Expand All @@ -52,8 +52,8 @@ def backup_flow_cells(context: CGConfig, dry_run: bool):
pdc_api = context.pdc_api
pdc_api.dry_run = dry_run
status_db: Store = context.status_db
flow_cells: list[FlowCellDirectoryData] = get_flow_cells_from_path(
flow_cells_dir=Path(context.illumina_flow_cells_directory)
flow_cells: list[IlluminaRunDirectoryData] = get_sequencing_runs_from_path(
sequencing_run_dir=Path(context.illumina_flow_cells_directory)
)
for flow_cell in flow_cells:
db_flow_cell: Flowcell | None = status_db.get_flow_cell_by_name(flow_cell_name=flow_cell.id)
Expand Down Expand Up @@ -88,8 +88,8 @@ def backup_flow_cells(context: CGConfig, dry_run: bool):
def encrypt_flow_cells(context: CGConfig, dry_run: bool):
"""Encrypt flow cells."""
status_db: Store = context.status_db
flow_cells: list[FlowCellDirectoryData] = get_flow_cells_from_path(
flow_cells_dir=Path(context.illumina_flow_cells_directory)
flow_cells: list[IlluminaRunDirectoryData] = get_sequencing_runs_from_path(
sequencing_run_dir=Path(context.illumina_flow_cells_directory)
)
for flow_cell in flow_cells:
db_flow_cell: Flowcell | None = status_db.get_flow_cell_by_name(flow_cell_name=flow_cell.id)
Expand Down Expand Up @@ -206,7 +206,7 @@ def archive_spring_file(config: CGConfig, spring_file_path: str, dry_run: bool):
@DRY_RUN
@click.option("-s", "--sample-id", "object_type", flag_value="sample", type=str)
@click.option("-c", "--case-id", "object_type", flag_value="case", type=str)
@click.option("-f", "--flow-cell-id", "object_type", flag_value="flow_cell", type=str)
@click.option("-f", "--flow-cell-id", "object_type", flag_value="run_devices", type=str)
@click.argument("identifier", type=str)
@click.pass_context
@click.pass_obj
Expand Down
6 changes: 3 additions & 3 deletions cg/cli/demultiplex/demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
is_syncing_complete,
)
from cg.models.cg_config import CGConfig
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData

LOG = logging.getLogger(__name__)

Expand All @@ -50,7 +50,7 @@ def demultiplex_all(context: CGConfig, flow_cells_directory: click.Path, dry_run
continue
LOG.info(f"Found directory {sub_dir}")
try:
flow_cell = FlowCellDirectoryData(flow_cell_path=sub_dir)
flow_cell = IlluminaRunDirectoryData(sequencing_run_path=sub_dir)
except FlowCellError:
continue

Expand Down Expand Up @@ -98,7 +98,7 @@ def demultiplex_flow_cell(
LOG.info(f"setting demultiplexed runs dir to {demultiplex_api.demultiplexed_runs_dir}")

try:
flow_cell = FlowCellDirectoryData(flow_cell_directory)
flow_cell = IlluminaRunDirectoryData(flow_cell_directory)
except FlowCellError as error:
raise click.Abort from error

Expand Down
2 changes: 1 addition & 1 deletion cg/constants/devices.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Enums for devices."""
"""Enums for run_devices."""

from enum import Enum, auto

Expand Down
4 changes: 2 additions & 2 deletions cg/meta/clean/clean_flow_cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
HousekeeperBundleVersionMissingError,
HousekeeperFileMissingError,
)
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.store.models import Flowcell, SampleLaneSequencingMetrics
from cg.store.store import Store
from cg.utils.files import remove_directory_and_contents
Expand Down Expand Up @@ -42,7 +42,7 @@ def __init__(
):
self.status_db: Store = status_db
self.hk_api: HousekeeperAPI = housekeeper_api
self.flow_cell = FlowCellDirectoryData(flow_cell_path=flow_cell_path)
self.flow_cell = IlluminaRunDirectoryData(sequencing_run_path=flow_cell_path)
self.dry_run: bool = dry_run
LOG.info(f"Trying to delete {flow_cell_path}")

Expand Down
6 changes: 3 additions & 3 deletions cg/meta/demultiplex/demux_post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
is_flow_cell_ready_for_postprocessing,
)
from cg.models.cg_config import CGConfig
from cg.models.flow_cell.flow_cell import FlowCellDirectoryData
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.store.store import Store

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,7 +70,7 @@ def finish_flow_cell(

flow_cell_out_directory = Path(self.demultiplexed_runs_dir, flow_cell_directory_name)

flow_cell = FlowCellDirectoryData(flow_cell_out_directory)
flow_cell = IlluminaRunDirectoryData(flow_cell_out_directory)

sample_sheet_path: Path = self.hk_api.get_sample_sheet_path(flow_cell.id)
flow_cell.set_sample_sheet_path_hk(hk_path=sample_sheet_path)
Expand Down Expand Up @@ -108,7 +108,7 @@ def finish_all_flow_cells(self) -> bool:
continue
return is_error_raised

def store_flow_cell_data(self, parsed_flow_cell: FlowCellDirectoryData) -> None:
def store_flow_cell_data(self, parsed_flow_cell: IlluminaRunDirectoryData) -> None:
"""Store data from the flow cell directory in status db and housekeeper."""
store_flow_cell_data_in_status_db(
parsed_flow_cell=parsed_flow_cell,
Expand Down
Loading

0 comments on commit dee97ab

Please sign in to comment.