Skip to content

Commit

Permalink
refactor(rsync api) (#3694)
Browse files Browse the repository at this point in the history
# Description

refactoring of rsync api
  • Loading branch information
ChrOertlin authored Sep 6, 2024
1 parent e089a7f commit c762c03
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 120 deletions.
23 changes: 4 additions & 19 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants.cli_options import DRY_RUN
from cg.constants.delivery import FileDeliveryOption
from cg.meta.rsync.rsync_api import RsyncAPI
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.models.cg_config import CGConfig
from cg.services.file_delivery.deliver_files_service.deliver_files_service import (
DeliverFilesService,
Expand Down Expand Up @@ -46,7 +46,7 @@ def rsync(context: CGConfig, ticket: str, dry_run: bool):
rsynced with this function to the customers inbox on the delivery server
"""
tb_api: TrailblazerAPI = context.trailblazer_api
rsync_api: RsyncAPI = RsyncAPI(config=context)
rsync_api: DeliveryRsyncService = context.delivery_rsync_service
slurm_id = rsync_api.run_rsync_for_ticket(ticket=ticket, dry_run=dry_run)
LOG.info(f"Rsync to the delivery server running as job {slurm_id}")
rsync_api.add_to_trailblazer_api(
Expand Down Expand Up @@ -74,14 +74,7 @@ def deliver_case(
Deliver all case files based on delivery type to the customer inbox on the HPC
"""
inbox: str = context.delivery_path
rsync_api: RsyncAPI = RsyncAPI(config=context)
service_builder = DeliveryServiceFactory(
store=context.status_db,
hk_api=context.housekeeper_api,
tb_service=context.trailblazer_api,
rsync_service=rsync_api,
analysis_service=context.analysis_service,
)
service_builder: DeliveryServiceFactory = context.delivery_service_factory
case: Case = context.status_db.get_case_by_internal_id(internal_id=case_id)
if not case:
LOG.error(f"Could not find case with id {case_id}")
Expand Down Expand Up @@ -110,15 +103,7 @@ def deliver_ticket(
Deliver all case files based on delivery type to the customer inbox on the HPC for cases connected to a ticket.
"""
inbox: str = context.delivery_path
rsync_api: RsyncAPI = RsyncAPI(config=context)
service_builder = DeliveryServiceFactory(
store=context.status_db,
hk_api=context.housekeeper_api,
tb_service=context.trailblazer_api,
rsync_service=rsync_api,
analysis_service=context.analysis_service,
)

service_builder: DeliveryServiceFactory = context.delivery_service_factory
cases: list[Case] = context.status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.error(f"Could not find case connected to ticket {ticket}")
Expand Down
4 changes: 2 additions & 2 deletions cg/cli/workflow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from cg.constants.cli_options import DRY_RUN, SKIP_CONFIRMATION, FORCE, COMMENT
from cg.constants.observations import LOQUSDB_SUPPORTED_WORKFLOWS
from cg.exc import IlluminaRunsNeededError
from cg.meta.rsync import RsyncAPI
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.balsamic import BalsamicAnalysisAPI
from cg.meta.workflow.balsamic_pon import BalsamicPonAnalysisAPI
Expand All @@ -25,6 +24,7 @@
from cg.meta.workflow.mutant import MutantAnalysisAPI
from cg.meta.workflow.rnafusion import RnafusionAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.store.store import Store

ARGUMENT_BEFORE_STR = click.argument("before_str", type=str)
Expand Down Expand Up @@ -150,7 +150,7 @@ def rsync_past_run_dirs(
) -> None:
"""Remove deliver workflow commands."""

rsync_api: RsyncAPI = RsyncAPI(config=context)
rsync_api: DeliveryRsyncService = context.delivery_rsync_service

before: dt.datetime = parse_date(before_str)

Expand Down
1 change: 0 additions & 1 deletion cg/meta/rsync/__init__.py

This file was deleted.

5 changes: 4 additions & 1 deletion cg/meta/transfer/external_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from cg.constants import HK_FASTQ_TAGS, FileExtensions
from cg.constants.constants import CaseActions
from cg.meta.meta import MetaAPI
from cg.meta.rsync.sbatch import ERROR_RSYNC_FUNCTION, RSYNC_CONTENTS_COMMAND
from cg.services.file_delivery.rsync_service.sbatch import (
ERROR_RSYNC_FUNCTION,
RSYNC_CONTENTS_COMMAND,
)
from cg.meta.transfer.utils import are_all_fastq_valid
from cg.models.cg_config import CGConfig
from cg.models.slurm.sbatch import Sbatch
Expand Down
10 changes: 1 addition & 9 deletions cg/meta/upload/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from cg.exc import AnalysisAlreadyUploadedError, AnalysisUploadError
from cg.meta.meta import MetaAPI
from cg.meta.rsync import RsyncAPI
from cg.meta.upload.error_handling import handle_delivery_type_errors
from cg.meta.upload.scout.uploadscoutapi import UploadScoutAPI
from cg.meta.workflow.analysis import AnalysisAPI
Expand Down Expand Up @@ -96,14 +95,7 @@ def verify_analysis_upload(case_obj: Case, restart: bool) -> None:
@handle_delivery_type_errors
def upload_files_to_customer_inbox(self, case: Case) -> None:
"""Uploads the analysis files to the customer inbox."""
rsync_service = RsyncAPI(config=self.config)
factory_service = DeliveryServiceFactory(
store=self.status_db,
hk_api=self.housekeeper_api,
tb_service=self.trailblazer_api,
rsync_service=rsync_service,
analysis_service=self.config.analysis_service,
)
factory_service: DeliveryServiceFactory = self.config.delivery_service_factory
delivery_service: DeliverFilesService = factory_service.build_delivery_service(
delivery_type=case.data_delivery,
workflow=case.data_analysis,
Expand Down
44 changes: 40 additions & 4 deletions cg/models/cg_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
)
from cg.services.file_delivery.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.services.file_delivery.rsync_service.models import RsyncDeliveryConfig
from cg.services.pdc_service.pdc_service import PdcService
from cg.services.run_devices.pacbio.data_storage_service.pacbio_store_service import (
PacBioStoreService,
Expand Down Expand Up @@ -283,12 +288,12 @@ class GisaidConfig(CommonAppConfig):


class DataDeliveryConfig(BaseModel):
destination_path: str
covid_destination_path: str
covid_source_path = str
covid_report_path: str
account: str
base_path: str
covid_destination_path: str
covid_source_path: str
covid_report_path: str
destination_path: str
mail_user: str


Expand Down Expand Up @@ -382,6 +387,8 @@ class CGConfig(BaseModel):
data_delivery: DataDeliveryConfig = Field(None, alias="data-delivery")
data_flow: DataFlowConfig | None = None
delivery_api_: DeliveryAPI | None = None
delivery_rsync_service_: DeliveryRsyncService | None = None
delivery_service_factory_: DeliveryServiceFactory | None = None
demultiplex: DemultiplexConfig = None
demultiplex_api_: DemultiplexingAPI = None
encryption: Encryption | None = None
Expand Down Expand Up @@ -713,3 +720,32 @@ def delivery_api(self) -> DeliveryAPI:
@property
def sequencing_qc_service(self) -> SequencingQCService:
return SequencingQCService(self.status_db)

@property
def delivery_rsync_service(self) -> DeliveryRsyncService:
service = self.delivery_rsync_service_
if service is None:
LOG.debug("Instantiating delivery rsync service")
rsync_config = RsyncDeliveryConfig(**self.data_delivery.dict())
service = DeliveryRsyncService(
delivery_path=self.delivery_path,
rsync_config=rsync_config,
status_db=self.status_db,
)
self.delivery_rsync_service_ = service
return service

@property
def delivery_service_factory(self) -> DeliveryServiceFactory:
factory = self.delivery_service_factory_
if not factory:
LOG.debug("Instantiating delivery service factory")
factory = DeliveryServiceFactory(
store=self.status_db,
hk_api=self.housekeeper_api,
tb_service=self.trailblazer_api,
rsync_service=self.delivery_rsync_service,
analysis_service=self.analysis_service,
)
self.delivery_service_factory_ = factory
return factory
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from cg.apps.tb.models import TrailblazerAnalysis
from cg.constants import Priority, Workflow
from cg.constants.tb import AnalysisTypes
from cg.meta.rsync import RsyncAPI
from cg.services.analysis_service.analysis_service import AnalysisService
from cg.services.file_delivery.fetch_file_service.fetch_delivery_files_service import (
FetchDeliveryFilesService,
Expand All @@ -18,6 +17,7 @@
from cg.services.file_delivery.move_files_service.move_delivery_files_service import (
MoveDeliveryFilesService,
)
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.store.exc import EntryNotFoundError
from cg.store.models import Case
from cg.store.store import Store
Expand All @@ -40,7 +40,7 @@ def __init__(
delivery_file_manager_service: FetchDeliveryFilesService,
move_file_service: MoveDeliveryFilesService,
file_formatter_service: DeliveryFileFormattingService,
rsync_service: RsyncAPI,
rsync_service: DeliveryRsyncService,
tb_service: TrailblazerAPI,
analysis_service: AnalysisService,
status_db: Store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.apps.tb import TrailblazerAPI
from cg.constants import Workflow, DataDelivery
from cg.meta.rsync import RsyncAPI
from cg.services.analysis_service.analysis_service import AnalysisService
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
Expand Down Expand Up @@ -49,6 +48,7 @@
from cg.services.file_delivery.move_files_service.move_delivery_files_service import (
MoveDeliveryFilesService,
)
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.store.store import Store


Expand All @@ -59,7 +59,7 @@ def __init__(
self,
store: Store,
hk_api: HousekeeperAPI,
rsync_service: RsyncAPI,
rsync_service: DeliveryRsyncService,
tb_service: TrailblazerAPI,
analysis_service: AnalysisService,
):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,37 @@
from cg.constants.tb import AnalysisTypes
from cg.exc import CgError
from cg.io.controller import WriteFile
from cg.meta.meta import MetaAPI
from cg.meta.rsync.sbatch import (
from cg.services.file_delivery.rsync_service.models import RsyncDeliveryConfig
from cg.services.file_delivery.rsync_service.sbatch import (
COVID_RSYNC,
ERROR_RSYNC_FUNCTION,
RSYNC_COMMAND,
COVID_REPORT_RSYNC,
)
from cg.models.cg_config import CGConfig

from cg.models.slurm.sbatch import Sbatch
from cg.store.models import Case
from cg.store.store import Store

LOG = logging.getLogger(__name__)


class RsyncAPI(MetaAPI):
def __init__(self, config: CGConfig):
super().__init__(config)
self.delivery_path: str = config.delivery_path
self.destination_path: str = config.data_delivery.destination_path
self.covid_destination_path: str = config.data_delivery.covid_destination_path
self.covid_report_path: str = config.data_delivery.covid_report_path
self.base_path: Path = Path(config.data_delivery.base_path)
self.account: str = config.data_delivery.account
self.log_dir: Path = Path(config.data_delivery.base_path)
self.mail_user: str = config.data_delivery.mail_user
class DeliveryRsyncService:
def __init__(
self,
delivery_path: str,
rsync_config: RsyncDeliveryConfig,
status_db: Store,
):
self.status_db = status_db
self.delivery_path: str = delivery_path
self.destination_path: str = rsync_config.destination_path
self.covid_destination_path: str = rsync_config.covid_destination_path
self.covid_report_path: str = rsync_config.covid_report_path
self.base_path: Path = Path(rsync_config.base_path)
self.account: str = rsync_config.account
self.log_dir: Path = Path(rsync_config.base_path)
self.mail_user: str = rsync_config.mail_user
self.workflow: str = Workflow.RSYNC

@property
Expand Down
11 changes: 11 additions & 0 deletions cg/services/file_delivery/rsync_service/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel


class RsyncDeliveryConfig(BaseModel):
account: str
base_path: str
covid_destination_path: str
covid_source_path: str
covid_report_path: str
destination_path: str
mail_user: str
File renamed without changes.
12 changes: 0 additions & 12 deletions tests/cli/upload/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
HkMipAnalysisTag,
)
from cg.io.controller import ReadFile
from cg.meta.rsync import RsyncAPI
from cg.meta.upload.scout.uploadscoutapi import UploadScoutAPI
from cg.meta.workflow.mip import MipAnalysisAPI
from cg.meta.workflow.mip_dna import MipDNAAnalysisAPI
Expand Down Expand Up @@ -196,17 +195,6 @@ def fastq_context(
cg_context: CGConfig,
) -> CGConfig:
"""Fastq context to use in cli"""

base_context.meta_apis["delivery_api"] = DeliverAPI(
store=base_context.status_db,
hk_api=base_context.housekeeper_api,
case_tags=PIPELINE_ANALYSIS_TAG_MAP[Workflow.FASTQ]["case_tags"],
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[Workflow.FASTQ]["sample_tags"],
delivery_type="fastq",
project_base_path=Path(base_context.delivery_path),
fastq_file_service=FastqConcatenationService(),
)
base_context.meta_apis["rsync_api"] = RsyncAPI(cg_context)
base_context.trailblazer_api_ = trailblazer_api
return base_context

Expand Down
9 changes: 5 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from cg.io.controller import WriteFile
from cg.io.json import read_json, write_json
from cg.io.yaml import read_yaml, write_yaml
from cg.meta.rsync import RsyncAPI
from cg.meta.tar.tar import TarAPI
from cg.meta.transfer.external_data import ExternalDataAPI
from cg.meta.workflow.jasen import JasenAnalysisAPI
Expand All @@ -54,6 +53,7 @@
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.models.taxprofiler.taxprofiler import TaxprofilerParameters, TaxprofilerSampleSheetEntry
from cg.models.tomte.tomte import TomteParameters, TomteSampleSheetHeaders
from cg.services.file_delivery.rsync_service.delivery_rsync_service import DeliveryRsyncService
from cg.services.illumina.backup.encrypt_service import IlluminaRunEncryptionService
from cg.services.illumina.data_transfer.data_transfer_service import IlluminaDataTransferService
from cg.store.database import create_all_tables, drop_all_tables, initialize_database
Expand Down Expand Up @@ -606,9 +606,9 @@ def demultiplexing_api(


@pytest.fixture
def rsync_api(cg_context: CGConfig) -> RsyncAPI:
"""RsyncAPI fixture."""
return RsyncAPI(config=cg_context)
def delivery_rsync_service(cg_context: CGConfig) -> DeliveryRsyncService:
"""Delivery Rsync service fixture."""
return cg_context.delivery_rsync_service


@pytest.fixture
Expand Down Expand Up @@ -1956,6 +1956,7 @@ def context_config(
"account": "development",
"base_path": "/another/path",
"covid_destination_path": "server.name.se:/another/%s/foldername/",
"covid_source_path": "a/source/path",
"covid_report_path": "/folder_structure/%s/yet_another_folder/filename_%s_data_*.csv",
"destination_path": "server.name.se:/some",
"mail_user": email_address,
Expand Down
Empty file.
Empty file.
File renamed without changes.
Loading

0 comments on commit c762c03

Please sign in to comment.