Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(hk service to pacbio flow) #3466

Merged
merged 16 commits into from
Jul 26, 2024
18 changes: 9 additions & 9 deletions cg/apps/housekeeper/hk.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,19 +570,19 @@ def add_bundle_and_version_if_non_existent(self, bundle_name: str) -> None:
else:
LOG.debug(f"Bundle with name {bundle_name} already exists")

def store_fastq_path_in_housekeeper(
def create_bundle_and_add_file_with_tags(
self,
sample_internal_id: str,
sample_fastq_path: Path,
flow_cell_id: str,
bundle_name: str,
file_path: Path,
tags: list[str],
) -> None:
"""Add the fastq file path with tags to a bundle and version in Housekeeper."""
self.add_bundle_and_version_if_non_existent(sample_internal_id)
self.add_tags_if_non_existent([sample_internal_id])
self.add_bundle_and_version_if_non_existent(bundle_name)
self.add_tags_if_non_existent([bundle_name])
self.add_file_to_bundle_if_non_existent(
file_path=sample_fastq_path,
bundle_name=sample_internal_id,
tag_names=[SequencingFileTag.FASTQ, flow_cell_id, sample_internal_id],
file_path=file_path,
bundle_name=bundle_name,
tag_names=tags,
)

def get_archive_entries(
Expand Down
39 changes: 38 additions & 1 deletion cg/constants/pacbio.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Constants related to PacBio sequencing."""

from cg.constants import FileExtensions
from cg.constants.housekeeper_tags import AlignmentFileTag


class PacBioDirsAndFiles:
CCS_REPORT_SUFFIX: str = "ccs_report.json"
CONTROL_REPORT: str = "control.report.json"
LOADING_REPORT: str = "loading.report.json"
HIFI_READS_DIR: str = "hifi_reads"
HIFI_READS: str = "hifi_reads"
RAW_DATA_REPORT: str = "raw_data.report.json"
SMRTLINK_DATASETS_REPORT: str = "smrtlink-datasets.json"
STATISTICS_DIR: str = "statistics"
Expand Down Expand Up @@ -54,3 +57,37 @@ class SmrtLinkDatabasesIDs:
PATH: str = "path"
WELL_NAME: str = "wellName"
WELL_SAMPLE_NAME: str = "wellSampleName"


class PacBioHousekeeperTags:
CCS_REPORT: str = "ccs-report"
CONTROL_REPORT: str = "control-report"
LOADING_REPORT: str = "loading-report"
RAWDATA_REPORT: str = "raw-data-report"
DATASETS_REPORT: str = "datasets-report"


class PacBioBundleTypes:
SAMPLE: str = "sample"
SMRT_CELL: str = "smrt_cell"


file_pattern_to_tag: dict[str, list[str]] = {
PacBioDirsAndFiles.CONTROL_REPORT: [PacBioHousekeeperTags.CONTROL_REPORT],
f".*{PacBioDirsAndFiles.CCS_REPORT_SUFFIX}$": [PacBioHousekeeperTags.CCS_REPORT],
PacBioDirsAndFiles.LOADING_REPORT: [PacBioHousekeeperTags.LOADING_REPORT],
PacBioDirsAndFiles.RAW_DATA_REPORT: [PacBioHousekeeperTags.RAWDATA_REPORT],
PacBioDirsAndFiles.SMRTLINK_DATASETS_REPORT: [PacBioHousekeeperTags.DATASETS_REPORT],
f"{PacBioDirsAndFiles.HIFI_READS}{FileExtensions.BAM}$": [AlignmentFileTag.BAM],
f"{PacBioDirsAndFiles.HIFI_READS}{FileExtensions.BAM}.pbi": [AlignmentFileTag.BAM, "pbi"],
}

file_pattern_to_bundle_type: dict[str, str] = {
PacBioDirsAndFiles.CONTROL_REPORT: PacBioBundleTypes.SMRT_CELL,
f".*{PacBioDirsAndFiles.CCS_REPORT_SUFFIX}$": PacBioBundleTypes.SMRT_CELL,
PacBioDirsAndFiles.LOADING_REPORT: PacBioBundleTypes.SMRT_CELL,
PacBioDirsAndFiles.RAW_DATA_REPORT: PacBioBundleTypes.SMRT_CELL,
PacBioDirsAndFiles.SMRTLINK_DATASETS_REPORT: PacBioBundleTypes.SMRT_CELL,
f"{PacBioDirsAndFiles.HIFI_READS}{FileExtensions.BAM}$": PacBioBundleTypes.SAMPLE,
f"{PacBioDirsAndFiles.HIFI_READS}{FileExtensions.BAM}.pbi": PacBioBundleTypes.SAMPLE,
}
16 changes: 8 additions & 8 deletions cg/services/illumina/post_processing/housekeeper_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ def add_sample_fastq_files_to_housekeeper(
device_internal_id=run_directory_data.id,
store=store,
):
hk_api.store_fastq_path_in_housekeeper(
sample_internal_id=sample_internal_id,
sample_fastq_path=sample_fastq_path,
flow_cell_id=run_directory_data.id,
hk_api.create_bundle_and_add_file_with_tags(
bundle_name=sample_internal_id,
file_path=sample_fastq_path,
tags=[run_directory_data.id, sample_internal_id, SequencingFileTag.FASTQ],
)


Expand All @@ -106,10 +106,10 @@ def store_undetermined_fastq_files(
device_internal_id=run_directory_data.id,
store=store,
):
hk_api.store_fastq_path_in_housekeeper(
sample_internal_id=sample_id,
sample_fastq_path=fastq_path,
flow_cell_id=run_directory_data.id,
hk_api.create_bundle_and_add_file_with_tags(
bundle_name=sample_id,
file_path=fastq_path,
tags=[run_directory_data.id, SequencingFileTag.FASTQ, sample_id],
)


Expand Down
14 changes: 11 additions & 3 deletions cg/services/post_processing/abstract_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class PostProcessingMetricsParser(ABC):
def __init__(self, file_manager: RunFileManager):
self.file_manager: RunFileManager = file_manager

@abstractmethod
def parse_metrics(self, run_data: RunData) -> RunMetrics:
pass

Expand Down Expand Up @@ -70,10 +71,17 @@ def store_post_processing_data(self, run_name):


class PostProcessingHKService(ABC):
def __init__(self, hk_api: HousekeeperAPI):
self.hk_api = HousekeeperAPI
def __init__(
self,
hk_api: HousekeeperAPI,
file_manager: RunFileManager,
metrics_parser: PostProcessingMetricsParser,
):
self.hk_api: HousekeeperAPI = hk_api
self.file_manager: RunFileManager = file_manager
self.metrics_parser: PostProcessingMetricsParser = metrics_parser

def store_files_in_housekeeper(self, file_to_store: list[Path]):
def store_files_in_housekeeper(self, run_data: RunData):
pass


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pathlib import Path

from pydantic import BaseModel


class PacBioFileData(BaseModel):
bundle_name: str
file_path: Path
tags: list[str]
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Module for the PacBioHousekeeperService used in the Post processing flow."""

import re
from pathlib import Path

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.constants.pacbio import file_pattern_to_tag, file_pattern_to_bundle_type, PacBioBundleTypes
from cg.services.post_processing.abstract_classes import PostProcessingHKService
from cg.services.post_processing.pacbio.housekeeper_service.models import PacBioFileData
from cg.services.post_processing.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.post_processing.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.post_processing.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.post_processing.pacbio.run_file_manager.run_file_manager import (
PacBioRunFileManager,
)
from cg.utils.mapping import get_item_by_pattern_in_source


class PacBioHousekeeperService(PostProcessingHKService):

def __init__(
self,
hk_api: HousekeeperAPI,
file_manager: PacBioRunFileManager,
metrics_parser: PacBioMetricsParser,
):
super().__init__(hk_api=hk_api, file_manager=file_manager, metrics_parser=metrics_parser)

def store_files_in_housekeeper(self, run_data: PacBioRunData):
parsed_metrics: PacBioMetrics = self.metrics_parser.parse_metrics(run_data)
file_to_store: list[Path] = self.file_manager.get_files_to_store(run_data)
for file_path in file_to_store:
bundle_info: PacBioFileData = self._create_bundle_info(
file_path=file_path, parsed_metrics=parsed_metrics
)
self.hk_api.create_bundle_and_add_file_with_tags(
bundle_name=bundle_info.bundle_name,
file_path=bundle_info.file_path,
tags=bundle_info.tags,
)

@staticmethod
def _get_bundle_type_for_file(file_path: Path) -> str:
return get_item_by_pattern_in_source(
source=file_path.name, pattern_map=file_pattern_to_bundle_type
)

@staticmethod
def _get_tags_for_file(file_path: Path) -> list[str]:
return get_item_by_pattern_in_source(source=file_path.name, pattern_map=file_pattern_to_tag)

@staticmethod
def _add_tag_to_tags(tags: list[str], tag: str) -> list[str]:
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
new_tags: list[str] = tags
new_tags.append(tag)
return new_tags

def _create_bundle_info(self, file_path: Path, parsed_metrics: PacBioMetrics) -> PacBioFileData:
tags: list[str] = self._get_tags_for_file(file_path)
if self._is_file_type_smrt_cell(file_path):
tags.append(parsed_metrics.dataset_metrics.cell_id)
bundle_name: str = parsed_metrics.dataset_metrics.cell_id
else:
tags.append(parsed_metrics.dataset_metrics.sample_internal_id)
bundle_name: str = parsed_metrics.dataset_metrics.sample_internal_id
return PacBioFileData(
bundle_name=bundle_name,
file_path=file_path,
tags=tags,
)

def _is_file_type_smrt_cell(self, file_path: Path) -> bool:
return self._get_bundle_type_for_file(file_path) == PacBioBundleTypes.SMRT_CELL
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _get_report_files(run_path: Path) -> list[Path]:
@staticmethod
def _find_hifi_files(run_path: Path) -> list[Path]:
"""Return the paths to the HiFi read files."""
hifi_dir = Path(run_path, PacBioDirsAndFiles.HIFI_READS_DIR)
hifi_dir = Path(run_path, PacBioDirsAndFiles.HIFI_READS)
bam_files: list[Path] = get_files_matching_pattern(
directory=hifi_dir, pattern=f"*{FileExtensions.BAM}*"
)
Expand Down
15 changes: 15 additions & 0 deletions cg/utils/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Util functions that provide support in mapping objects."""

import re
from cg.exc import CgError


def get_item_by_pattern_in_source(source: str, pattern_map: dict[str, any]) -> any:
"""
Check if the pattern is in the keys of the pattern map.
Raises an CgError.
"""
for map_key in pattern_map.keys():
if re.search(map_key, source):
return pattern_map.get(map_key)
raise CgError(f"Could not find pattern for {source} in {pattern_map.keys()}.")
18 changes: 18 additions & 0 deletions tests/fixture_plugins/pacbio_fixtures/name_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pytest

from cg.constants.pacbio import PacBioDirsAndFiles
from cg.services.post_processing.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.post_processing.pacbio.metrics_parser.models import PacBioMetrics


@pytest.fixture
Expand All @@ -26,3 +28,19 @@ def pac_bio_1_a01_cell_full_name() -> str:
def ccs_report_1_a01_name(pac_bio_1_a01_cell_full_name: str) -> str:
"""Return the name of a ccs report file."""
return f"{pac_bio_1_a01_cell_full_name}.{PacBioDirsAndFiles.CCS_REPORT_SUFFIX}"


@pytest.fixture
def expected_smrt_cell_bundle_name(
pac_bio_metrics_parser: PacBioMetricsParser, expected_pac_bio_run_data
) -> str:
parsed_metrics: PacBioMetrics = pac_bio_metrics_parser.parse_metrics(expected_pac_bio_run_data)
return parsed_metrics.dataset_metrics.cell_id


@pytest.fixture
def expexted_pac_bio_sample_name(
pac_bio_metrics_parser: PacBioMetricsParser, expected_pac_bio_run_data
):
parsed_metrics: PacBioMetrics = pac_bio_metrics_parser.parse_metrics(expected_pac_bio_run_data)
return parsed_metrics.dataset_metrics.sample_internal_id
2 changes: 1 addition & 1 deletion tests/fixture_plugins/pacbio_fixtures/path_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def pac_bio_smrt_cell_dir_1_a01(pac_bio_test_run_dir: Path, pac_bio_smrt_cell_na
@pytest.fixture
def pac_bio_hifi_reads_dir(pac_bio_smrt_cell_dir_1_a01: Path) -> Path:
"""Return the path to a PacBio HiFi reads directory."""
return Path(pac_bio_smrt_cell_dir_1_a01, PacBioDirsAndFiles.HIFI_READS_DIR)
return Path(pac_bio_smrt_cell_dir_1_a01, PacBioDirsAndFiles.HIFI_READS)


@pytest.fixture
Expand Down
32 changes: 27 additions & 5 deletions tests/fixture_plugins/pacbio_fixtures/service_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
"""Module for PacBio fixtures returning service objects."""

from pathlib import Path

import pytest

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.services.post_processing.pacbio.housekeeper_service.pacbio_houskeeper_service import (
PacBioHousekeeperService,
)
from cg.services.post_processing.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.post_processing.pacbio.run_file_manager.run_file_manager import (
PacBioRunFileManager,
)


@pytest.fixture
def pac_bio_run_file_manager() -> PacBioRunFileManager:
return PacBioRunFileManager()


@pytest.fixture
def pac_bio_metrics_parser(pac_bio_run_file_manager: PacBioRunFileManager) -> PacBioMetricsParser:
return PacBioMetricsParser(file_manager=pac_bio_run_file_manager)


@pytest.fixture
def pac_bio_metrics_parser(pac_bio_smrt_cell_dir_1_a01: Path) -> PacBioMetricsParser:
"""Return a PacBio metrics parser."""
return PacBioMetricsParser(pac_bio_smrt_cell_dir_1_a01)
def pac_bio_housekeeper_service(
real_housekeeper_api: HousekeeperAPI,
pac_bio_run_file_manager: PacBioRunFileManager,
pac_bio_metrics_parser: PacBioMetricsParser,
) -> PacBioHousekeeperService:
return PacBioHousekeeperService(
hk_api=real_housekeeper_api,
file_manager=pac_bio_run_file_manager,
metrics_parser=pac_bio_metrics_parser,
)
9 changes: 9 additions & 0 deletions tests/services/post_processing/pacbio/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@

import pytest

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.services.post_processing.pacbio.housekeeper_service.pacbio_houskeeper_service import (
PacBioHousekeeperService,
)
from cg.services.post_processing.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.post_processing.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.post_processing.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.post_processing.pacbio.run_file_manager.run_file_manager import (
PacBioRunFileManager,
)


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Module for the PacBioHousekeeperService tests."""

from housekeeper.store.models import File

from cg.services.post_processing.pacbio.housekeeper_service.pacbio_houskeeper_service import (
PacBioHousekeeperService,
)
from cg.services.post_processing.pacbio.run_data_generator.run_data import PacBioRunData


def test_store_files_in_housekeeper(
pac_bio_housekeeper_service: PacBioHousekeeperService,
expected_pac_bio_run_data: PacBioRunData,
expexted_pac_bio_sample_name: str,
expected_smrt_cell_bundle_name: str,
):
# GIVEN a PacBioRunData object and a PacBioHousekeeperService

# WHEN storing files in Housekeeper
pac_bio_housekeeper_service.store_files_in_housekeeper(expected_pac_bio_run_data)

# THEN a SMRT cell bundle is created
assert pac_bio_housekeeper_service.hk_api.get_latest_bundle_version(
expected_smrt_cell_bundle_name
)

# THEN a sample bundle is created
assert pac_bio_housekeeper_service.hk_api.get_latest_bundle_version(
expexted_pac_bio_sample_name
)

# THEN all expected files are listed under the smrt cell bundle
smrt_bundle_files: list[File] = (
pac_bio_housekeeper_service.hk_api.get_files_from_latest_version(
bundle_name=expected_smrt_cell_bundle_name
)
)
assert smrt_bundle_files

# THEN all expected files are listed under the sample bundle
sample_bundle_files: list[File] = pac_bio_housekeeper_service.hk_api.get_latest_bundle_version(
bundle_name=expexted_pac_bio_sample_name
)
assert sample_bundle_files
Loading
Loading