Skip to content

Commit

Permalink
refactor(rename flow cell in demux api) (#3313) (patch)
Browse files Browse the repository at this point in the history
Refactoring demultiplexing api
  • Loading branch information
ChrOertlin authored Jun 10, 2024
1 parent 940cd15 commit 7c74e5e
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 114 deletions.
126 changes: 71 additions & 55 deletions cg/apps/demultiplex/demultiplex_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, config: dict, housekeeper_api: HousekeeperAPI, out_dir: Path
self.hk_api = housekeeper_api
self.slurm_account: str = config["demultiplex"]["slurm"]["account"]
self.mail: str = config["demultiplex"]["slurm"]["mail_user"]
self.flow_cells_dir: Path = Path(config["illumina_flow_cells_directory"])
self.sequencing_runs_dir: Path = Path(config["illumina_flow_cells_directory"])
self.demultiplexed_runs_dir: Path = out_dir or Path(
config["illumina_demultiplexed_runs_directory"]
)
Expand All @@ -55,18 +55,18 @@ def set_dry_run(self, dry_run: bool) -> None:

@staticmethod
def get_sbatch_error(
flow_cell: IlluminaRunDirectoryData,
sequencing_run: IlluminaRunDirectoryData,
email: str,
demux_dir: Path,
) -> str:
"""Create the sbatch error string."""
LOG.debug("Creating the sbatch error string")
error_parameters: SbatchError = SbatchError(
flow_cell_id=flow_cell.id,
flow_cell_id=sequencing_run.id,
email=email,
logfile=DemultiplexingAPI.get_stderr_logfile(flow_cell=flow_cell).as_posix(),
logfile=DemultiplexingAPI.get_stderr_logfile(sequencing_run=sequencing_run).as_posix(),
demux_dir=demux_dir.as_posix(),
demux_started=flow_cell.demultiplexing_started_path.as_posix(),
demux_started=sequencing_run.demultiplexing_started_path.as_posix(),
)
return DEMULTIPLEX_ERROR.format(**error_parameters.model_dump())

Expand Down Expand Up @@ -95,28 +95,28 @@ def get_sbatch_command(
return DEMULTIPLEX_COMMAND.format(**command_parameters.model_dump())

@staticmethod
def demultiplex_sbatch_path(flow_cell: IlluminaRunDirectoryData) -> Path:
def demultiplex_sbatch_path(sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Get the path to where sbatch script file should be kept."""
return Path(flow_cell.path, "demux-novaseq.sh")
return Path(sequencing_run.path, "demux-novaseq.sh")

@staticmethod
def get_run_name(flow_cell: IlluminaRunDirectoryData) -> str:
def get_run_name(sequencing_run: IlluminaRunDirectoryData) -> str:
"""Create the run name for the sbatch job."""
return f"{flow_cell.id}_demultiplex"
return f"{sequencing_run.id}_demultiplex"

@staticmethod
def get_stderr_logfile(flow_cell: IlluminaRunDirectoryData) -> Path:
def get_stderr_logfile(sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Create the path to the stderr logfile."""
return Path(flow_cell.path, f"{DemultiplexingAPI.get_run_name(flow_cell)}.stderr")
return Path(sequencing_run.path, f"{DemultiplexingAPI.get_run_name(sequencing_run)}.stderr")

@staticmethod
def get_stdout_logfile(flow_cell: IlluminaRunDirectoryData) -> Path:
def get_stdout_logfile(sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Create the path to the stdout logfile."""
return Path(flow_cell.path, f"{DemultiplexingAPI.get_run_name(flow_cell)}.stdout")
return Path(sequencing_run.path, f"{DemultiplexingAPI.get_run_name(sequencing_run)}.stdout")

def flow_cell_out_dir_path(self, flow_cell: IlluminaRunDirectoryData) -> Path:
def demultiplexed_run_dir_path(self, sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Create the path to where the demultiplexed result should be produced."""
return Path(self.demultiplexed_runs_dir, flow_cell.path.name)
return Path(self.demultiplexed_runs_dir, sequencing_run.path.name)

def is_sample_sheet_in_housekeeper(self, flow_cell_id: str) -> bool:
"""Returns True if the sample sheet for the flow cell exists in Housekeeper."""
Expand All @@ -126,45 +126,52 @@ def is_sample_sheet_in_housekeeper(self, flow_cell_id: str) -> bool:
except HousekeeperFileMissingError:
return False

def get_flow_cell_unaligned_dir(self, flow_cell: IlluminaRunDirectoryData) -> Path:
def get_sequencing_run_unaligned_dir(self, sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Returns the path to where the demultiplexed result are located."""
return Path(
self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME
self.demultiplexed_run_dir_path(sequencing_run),
DemultiplexingDirsAndFiles.UNALIGNED_DIR_NAME,
)

def demultiplexing_completed_path(self, flow_cell: IlluminaRunDirectoryData) -> Path:
def demultiplexing_completed_path(self, sequencing_run: IlluminaRunDirectoryData) -> Path:
"""Return the path to demultiplexing complete file."""
LOG.info(
Path(self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.DEMUX_COMPLETE)
Path(
self.demultiplexed_run_dir_path(sequencing_run),
DemultiplexingDirsAndFiles.DEMUX_COMPLETE,
)
)
return Path(
self.flow_cell_out_dir_path(flow_cell), DemultiplexingDirsAndFiles.DEMUX_COMPLETE
self.demultiplexed_run_dir_path(sequencing_run),
DemultiplexingDirsAndFiles.DEMUX_COMPLETE,
)

def is_demultiplexing_possible(self, flow_cell: IlluminaRunDirectoryData) -> bool:
def is_demultiplexing_possible(self, sequencing_run: IlluminaRunDirectoryData) -> bool:
"""Check if it is possible to start demultiplexing.
This means that
- flow cell should be ready for demultiplexing (all files in place)
- sequencing run should be ready for demultiplexing (all files in place)
- sample sheet needs to exist
- demultiplexing should not be running
"""
LOG.info(f"Check if demultiplexing is possible for {flow_cell.id}")
LOG.info(f"Check if demultiplexing is possible for {sequencing_run.id}")
demultiplexing_possible = True
if not flow_cell.is_sequencing_run_ready():
if not sequencing_run.is_sequencing_run_ready():
demultiplexing_possible = False

if not flow_cell.sample_sheet_exists():
LOG.warning(f"Could not find sample sheet in flow cell directory for {flow_cell.id}")
if not sequencing_run.sample_sheet_exists():
LOG.warning(
f"Could not find sample sheet in sequencing run directory for {sequencing_run.id}"
)
demultiplexing_possible = False

if not self.is_sample_sheet_in_housekeeper(flow_cell_id=flow_cell.id):
LOG.warning(f"Could not find sample sheet in Housekeeper for {flow_cell.id}")
if not self.is_sample_sheet_in_housekeeper(flow_cell_id=sequencing_run.id):
LOG.warning(f"Could not find sample sheet in Housekeeper for {sequencing_run.id}")
demultiplexing_possible = False

if (
flow_cell.has_demultiplexing_started_locally()
or flow_cell.has_demultiplexing_started_on_sequencer()
sequencing_run.has_demultiplexing_started_locally()
or sequencing_run.has_demultiplexing_started_on_sequencer()
):
LOG.warning("Demultiplexing has already been started")
demultiplexing_possible = False
Expand All @@ -190,37 +197,39 @@ def write_trailblazer_config(content: dict, file_path: Path) -> None:
)

def add_to_trailblazer(
self, tb_api: TrailblazerAPI, slurm_job_id: int, flow_cell: IlluminaRunDirectoryData
self, tb_api: TrailblazerAPI, slurm_job_id: int, sequencing_run: IlluminaRunDirectoryData
):
"""Add demultiplexing entry to trailblazer."""
if self.dry_run:
return
self.write_trailblazer_config(
content=self.get_trailblazer_config(slurm_job_id=slurm_job_id),
file_path=flow_cell.trailblazer_config_path,
file_path=sequencing_run.trailblazer_config_path,
)
tb_api.add_pending_analysis(
case_id=flow_cell.id,
case_id=sequencing_run.id,
analysis_type=AnalysisTypes.OTHER,
config_path=flow_cell.trailblazer_config_path.as_posix(),
out_dir=flow_cell.trailblazer_config_path.parent.as_posix(),
config_path=sequencing_run.trailblazer_config_path.as_posix(),
out_dir=sequencing_run.trailblazer_config_path.parent.as_posix(),
slurm_quality_of_service=self.slurm_quality_of_service,
email=self.mail,
workflow=Workflow.DEMULTIPLEX,
)

def start_demultiplexing(self, flow_cell: IlluminaRunDirectoryData):
"""Start demultiplexing for a flow cell."""
self.create_demultiplexing_started_file(flow_cell.demultiplexing_started_path)
log_path: Path = self.get_stderr_logfile(flow_cell=flow_cell)
def start_demultiplexing(self, sequencing_run: IlluminaRunDirectoryData):
"""Start demultiplexing for a sequencing run."""
self.create_demultiplexing_started_file(sequencing_run.demultiplexing_started_path)
log_path: Path = self.get_stderr_logfile(sequencing_run=sequencing_run)
error_function: str = self.get_sbatch_error(
flow_cell=flow_cell, email=self.mail, demux_dir=self.flow_cell_out_dir_path(flow_cell)
sequencing_run=sequencing_run,
email=self.mail,
demux_dir=self.demultiplexed_run_dir_path(sequencing_run),
)
commands: str = self.get_sbatch_command(
run_dir=flow_cell.path,
demux_dir=self.flow_cell_out_dir_path(flow_cell=flow_cell),
sample_sheet=flow_cell.sample_sheet_path,
demux_completed=self.demultiplexing_completed_path(flow_cell=flow_cell),
run_dir=sequencing_run.path,
demux_dir=self.demultiplexed_run_dir_path(sequencing_run=sequencing_run),
sample_sheet=sequencing_run.sample_sheet_path,
demux_completed=self.demultiplexing_completed_path(sequencing_run=sequencing_run),
environment=self.environment,
)
sbatch_parameters: SbatchDragen = SbatchDragen(
Expand All @@ -229,31 +238,38 @@ def start_demultiplexing(self, flow_cell: IlluminaRunDirectoryData):
email=self.mail,
error=error_function,
hours=36,
job_name=self.get_run_name(flow_cell),
job_name=self.get_run_name(sequencing_run),
log_dir=log_path.parent.as_posix(),
quality_of_service=self.slurm_quality_of_service,
)
sbatch_content: str = self.slurm_api.generate_sbatch_content(
sbatch_parameters=sbatch_parameters
)
sbatch_path: Path = self.demultiplex_sbatch_path(flow_cell=flow_cell)
sbatch_path: Path = self.demultiplex_sbatch_path(sequencing_run=sequencing_run)
sbatch_number: int = self.slurm_api.submit_sbatch(
sbatch_content=sbatch_content, sbatch_path=sbatch_path
)
LOG.info(f"Demultiplexing running as job {sbatch_number}")
return sbatch_number

def prepare_output_directory(self, flow_cell: IlluminaRunDirectoryData) -> None:
def prepare_output_directory(self, sequencing_run: IlluminaRunDirectoryData) -> None:
"""Makes sure the output directory is ready for demultiplexing."""
self.remove_demultiplexing_output_directory(flow_cell)
self.create_demultiplexing_output_dir(flow_cell)
self.remove_demultiplexing_output_directory(sequencing_run)
self.create_demultiplexing_output_dir(sequencing_run)

def remove_demultiplexing_output_directory(self, flow_cell: IlluminaRunDirectoryData) -> None:
if not self.dry_run and self.flow_cell_out_dir_path(flow_cell=flow_cell).exists():
shutil.rmtree(self.flow_cell_out_dir_path(flow_cell=flow_cell), ignore_errors=False)
def remove_demultiplexing_output_directory(
self, sequencing_run: IlluminaRunDirectoryData
) -> None:
if (
not self.dry_run
and self.demultiplexed_run_dir_path(sequencing_run=sequencing_run).exists()
):
shutil.rmtree(
self.demultiplexed_run_dir_path(sequencing_run=sequencing_run), ignore_errors=False
)

def create_demultiplexing_output_dir(self, flow_cell: IlluminaRunDirectoryData) -> None:
"""Creates the demultiplexing output directory for the flow cell."""
output_directory: Path = self.flow_cell_out_dir_path(flow_cell)
def create_demultiplexing_output_dir(self, sequencing_run: IlluminaRunDirectoryData) -> None:
"""Creates the demultiplexing output directory for the sequencing run."""
output_directory: Path = self.demultiplexed_run_dir_path(sequencing_run)
LOG.debug(f"Creating demultiplexing output directory: {output_directory}")
output_directory.mkdir(exist_ok=False, parents=True)
Loading

0 comments on commit 7c74e5e

Please sign in to comment.